Skip to content

Instantly share code, notes, and snippets.

@jimbaker
Last active December 17, 2015 20:19
Show Gist options
  • Save jimbaker/5666634 to your computer and use it in GitHub Desktop.
Save jimbaker/5666634 to your computer and use it in GitHub Desktop.
Storm code in Jython
import time
from backtype.storm import Config, LocalCluster
from backtype.storm.tuple import Fields, Values
from backtype.storm.topology import TopologyBuilder
from plumbing import ExclamationBolt, WordSpout
def get_topology_builder():
builder = TopologyBuilder();
builder.setSpout("words", WordSpout(), 4)
builder.setBolt("exclaim1", ExclamationBolt(), 2).shuffleGrouping("words")
builder.setBolt("exclaim2", ExclamationBolt(), 2).shuffleGrouping("exclaim1")
return builder
def main():
conf = Config()
conf.setDebug(True)
conf.setNumWorkers(2)
cluster = LocalCluster()
builder = get_topology_builder()
cluster.submitTopology("test", conf, builder.createTopology())
time.sleep(10)
cluster.killTopology("test")
cluster.shutdown()
if __name__ == "__main__":
main()
import random
import time
from backtype.storm.topology.base import BaseRichBolt, BaseRichSpout
from backtype.storm.tuple import Fields, Values
words = ["alice", "bob", "charles", "dana", "edward", "frances"]
class WordSpout(BaseRichSpout):
def open(self, conf, context, collector):
self._collector = collector
def nextTuple(self):
time.sleep(0.001)
self._collector.emit(Values([random.choice(words)]))
def declareOutputFields(self, declarer):
declarer.declare(Fields(["word"]))
class ExclamationBolt(BaseRichBolt):
def prepare(self, conf, context, collector):
self._collector = collector
def execute(self, t):
self._collector.emit(t, Values([t.getString(0) + " - nahhhh!!!"]))
self._collector.ack(t)
def declareOutputFields(self, declarer):
declarer.declare(Fields(["word"]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment