1 # Read via REST API from Kafka topic
2 # Prerequisite: set up kafka with topic accesslogapache
7 kafka_rest_proxy <- "http://localhost:8082"
10 # See https://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
12 response <- POST(url=paste(kafka_rest_proxy, "consumers", "my_json_consumer", sep="/"),
13 content_type("application/vnd.kafka.v2+json"),
14 accept("application/vnd.kafka.v2+json"),
15 body='{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}')
16 fromJSON(content(response, "text"))
18 response <- POST(url=paste(kafka_rest_proxy,
22 "my_consumer_instance",
23 "subscription", sep="/"),
24 content_type("application/vnd.kafka.v2+json"),
25 body = '{"topics":["accesslogapache"]}')
28 # Obtain all messages on the topic
29 messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
30 accept("application/vnd.kafka.json.v2+json"),
33 apachelog <- fromJSON(content(messages,"text"))
36 DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"),
37 content_type("application/vnd.kafka.v2+json"))