From ad70a446e1f9c1d371008dc32f7374a7e3a54907 Mon Sep 17 00:00:00 2001 From: Frederik Vanrenterghem Date: Thu, 13 Sep 2018 21:00:11 +0800 Subject: [PATCH] Use returned base_uri. Per doco: Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found. --- kafkaConsumer.R | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/kafkaConsumer.R b/kafkaConsumer.R index 4a77beb..e7da6ed 100644 --- a/kafkaConsumer.R +++ b/kafkaConsumer.R @@ -22,14 +22,10 @@ response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"), consumer_instance, '", "format": "json", "auto.offset.reset": "earliest"}') ) -fromJSON(content(response, "text")) +consumerDetails <- fromJSON(content(response, "text")) # Subscribe it to topic -response <- POST(url=paste(kafka_rest_proxy, - "consumers", - consumer, - "instances", - consumer_instance, +response <- POST(url=paste(consumerDetails$base_uri, "subscription", sep="/"), content_type("application/vnd.kafka.v2+json"), body = paste0('{"topics":["', @@ -39,7 +35,7 @@ response <- POST(url=paste(kafka_rest_proxy, response # Obtain all (or latest) messages on the topic -messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"), +messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"), accept("application/vnd.kafka.json.v2+json"), encode="json") @@ -50,7 +46,7 @@ createPlot(messages$value) Sys.sleep(120) # Obtain latest messages -messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"), +messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"), accept("application/vnd.kafka.json.v2+json"), encode="json") @@ -61,5 +57,5 @@ apachelog <- rbind(messages$value,messages2$value) createPlot(apachelog) # Remove the consumer -DELETE(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance, sep = "/"), +DELETE(url = consumerDetails$base_uri, content_type("application/vnd.kafka.v2+json")) -- 2.30.2