Add function documentation and error handling.
[R/project-using-kafka-in-R.git] / R / CreateKafkaConsumer.R
1 #' Create Apache Kafka consumer using Confluent's REST Proxy API
2 #' 
3 #' \code{CreateKafkaConsumer} creates an Apache Kafka consumer using the rest proxy api provided by Confluence.
4 #' 
5 #' @param kafka.rest.proxy The URI of the REST Proxy. Defaults to \url{http://localhost:8082}.
6 #' @param consumer A chr object with the name for the consumer.
7 #' @param consumer_instance A chr object with the name for the consumer_instance.
8 #' 
9 #' @return If the operation succeeds, returns a list with the created consumer instance and its base URI.
10 #'   If the operation fails, the received reply is returned. Use \code{jsonlite::fromJSON(content(response, "text"))}
11 #'   to obtain a list with the error code and message.
12 #'   
13 CreateKafkaConsumer <- function(kafka.rest.proxy = "http://localhost:8082", consumer, consumer_instance) {
14   response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"),
15                    content_type("application/vnd.kafka.v2+json"),
16                    accept("application/vnd.kafka.v2+json"),
17                    body=paste0('{"name": "',
18                                consumer_instance,
19                                '", "format": "json", "auto.offset.reset": "earliest"}')
20   )
21   if(response$status_code!=200) stop(response) else
22   consumerDetails <- fromJSON(content(response, "text"))
23   return(consumerDetails)
24 }