--- /dev/null
+CreateKafkaConsumer <- function(kafka.rest.proxy = "http://localhost:8082", consumer, consumer_instance) {
+ response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"),
+ content_type("application/vnd.kafka.v2+json"),
+ accept("application/vnd.kafka.v2+json"),
+ body=paste0('{"name": "',
+ consumer_instance,
+ '", "format": "json", "auto.offset.reset": "earliest"}')
+ )
+ consumerDetails <- fromJSON(content(response, "text"))
+ return(consumerDetails)
+}
\ No newline at end of file
# See https://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
# Create consumer
-response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"),
- content_type("application/vnd.kafka.v2+json"),
- accept("application/vnd.kafka.v2+json"),
- body=paste0('{"name": "',
- consumer_instance,
- '", "format": "json", "auto.offset.reset": "earliest"}')
- )
-consumerDetails <- fromJSON(content(response, "text"))
+source("R/CreateKafkaConsumer.R")
+consumerDetails <- CreateKafkaConsumer(kafka.rest.proxy = kafka_rest_proxy, consumer = consumer, consumer_instance = consumer_instance)
# Subscribe it to topic
response <- POST(url=paste(consumerDetails$base_uri,