Created
August 6, 2010 20:20
-
-
Save nathanmarz/511938 to your computer and use it in GitHub Desktop.
Random number in Cascalog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cascading.flow.FlowProcess; | |
import cascading.flow.hadoop.HadoopFlowProcess; | |
import cascading.operation.FunctionCall; | |
import cascading.operation.OperationCall; | |
import cascading.tuple.Tuple; | |
import java.util.Random; | |
import cascalog.CascalogFunction; | |
public class RandInt extends CascalogFunction { | |
long _seed; | |
Random _rand; | |
Integer _max; | |
public RandInt() { | |
_seed = new Random().nextLong(); | |
_max = null; | |
} | |
public RandInt(int max) { | |
this(); | |
_max = max; | |
} | |
public void prepare(FlowProcess flowProcess, OperationCall operationCall) { | |
_rand = new Random(_seed + ((HadoopFlowProcess) flowProcess).getCurrentTaskNum()); | |
} | |
public void operate(FlowProcess flow_process, FunctionCall fn_call) { | |
int rand; | |
if(_max==null) rand = _rand.nextInt(); | |
else rand = _rand.nextInt(_max); | |
fn_call.getOutputCollector().add(new Tuple(rand)); | |
} | |
} |
Oh.. cool. I didn't realize Cascading serialized the instance.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The seed is chosen before the job starts (in the constructor). Cascading then serializes the RandInt instance for use in the tasks. The tasks then call "prepare" before they start processing records. If a task fails, it reuses the same RandInt instance with the same seed.