1 #' Create Apache Kafka consumer using Confluent's REST Proxy API
3 #' \code{CreateKafkaConsumer} creates an Apache Kafka consumer using the rest proxy api provided by Confluence.
5 #' @param kafka.rest.proxy The URI of the REST Proxy. Defaults to \url{http://localhost:8082}.
6 #' @param consumer A chr object with the name for the consumer.
7 #' @param consumer_instance A chr object with the name for the consumer_instance.
9 #' @return If the operation succeeds, returns a list with the created consumer instance and its base URI.
10 #' If the operation fails, the received reply is returned. Use \code{jsonlite::fromJSON(content(response, "text"))}
11 #' to obtain a list with the error code and message.
13 CreateKafkaConsumer <- function(kafka.rest.proxy = "http://localhost:8082", consumer, consumer_instance) {
14 response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"),
15 content_type("application/vnd.kafka.v2+json"),
16 accept("application/vnd.kafka.v2+json"),
17 body=paste0('{"name": "',
19 '", "format": "json", "auto.offset.reset": "earliest"}')
21 if(response$status_code!=200) stop(response) else
22 consumerDetails <- fromJSON(content(response, "text"))
23 return(consumerDetails)