Skip to content

Instantly share code, notes, and snippets.

@codyaray
Created May 15, 2014 15:31
Show Gist options
  • Save codyaray/d58c1aaf688f27b72fdd to your computer and use it in GitHub Desktop.
Save codyaray/d58c1aaf688f27b72fdd to your computer and use it in GitHub Desktop.
Best practice to randomly shard data between multiple TridentStates
List<StateFactory> factories = Lists.newArrayListWithCapacity(mongoHosts.size());
for (String host : mongoHosts) {
factories.add(MongoStateFactory.opaque(host, mongoDatabase));
}
StateFactory stateFactory = new RandomShardState.Factory(factories);
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.shard;
import java.util.List;
import java.util.Map;
import java.util.Random;
import backtype.storm.task.IMetricsContext;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import storm.trident.state.map.MapState;
import storm.trident.state.map.ReadOnlyMapState;
import com.google.common.collect.Lists;
/**
* Randomly shards batches between delegate {@link State state}s.
*
* @author codyaray
* @since 4/23/2014
*/
public class RandomShardState implements MapState, State {
private final Random rand = new Random();
private final List<State> delegates;
// Set and unset in beginCommit/commit
private State shard;
private RandomShardState(List<State> delegates) {
this.delegates = delegates;
}
@Override
public void beginCommit(Long txid) {
shard = delegates.get(rand.nextInt(delegates.size()));
shard.beginCommit(txid);
}
@Override
public void commit(Long txid) {
shard.commit(txid);
shard = null;
}
@Override
public List multiGet(List keys) {
return ((ReadOnlyMapState) shard).multiGet(keys);
}
@Override
public List multiUpdate(List keys, List vals) {
return ((MapState) shard).multiUpdate(keys, vals);
}
@Override
public void multiPut(List keys, List vals) {
((MapState) shard).multiPut(keys, vals);
}
/**
* Factory to create {@link RandomShardState}s.
*/
public static class Factory implements StateFactory {
private static final long serialVersionUID = -6289401502963920055L;
private final List<StateFactory> delegates;
public Factory(List<StateFactory> delegates) {
this.delegates = delegates;
}
@Override
@SuppressWarnings({ "rawtypes" })
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
List<State> states = Lists.newArrayListWithCapacity(delegates.size());
for (StateFactory factory : delegates) {
states.add(factory.makeState(conf, metrics, partitionIndex, numPartitions));
}
return new RandomShardState(states);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment