Skip to content

Instantly share code, notes, and snippets.

@quintona
Created May 11, 2013 03:25
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quintona/5558787 to your computer and use it in GitHub Desktop.
Save quintona/5558787 to your computer and use it in GitHub Desktop.
A way of doing an outer join of 2 separate streams, in storm trident. Use an outer join reducer. Here is the code for the reducer and associated state. Simply use topology.multiReduce(s1, s2, function, outputFields).
package storm.cookbook.tfidf.functions;
import java.util.Map;
import storm.trident.operation.MultiReducer;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentMultiReducerContext;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;
public class OuterJoinReducer implements MultiReducer<OuterJoinState> {
private static final long serialVersionUID = 1L;
@Override
public void prepare(Map conf, TridentMultiReducerContext context) {
}
@Override
public OuterJoinState init(TridentCollector collector) {
return new OuterJoinState();
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(OuterJoinState state, int streamIndex,
TridentTuple input, TridentCollector collector) {
state.addValues(streamIndex, input);
}
@Override
public void complete(OuterJoinState state,
TridentCollector collector) {
for(Values vals : state.join()){
collector.emit(vals);
}
}
}
package storm.cookbook.tfidf.functions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;
public class OuterJoinState {
//2 columns, based on streamId. Each column contains the tuples from the given stream
private HashMap<Integer, List<Object[]>> bothSides = new HashMap<Integer,List<Object[]>>();
public void addValues(int streamId, TridentTuple input){
if(!bothSides.keySet().contains(streamId)){
if(bothSides.keySet().size() >= 2)
throw new IllegalArgumentException("Outer join can only be performed between 2 streams");
bothSides.put(streamId, new ArrayList<Object[]>());
}
bothSides.get(streamId).add(input.toArray());
}
//the shorter side is the LHS
private int getLHS(){
int len = Integer.MAX_VALUE;
int index = 0;
for(int id : bothSides.keySet()){
if(bothSides.get(id).size() < len){
len = bothSides.get(id).size();
index = id;
}
}
return index;
}
private int getRhs(int lhs){
for(int test : bothSides.keySet()){
if(test != lhs)
return test;
}
throw new IllegalArgumentException("Can't find RHS!");
}
public List<Values> join(){
List<Values> ret = new ArrayList<Values>();
try{
int lhsId = getLHS();
int rhsId = getRhs(lhsId);
for(Object[] lhs : bothSides.get(lhsId)){
for(Object[] rhs : bothSides.get(rhsId)){
Values vals = new Values(lhs);
vals.addAll(Arrays.asList(rhs));
ret.add(vals);
}
}
}catch(Exception e){}
return ret;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment