64c4544c21ca09b59762a7c8d08713d3ef8ee661
[R/project-using-kafka-in-R.git] / kafkaConsumer.R
1 # Read via REST API from Kafka topic
2 # Prerequisite: set up kafka with topic accesslogapache
5 library(httr)
6 library(jsonlite)
7 kafka_rest_proxy <- "http://localhost:8082"
9 # Create consumer
10 # See https://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
12 response <- POST(url=paste(kafka_rest_proxy, "consumers", "my_json_consumer", sep="/"),
13      content_type("application/vnd.kafka.v2+json"),
14      accept("application/vnd.kafka.v2+json"),
15      body='{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}')
16 fromJSON(content(response, "text"))
18 response <- POST(url=paste(kafka_rest_proxy,
19                            "consumers",
20                            "my_json_consumer",
21                            "instances",
22                            "my_consumer_instance",
23                            "subscription", sep="/"),
24                content_type("application/vnd.kafka.v2+json"),
25                body = '{"topics":["accesslogapache"]}')
26 response
28 # Obtain all messages on the topic
29 messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
30                 accept("application/vnd.kafka.json.v2+json"),
31                 encode="json")
33 apachelog <- fromJSON(content(messages,"text"))
35 # Remove the consumer
36 DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"),
37        content_type("application/vnd.kafka.v2+json"))