Skip to content

Instantly share code, notes, and snippets.

@ryanbriones
Created July 6, 2012 21:12
Show Gist options
  • Save ryanbriones/3062772 to your computer and use it in GitHub Desktop.
Save ryanbriones/3062772 to your computer and use it in GitHub Desktop.
A Storm ShellBolt written in R; logs the message and acks it
library(RJSONIO)
cStdin <- file('stdin')
open(cStdin)
readMessage <- function() {
message <- c()
while(length(line <- readLines(cStdin, n = 1)) > 0) {
if(line == "end") { break }
message <- c(message, line)
}
return(paste(message, collapse = "\n"))
}
sendMessage <- function(message) {
write(message, stdout())
write("end", stdout())
flush(stdout())
}
# get config/context from Storm
handshake <- readMessage()
hs_obj <- fromJSON(handshake)
# send back pid of script and write pid to special file
handshake_response <- toJSON(list(pid=Sys.getpid()))
sendMessage(handshake_response)
write("", paste(hs_obj$pidDir, "/", Sys.getpid(), sep=""))
# read in tuples until script is terminated
repeat {
tuple <- readMessage()
tup_obj <- fromJSON(tuple)
id <- as.character(tup_obj$id)
stream <- as.character(tup_obj$stream)
task <- as.character(tup_obj$task)
# send a "log" command back to Storm
# message can be any string, `tuple` for debugging
logMessage <- toJSON(list(command="log",
msg=tuple))
sendMessage(logMessage)
# send "ack" command so job is completed
ackMessage <- toJSON(list(command="ack",
id=id))
sendMessage(ackMessage)
}
@punitjajo
Copy link

Thank you so much for posting this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment