From: Frederik Vanrenterghem Date: Thu, 13 Sep 2018 13:00:11 +0000 (+0800) Subject: Use returned base_uri. X-Git-Url: http://git.vanrenterghem.biz/R/project-using-kafka-in-R.git/commitdiff_plain/ad70a446e1f9c1d371008dc32f7374a7e3a54907 Use returned base_uri. Per doco: Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found. --- diff --git a/kafkaConsumer.R b/kafkaConsumer.R index 4a77beb..e7da6ed 100644 --- a/kafkaConsumer.R +++ b/kafkaConsumer.R @@ -22,14 +22,10 @@ response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"), consumer_instance, '", "format": "json", "auto.offset.reset": "earliest"}') ) -fromJSON(content(response, "text")) +consumerDetails <- fromJSON(content(response, "text")) # Subscribe it to topic -response <- POST(url=paste(kafka_rest_proxy, - "consumers", - consumer, - "instances", - consumer_instance, +response <- POST(url=paste(consumerDetails$base_uri, "subscription", sep="/"), content_type("application/vnd.kafka.v2+json"), body = paste0('{"topics":["', @@ -39,7 +35,7 @@ response <- POST(url=paste(kafka_rest_proxy, response # Obtain all (or latest) messages on the topic -messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"), +messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"), accept("application/vnd.kafka.json.v2+json"), encode="json") @@ -50,7 +46,7 @@ createPlot(messages$value) Sys.sleep(120) # Obtain latest messages -messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"), +messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"), accept("application/vnd.kafka.json.v2+json"), encode="json") @@ -61,5 +57,5 @@ apachelog <- rbind(messages$value,messages2$value) createPlot(apachelog) # Remove the consumer -DELETE(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance, sep = "/"), +DELETE(url = consumerDetails$base_uri, content_type("application/vnd.kafka.v2+json"))