Skip to content

Instantly share code, notes, and snippets.

@ceteri
Created June 29, 2012 19:56
Show Gist options
  • Save ceteri/3020297 to your computer and use it in GitHub Desktop.
Save ceteri/3020297 to your computer and use it in GitHub Desktop.
Cascading for the Impatient, Part 2
public class
Main
{
public static void
main( String[] args )
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps
Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
// specify a regex operation to split the "document" text lines into a token stream
Fields token = new Fields( "token" );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
// only returns "token"
Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );
// determine the word counts
Pipe wcPipe = new Pipe( "wc", docPipe );
wcPipe = new GroupBy( wcPipe, token );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( docPipe, docTap )
.addTailSink( wcPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
}
}
bash-3.2$ ls -lth
total 24
-rw-r--r-- 1 paco staff 819B Jul 3 14:55 LICENSE.txt
-rw-r--r-- 1 paco staff 1.3K Jul 3 14:54 README.md
-rw-r--r-- 1 paco staff 1.7K Jul 3 14:52 build.gradle
drwxr-xr-x 3 paco staff 102B Jul 3 14:52 data
drwxr-xr-x 4 paco staff 136B Jul 3 14:52 docs
drwxr-xr-x 3 paco staff 102B Jul 3 14:52 src
bash-3.2$ hadoop version
Warning: $HADOOP_HOME is deprecated.
Hadoop 1.0.3
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192
Compiled by hortonfo on Tue May 8 20:31:25 UTC 2012
From source with checksum e6b0c1e23dcf76907c5fecb4b832f3be
bash-3.2$ gradle -version
------------------------------------------------------------
Gradle 1.0
------------------------------------------------------------
Gradle build time: Tuesday, June 12, 2012 12:56:21 AM UTC
Groovy: 1.8.6
Ant: Apache Ant(TM) version 1.8.2 compiled on December 20 2010
Ivy: 2.2.0
JVM: 1.6.0_33 (Apple Inc. 20.8-b03-424)
OS: Mac OS X 10.6.8 x86_64
bash-3.2$ gradle clean jar
:clean UP-TO-DATE
:compileJava
:processResources UP-TO-DATE
:classes
:jar
BUILD SUCCESSFUL
Total time: 10.335 secs
bash-3.2$ hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc
Warning: $HADOOP_HOME is deprecated.
12/07/09 14:13:23 INFO util.HadoopUtil: resolving application jar from found main method on: impatient.Main
12/07/09 14:13:23 INFO planner.HadoopPlanner: using application jar: /Users/paco/src/concur/Impatient/part2/./build/libs/impatient.jar
12/07/09 14:13:23 INFO property.AppProps: using app.id: E61BF52F91214C88F4334CB2FEFAD641
12/07/09 14:13:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/07/09 14:13:23 WARN snappy.LoadSnappy: Snappy native library not loaded
12/07/09 14:13:23 INFO mapred.FileInputFormat: Total input paths to process : 1
12/07/09 14:13:23 INFO util.Version: Concurrent, Inc - Cascading 2.0.1
12/07/09 14:13:23 INFO flow.Flow: [wc] starting
12/07/09 14:13:23 INFO flow.Flow: [wc] source: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/07/09 14:13:23 INFO flow.Flow: [wc] sink: Hfs["TextDelimited[[UNKNOWN]->['token', 'count']]"]["output/wc"]"]
12/07/09 14:13:23 INFO flow.Flow: [wc] parallel execution is enabled: false
12/07/09 14:13:23 INFO flow.Flow: [wc] starting jobs: 1
12/07/09 14:13:23 INFO flow.Flow: [wc] allocating threads: 1
12/07/09 14:13:23 INFO flow.FlowStep: [wc] starting step: (1/1) output/wc
12/07/09 14:13:24 INFO mapred.FileInputFormat: Total input paths to process : 1
12/07/09 14:13:24 INFO flow.FlowStep: [wc] submitted hadoop job: job_local_0001
12/07/09 14:13:24 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/07/09 14:13:24 INFO io.MultiInputSplit: current split input path: file:/Users/paco/src/concur/Impatient/part2/data/rain.txt
12/07/09 14:13:24 INFO mapred.MapTask: numReduceTasks: 1
12/07/09 14:13:24 INFO mapred.MapTask: io.sort.mb = 100
12/07/09 14:13:24 INFO mapred.MapTask: data buffer = 79691776/99614720
12/07/09 14:13:24 INFO mapred.MapTask: record buffer = 262144/327680
12/07/09 14:13:24 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
12/07/09 14:13:24 INFO hadoop.FlowMapper: sinking to: GroupBy(wc)[by:[{1}:'token']]
12/07/09 14:13:24 INFO mapred.MapTask: Starting flush of map output
12/07/09 14:13:24 INFO mapred.MapTask: Finished spill 0
12/07/09 14:13:24 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/07/09 14:13:27 INFO mapred.LocalJobRunner: file:/Users/paco/src/concur/Impatient/part2/data/rain.txt:0+510
12/07/09 14:13:27 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
12/07/09 14:13:27 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/07/09 14:13:27 INFO mapred.LocalJobRunner:
12/07/09 14:13:27 INFO mapred.Merger: Merging 1 sorted segments
12/07/09 14:13:27 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1347 bytes
12/07/09 14:13:27 INFO mapred.LocalJobRunner:
12/07/09 14:13:27 INFO hadoop.FlowReducer: sourcing from: GroupBy(wc)[by:[{1}:'token']]
12/07/09 14:13:27 INFO hadoop.FlowReducer: sinking to: Hfs["TextDelimited[[UNKNOWN]->['token', 'count']]"]["output/wc"]"]
12/07/09 14:13:27 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/07/09 14:13:27 INFO mapred.LocalJobRunner:
12/07/09 14:13:27 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/07/09 14:13:27 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/paco/src/concur/Impatient/part2/output/wc
12/07/09 14:13:30 INFO mapred.LocalJobRunner: reduce > reduce
12/07/09 14:13:30 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
12/07/09 14:13:34 INFO util.Hadoop18TapUtil: deleting temp path output/wc/_temporary
bash-3.2$ more output/wc/part-00000
token count
9
A 3
Australia 1
Broken 1
California's 1
DVD 1
Death 1
Land 1
Secrets 1
This 2
Two 1
Valley 1
Women 1
a 5
air 1
an 1
and 2
area 4
as 2
back 1
cause 1
cloudcover 1
deserts 1
downwind 1
dry 3
effect 1
in 1
is 4
known 1
land 1
lee 2
leeward 2
less 1
lies 1
mountain 3
mountainous 1
of 6
on 2
or 2
primary 1
produces 1
rain 5
ranges 1
shadow 4
side 2
sinking 1
such 1
that 1
the 5
with 1
bash-3.2$
bash-3.2$ pig -version
Warning: $HADOOP_HOME is deprecated.
Apache Pig version 0.10.0 (r1328203)
compiled Apr 19 2012, 22:54:12
bash-3.2$ pig -p docPath=./data/rain.txt -p wcPath=./output/wc ./src/scripts/wc.pig
Warning: $HADOOP_HOME is deprecated.
2012-08-28 10:12:12,884 [main] INFO org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-08-28 10:12:12,885 [main] INFO org.apache.pig.Main - Logging error messages to: /Users/ceteri/src/concur/Impatient/part2/pig_1346173932880.log
2012-08-28 10:12:13.012 java[74053:1903] Unable to load realm info from SCDynamicStore
2012-08-28 10:12:13,268 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
2012-08-28 10:12:13,922 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 2 time(s).
2012-08-28 10:12:13,923 [main] WARN org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
2012-08-28 10:12:14,053 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2012-08-28 10:12:14,060 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic foreach to combiner
2012-08-28 10:12:14,080 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2012-08-28 10:12:14,080 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2012-08-28 10:12:14,108 [main] WARN org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 2 time(s).
2012-08-28 10:12:14,108 [main] WARN org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
2012-08-28 10:12:14,109 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY,FILTER
2012-08-28 10:12:14,126 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2012-08-28 10:12:14,127 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic foreach to combiner
2012-08-28 10:12:14,130 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2012-08-28 10:12:14,130 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2012-08-28 10:12:14,145 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
2012-08-28 10:12:14,156 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2012-08-28 10:12:14,158 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job4452410853593859089.jar
2012-08-28 10:12:18,099 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job4452410853593859089.jar created
2012-08-28 10:12:18,109 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2012-08-28 10:12:18,132 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=510
2012-08-28 10:12:18,132 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
2012-08-28 10:12:18,160 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2012-08-28 10:12:18,168 [Thread-6] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2012-08-28 10:12:18,266 [Thread-6] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-08-28 10:12:18,267 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
2012-08-28 10:12:18,272 [Thread-6] WARN org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library not loaded
2012-08-28 10:12:18,273 [Thread-6] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2012-08-28 10:12:18,463 [Thread-7] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-08-28 10:12:18,475 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/Users/ceteri/src/concur/Impatient/part2/data/rain.txt:0+510
2012-08-28 10:12:18,480 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
2012-08-28 10:12:18,576 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
2012-08-28 10:12:18,576 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
2012-08-28 10:12:18,627 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Starting flush of map output
2012-08-28 10:12:18,648 [Thread-7] INFO org.apache.hadoop.mapred.MapTask - Finished spill 0
2012-08-28 10:12:18,650 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2012-08-28 10:12:18,662 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0001
2012-08-28 10:12:18,662 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2012-08-28 10:12:21,442 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:12:21,442 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000000_0' done.
2012-08-28 10:12:21,452 [Thread-7] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : null
2012-08-28 10:12:21,452 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:12:21,455 [Thread-7] INFO org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
2012-08-28 10:12:21,462 [Thread-7] INFO org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 1218 bytes
2012-08-28 10:12:21,462 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:12:21,486 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2012-08-28 10:12:21,487 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner -
2012-08-28 10:12:21,488 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task attempt_local_0001_r_000000_0 is allowed to commit now
2012-08-28 10:12:21,490 [Thread-7] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/ceteri/src/concur/Impatient/part2/output/wc
2012-08-28 10:12:24,449 [Thread-7] INFO org.apache.hadoop.mapred.LocalJobRunner - reduce > reduce
2012-08-28 10:12:24,449 [Thread-7] INFO org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_r_000000_0' done.
2012-08-28 10:12:24,450 [Thread-7] WARN org.apache.hadoop.mapred.FileOutputCommitter - Output path is null in cleanup
2012-08-28 10:12:28,681 [main] WARN org.apache.pig.tools.pigstats.PigStatsUtil - Failed to get RunningJob for job job_local_0001
2012-08-28 10:12:28,684 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2012-08-28 10:12:28,685 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.3 0.10.0 ceteri 2012-08-28 10:12:14 2012-08-28 10:12:28 GROUP_BY,FILTER
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_local_0001 1 1 n/a n/a n/a n/a n/a n/a docPipe,tokenGroups,tokenPipe,wcPipe GROUP_BY,COMBINER file:///Users/ceteri/src/concur/Impatient/part2/output/wc,
Input(s):
Successfully read 0 records from: "file:///Users/ceteri/src/concur/Impatient/part2/data/rain.txt"
Output(s):
Successfully stored 0 records in: "file:///Users/ceteri/src/concur/Impatient/part2/output/wc"
Counters:
Total records written : 0
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_local_0001
2012-08-28 10:12:28,686 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
bash-3.2$ cat output/wc/part-r-00000
A 3
a 5
an 1
as 2
in 1
is 4
of 6
on 2
or 2
DVD 1
Two 1
air 1
and 2
dry 3
lee 2
the 5
Land 1
This 2
area 4
back 1
land 1
less 1
lies 1
rain 5
side 2
such 1
that 1
with 1
Death 1
Women 1
cause 1
known 1
Broken 1
Valley 1
effect 1
ranges 1
shadow 4
Secrets 1
deserts 1
leeward 2
primary 1
sinking 1
downwind 1
mountain 3
produces 1
Australia 1
cloudcover 1
mountainous 1
California's 1
bash-3.2$
docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);
docPipe = FILTER docPipe BY doc_id != 'doc_id';
-- specify a regex operation to split the "document" text lines into a token stream
tokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(text, ' [](),.')) AS token;
tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
-- DUMP tokenPipe;
-- determine the word counts
tokenGroups = GROUP tokenPipe BY token;
wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;
-- output
STORE wcPipe INTO '$wcPath' using PigStorage('\t', 'tagsource');
EXPLAIN -out dot/wc_pig.dot -dot wcPipe;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment