A Spout takes some input and decides whether or not that input is worth passing on based on some logic.
Bolts do some intensive operation on a given data, which might be composed together.
creating a spout means just extending BasicSpout
Some Java examples
Random random
@Override
public void declaredOutputFields(declrarer) {
declarer.declare(new Fields(""))
}
HashTagCountTopology {
public static void main() {
builder = new builder
builder.setspout("", new spoutthing())
builder.setspout("", new spoutthing())
builder.setspout("", new spoutthing())
builder.setspout("", new spoutthing())
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
builder.setBolt("green", new GreenBolt(), 4)
.setNumTask(8)
.shuffleGrouping("blue-spout");
cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.start());
}
}
Scala Examples
package spouts
import backtype.storm.spout.{SpoutOutputCollector, ISpout}
import backtype.storm.task.TopologyContext
import java.util.{Map => JavaMap}
class TestSpout extends ISpout {
def open(conf: JavaMap[_, _], context: TopologyContext, collector: SpoutOutputCollector) {}
def close {}
def activate {}
def deactivate {}
def nextTuple {}
def ack(msgId: AnyRef) {}
def fail(msgId: AnyRef) {}
}
package bolts
import storm.trident._
import backtype.storm.generated.Bolt
import backtype.storm.topology.{OutputFieldsDeclarer, BasicOutputCollector, IBasicBolt}
import java.util.{Map => JavaMap}
import backtype.storm.task.TopologyContext
import backtype.storm.tuple.Tuple
import scala.language.implicitConversions
class TestBolt extends IBasicBolt {
def prepare(stormConf: JavaMap[_,_], context: TopologyContext) {}
def execute(input: Tuple, collector: BasicOutputCollector) {}
def cleanup() {}
def declareOutputFields(declarer: OutputFieldsDeclarer) {}
def getComponentConfiguration:Map[String,AnyRef] = Map[String,AnyRef]()
}
you have the ability to replay in a spout
public void nextTuple() {
UUID msg = UUID.randomUUID();
String tweet = tweets[random.asdfasdfs]
}
public void execute() {
setendning
get string from tuple
for every word in th sentence do something useful
}
and to finish processing:
public void ack(Object msg) {
//success
}
public void fail(Object msg) {
//fail, try again?
}
4,000 pages per second traffic --
custom distributed processing system
python and zmq
advantages simple but not scalable
kafka and storm
someone suggested flume... but nah we got up and running way faster with storm without JVM knowledge.
Kafka is worth a whole presentation on its own. Its designe to handle high throughput messaging.
method to buffer lcicks into a stream
kafka and storm is a common pattern for click tracking
15 latency requirements, close to real time
fault tolerance
easy to manage parallelism
stream grouping
active community
open source
Clicks from customer traffic
=== AWS ===
ELB
Nginx EC2
Kafka EC2 Cluster
Storm Topology on EC2
splits to MongoDB and other
40 producers from the Nginx (8 m1.large instnaces, python brod
4 brokers (4 m1.large)
10k clicks per second at peak
14 billion clicks per month
Kafka v0.7.2
40 superviewers
46 bolts
1 kafka spout
250 executors in worker threads
160k tuples executed per second
Storm v0.82 Leiningen v1.7 (clojure library, takes away pain of package management)
Kafka spout -> aggregates every 15s -> pipes to the following 4 streams: customer postion -> front page aggregate 5m -> arrangement social -> @handle
Took about 4 months from start to research to dev to production.
Shell Bolts - because we're mainly a Python shop we have to use shell bolts to wrap around our python code and have that execute on the JVM as a bolt.
Anchor Bolts / Replaying Stream -
Acking Tuples
Monitoring
Scribe Logging
Munin + Nagios
JMX + JMXTrans + Ganglia
Storm UI
Thrift
Load testing
Break topology into smaller pieces
Move from AWS to private dat a center (dude its expensive!)