#' Create Apache Kafka consumer using Confluent's REST Proxy API #' #' \code{CreateKafkaConsumer} creates an Apache Kafka consumer using the rest proxy api provided by Confluent. #' #' @param kafka.rest.proxy The URI of the REST Proxy. Defaults to \url{http://localhost:8082}. #' @param consumer A chr object with the name for the consumer. #' @param consumer_instance A chr object with the name for the consumer_instance. #' #' @return If the operation succeeds, returns a list with the created consumer instance and its base URI. #' If the operation fails, the received reply is returned. Use \code{jsonlite::fromJSON(content(response, "text"))} #' to obtain a list with the error code and message. #' CreateKafkaConsumer <- function(kafka.rest.proxy = "http://localhost:8082", consumer, consumer_instance) { response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"), content_type("application/vnd.kafka.v2+json"), accept("application/vnd.kafka.v2+json"), body=paste0('{"name": "', consumer_instance, '", "format": "json", "auto.offset.reset": "earliest"}') ) if(response$status_code!=200) stop(response) else consumerDetails <- fromJSON(content(response, "text")) return(consumerDetails) }