--- /dev/null
+ConsumeKafkaMessages <- function(consumer.base.uri) {
+ messagesJSON <- GET(url = paste(consumer.base.uri,"records", sep = "/"),
+ accept("application/vnd.kafka.json.v2+json"),
+ encode="json")
+ if(messagesJSON$status_code!=200) stop(messagesJSON) else
+ messages <- fromJSON(content(messagesJSON,"text"))
+ return(messages)
+}
\ No newline at end of file
response
# Obtain all (or latest) messages on the topic
-messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
- accept("application/vnd.kafka.json.v2+json"),
- encode="json")
-
-messages <- fromJSON(content(messagesJSON,"text"))
+source("R/ConsumeKafkaMessages.R")
+messages <- ConsumeKafkaMessages(consumerDetails$base_uri)
createPlot(messages$value)