ResetKafkaConsumerOffset <- function(consumer.base.uri, topic) { response <- POST(url=paste(consumer.base.uri, "positions", "beginning", sep="/"), content_type("application/vnd.kafka.v2+json"), body = paste0('{"partitions": [ {"topic": "', topic, '" , "partition": 0}', ']}') ) if(response$status_code == 204) { return(response) } else stop(response) }