Skip to content

Instantly share code, notes, and snippets.

@kaeawc
Last active December 21, 2015 18:29
Show Gist options
  • Save kaeawc/6348042 to your computer and use it in GitHub Desktop.
Save kaeawc/6348042 to your computer and use it in GitHub Desktop.
Notes from Storm + Kafka meetup on 08/26/2013

Storm Overview

What are the parts?

A Spout takes some input and decides whether or not that input is worth passing on based on some logic.

Bolts do some intensive operation on a given data, which might be composed together.

creating a spout means just extending BasicSpout

Some Java examples


  Random random

  @Override
  public void declaredOutputFields(declrarer) {
    declarer.declare(new Fields(""))
  }
HashTagCountTopology {
  public static void main() {
    builder = new builder

    builder.setspout("", new spoutthing())
    builder.setspout("", new spoutthing())
    builder.setspout("", new spoutthing())
    builder.setspout("", new spoutthing())

    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(2);
    builder.setBolt("green", new GreenBolt(), 4)
      .setNumTask(8)
      .shuffleGrouping("blue-spout");

    cluster = new LocalCluster();
    cluster.submitTopology("word-count", conf, builder.start());

  }
}

Scala Examples

package spouts

import backtype.storm.spout.{SpoutOutputCollector, ISpout}
import backtype.storm.task.TopologyContext
import java.util.{Map => JavaMap}

class TestSpout extends ISpout {

  def open(conf: JavaMap[_, _], context: TopologyContext, collector: SpoutOutputCollector) {}
  def close {}
  def activate {}
  def deactivate {}
  def nextTuple {}
  def ack(msgId: AnyRef) {}
  def fail(msgId: AnyRef) {}

}
package bolts

import storm.trident._
import backtype.storm.generated.Bolt
import backtype.storm.topology.{OutputFieldsDeclarer, BasicOutputCollector, IBasicBolt}
import java.util.{Map => JavaMap}
import backtype.storm.task.TopologyContext
import backtype.storm.tuple.Tuple
import scala.language.implicitConversions

class TestBolt extends IBasicBolt {

  def prepare(stormConf: JavaMap[_,_], context: TopologyContext) {}

  def execute(input: Tuple, collector: BasicOutputCollector) {}

  def cleanup() {}

  def declareOutputFields(declarer: OutputFieldsDeclarer) {}

  def getComponentConfiguration:Map[String,AnyRef] = Map[String,AnyRef]()


}


Guaranteed Message Processing "Tuple Tree"

you have the ability to replay in a spout

public void nextTuple() {
  
  UUID msg = UUID.randomUUID();
  String tweet = tweets[random.asdfasdfs]

}
public void execute() {
  setendning

  get string from tuple

  for every word in th sentence do something useful

}

and to finish processing:

public void ack(Object msg) {
  //success
}

public void fail(Object msg) {
  //fail, try again?
}

And now for Outbrain

4,000 pages per second traffic --

custom distributed processing system

python and zmq

advantages simple but not scalable

kafka and storm

someone suggested flume... but nah we got up and running way faster with storm without JVM knowledge.

Kafka is worth a whole presentation on its own. Its designe to handle high throughput messaging.

Why Kafka?

method to buffer lcicks into a stream

kafka and storm is a common pattern for click tracking

Why Storm?

15 latency requirements, close to real time

fault tolerance

easy to manage parallelism

stream grouping

active community

open source

Architecture

Clicks from customer traffic

=== AWS ===

ELB

Nginx EC2

Kafka EC2 Cluster

Storm Topology on EC2

splits to MongoDB and other

Kafka Cluster

40 producers from the Nginx (8 m1.large instnaces, python brod

4 brokers (4 m1.large)

10k clicks per second at peak

14 billion clicks per month

Kafka v0.7.2

Storm Topology

40 superviewers

46 bolts

1 kafka spout

250 executors in worker threads

160k tuples executed per second

Storm v0.82 Leiningen v1.7 (clojure library, takes away pain of package management)

Rough overview

Kafka spout -> aggregates every 15s -> pipes to the following 4 streams: customer postion -> front page aggregate 5m -> arrangement social -> @handle

Challenges

Took about 4 months from start to research to dev to production.

Shell Bolts - because we're mainly a Python shop we have to use shell bolts to wrap around our python code and have that execute on the JVM as a bolt.

Anchor Bolts / Replaying Stream -

Acking Tuples

Monitoring

Monitoring

Scribe Logging

Munin + Nagios

JMX + JMXTrans + Ganglia

Storm UI

Thrift

Future

Load testing

Break topology into smaller pieces

Move from AWS to private dat a center (dude its expensive!)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment