-response <- POST(url=paste(kafka_rest_proxy, "consumers", "my_json_consumer", sep="/"),
- content_type("application/vnd.kafka.v2+json"),
- accept("application/vnd.kafka.v2+json"),
- body='{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}')
-fromJSON(content(response, "text"))
-
-response <- POST(url=paste(kafka_rest_proxy,
- "consumers",
- "my_json_consumer",
- "instances",
- "my_consumer_instance",
- "subscription", sep="/"),
- content_type("application/vnd.kafka.v2+json"),
- body = '{"topics":["accesslogapache"]}')
-response
-
-# Obtain all messages on the topic
-messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
+# Create consumer
+source("R/CreateKafkaConsumer.R")
+consumerDetails <- CreateKafkaConsumer(kafka.rest.proxy = kafka_rest_proxy, consumer = consumer, consumer_instance = consumer_instance)
+
+# Subscribe it to topic
+source("R/SubscribeKafkaTopic.R")
+response <- SubscribeKafkaTopic(consumerDetails$base_uri,topic = topic)
+
+# Obtain all (or latest) messages on the topic
+source("R/ConsumeKafkaMessages.R")
+messages <- ConsumeKafkaMessages(consumerDetails$base_uri)
+
+createPlot(messages$value)
+
+Sys.sleep(120)
+
+# Obtain latest messages
+messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),