From: Frederik Vanrenterghem Date: Thu, 6 Sep 2018 13:37:36 +0000 (+0800) Subject: Add R consumer for Kafka topic using REST api X-Git-Url: http://git.vanrenterghem.biz/R/project-using-kafka-in-R.git/commitdiff_plain/7df155dc850d206391becead77267dd0e1243715?ds=sidebyside Add R consumer for Kafka topic using REST api Uses Kafka REST Proxy by Confluent. --- diff --git a/kafkaConsumer.R b/kafkaConsumer.R new file mode 100644 index 0000000..64c4544 --- /dev/null +++ b/kafkaConsumer.R @@ -0,0 +1,37 @@ +# Read via REST API from Kafka topic +# Prerequisite: set up kafka with topic accesslogapache + + +library(httr) +library(jsonlite) +kafka_rest_proxy <- "http://localhost:8082" + +# Create consumer +# See https://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages + +response <- POST(url=paste(kafka_rest_proxy, "consumers", "my_json_consumer", sep="/"), + content_type("application/vnd.kafka.v2+json"), + accept("application/vnd.kafka.v2+json"), + body='{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}') +fromJSON(content(response, "text")) + +response <- POST(url=paste(kafka_rest_proxy, + "consumers", + "my_json_consumer", + "instances", + "my_consumer_instance", + "subscription", sep="/"), + content_type("application/vnd.kafka.v2+json"), + body = '{"topics":["accesslogapache"]}') +response + +# Obtain all messages on the topic +messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"), + accept("application/vnd.kafka.json.v2+json"), + encode="json") + +apachelog <- fromJSON(content(messages,"text")) + +# Remove the consumer +DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"), + content_type("application/vnd.kafka.v2+json"))