]> git.vanrenterghem.biz Git - R/project-using-kafka-in-R.git/commitdiff
Add ResetKafkaConsumerOffset function.
authorFrederik Vanrenterghem <frederik@vanrenterghem.io>
Fri, 14 Sep 2018 12:52:23 +0000 (20:52 +0800)
committerFrederik Vanrenterghem <frederik@vanrenterghem.io>
Fri, 14 Sep 2018 12:52:23 +0000 (20:52 +0800)
Add barebones R function to set a Kafka consumer's topic offset back to 0.

R/ResetKafkaConsumerOffset.R [new file with mode: 0644]

diff --git a/R/ResetKafkaConsumerOffset.R b/R/ResetKafkaConsumerOffset.R
new file mode 100644 (file)
index 0000000..7383fef
--- /dev/null
@@ -0,0 +1,13 @@
+ResetKafkaConsumerOffset <- function(consumer.base.uri, topic) {
+  response <- POST(url=paste(consumer.base.uri,
+                             "positions", "beginning", sep="/"),
+                   content_type("application/vnd.kafka.v2+json"),
+                   body = paste0('{"partitions": [ {"topic":  "',
+                                 topic,
+                                 '" , "partition": 0}',
+                                 ']}')
+                   )
+  if(response$status_code == 204) {
+    return(response)
+  } else stop(response)
+}
\ No newline at end of file