Add ResetKafkaConsumerOffset function.
[R/project-using-kafka-in-R.git] / R / ResetKafkaConsumerOffset.R
diff --git a/R/ResetKafkaConsumerOffset.R b/R/ResetKafkaConsumerOffset.R
new file mode 100644 (file)
index 0000000..7383fef
--- /dev/null
@@ -0,0 +1,13 @@
+ResetKafkaConsumerOffset <- function(consumer.base.uri, topic) {
+  response <- POST(url=paste(consumer.base.uri,
+                             "positions", "beginning", sep="/"),
+                   content_type("application/vnd.kafka.v2+json"),
+                   body = paste0('{"partitions": [ {"topic":  "',
+                                 topic,
+                                 '" , "partition": 0}',
+                                 ']}')
+                   )
+  if(response$status_code == 204) {
+    return(response)
+  } else stop(response)
+}
\ No newline at end of file