+# Read via REST API from Kafka topic
+# Prerequisite: set up kafka with topic accesslogapache
+
+
+library(httr)
+library(jsonlite)
+kafka_rest_proxy <- "http://localhost:8082"
+
+# Create consumer
+# See https://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
+
+response <- POST(url=paste(kafka_rest_proxy, "consumers", "my_json_consumer", sep="/"),
+ content_type("application/vnd.kafka.v2+json"),
+ accept("application/vnd.kafka.v2+json"),
+ body='{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}')
+fromJSON(content(response, "text"))
+
+response <- POST(url=paste(kafka_rest_proxy,
+ "consumers",
+ "my_json_consumer",
+ "instances",
+ "my_consumer_instance",
+ "subscription", sep="/"),
+ content_type("application/vnd.kafka.v2+json"),
+ body = '{"topics":["accesslogapache"]}')
+response
+
+# Obtain all messages on the topic
+messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
+ accept("application/vnd.kafka.json.v2+json"),
+ encode="json")
+
+apachelog <- fromJSON(content(messages,"text"))
+
+# Remove the consumer
+DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"),
+ content_type("application/vnd.kafka.v2+json"))