]> git.vanrenterghem.biz Git - R/project-using-kafka-in-R.git/commitdiff
Use returned base_uri.
authorFrederik Vanrenterghem <frederik@vanrenterghem.io>
Thu, 13 Sep 2018 13:00:11 +0000 (21:00 +0800)
committerFrederik Vanrenterghem <frederik@vanrenterghem.io>
Thu, 13 Sep 2018 13:00:11 +0000 (21:00 +0800)
Per doco: Because consumers are stateful, any consumer instances created with the REST API are tied to a specific REST proxy instance. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer instance will not be found.

kafkaConsumer.R

index 4a77beb3037cab472757235da05d4948362a327f..e7da6ed25a27319c4369494650da48d1b0dfcec4 100644 (file)
@@ -22,14 +22,10 @@ response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"),
                  consumer_instance,
                  '", "format": "json", "auto.offset.reset": "earliest"}')
      )
                  consumer_instance,
                  '", "format": "json", "auto.offset.reset": "earliest"}')
      )
-fromJSON(content(response, "text"))
+consumerDetails <- fromJSON(content(response, "text"))
 
 # Subscribe it to topic
 
 # Subscribe it to topic
-response <- POST(url=paste(kafka_rest_proxy,
-                           "consumers",
-                           consumer,
-                           "instances",
-                           consumer_instance,
+response <- POST(url=paste(consumerDetails$base_uri,
                            "subscription", sep="/"),
                content_type("application/vnd.kafka.v2+json"),
                body = paste0('{"topics":["',
                            "subscription", sep="/"),
                content_type("application/vnd.kafka.v2+json"),
                body = paste0('{"topics":["',
@@ -39,7 +35,7 @@ response <- POST(url=paste(kafka_rest_proxy,
 response
 
 # Obtain all (or latest) messages on the topic
 response
 
 # Obtain all (or latest) messages on the topic
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
@@ -50,7 +46,7 @@ createPlot(messages$value)
 Sys.sleep(120)
 
 # Obtain latest messages
 Sys.sleep(120)
 
 # Obtain latest messages
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
@@ -61,5 +57,5 @@ apachelog <- rbind(messages$value,messages2$value)
 createPlot(apachelog)
 
 # Remove the consumer
 createPlot(apachelog)
 
 # Remove the consumer
-DELETE(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance, sep = "/"),
+DELETE(url = consumerDetails$base_uri,
        content_type("application/vnd.kafka.v2+json"))
        content_type("application/vnd.kafka.v2+json"))