consumer_instance,
'", "format": "json", "auto.offset.reset": "earliest"}')
)
-fromJSON(content(response, "text"))
+consumerDetails <- fromJSON(content(response, "text"))
# Subscribe it to topic
-response <- POST(url=paste(kafka_rest_proxy,
- "consumers",
- consumer,
- "instances",
- consumer_instance,
+response <- POST(url=paste(consumerDetails$base_uri,
"subscription", sep="/"),
content_type("application/vnd.kafka.v2+json"),
body = paste0('{"topics":["',
response
# Obtain all (or latest) messages on the topic
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
accept("application/vnd.kafka.json.v2+json"),
encode="json")
Sys.sleep(120)
# Obtain latest messages
-messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance,"records", sep = "/"),
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
accept("application/vnd.kafka.json.v2+json"),
encode="json")
createPlot(apachelog)
# Remove the consumer
-DELETE(url = paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance, sep = "/"),
+DELETE(url = consumerDetails$base_uri,
content_type("application/vnd.kafka.v2+json"))