]> git.vanrenterghem.biz Git - R/project-using-kafka-in-R.git/commitdiff
Create 2 plots 2 minutes apart.
authorFrederik Vanrenterghem <frederik@vanrenterghem.io>
Sat, 8 Sep 2018 11:23:31 +0000 (19:23 +0800)
committerFrederik Vanrenterghem <frederik@vanrenterghem.io>
Sat, 8 Sep 2018 11:23:31 +0000 (19:23 +0800)
* Call function from yet unlinked script to plot and forecast.
* Iterate twice to verify the script works with streaming content.

kafkaConsumer.R

index 64c4544c21ca09b59762a7c8d08713d3ef8ee661..a2ca952cc03dc55d75a77dfc600065973fb5e261 100644 (file)
@@ -4,6 +4,8 @@
 
 library(httr)
 library(jsonlite)
 
 library(httr)
 library(jsonlite)
+library(dplyr)
+
 kafka_rest_proxy <- "http://localhost:8082"
 
 # Create consumer
 kafka_rest_proxy <- "http://localhost:8082"
 
 # Create consumer
@@ -26,11 +28,25 @@ response <- POST(url=paste(kafka_rest_proxy,
 response
 
 # Obtain all messages on the topic
 response
 
 # Obtain all messages on the topic
-messages <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
+messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
                 accept("application/vnd.kafka.json.v2+json"),
                 encode="json")
 
-apachelog <- fromJSON(content(messages,"text"))
+messages <- fromJSON(content(messagesJSON,"text"))
+
+createPlot(messages$value)
+
+Sys.sleep(120)
+
+messagesJSON <- GET(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance","records", sep = "/"),
+                accept("application/vnd.kafka.json.v2+json"),
+                encode="json")
+
+messages2 <- fromJSON(content(messagesJSON,"text"))
+
+apachelog <- rbind(messages$value,messages2$value)
+
+createPlot(apachelog)
 
 # Remove the consumer
 DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"),
 
 # Remove the consumer
 DELETE(url = paste(kafka_rest_proxy,"consumers","my_json_consumer","instances","my_consumer_instance", sep = "/"),