Skip to content

Instantly share code, notes, and snippets.

@ceteri
Created September 7, 2012 23:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ceteri/3670825 to your computer and use it in GitHub Desktop.
Save ceteri/3670825 to your computer and use it in GitHub Desktop.
Cascading wordcount sample
bash-3.2$ cd cascading.samples/
bash-3.2$ ls
build.gradle hadoop logparser settings.gradle
build.xml loganalysis sample.build.gradle wordcount
bash-3.2$ cd wordcount/
bash-3.2$ ls
README.TXT build.gradle data src
bash-3.2$ java -version
java version "1.6.0_33"
Java(TM) SE Runtime Environment (build 1.6.0_33-b03-424-11M3720)
Java HotSpot(TM) 64-Bit Server VM (build 20.8-b03-424, mixed mode)
bash-3.2$ hadoop version
Warning: $HADOOP_HOME is deprecated.
Hadoop 1.0.3
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192
Compiled by hortonfo on Tue May 8 20:31:25 UTC 2012
From source with checksum e6b0c1e23dcf76907c5fecb4b832f3be
bash-3.2$ gradle -version
------------------------------------------------------------
Gradle 1.0
------------------------------------------------------------
Gradle build time: Tuesday, June 12, 2012 12:56:21 AM UTC
Groovy: 1.8.6
Ant: Apache Ant(TM) version 1.8.2 compiled on December 20 2010
Ivy: 2.2.0
JVM: 1.6.0_33 (Apple Inc. 20.8-b03-424)
OS: Mac OS X 10.7.4 x86_64
bash-3.2$ gradle clean jar
:wordcount:clean UP-TO-DATE
:wordcount:compileJava
Download http://conjars.org/repo/cascading/cascading-core/2.0.5-wip-354/cascading-core-2.0.5-wip-354.pom
Download http://conjars.org/repo/cascading/cascading-xml/2.0.5-wip-354/cascading-xml-2.0.5-wip-354.pom
Download http://conjars.org/repo/cascading/cascading-hadoop/2.0.5-wip-354/cascading-hadoop-2.0.5-wip-354.pom
Download http://repo1.maven.org/maven2/org/ccil/cowan/tagsoup/tagsoup/1.2/tagsoup-1.2.pom
Download http://conjars.org/repo/cascading/cascading-core/2.0.5-wip-354/cascading-core-2.0.5-wip-354.jar
Download http://conjars.org/repo/cascading/cascading-xml/2.0.5-wip-354/cascading-xml-2.0.5-wip-354.jar
Download http://conjars.org/repo/cascading/cascading-hadoop/2.0.5-wip-354/cascading-hadoop-2.0.5-wip-354.jar
Download http://repo1.maven.org/maven2/org/ccil/cowan/tagsoup/tagsoup/1.2/tagsoup-1.2.jar
:wordcount:processResources UP-TO-DATE
:wordcount:classes
:wordcount:jar
BUILD SUCCESSFUL
Total time: 14.857 secs
bash-3.2$ more README.TXT
To use, make sure Hadoop is in your path and, optionally, HADOOP_CONF is set.
From the downloaded archive execute,
> hadoop jar wordcount.jar data/url+page.200.txt output local
If building from source,
> gradle jar
> hadoop jar ./build/libs/wordcount.jar data/url+page.200.txt output local
Note that if HADOOP_CONF references a cluster, the above 'output' directory will show up in HDFS.
bash-3.2$ hadoop jar ./build/libs/wordcount.jar data/url+page.200.txt output local
Warning: $HADOOP_HOME is deprecated.
12/09/07 16:49:53 INFO util.HadoopUtil: resolving application jar from found main method on: wordcount.Main
12/09/07 16:49:53 INFO planner.HadoopPlanner: using application jar: /Users/ceteri/src/concur/cascading.samples/wordcount/./build/libs/wordcount.jar
12/09/07 16:49:53 INFO property.AppProps: using app.id: 7EE7B99490AFB749787FCE41695E276B
2012-09-07 16:49:53.886 java[90957:1903] Unable to load realm info from SCDynamicStore
12/09/07 16:49:54 INFO planner.HadoopPlanner: using application jar: /Users/ceteri/src/concur/cascading.samples/wordcount/./build/libs/wordcount.jar
12/09/07 16:49:54 INFO planner.HadoopPlanner: using application jar: /Users/ceteri/src/concur/cascading.samples/wordcount/./build/libs/wordcount.jar
12/09/07 16:49:54 INFO planner.HadoopPlanner: using application jar: /Users/ceteri/src/concur/cascading.samples/wordcount/./build/libs/wordcount.jar
12/09/07 16:49:54 INFO cascade.Cascade: [import pages+url pipe+...] starting
12/09/07 16:49:54 INFO cascade.Cascade: [import pages+url pipe+...] parallel execution is enabled: false
12/09/07 16:49:54 INFO cascade.Cascade: [import pages+url pipe+...] starting flows: 4
12/09/07 16:49:54 INFO cascade.Cascade: [import pages+url pipe+...] allocating threads: 1
12/09/07 16:49:54 INFO cascade.Cascade: [import pages+url pipe+...] starting flow: import pages
12/09/07 16:49:54 INFO flow.Flow: [import pages] at least one sink does not exist
12/09/07 16:49:54 INFO flow.Flow: [import pages] starting
12/09/07 16:49:54 INFO flow.Flow: [import pages] source: Lfs["TextLine[['offset', 'line']->[ALL]]"]["data/url+page.200.txt"]"]
12/09/07 16:49:54 INFO flow.Flow: [import pages] sink: Hfs["SequenceFile[['url', 'page']]"]["output/pages"]"]
12/09/07 16:49:54 INFO flow.Flow: [import pages] parallel execution is enabled: false
12/09/07 16:49:54 INFO flow.Flow: [import pages] starting jobs: 1
12/09/07 16:49:54 INFO flow.Flow: [import pages] allocating threads: 1
12/09/07 16:49:54 INFO flow.FlowStep: [import pages] starting step: (1/1) output/pages
12/09/07 16:49:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/09/07 16:49:54 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/07 16:49:54 INFO mapred.FileInputFormat: Total input paths to process : 1
12/09/07 16:49:54 INFO flow.FlowStep: [import pages] submitted hadoop job: job_local_0001
12/09/07 16:49:54 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/09/07 16:49:54 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/cascading.samples/wordcount/data/url+page.200.txt
12/09/07 16:49:54 INFO mapred.MapTask: numReduceTasks: 0
12/09/07 16:49:54 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.0.5-wip-354
12/09/07 16:49:54 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
12/09/07 16:49:54 INFO hadoop.FlowMapper: sourcing from: Lfs["TextLine[['offset', 'line']->[ALL]]"]["data/url+page.200.txt"]"]
12/09/07 16:49:54 INFO hadoop.FlowMapper: sinking to: Hfs["SequenceFile[['url', 'page']]"]["output/pages"]"]
12/09/07 16:49:54 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/09/07 16:49:54 INFO mapred.LocalJobRunner:
12/09/07 16:49:54 INFO mapred.Task: Task attempt_local_0001_m_000000_0 is allowed to commit now
12/09/07 16:49:54 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_m_000000_0' to file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages
12/09/07 16:49:57 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/data/url+page.200.txt:0+4922081
12/09/07 16:49:57 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
12/09/07 16:49:59 INFO util.Hadoop18TapUtil: deleting temp path output/pages/_temporary
12/09/07 16:49:59 INFO cascade.Cascade: [import pages+url pipe+...] completed flow: import pages
12/09/07 16:49:59 INFO cascade.Cascade: [import pages+url pipe+...] starting flow: url pipe+word pipe
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] at least one sink does not exist
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] starting
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] source: Hfs["SequenceFile[['url', 'page']]"]["output/pages"]"]
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] sink: Hfs["SequenceFile[['word', 'count']]"]["output/words"]"]
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] sink: Hfs["SequenceFile[['url', 'word', 'count']]"]["output/urls"]"]
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] parallel execution is enabled: false
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] starting jobs: 2
12/09/07 16:49:59 INFO flow.Flow: [url pipe+word pipe] allocating threads: 1
12/09/07 16:49:59 INFO flow.FlowStep: [url pipe+word pipe] starting step: (1/2) output/urls
12/09/07 16:49:59 INFO mapred.FileInputFormat: Total input paths to process : 1
12/09/07 16:49:59 INFO flow.FlowStep: [url pipe+word pipe] submitted hadoop job: job_local_0002
12/09/07 16:49:59 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/09/07 16:49:59 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages/part-00000
12/09/07 16:49:59 INFO mapred.MapTask: numReduceTasks: 1
12/09/07 16:49:59 INFO mapred.MapTask: io.sort.mb = 100
12/09/07 16:49:59 INFO mapred.MapTask: data buffer = 79691776/99614720
12/09/07 16:49:59 INFO mapred.MapTask: record buffer = 262144/327680
12/09/07 16:49:59 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.0.5-wip-354
12/09/07 16:49:59 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
12/09/07 16:49:59 INFO hadoop.FlowMapper: sourcing from: Hfs["SequenceFile[['url', 'page']]"]["output/pages"]"]
12/09/07 16:49:59 INFO hadoop.FlowMapper: sinking to: GroupBy(url pipe)[by:[{2}:'url', 'word']]
12/09/07 16:50:03 INFO mapred.MapTask: Starting flush of map output
12/09/07 16:50:05 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages/part-00000:0+4715627
12/09/07 16:50:06 INFO mapred.MapTask: Finished spill 0
12/09/07 16:50:06 INFO mapred.Task: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
12/09/07 16:50:08 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages/part-00000:0+4715627
12/09/07 16:50:08 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages/part-00000:0+4715627
12/09/07 16:50:08 INFO mapred.Task: Task 'attempt_local_0002_m_000000_0' done.
12/09/07 16:50:08 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/09/07 16:50:08 INFO mapred.LocalJobRunner:
12/09/07 16:50:08 INFO mapred.Merger: Merging 1 sorted segments
12/09/07 16:50:08 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 14264931 bytes
12/09/07 16:50:08 INFO mapred.LocalJobRunner:
12/09/07 16:50:08 INFO hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.0.5-wip-354
12/09/07 16:50:08 INFO hadoop.FlowReducer: child jvm opts: -Xmx200m
12/09/07 16:50:08 INFO hadoop.FlowReducer: sourcing from: GroupBy(url pipe)[by:[{2}:'url', 'word']]
12/09/07 16:50:08 INFO hadoop.FlowReducer: sinking to: Hfs["SequenceFile[['url', 'word', 'count']]"]["output/urls"]"]
12/09/07 16:50:09 INFO mapred.Task: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
12/09/07 16:50:09 INFO mapred.LocalJobRunner:
12/09/07 16:50:09 INFO mapred.Task: Task attempt_local_0002_r_000000_0 is allowed to commit now
12/09/07 16:50:09 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0002_r_000000_0' to file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/urls
12/09/07 16:50:11 INFO mapred.LocalJobRunner: reduce > reduce
12/09/07 16:50:11 INFO mapred.Task: Task 'attempt_local_0002_r_000000_0' done.
12/09/07 16:50:14 INFO flow.FlowStep: [url pipe+word pipe] starting step: (2/2) output/words
12/09/07 16:50:14 INFO mapred.FileInputFormat: Total input paths to process : 1
12/09/07 16:50:14 INFO flow.FlowStep: [url pipe+word pipe] submitted hadoop job: job_local_0003
12/09/07 16:50:14 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/09/07 16:50:14 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages/part-00000
12/09/07 16:50:14 INFO mapred.MapTask: numReduceTasks: 1
12/09/07 16:50:14 INFO mapred.MapTask: io.sort.mb = 100
12/09/07 16:50:14 INFO mapred.MapTask: data buffer = 79691776/99614720
12/09/07 16:50:14 INFO mapred.MapTask: record buffer = 262144/327680
12/09/07 16:50:14 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.0.5-wip-354
12/09/07 16:50:14 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
12/09/07 16:50:14 INFO hadoop.FlowMapper: sourcing from: Hfs["SequenceFile[['url', 'page']]"]["output/pages"]"]
12/09/07 16:50:14 INFO hadoop.FlowMapper: sinking to: GroupBy(word pipe)[by:[{1}:'word']]
12/09/07 16:50:16 INFO mapred.MapTask: Starting flush of map output
12/09/07 16:50:17 INFO mapred.MapTask: Finished spill 0
12/09/07 16:50:17 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
12/09/07 16:50:20 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages/part-00000:0+4715627
12/09/07 16:50:20 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/pages/part-00000:0+4715627
12/09/07 16:50:20 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
12/09/07 16:50:20 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/09/07 16:50:20 INFO mapred.LocalJobRunner:
12/09/07 16:50:20 INFO mapred.Merger: Merging 1 sorted segments
12/09/07 16:50:20 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 14059334 bytes
12/09/07 16:50:20 INFO mapred.LocalJobRunner:
12/09/07 16:50:20 INFO hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.0.5-wip-354
12/09/07 16:50:20 INFO hadoop.FlowReducer: child jvm opts: -Xmx200m
12/09/07 16:50:20 INFO hadoop.FlowReducer: sourcing from: GroupBy(word pipe)[by:[{1}:'word']]
12/09/07 16:50:20 INFO hadoop.FlowReducer: sinking to: Hfs["SequenceFile[['word', 'count']]"]["output/words"]"]
12/09/07 16:50:20 INFO mapred.Task: Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
12/09/07 16:50:20 INFO mapred.LocalJobRunner:
12/09/07 16:50:20 INFO mapred.Task: Task attempt_local_0003_r_000000_0 is allowed to commit now
12/09/07 16:50:20 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0003_r_000000_0' to file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/words
12/09/07 16:50:23 INFO mapred.LocalJobRunner: reduce > reduce
12/09/07 16:50:23 INFO mapred.Task: Task 'attempt_local_0003_r_000000_0' done.
12/09/07 16:50:24 INFO util.Hadoop18TapUtil: deleting temp path output/urls/_temporary
12/09/07 16:50:24 INFO util.Hadoop18TapUtil: deleting temp path output/words/_temporary
12/09/07 16:50:24 INFO cascade.Cascade: [import pages+url pipe+...] completed flow: url pipe+word pipe
12/09/07 16:50:24 INFO cascade.Cascade: [import pages+url pipe+...] starting flow: export word
12/09/07 16:50:24 INFO flow.Flow: [export word] at least one sink does not exist
12/09/07 16:50:24 INFO flow.Flow: [export word] starting
12/09/07 16:50:24 INFO flow.Flow: [export word] source: Hfs["SequenceFile[['word', 'count']]"]["output/words"]"]
12/09/07 16:50:24 INFO flow.Flow: [export word] sink: Lfs["TextLine[['offset', 'line']->[ALL]]"]["local/words"]"]
12/09/07 16:50:24 INFO flow.Flow: [export word] parallel execution is enabled: false
12/09/07 16:50:24 INFO flow.Flow: [export word] starting jobs: 1
12/09/07 16:50:24 INFO flow.Flow: [export word] allocating threads: 1
12/09/07 16:50:24 INFO flow.FlowStep: [export word] starting step: (1/1) local/words
12/09/07 16:50:24 INFO mapred.FileInputFormat: Total input paths to process : 1
12/09/07 16:50:24 INFO flow.FlowStep: [export word] submitted hadoop job: job_local_0004
12/09/07 16:50:24 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/09/07 16:50:24 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/words/part-00000
12/09/07 16:50:24 INFO mapred.MapTask: numReduceTasks: 0
12/09/07 16:50:24 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.0.5-wip-354
12/09/07 16:50:24 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
12/09/07 16:50:24 INFO hadoop.FlowMapper: sourcing from: Hfs["SequenceFile[['word', 'count']]"]["output/words"]"]
12/09/07 16:50:24 INFO hadoop.FlowMapper: sinking to: Lfs["TextLine[['offset', 'line']->[ALL]]"]["local/words"]"]
12/09/07 16:50:24 INFO mapred.Task: Task:attempt_local_0004_m_000000_0 is done. And is in the process of commiting
12/09/07 16:50:24 INFO mapred.LocalJobRunner:
12/09/07 16:50:24 INFO mapred.Task: Task attempt_local_0004_m_000000_0 is allowed to commit now
12/09/07 16:50:24 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0004_m_000000_0' to file:/Users/ceteri/src/concur/cascading.samples/wordcount/local/words
12/09/07 16:50:27 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/words/part-00000:0+622634
12/09/07 16:50:27 INFO mapred.Task: Task 'attempt_local_0004_m_000000_0' done.
12/09/07 16:50:29 INFO util.Hadoop18TapUtil: deleting temp path local/words/_temporary
12/09/07 16:50:29 INFO cascade.Cascade: [import pages+url pipe+...] completed flow: export word
12/09/07 16:50:29 INFO cascade.Cascade: [import pages+url pipe+...] starting flow: export url
12/09/07 16:50:29 INFO flow.Flow: [export url] at least one sink does not exist
12/09/07 16:50:29 INFO flow.Flow: [export url] starting
12/09/07 16:50:29 INFO flow.Flow: [export url] source: Hfs["SequenceFile[['url', 'word', 'count']]"]["output/urls"]"]
12/09/07 16:50:29 INFO flow.Flow: [export url] sink: Lfs["TextLine[['offset', 'line']->[ALL]]"]["local/urls"]"]
12/09/07 16:50:29 INFO flow.Flow: [export url] parallel execution is enabled: false
12/09/07 16:50:29 INFO flow.Flow: [export url] starting jobs: 1
12/09/07 16:50:29 INFO flow.Flow: [export url] allocating threads: 1
12/09/07 16:50:29 INFO flow.FlowStep: [export url] starting step: (1/1) local/urls
12/09/07 16:50:29 INFO mapred.FileInputFormat: Total input paths to process : 1
12/09/07 16:50:29 INFO flow.FlowStep: [export url] submitted hadoop job: job_local_0005
12/09/07 16:50:29 INFO mapred.Task: Using ResourceCalculatorPlugin : null
12/09/07 16:50:29 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/urls/part-00000
12/09/07 16:50:29 INFO mapred.MapTask: numReduceTasks: 0
12/09/07 16:50:29 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.0.5-wip-354
12/09/07 16:50:29 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
12/09/07 16:50:29 INFO hadoop.FlowMapper: sourcing from: Hfs["SequenceFile[['url', 'word', 'count']]"]["output/urls"]"]
12/09/07 16:50:29 INFO hadoop.FlowMapper: sinking to: Lfs["TextLine[['offset', 'line']->[ALL]]"]["local/urls"]"]
12/09/07 16:50:29 INFO mapred.Task: Task:attempt_local_0005_m_000000_0 is done. And is in the process of commiting
12/09/07 16:50:29 INFO mapred.LocalJobRunner:
12/09/07 16:50:29 INFO mapred.Task: Task attempt_local_0005_m_000000_0 is allowed to commit now
12/09/07 16:50:29 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0005_m_000000_0' to file:/Users/ceteri/src/concur/cascading.samples/wordcount/local/urls
12/09/07 16:50:32 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/cascading.samples/wordcount/output/urls/part-00000:0+5523853
12/09/07 16:50:32 INFO mapred.Task: Task 'attempt_local_0005_m_000000_0' done.
12/09/07 16:50:34 INFO util.Hadoop18TapUtil: deleting temp path local/urls/_temporary
12/09/07 16:50:34 INFO cascade.Cascade: [import pages+url pipe+...] completed flow: export url
bash-3.2$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment