Skip to content

Instantly share code, notes, and snippets.

@codyaray
Created July 7, 2014 18:16
Show Gist options
  • Save codyaray/75533044fc8c0a12fa67 to your computer and use it in GitHub Desktop.
Save codyaray/75533044fc8c0a12fa67 to your computer and use it in GitHub Desktop.
Time-Series Event Aggregation with Storm
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
/**
* Converts the first tuple from a byte array into a string.
*/
public class BinaryToString extends BaseFunction {
private static final long serialVersionUID = -8686873770270590062L;
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String field = new String(tuple.getBinary(0));
collector.emit(new Values(field));
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import java.math.RoundingMode;
import backtype.storm.tuple.Values;
import com.google.common.math.DoubleMath;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
/**
* Maps a timestamp into a bucket of size {@code interval}.
*
* Assumes the first tuple value is a long timestamp.
* Outputs the {@code bucketStart} and {@code bucketEnd}.
*/
public class Bucket extends BaseFunction {
private static final long serialVersionUID = 1042081321412192768L;
private final long interval;
public Bucket(long interval) {
this.interval = interval;
}
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
long timestamp = tuple.getLong(0);
long bucketStart = DoubleMath.roundToLong(
Math.floor(timestamp / interval), RoundingMode.UNNECESSARY) * interval;
long bucketEnd = bucketStart + interval;
collector.emit(new Values(String.valueOf(bucketStart), String.valueOf(bucketEnd)));
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo;
import java.util.Arrays;
import java.util.List;
import com.google.common.base.Objects;
/**
* Represents a key in MongoDB. Provides a helper for
* parsing a Trident key into a Mongo key.
*
* @author codyaray
* @since 3/19/2014
*/
class MongoKey {
final String collection;
final long timestamp;
final String siteId;
final String fieldName;
public MongoKey(String collection, long timestamp, String siteId, String fieldName) {
this.collection = collection;
this.timestamp = timestamp;
this.siteId = siteId;
this.fieldName = fieldName;
}
public static MongoKey fromTrident(List<Object> key) {
String collection = key.get(0).toString();
long timestamp = Long.parseLong(key.get(1).toString());
String siteId = key.get(2).toString();
String fieldName = key.get(3).toString();
return new MongoKey(collection, timestamp, siteId, fieldName);
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("collection", collection)
.add("timestamp", timestamp)
.add("siteId", siteId)
.add("fieldName", fieldName)
.toString();
}
Object[] significantAttributes() {
return new Object[]{ collection, timestamp, siteId, fieldName };
}
@Override
public int hashCode() {
return Objects.hashCode(significantAttributes());
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
} else if (obj == this) {
return true;
} else if (!getClass().isAssignableFrom(obj.getClass())) {
return false;
}
return Arrays.equals(significantAttributes(), getClass().cast(obj).significantAttributes());
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo.serializer;
import com.google.common.reflect.TypeToken;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
/**
* Serialize a key-value pair to MongoDB without any transactional metadata.
*
* @author codyaray
* @since 3/19/2014
* @param <T> the type of the value
*/
public class MongoNonTransactionalSerializer<T> implements MongoSerializer<T> {
private static final long serialVersionUID = 1L;
@Override
public DBObject serialize(String fieldName, T val) {
return new BasicDBObject(fieldName, val);
}
@Override
public T deserialize(DBObject obj, String fieldName) {
return MongoSerializerHelper.parse(obj.get(fieldName), getTypeParameter());
}
@SuppressWarnings("serial")
private Class<? super T> getTypeParameter() {
return new TypeToken<T>(getClass()) { }.getRawType();
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo.serializer;
import com.google.common.reflect.TypeToken;
import com.mongodb.DBObject;
import storm.trident.state.OpaqueValue;
import storm.trident.state.TransactionalValue;
/**
* Serialize a key-value pair to MongoDB with opaque transactional metadata.
*
* @author codyaray
* @since 3/19/2014
* @param <T> the type of the value
*/
public class MongoOpaqueSerializer<T> implements MongoSerializer<OpaqueValue<T>> {
private static final long serialVersionUID = 1L;
private static final String PREV_PREFIX = "prev-";
private final MongoTransactionalSerializer<T> base;
public MongoOpaqueSerializer(MongoTransactionalSerializer<T> base) {
this.base = base;
}
@Override
public DBObject serialize(String fieldName, OpaqueValue<T> val) {
TransactionalValue<T> value = new TransactionalValue<T>(val.getCurrTxid(), val.getCurr());
DBObject obj = base.serialize(fieldName, value);
obj.put(PREV_PREFIX + fieldName, val.getPrev());
return obj;
}
@Override
public OpaqueValue<T> deserialize(DBObject obj, String fieldName) {
TransactionalValue<T> val = base.deserialize(obj, fieldName);
T prev = MongoSerializerHelper.parse(obj.get(PREV_PREFIX + fieldName), getTypeParameter());
return new OpaqueValue<T>(val.getTxid(), val.getVal(), prev);
}
@SuppressWarnings("serial")
private Class<? super T> getTypeParameter() {
return new TypeToken<T>(getClass()) { }.getRawType();
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo.serializer;
import java.io.Serializable;
import com.mongodb.DBObject;
/**
* Serialize a key-value pair to MongoDB.
*
* @author codyaray
* @since 3/19/2014
* @param <T> the type of the value
*/
public interface MongoSerializer<T> extends Serializable {
/**
* Returns a mongo {@link DBObject} for the key-value pair.
*
* @param key the key
* @param val the value
* @return the mongo {@link DBObject}
*/
DBObject serialize(String key, T val);
/**
* Returns the value corresponding to {@code key} in {@code obj}.
*
* @param obj the mongo {@link DBObject}
* @param key the key
* @return the value
*/
T deserialize(DBObject obj, String key);
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo.serializer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helpers for the {@link MongoSerializer}s.
*
* @author codyaray
* @since 3/19/2014
*/
class MongoSerializerHelper {
private static final Logger log = LoggerFactory.getLogger(MongoSerializerHelper.class);
/**
* Parse {@code object} as {@code type}. Requires {@code type} to have
* a constructor with a single String argument.
*
* @return {@code object} as a {@code type} or {@code null} if {@code object} is {@code null}
* or does not have a constructor which accepts a single String argument.
*/
static @Nullable <T> T parse(@Nullable Object object, Class<? super T> type) {
if (object == null) {
return null;
}
try {
@SuppressWarnings("unchecked")
T instance = (T) type.getConstructor(String.class).newInstance(object.toString());
return instance;
} catch (Exception e) {
log.warn("Can't parse Object {} as type {}: {}", new Object[]{ object, type, e });
}
return null;
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.brighttag.storm.mongo.serializer.MongoSerializer;
import storm.trident.state.map.IBackingMap;
/**
* A one-off {@link MongoState} which seeks to reproduce the existing Mongo schema
* written by statscollector and read by stathub.
*
* @author codyaray
* @since 3/19/2014
*/
public class MongoState<T> implements IBackingMap<T> {
private final DB db;
private final MongoSerializer<T> serializer;
public MongoState(DB db, MongoSerializer<T> serializer) {
this.db = db;
this.serializer = serializer;
}
/**
* Fetch multiple metrics from MongoDB.
*
* @param keys list of metric key fields in this order:
* (collection_name [string], start_date [long], site_id [string], field_name [string])
* @return list of values corresponding to the {@code key} with the same index in {@code keys}
* or {@code null} for each key without a value
*/
@Override
public List<T> multiGet(List<List<Object>> keys) {
List<T> values = Lists.newArrayList(Collections.<T>nCopies(keys.size(), null));
Map<MongoKey, Map<String, Integer>> siteBuckets = new IdentityGenerator().toSiteBuckets(keys);
for (Map.Entry<MongoKey, Map<String, Integer>> siteBucketFields : siteBuckets.entrySet()) {
MongoKey key = siteBucketFields.getKey();
DBCollection collection = db.getCollection(key.collection);
DBObject obj = collection.findOne(createQuery(key.timestamp, key.siteId));
for (Map.Entry<String, Integer> field : siteBucketFields.getValue().entrySet()) {
if (obj != null) {
values.set(field.getValue(), serializer.deserialize(obj, field.getKey()));
}
}
}
return values;
}
/**
* Store multiple metrics into MongoDB. This save is idempotent; any data for
* {@code keys} that already exists is overwritten with the new {@code vals}.
*
* @param keys list of metric key fields in this order:
* (collection_name [string], start_date [long], site_id [string], field_name [string])
*/
@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
Map<MongoKey, Map<String, T>> siteBuckets = new ValueFromListGenerator<T>(vals).toSiteBuckets(keys);
for (Map.Entry<MongoKey, Map<String, T>> entry : siteBuckets.entrySet()) {
MongoKey key = entry.getKey();
DBCollection collection = db.getCollection(key.collection);
collection.update(createQuery(key.timestamp, key.siteId),
createUpdateObject(key.timestamp, key.siteId, entry.getValue()), true, false);
}
}
private DBObject createUpdateObject(long timestamp, String siteId, Map<String, T> fields) {
DBObject upsert = createQuery(timestamp, siteId);
for (Map.Entry<String, T> entry : fields.entrySet()) {
upsert.putAll(serializer.serialize(entry.getKey(), entry.getValue()));
}
return new BasicDBObject("$set", upsert);
}
private static DBObject createQuery(long timeslot, String siteId) {
DBObject query = new BasicDBObject();
query.put("timeStamp", timeslot);
query.put("siteId", siteId);
return query;
}
/**
* Nasty hack to workaround the lack of callbacks/generators/blocks in Java.
* We just need a way to provide a function to {@link #toSiteBuckets(List)}
* that accepts the current index and returns the content for that field.
*
* @param <T> the type of the field value
*/
private abstract static class Generator<T> {
/**
* Transforms a list of metric key tuples into a map of site-buckets to fields,
* where each field can also have its own value. The field value is provided
* by the generator function {@link #getContent(int)}.
*
* @param keys list of metric key fields in this order:
* (collection_name [string], start_date [long], site_id [string], field_name [string])
* @return the map of site-buckets to fields
*/
public Map<MongoKey, Map<String, T>> toSiteBuckets(List<List<Object>> keys) {
// (collection, start_date, site_id) => {field_name => field_value}
Map<MongoKey, Map<String, T>> siteBuckets = Maps.newHashMap();
for (int i = 0; i < keys.size(); i++) {
MongoKey key = MongoKey.fromTrident(keys.get(i));
MongoKey siteBucket = new MongoKey(key.collection, key.timestamp, key.siteId, null);
Map<String, T> fields = siteBuckets.containsKey(siteBucket) ?
siteBuckets.get(siteBucket) : Maps.<String, T>newHashMap();
fields.put(key.fieldName, getContent(i));
siteBuckets.put(siteBucket, fields);
}
return siteBuckets;
}
/**
* The callback/generator function/block itself.
*/
protected abstract T getContent(int i);
}
/**
* Generator that returns the content at index {@code i} from a list.
*
* @param <T> the type of the field value
*/
private static class ValueFromListGenerator<T> extends Generator<T> {
private final List<T> values;
ValueFromListGenerator(@Nullable List<T> values) {
this.values = values;
}
@Override
protected T getContent(int i) {
return values.get(i);
}
}
/**
* Generator that returns the index itself.
*/
private static class IdentityGenerator extends Generator<Integer> {
@Override
protected Integer getContent(int i) {
return i;
}
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo;
import java.net.UnknownHostException;
import java.util.Map;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.brighttag.storm.mongo.serializer.MongoNonTransactionalSerializer;
import com.brighttag.storm.mongo.serializer.MongoOpaqueSerializer;
import com.brighttag.storm.mongo.serializer.MongoSerializer;
import com.brighttag.storm.mongo.serializer.MongoTransactionalSerializer;
import storm.trident.state.OpaqueValue;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import storm.trident.state.StateType;
import storm.trident.state.map.CachedMap;
import storm.trident.state.map.IBackingMap;
import storm.trident.state.map.MapState;
import storm.trident.state.map.NonTransactionalMap;
import storm.trident.state.map.OpaqueMap;
import storm.trident.state.map.SnapshottableMap;
import storm.trident.state.map.TransactionalMap;
/**
* Factory for creating {@link MongoState}s.
*
* @author codyaray
* @since 3/19/2014
* @param <T> the type of value (wrapped with transaction metadata, if appropriate).
*/
@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
public class MongoStateFactory<T> implements StateFactory {
private static final long serialVersionUID = 80291612465975321L;
private final StateType stateType;
private final String host;
private final String dbName;
private final MongoStateOptions<T> options;
private final MongoSerializer<T> serializer;
private static final Map<StateType, MongoSerializer> DEFAULT_SERIALIZERS;
static {
MongoNonTransactionalSerializer<Integer> nonTransactional = new MongoNonTransactionalSerializer<Integer>() {};
MongoTransactionalSerializer<Integer> transactional = new MongoTransactionalSerializer<Integer>(nonTransactional) {};
MongoOpaqueSerializer<Integer> opaque = new MongoOpaqueSerializer<Integer>(transactional) {};
DEFAULT_SERIALIZERS = ImmutableMap.<StateType, MongoSerializer>of(
StateType.NON_TRANSACTIONAL, nonTransactional,
StateType.TRANSACTIONAL, transactional,
StateType.OPAQUE, opaque);
}
/**
* Return an opaque {@link MongoStateFactory}.
*/
public static <T> StateFactory opaque(String host, String dbName) {
return opaque(host, dbName, new MongoStateOptions<OpaqueValue<T>>());
}
/**
* Return an opaque {@link MongoStateFactory} with the given {@code options}.
*/
public static <T> StateFactory opaque(String host, String dbName, MongoStateOptions<OpaqueValue<T>> options) {
return new MongoStateFactory<OpaqueValue<T>>(StateType.OPAQUE, host, dbName, options);
}
public MongoStateFactory(StateType stateType, String host, String dbName, MongoStateOptions<T> options) {
this.stateType = stateType;
this.host = host;
this.dbName = dbName;
this.options = options;
this.serializer = Objects.firstNonNull(options.serializer, DEFAULT_SERIALIZERS.get(stateType));
}
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
try {
MongoState<T> state = new MongoState<T>(new Mongo(host).getDB(dbName), serializer);
CachedMap<T> cachedMap = new CachedMap<T>(state, options.localCacheSize);
MapState<T> mapState = buildMapState(cachedMap);
return new SnapshottableMap<T>(mapState, new Values(options.globalKey));
} catch (UnknownHostException e) {
throw Throwables.propagate(e);
} catch (MongoException e) {
throw Throwables.propagate(e);
}
}
private MapState<T> buildMapState(IBackingMap cachedMap) {
switch (stateType) {
case NON_TRANSACTIONAL:
return NonTransactionalMap.build(cachedMap);
case OPAQUE:
return OpaqueMap.build(cachedMap);
case TRANSACTIONAL:
return TransactionalMap.build(cachedMap);
default:
throw new RuntimeException("Unknown state type: " + stateType);
}
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo;
import java.io.Serializable;
import com.brighttag.storm.mongo.serializer.MongoSerializer;
/**
* Various options for configuring the {@link MongoState}.
*
* @author codyaray
* @since 3/19/2014
* @param <T> the type of value (wrapped with transaction metadata, if appropriate).
*/
public class MongoStateOptions<T> implements Serializable {
private static final long serialVersionUID = 3936747562105453714L;
public int localCacheSize = 10000;
public String globalKey = "$__GLOBAL_KEY__$";
public MongoSerializer<T> serializer;
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.mongo.serializer;
import com.mongodb.DBObject;
import storm.trident.state.TransactionalValue;
/**
* Serialize a key-value pair to MongoDB with transactional metadata.
*
* @author codyaray
* @since 3/19/2014
* @param <T> the type of the value
*/
public class MongoTransactionalSerializer<T> implements MongoSerializer<TransactionalValue<T>> {
private static final long serialVersionUID = 1L;
private static final String TXID_PREFIX = "txid-";
private final MongoNonTransactionalSerializer<T> base;
public MongoTransactionalSerializer(MongoNonTransactionalSerializer<T> base) {
this.base = base;
}
@Override
public DBObject serialize(String fieldName, TransactionalValue<T> val) {
DBObject obj = base.serialize(fieldName, val.getVal());
obj.put(TXID_PREFIX + fieldName, val.getTxid());
return obj;
}
@Override
public TransactionalValue<T> deserialize(DBObject obj, String fieldName) {
T val = base.deserialize(obj, fieldName);
Object txidObj = obj.get(TXID_PREFIX + fieldName);
long txid = (txidObj == null) ? 0 : Long.parseLong(txidObj.toString());
return new TransactionalValue<T>(txid, val);
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.shard;
import java.util.List;
import java.util.Map;
import java.util.Random;
import com.google.common.collect.Lists;
import com.brighttag.storm.mongo.MongoStateFactory;
import backtype.storm.task.IMetricsContext;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import storm.trident.state.map.MapState;
import storm.trident.state.map.ReadOnlyMapState;
/**
* Randomly shards batches between delegate {@link State state}s.
*
* @author codyaray
* @since 4/23/2014
*/
public class RandomShardState implements MapState, State {
private final Random rand = new Random();
private final List<State> delegates;
// Set and unset in beginCommit/commit
private State shard;
private RandomShardState(List<State> delegates) {
this.delegates = delegates;
}
@Override
public void beginCommit(Long txid) {
shard = delegates.get(rand.nextInt(delegates.size()));
shard.beginCommit(txid);
}
@Override
public void commit(Long txid) {
shard.commit(txid);
shard = null;
}
@Override
public List multiGet(List keys) {
return ((ReadOnlyMapState) shard).multiGet(keys);
}
@Override
public List multiUpdate(List keys, List vals) {
return ((MapState) shard).multiUpdate(keys, vals);
}
@Override
public void multiPut(List keys, List vals) {
((MapState) shard).multiPut(keys, vals);
}
/**
* Factory to create {@link RandomShardState}s.
*/
public static class Factory implements StateFactory {
private static final long serialVersionUID = -6289401502963920055L;
private final List<StateFactory> delegates;
public Factory(List<StateFactory> delegates) {
this.delegates = delegates;
}
@Override
@SuppressWarnings({ "rawtypes" })
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
List<State> states = Lists.newArrayListWithCapacity(delegates.size());
for (StateFactory factory : delegates) {
states.add(factory.makeState(conf, metrics, partitionIndex, numPartitions));
}
return new RandomShardState(states);
}
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Computes the parallelism for a particular topology and machine configuration.
*
* @author codyaray
* @since 4/21/2014
*/
public class StormParallelism {
private final int numberHosts;
private final int numberWorkersPerHost;
private final int numberWorkers;
private final int numberKafkaPartitions;
private final int numberKafkaPartitionsPerSpout;
private final int spoutParallelism;
private final int numberExecutorsPerCore;
private final int numberCoresPerHost;
private final int numberExecutorsPerHost;
private final int numberExecutorsPerWorker;
private final int transformParallelism;
private final int persistenceParallelism;
private StormParallelism(Builder builder) {
// Number of workers should be a multiple of number of machines
this.numberHosts = builder.numberHosts;
this.numberWorkersPerHost = builder.numberWorkersPerHost;
this.numberWorkers = numberHosts * numberWorkersPerHost;
// Number of partitions should be a multiple of spout parallelism
this.numberKafkaPartitions = builder.numberKafkaPartitions;
this.numberKafkaPartitionsPerSpout = builder.numberKafkaPartitionsPerSpout;
this.spoutParallelism = numberKafkaPartitions / numberKafkaPartitionsPerSpout;
// Parallelism should be a multiple of number of workers
this.numberExecutorsPerCore = builder.numberExecutorsPerCore;
this.numberCoresPerHost = builder.numberCoresPerHost;
this.numberExecutorsPerHost = numberExecutorsPerCore * numberCoresPerHost;
this.numberExecutorsPerWorker = numberExecutorsPerHost / numberWorkersPerHost;
this.transformParallelism = numberExecutorsPerWorker * numberWorkers;
// Reduce parallelism in persistence for best cache efficiency and lowest bulk-request overhead
this.persistenceParallelism = numberWorkers;
}
public int getNumberWorkers() {
return numberWorkers;
}
public int forSpoutLayer() {
return spoutParallelism;
}
public int forTransformLayer() {
return transformParallelism;
}
public int forPersistenceLayer() {
return persistenceParallelism;
}
public static Builder builder() {
return new Builder();
}
/**
* Builder of {@link StormParallelism} configurations.
*/
public static class Builder {
private int numberHosts;
private int numberWorkersPerHost = 1;
private int numberKafkaPartitions;
private int numberKafkaPartitionsPerSpout = 1;
private int numberCoresPerHost;
private int numberExecutorsPerCore = 1;
public Builder numberHosts(int numberHosts) {
this.numberHosts = numberHosts;
return this;
}
public Builder numberWorkersPerHost(int numberWorkersPerHost) {
this.numberWorkersPerHost = numberWorkersPerHost;
return this;
}
public Builder numberKafkaPartitions(int numberKafkaPartitions) {
this.numberKafkaPartitions = numberKafkaPartitions;
return this;
}
public Builder numberKafkaPartitionsPerSpout(int numberKafkaPartitionsPerSpout) {
this.numberKafkaPartitionsPerSpout = numberKafkaPartitionsPerSpout;
return this;
}
public Builder numberCoresPerHost(int numberCoresPerHost) {
this.numberCoresPerHost = numberCoresPerHost;
return this;
}
public Builder numberExecutorsPerCore(int numberExecutorsPerCore) {
this.numberExecutorsPerCore = numberExecutorsPerCore;
return this;
}
public StormParallelism build() {
checkArgument(numberHosts > 0);
checkArgument(numberKafkaPartitions > 0);
checkArgument(numberCoresPerHost > 0);
return new StormParallelism(this);
}
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import javax.annotation.Nullable;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.utils.Utils;
/**
* Helpers to run topologies in Storm.
*
* @author codyaray
* @since 3/26/2014
*/
public class StormRunner {
private static final int MILLIS_IN_SEC = 1000;
private StormRunner() { /* No instances */ }
/**
* Runs a N-worker topology in a cluster or locally if {@code topologyName} is {@code null}.
*
* @param topology the storm topology
* @param numWorkers the number of workers for this topology or {@code maxTaxParallelism} in local mode
* @param topologyName the topology name or {@code null} to run in local mode
*/
public static void runTopology(StormTopology topology, int numWorkers, @Nullable String topologyName)
throws AlreadyAliveException, InvalidTopologyException {
Config conf = new Config();
if (topologyName != null) {
conf.setNumWorkers(numWorkers);
} else {
conf.setMaxTaskParallelism(numWorkers);
}
runTopology(topology, conf, topologyName);
}
/**
* Runs a topology in a cluster or locally if {@code topologyName} is {@code null}.
*
* @param topology the storm topology
* @param topologyName the topology name or {@code null} to run in local mode
* @param conf the topology configuration
*/
public static void runTopology(StormTopology topology, Config conf, @Nullable String topologyName)
throws AlreadyAliveException, InvalidTopologyException {
if (topologyName != null) {
StormSubmitter.submitTopology(topologyName, conf, topology);
} else {
runTopologyLocally(topology, "stormseries", conf, 10000);
}
}
/**
* Runs a topology in local mode.
*
* @param topology the storm topology
* @param topologyName the topology name
* @param conf the topology configuration
* @param runtimeInSeconds how long to run the topology before killing it
*/
public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, long runtimeInSeconds) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology);
Utils.sleep(runtimeInSeconds * MILLIS_IN_SEC);
cluster.killTopology(topologyName);
cluster.shutdown();
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
/**
* Formats a given string from the incoming fields.
*/
public class StringFormatter extends BaseFunction {
private static final long serialVersionUID = 9223269171323302719L;
private final String format;
public StringFormatter(String format) {
this.format = format;
}
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
Object[] formatArgs = new String[tuple.size()];
for (int i=0; i<tuple.size(); i++) {
formatArgs[i] = tuple.getString(i);
}
collector.emit(new Values(String.format(format, formatArgs)));
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.apps;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import storm.kafka.ZkHosts;
import storm.kafka.trident.OpaqueTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Sum;
import storm.trident.state.StateFactory;
import com.brighttag.storm.shard.RandomShardState;
import com.brighttag.storm.json.TagRequestMetricJsonParser;
import com.brighttag.storm.mongo.MongoStateFactory;
import com.brighttag.storm.utils.BinaryToString;
import com.brighttag.storm.utils.Bucket;
import com.brighttag.storm.utils.StormParallelism;
import com.brighttag.storm.utils.StormRunner;
import com.brighttag.storm.utils.StringFormatter;
/**
* Storm topology which ingests metrics as {@link TagRequestMetrics} JSON from Kafka,
* aggregates them into buckets, and stores the aggregated results in MongoDB.
*
* This is a transition topology designed to pre-aggregate data for reducing write load on Mongo.
* Its designed to be a turn-key replacement for current statscollectors, writing data
* in the same format (but with additional txid-* and prev-* fields).
*
* @author codyaray
* @since 3/18/2014
*/
public class TagRequestMongoBucketizer {
private static final Map<String, Long> BUCKETS = ImmutableMap.of("30s", 30000L, "30m", 1800000L);
private static final int NUMBER_KAFKA_PARTITIONS = 8;
private static final int NUMBER_STORM_HOSTS = 3;
private static final int NUMBER_STORM_CORES_PER_HOST = 2;
private static final int NUMBER_STORM_EXECUTORS_PER_CORE = 2;
/**
* Builds the Storm topology.
*/
private static StormTopology buildTopology(StormParallelism parallelism, String zookeepers,
String kafkaTopic, List<String> mongoHosts, String mongoDatabase, String mongoCollectionPrefix)
throws UnknownHostException {
// Input Spout
TridentKafkaConfig config = new TridentKafkaConfig(new ZkHosts(zookeepers), kafkaTopic);
config.fetchSizeBytes = 5 * (1024*1024); // 5 MB
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(config);
// Output State
List<StateFactory> factories = Lists.newArrayListWithCapacity(mongoHosts.size());
for (String host : mongoHosts) {
factories.add(MongoStateFactory.opaque(host, mongoDatabase));
}
StateFactory stateFactory = new RandomShardState.Factory(factories);
// Aggregation Topology
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout2", spout)
.parallelismHint(parallelism.forSpoutLayer()).shuffle().name("Transform")
.each(new Fields("bytes"), new BinaryToString(), new Fields("string"))
.each(new Fields("string"), new TagRequestMetricJsonParser(), new Fields("timestamp", "siteId", "field", "value"));
for (Map.Entry<String, Long> entry : BUCKETS.entrySet()) {
stream.each(new Fields("timestamp"), new Bucket(entry.getValue()), new Fields("bucketStart", "bucketEnd"))
.each(new StringFormatter(mongoCollectionPrefix + entry.getKey()), new Fields("collection"))
.parallelismHint(parallelism.forTransformLayer())
.groupBy(new Fields("collection", "bucketStart", "siteId", "field"))
.name("Aggregator-" + entry.getKey())
.persistentAggregate(stateFactory, new Fields("value"), new Sum(), new Fields("count"))
.parallelismHint(parallelism.forPersistenceLayer());
}
return topology.build();
}
private static final Splitter HOST_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
private static List<String> split(String str) {
return ImmutableList.copyOf(HOST_SPLITTER.split(str));
}
/**
* Runs the topology locally or in a cluster if a [cluster-topology-name] is given.
*/
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, UnknownHostException {
if (args.length < 5) {
System.out.println("Usage: " + TagRequestMongoBucketizer.class.getSimpleName() + " <zoo-hosts> <kafka-topic> <mongo-host> <mongo-database> <mongo-collection-prefix> [cluster-topology-name]");
return;
}
StormParallelism parallelism = StormParallelism.builder()
.numberKafkaPartitions(NUMBER_KAFKA_PARTITIONS)
.numberHosts(NUMBER_STORM_HOSTS)
.numberCoresPerHost(NUMBER_STORM_CORES_PER_HOST)
.numberExecutorsPerCore(NUMBER_STORM_EXECUTORS_PER_CORE)
.build();
StormTopology stormTopology = buildTopology(parallelism, args[0], args[1], split(args[2]), args[3], args[4]);
StormRunner.runTopology(stormTopology, parallelism.getNumberWorkers(), args.length < 6 ? null : args[5]);
}
}
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.json;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.tuple.Values;
import com.onetag.metrics.TagRequestMetrics;
import com.onetag.metrics.json.JSONTagRequestMetricsMarshaller;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
/**
* Parses the first tuple as a TagRequestMetric JSON string and emits key-value tuple
* as per the current tagserve/statscollector logic.
*/
public class TagRequestMetricJsonParser extends BaseFunction {
private static final long serialVersionUID = 7592816813615529588L;
private static final Logger log = LoggerFactory.getLogger(TagRequestMetricJsonParser.class);
private static final String TAG_PREFIX = "tag";
private static final String PAGE_PREFIX = "page";
private static JSONTagRequestMetricsMarshaller MARSHALLER = new JSONTagRequestMetricsMarshaller();
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
try {
TagRequestMetrics metrics = MARSHALLER.unmarshal(tuple.getString(0));
String siteId = metrics.getSiteId();
long timestamp = metrics.getRequestInterval().getStartMillis();
if (!metrics.isSecondaryRequest()) {
collector.emit(new Values(timestamp, siteId, "siteCount", 1));
collector.emit(new Values(timestamp, siteId, metrics.getSource().lowerCamelName(), 1));
for (Long pageId : metrics.getPageIds()) {
collector.emit(new Values(timestamp, siteId, PAGE_PREFIX + pageId, 1));
}
}
for (Long tagId : metrics.getTagFires().keySet()) {
collector.emit(new Values(timestamp, siteId, TAG_PREFIX + tagId, 1));
}
} catch (Exception e) {
log.info("Problem parsing JSON {}: {}", tuple.getString(0), e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment