Per doco: Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found.
consumer_instance,
'", "format": "json", "auto.offset.reset": "earliest"}')
)
consumer_instance,
'", "format": "json", "auto.offset.reset": "earliest"}')
)
-fromJSON(content(response, "text"))
+consumerDetails <- fromJSON(content(response, "text"))
-response <- POST(url=paste(kafka_rest_proxy,
- "consumers",
- consumer,
- "instances",
- consumer_instance,
+response <- POST(url=paste(consumerDetails$base_uri,
"subscription", sep="/"),
content_type("application/vnd.kafka.v2+json"),
body = paste0('{"topics":["',
"subscription", sep="/"),
content_type("application/vnd.kafka.v2+json"),
body = paste0('{"topics":["',
response
# Obtain all (or latest) messages on the topic
response
# Obtain all (or latest) messages on the topic
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
accept("application/vnd.kafka.json.v2+json"),
encode="json")
accept("application/vnd.kafka.json.v2+json"),
encode="json")
Sys.sleep(120)
# Obtain latest messages
Sys.sleep(120)
# Obtain latest messages
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
accept("application/vnd.kafka.json.v2+json"),
encode="json")
accept("application/vnd.kafka.json.v2+json"),
encode="json")
createPlot(apachelog)
# Remove the consumer
createPlot(apachelog)
# Remove the consumer
-DELETE(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance, sep = "/"),
+DELETE(url = consumerDetails$base_uri,
content_type("application/vnd.kafka.v2+json"))
content_type("application/vnd.kafka.v2+json"))