]> git.vanrenterghem.biz Git - R/project-using-kafka-in-R.git/blob - kafkaConsumer.R
a2ca952cc03dc55d75a77dfc600065973fb5e261
[R/project-using-kafka-in-R.git] / kafkaConsumer.R
1 # Read via REST API from Kafka topic
2 # Prerequisite: set up kafka with topic accesslogapache
5 library(httr)
6 library(jsonlite)
7 library(dplyr)
9 kafka_rest_proxy <- "http://localhost:8082"
11 # Create consumer
12 # See https://docs.confluent.io/current/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
14 response <- POST(url=paste(kafka_rest_proxy, "consumers", "my_json_consumer", sep="/"),
15      content_type("application/vnd.kafka.v2+json"),
16      accept("application/vnd.kafka.v2+json"),
17      body='{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}')
18 fromJSON(content(response, "text"))
20 response <- POST(url=paste(kafka_rest_proxy,
21                            "consumers",
22                            "my_json_consumer",
23                            "instances",
24                            "my_consumer_instance",
25                            "subscription", sep="/"),
26                content_type("application/vnd.kafka.v2+json"),
27                body = '{"topics":["accesslogapache"]}')
28 response
30 # Obtain all messages on the topic
31 messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
32                 accept("application/vnd.kafka.json.v2+json"),
33                 encode="json")
35 messages <- fromJSON(content(messagesJSON,"text"))
37 createPlot(messages$value)
39 Sys.sleep(120)
41 messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
42                 accept("application/vnd.kafka.json.v2+json"),
43                 encode="json")
45 messages2 <- fromJSON(content(messagesJSON,"text"))
47 apachelog <- rbind(messages$value,messages2$value)
49 createPlot(apachelog)
51 # Remove the consumer
52 DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"),
53        content_type("application/vnd.kafka.v2+json"))