Photos
Blog
Projects
vanrenterghem.biz
projects
/
R
/
project-using-kafka-in-R.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Use returned base_uri.
[R/project-using-kafka-in-R.git]
/
kafkaConsumer.R
diff --git
a/kafkaConsumer.R
b/kafkaConsumer.R
index 4a77beb3037cab472757235da05d4948362a327f..e7da6ed25a27319c4369494650da48d1b0dfcec4 100644
(file)
--- a/
kafkaConsumer.R
+++ b/
kafkaConsumer.R
@@
-22,14
+22,10
@@
response <- POST(url=paste(kafka_rest_proxy, "consumers", consumer, sep="/"),
consumer_instance,
'", "format": "json", "auto.offset.reset": "earliest"}')
)
consumer_instance,
'", "format": "json", "auto.offset.reset": "earliest"}')
)
-fromJSON(content(response, "text"))
+
consumerDetails <-
fromJSON(content(response, "text"))
# Subscribe it to topic
# Subscribe it to topic
-response <- POST(url=paste(kafka_rest_proxy,
- "consumers",
- consumer,
- "instances",
- consumer_instance,
+response <- POST(url=paste(consumerDetails$base_uri,
"subscription", sep="/"),
content_type("application/vnd.kafka.v2+json"),
body = paste0('{"topics":["',
"subscription", sep="/"),
content_type("application/vnd.kafka.v2+json"),
body = paste0('{"topics":["',
@@
-39,7
+35,7
@@
response <- POST(url=paste(kafka_rest_proxy,
response
# Obtain all (or latest) messages on the topic
response
# Obtain all (or latest) messages on the topic
-messagesJSON <- GET(url = paste(
kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance
,"records", sep = "/"),
+messagesJSON <- GET(url = paste(
consumerDetails$base_uri
,"records", sep = "/"),
accept("application/vnd.kafka.json.v2+json"),
encode="json")
accept("application/vnd.kafka.json.v2+json"),
encode="json")
@@
-50,7
+46,7
@@
createPlot(messages$value)
Sys.sleep(120)
# Obtain latest messages
Sys.sleep(120)
# Obtain latest messages
-messagesJSON <- GET(url = paste(
kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance
,"records", sep = "/"),
+messagesJSON <- GET(url = paste(
consumerDetails$base_uri
,"records", sep = "/"),
accept("application/vnd.kafka.json.v2+json"),
encode="json")
accept("application/vnd.kafka.json.v2+json"),
encode="json")
@@
-61,5
+57,5
@@
apachelog <- rbind(messages$value,messages2$value)
createPlot(apachelog)
# Remove the consumer
createPlot(apachelog)
# Remove the consumer
-DELETE(url =
paste(kafka_rest_proxy,"consumers",consumer,"instances",consumer_instance, sep = "/")
,
+DELETE(url =
consumerDetails$base_uri
,
content_type("application/vnd.kafka.v2+json"))
content_type("application/vnd.kafka.v2+json"))