]> git.vanrenterghem.biz Git - R/project-using-kafka-in-R.git/commitdiff
Add R consumer for Kafka topic using REST api
authorFrederik Vanrenterghem <frederik@vanrenterghem.io>
Thu, 6 Sep 2018 13:37:36 +0000 (21:37 +0800)
committerFrederik Vanrenterghem <frederik@vanrenterghem.io>
Thu, 6 Sep 2018 13:37:36 +0000 (21:37 +0800)
Uses Kafka REST Proxy by Confluent.

kafkaConsumer.R [new file with mode: 0644]

diff --git a/kafkaConsumer.R b/kafkaConsumer.R
new file mode 100644 (file)
index 0000000..64c4544
--- /dev/null
@@ -0,0 +1,37 @@
+# Read via REST API from Kafka topic
+# Prerequisite: set up kafka with topic accesslogapache
+
+
+library(httr)
+library(jsonlite)
+kafka_rest_proxy <- "http://localhost:8082"
+
+# Create consumer
+# See https://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
+
+response <- POST(url=paste(kafka_rest_proxy, "consumers", "my_json_consumer", sep="/"),
+     content_type("application/vnd.kafka.v2+json"),
+     accept("application/vnd.kafka.v2+json"),
+     body='{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}')
+fromJSON(content(response, "text"))
+
+response <- POST(url=paste(kafka_rest_proxy,
+                           "consumers",
+                           "my_json_consumer",
+                           "instances",
+                           "my_consumer_instance",
+                           "subscription", sep="/"),
+               content_type("application/vnd.kafka.v2+json"),
+               body = '{"topics":["accesslogapache"]}')
+response
+
+# Obtain all messages on the topic
+messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
+                accept("application/vnd.kafka.json.v2+json"),
+                encode="json")
+
+apachelog <- fromJSON(content(messages,"text"))
+
+# Remove the consumer
+DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"),
+       content_type("application/vnd.kafka.v2+json"))