Skip to content

Instantly share code, notes, and snippets.

Created June 30, 2012 01:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ceteri/3021655 to your computer and use it in GitHub Desktop.
Save ceteri/3021655 to your computer and use it in GitHub Desktop.
Cascading for the Impatient, Part 3
public class
public static void
main( String[] args )
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps
Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
// specify a regex operation to split the "document" text lines into a token stream
Fields token = new Fields( "token" );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
Fields fieldSelector = new Fields( "doc_id", "token" );
Pipe docPipe = new Each( "token", text, splitter, fieldSelector );
// define "ScrubFunction" to clean up the token stream
Fields scrubArguments = new Fields( "doc_id", "token" );
docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( fieldSelector ), Fields.RESULTS );
// determine the word counts
Pipe wcPipe = new Pipe( "wc", docPipe );
wcPipe = new Retain( wcPipe, token );
wcPipe = new GroupBy( wcPipe, token );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( docPipe, docTap )
.addTailSink( wcPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/" );
public class ScrubFunction extends BaseOperation implements Function
public ScrubFunction( Fields fieldDeclaration )
super( 2, fieldDeclaration );
public void operate( FlowProcess flowProcess, FunctionCall functionCall )
TupleEntry argument = functionCall.getArguments();
String doc_id = argument.getString( 0 );
String token = scrubText( argument.getString( 1 ) );
if( token.length() > 0 )
Tuple result = new Tuple();
result.add( doc_id );
result.add( token );
functionCall.getOutputCollector().add( result );
public String scrubText( String text )
return text.trim().toLowerCase();
bash-3.2$ ls -lth
total 16
-rw-r--r-- 1 paco staff 1.3K Jun 29 18:08
-rw-r--r-- 1 paco staff 1.7K Jun 29 18:07 build.gradle
drwxr-xr-x 3 paco staff 102B Jun 26 14:17 src
drwxr-xr-x 3 paco staff 102B Jun 11 10:18 data
bash-3.2$ hadoop version
Warning: $HADOOP_HOME is deprecated.
Hadoop 1.0.3
Subversion -r 1335192
Compiled by hortonfo on Tue May 8 20:31:25 UTC 2012
From source with checksum e6b0c1e23dcf76907c5fecb4b832f3be
bash-3.2$ gradle -version
Gradle 1.0
Gradle build time: Tuesday, June 12, 2012 12:56:21 AM UTC
Groovy: 1.8.6
Ant: Apache Ant(TM) version 1.8.2 compiled on December 20 2010
Ivy: 2.2.0
JVM: 1.6.0_33 (Apple Inc. 20.8-b03-424)
OS: Mac OS X 10.6.8 x86_64
bash-3.2$ gradle clean jar
:clean UP-TO-DATE
:processResources UP-TO-DATE
Total time: 7.858 secs
bash-3.2$ hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc
Warning: $HADOOP_HOME is deprecated.
12/06/29 18:09:52 INFO util.HadoopUtil: resolving application jar from found main method on: impatient.Main
12/06/29 18:09:52 INFO planner.HadoopPlanner: using application jar: /Users/paco/src/concur/impatient/part3/./build/libs/impatient.jar
12/06/29 18:09:52 INFO property.AppProps: using 0C7F52A914C5FEA53FBD75502AF9A19E
12/06/29 18:09:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/06/29 18:09:53 WARN snappy.LoadSnappy: Snappy native library not loaded
12/06/29 18:09:53 INFO mapred.FileInputFormat: Total input paths to process : 1
12/06/29 18:09:53 INFO util.Version: Concurrent, Inc - Cascading 2.0.1
12/06/29 18:09:53 INFO flow.Flow: [wc] starting
12/06/29 18:09:53 INFO flow.Flow: [wc] source: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/06/29 18:09:53 INFO flow.Flow: [wc] sink: Hfs["TextDelimited[[UNKNOWN]->['token', 'count']]"]["output/wc"]"]
12/06/29 18:09:53 INFO flow.Flow: [wc] parallel execution is enabled: false
12/06/29 18:09:53 INFO flow.Flow: [wc] starting jobs: 1
12/06/29 18:09:53 INFO flow.Flow: [wc] allocating threads: 1
12/06/29 18:09:53 INFO flow.FlowStep: [wc] starting step: (1/1) output/wc
12/06/29 18:09:54 INFO mapred.FileInputFormat: Total input paths to process : 1
12/06/29 18:09:54 INFO flow.FlowStep: [wc] submitted hadoop job: job_local_0001
12/06/29 18:09:54 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/06/29 18:09:54 INFO io.MultiInputSplit: current split input path: file:/Users/paco/src/concur/impatient/part3/data/rain.txt
12/06/29 18:09:54 INFO mapred.MapTask: numReduceTasks: 1
12/06/29 18:09:54 INFO mapred.MapTask: io.sort.mb = 100
12/06/29 18:09:54 INFO mapred.MapTask: data buffer = 79691776/99614720
12/06/29 18:09:54 INFO mapred.MapTask: record buffer = 262144/327680
12/06/29 18:09:54 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/06/29 18:09:54 INFO hadoop.FlowMapper: sinking to: GroupBy(wc)[by:[{1}:'token']]
12/06/29 18:09:55 INFO mapred.MapTask: Starting flush of map output
12/06/29 18:09:55 INFO mapred.MapTask: Finished spill 0
12/06/29 18:09:55 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/06/29 18:09:57 INFO mapred.LocalJobRunner: file:/Users/paco/src/concur/impatient/part3/data/rain.txt:0+510
12/06/29 18:09:57 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
12/06/29 18:09:57 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/06/29 18:09:57 INFO mapred.LocalJobRunner:
12/06/29 18:09:57 INFO mapred.Merger: Merging 1 sorted segments
12/06/29 18:09:57 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1257 bytes
12/06/29 18:09:57 INFO mapred.LocalJobRunner:
12/06/29 18:09:57 INFO hadoop.FlowReducer: sourcing from: GroupBy(wc)[by:[{1}:'token']]
12/06/29 18:09:57 INFO hadoop.FlowReducer: sinking to: Hfs["TextDelimited[[UNKNOWN]->['token', 'count']]"]["output/wc"]"]
12/06/29 18:09:57 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/06/29 18:09:57 INFO mapred.LocalJobRunner:
12/06/29 18:09:57 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/06/29 18:09:57 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/paco/src/concur/impatient/part3/output/wc
12/06/29 18:10:00 INFO mapred.LocalJobRunner: reduce > reduce
12/06/29 18:10:00 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
12/06/29 18:10:04 INFO util.Hadoop18TapUtil: deleting temp path output/wc/_temporary
bash-3.2$ more output/wc/part-00000
token count
a 8
air 1
an 1
and 2
area 4
as 2
australia 1
back 1
broken 1
california's 1
cause 1
cloudcover 1
death 1
deserts 1
downwind 1
dry 3
dvd 1
effect 1
in 1
is 4
known 1
land 2
lee 2
leeward 2
less 1
lies 1
mountain 3
mountainous 1
of 6
on 2
or 2
primary 1
bash-3.2$ pig -version
Warning: $HADOOP_HOME is deprecated.
Apache Pig version 0.10.0 (r1328203)
compiled Apr 19 2012, 22:54:12
bash-3.2$ pig -p docPath=./data/rain.txt -p wcPath=./output/wc ./src/scripts/wc.pig
Warning: $HADOOP_HOME is deprecated.
2012-08-28 10:06:19,860 [main] INFO org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-08-28 10:06:19,861 [main] INFO org.apache.pig.Main - Logging error messages to: /Users/ceteri/src/concur/Impatient/part3/pig_1346173579857.log
2012-08-28 10:06:19.991 java[73971:1903] Unable to load realm info from SCDynamicStore
2012-08-28 10:06:20,241 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
2012-08-28 10:06:20,941 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 2 time(s).
2012-08-28 10:06:20,941 [main] WARN org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
2012-08-28 10:06:21,145 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2012-08-28 10:06:21,157 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic foreach to combiner
2012-08-28 10:06:21,185 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2012-08-28 10:06:21,185 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2012-08-28 10:06:21,228 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 2 time(s).
2012-08-28 10:06:21,228 [main] WARN org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
2012-08-28 10:06:21,231 [main] INFO - Pig features used in the script: GROUP_BY,FILTER
2012-08-28 10:06:21,258 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2012-08-28 10:06:21,260 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic foreach to combiner
2012-08-28 10:06:21,264 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2012-08-28 10:06:21,264 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2012-08-28 10:06:21,289 [main] INFO - Pig script settings are added to the job
2012-08-28 10:06:21,306 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2012-08-28 10:06:21,310 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job8291599502780137612.jar
2012-08-28 10:06:25,386 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job8291599502780137612.jar created
2012-08-28 10:06:25,396 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2012-08-28 10:06:25,420 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=510
2012-08-28 10:06:25,420 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
2012-08-28 10:06:25,448 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2012-08-28 10:06:25,456 [Thread-6] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2012-08-28 10:06:25,552 [Thread-6] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-08-28 10:06:25,552 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2012-08-28 10:06:25,557 [Thread-6] WARN - Snappy native library not loaded
2012-08-28 10:06:25,558 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2012-08-28 10:06:25,740 [Thread-7] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-08-28 10:06:25,750 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/Users/ceteri/src/concur/Impatient/part3/data/rain.txt:0+510
2012-08-28 10:06:25,755 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
2012-08-28 10:06:25,849 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
2012-08-28 10:06:25,851 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
2012-08-28 10:06:25,903 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
2012-08-28 10:06:25,924 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2012-08-28 10:06:25,926 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2012-08-28 10:06:25,950 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0001
2012-08-28 10:06:25,950 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2012-08-28 10:06:28,721 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:06:28,722 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000000_0' done.
2012-08-28 10:06:28,733 [Thread-7] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-08-28 10:06:28,733 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:06:28,736 [Thread-7] INFO org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
2012-08-28 10:06:28,743 [Thread-7] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 1173 bytes
2012-08-28 10:06:28,743 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:06:28,766 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2012-08-28 10:06:28,767 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:06:28,767 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task attempt_local_0001_r_000000_0 is allowed to commit now
2012-08-28 10:06:28,769 [Thread-7] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/ceteri/src/concur/Impatient/part3/output/wc
2012-08-28 10:06:31,729 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce > reduce
2012-08-28 10:06:31,730 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_r_000000_0' done.
2012-08-28 10:06:31,731 [Thread-7] WARN org.apache.hadoop.mapred.FileOutputCommitter - Output path is null in cleanup
2012-08-28 10:06:35,969 [main] WARN - Failed to get RunningJob for job job_local_0001
2012-08-28 10:06:35,971 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2012-08-28 10:06:35,972 [main] INFO - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.3 0.10.0 ceteri 2012-08-28 10:06:21 2012-08-28 10:06:35 GROUP_BY,FILTER
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_local_0001 1 1 n/a n/a n/a n/a n/a n/a docPipe,tokenGroups,tokenPipe,wcPipe GROUP_BY,COMBINER file:///Users/ceteri/src/concur/Impatient/part3/output/wc,
Successfully read 0 records from: "file:///Users/ceteri/src/concur/Impatient/part3/data/rain.txt"
Successfully stored 0 records in: "file:///Users/ceteri/src/concur/Impatient/part3/output/wc"
Total records written : 0
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
2012-08-28 10:06:35,973 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
bash-3.2$ cat output/wc/part-r-00000
a 8
an 1
as 2
in 1
is 4
of 6
on 2
or 2
air 1
and 2
dry 3
dvd 1
lee 2
the 5
two 1
area 4
back 1
land 2
less 1
lies 1
rain 5
side 2
such 1
that 1
this 2
with 1
cause 1
death 1
known 1
women 1
broken 1
effect 1
ranges 1
shadow 4
valley 1
deserts 1
leeward 2
primary 1
secrets 1
sinking 1
downwind 1
mountain 3
produces 1
australia 1
cloudcover 1
mountainous 1
california's 1
docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);
docPipe = FILTER docPipe BY doc_id != 'doc_id';
-- specify a regex operation to split the "document" text lines into a token stream
tokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(LOWER(text), ' [](),.')) AS token;
tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
-- DUMP tokenPipe;
-- determine the word counts
tokenGroups = GROUP tokenPipe BY token;
wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;
-- output
STORE wcPipe INTO '$wcPath' using PigStorage('\t', 'tagsource');
EXPLAIN -out dot/ -dot wcPipe;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment