From 96ee6f0f5f77d9fe656c14c8f6f5a9c6759981f1 Mon Sep 17 00:00:00 2001 From: Frederik Vanrenterghem Date: Sat, 8 Sep 2018 19:23:31 +0800 Subject: [PATCH] Create 2 plots 2 minutes apart. * Call function from yet unlinked script to plot and forecast. * Iterate twice to verify the script works with streaming content. --- kafkaConsumer.R | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/kafkaConsumer.R b/kafkaConsumer.R index 64c4544..a2ca952 100644 --- a/kafkaConsumer.R +++ b/kafkaConsumer.R @@ -4,6 +4,8 @@ library(httr) library(jsonlite) +library(dplyr) + kafka_rest_proxy <- "http://localhost:8082" # Create consumer @@ -26,11 +28,25 @@ response <- POST(url=paste(kafka_rest_proxy, response # Obtain all messages on the topic -messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"), +messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"), accept("application/vnd.kafka.json.v2+json"), encode="json") -apachelog <- fromJSON(content(messages,"text")) +messages <- fromJSON(content(messagesJSON,"text")) + +createPlot(messages$value) + +Sys.sleep(120) + +messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"), + accept("application/vnd.kafka.json.v2+json"), + encode="json") + +messages2 <- fromJSON(content(messagesJSON,"text")) + +apachelog <- rbind(messages$value,messages2$value) + +createPlot(apachelog) # Remove the consumer DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"), -- 2.30.2