Use returned base_uri.
authorFrederik Vanrenterghem <frederik@vanrenterghem.io>
Thu, 13 Sep 2018 13:00:11 +0000 (21:00 +0800)
committerFrederik Vanrenterghem <frederik@vanrenterghem.io>
Thu, 13 Sep 2018 13:00:11 +0000 (21:00 +0800)
Per doco: Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found.

kafkaConsumer.R

index 4a77beb..e7da6ed 100644 (file)
@@ -22,14 +22,10 @@ response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"),
                  consumer_instance,
                  '", "format": "json", "auto.offset.reset": "earliest"}')
      )
-fromJSON(content(response, "text"))
+consumerDetails <- fromJSON(content(response, "text"))
 
 # Subscribe it to topic
-response <- POST(url=paste(kafka_rest_proxy,
-                           "consumers",
-                           consumer,
-                           "instances",
-                           consumer_instance,
+response <- POST(url=paste(consumerDetails$base_uri,
                            "subscription", sep="/"),
                content_type("application/vnd.kafka.v2+json"),
                body = paste0('{"topics":["',
@@ -39,7 +35,7 @@ response <- POST(url=paste(kafka_rest_proxy,
 response
 
 # Obtain all (or latest) messages on the topic
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
@@ -50,7 +46,7 @@ createPlot(messages$value)
 Sys.sleep(120)
 
 # Obtain latest messages
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
@@ -61,5 +57,5 @@ apachelog <- rbind(messages$value,messages2$value)
 createPlot(apachelog)
 
 # Remove the consumer
-DELETE(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance, sep = "/"),
+DELETE(url = consumerDetails$base_uri,
        content_type("application/vnd.kafka.v2+json"))