Add ResetKafkaConsumerOffset function.
[R/project-using-kafka-in-R.git] / R / ResetKafkaConsumerOffset.R
1 ResetKafkaConsumerOffset <- function(consumer.base.uri, topic) {
2   response <- POST(url=paste(consumer.base.uri,
3                              "positions", "beginning", sep="/"),
4                    content_type("application/vnd.kafka.v2+json"),
5                    body = paste0('{"partitions": [ {"topic":  "',
6                                  topic,
7                                  '" , "partition": 0}',
8                                  ']}')
9                    )
10   if(response$status_code == 204) {
11     return(response)
12   } else stop(response)
13 }