]> git.vanrenterghem.biz Git - R/project-using-kafka-in-R.git/commitdiff
Break out ConsumeKafkaMessages function.
authorFrederik Vanrenterghem <frederik@vanrenterghem.io>
Fri, 14 Sep 2018 11:56:50 +0000 (19:56 +0800)
committerFrederik Vanrenterghem <frederik@vanrenterghem.io>
Fri, 14 Sep 2018 11:56:50 +0000 (19:56 +0800)
R/ConsumeKafkaMessages.R [new file with mode: 0644]
kafkaConsumer.R

diff --git a/R/ConsumeKafkaMessages.R b/R/ConsumeKafkaMessages.R
new file mode 100644 (file)
index 0000000..4013b5c
--- /dev/null
@@ -0,0 +1,8 @@
+ConsumeKafkaMessages <- function(consumer.base.uri) {
+  messagesJSON <- GET(url = paste(consumer.base.uri,"records", sep = "/"),
+                      accept("application/vnd.kafka.json.v2+json"),
+                      encode="json")
+  if(messagesJSON$status_code!=200) stop(messagesJSON) else
+  messages <- fromJSON(content(messagesJSON,"text"))
+  return(messages)
+}
\ No newline at end of file
index 8b5e6c7f8acee1edb8c53debed53cc37fb002030..d6b092b917a6af028e2725f4876790ef317b203b 100644 (file)
@@ -29,11 +29,8 @@ response <- POST(url=paste(consumerDetails$base_uri,
 response
 
 # Obtain all (or latest) messages on the topic
 response
 
 # Obtain all (or latest) messages on the topic
-messagesJSON <- GET(url = paste(consumerDetails$base_uri,"records", sep = "/"),
-                accept("application/vnd.kafka.json.v2+json"),
-                encode="json")
-
-messages <- fromJSON(content(messagesJSON,"text"))
+source("R/ConsumeKafkaMessages.R")
+messages <- ConsumeKafkaMessages(consumerDetails$base_uri)
 
 createPlot(messages$value)
 
 
 createPlot(messages$value)