Skip to content

Instantly share code, notes, and snippets.

@ceteri
Created January 5, 2013 05:18
Show Gist options
  • Save ceteri/4459908 to your computer and use it in GitHub Desktop.
Save ceteri/4459908 to your computer and use it in GitHub Desktop.
COUNT(DISTINCT c) in Cascading, for Mikhail Gavryuchkov
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
archivesBaseName = 'example'
repositories {
mavenLocal()
mavenCentral()
mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}
ext.cascadingVersion = '2.1.0'
dependencies {
compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion )
compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion )
}
jar {
description = "Assembles a Hadoop ready jar file"
doFirst {
into( 'lib' ) {
from configurations.compile
}
}
manifest {
attributes( "Main-Class": "example/Main" )
}
}
package example;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.CoGroup;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.CountBy;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.pipe.assembly.Unique;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
public class
Main
{
public static void
main( String[] args )
{
String sourcePath = args[ 0 ];
String sinkPath = args[ 1 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create the source and sink taps
Tap sourceTap = new Hfs( new TextDelimited( true, "\t" ), sourcePath );
Tap sinkTap = new Hfs( new TextDelimited( true, "\t" ), sinkPath );
// specify a pipe to connect the taps
Pipe samplePipe = new Pipe( "sample" );
// one branch to COUNT(*)
Pipe cntPipe = new Pipe( "cntPipe", samplePipe );
Fields selector = new Fields( "a", "b" );
Fields c = new Fields( "c" );
Fields cnt = new Fields( "cnt" );
cntPipe = new CountBy( cntPipe, selector, cnt );
// one branch to COUNT(DISTINCT c)
Pipe ccntPipe = new Pipe( "ccntPipe", samplePipe );
selector = new Fields( "a", "b", "c" );
ccntPipe = new Unique( ccntPipe, selector );
selector = new Fields( "a", "b" );
Fields ccnt = new Fields( "ccnt" );
ccntPipe = new CountBy( ccntPipe, selector, ccnt );
Fields newFields = new Fields( "a2", "b2" );
ccntPipe = new Rename( ccntPipe, selector, newFields );
// put 'em together for the final view
Pipe calcPipe = new CoGroup( "calcPipe", cntPipe, selector, ccntPipe, newFields );
selector = new Fields( "a", "b", "cnt", "ccnt" );
calcPipe = new Retain( calcPipe, selector );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.addSource( samplePipe, sourceTap )
.addTailSink( calcPipe, sinkTap );
// run the flow
flowConnector.connect( flowDef ).complete();
}
}
bash-3.2$ gradle clean jar
:clean
:compileJava
:processResources UP-TO-DATE
:classes
:jar
BUILD SUCCESSFUL
Total time: 4.652 secs
bash-3.2$ cat data/sample.tsv
a b c
x y 1
x y 1
x z 1
x y 2
x y 2
bash-3.2$ rm -rf output/
bash-3.2$ hadoop jar ./build/libs/example.jar data/sample.tsv output/calc
Warning: $HADOOP_HOME is deprecated.
13/01/04 21:15:36 INFO util.HadoopUtil: resolving application jar from found main method on: example.Main
13/01/04 21:15:36 INFO planner.HadoopPlanner: using application jar: /Users/ceteri/src/concur/scratch/./build/libs/example.jar
13/01/04 21:15:36 INFO property.AppProps: using app.id: 4AAC60FDA191F3DA6EE141DF17F17E93
2013-01-04 21:15:37.056 java[24165:1903] Unable to load realm info from SCDynamicStore
13/01/04 21:15:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/01/04 21:15:37 WARN snappy.LoadSnappy: Snappy native library not loaded
13/01/04 21:15:37 INFO mapred.FileInputFormat: Total input paths to process : 1
13/01/04 21:15:37 INFO util.Version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:15:37 INFO flow.Flow: [] starting
13/01/04 21:15:37 INFO flow.Flow: [] source: Hfs["TextDelimited[['a', 'b', 'c']->[ALL]]"]["data/sample.tsv"]"]
13/01/04 21:15:37 INFO flow.Flow: [] sink: Hfs["TextDelimited[[UNKNOWN]->['a', 'b', 'cnt', 'ccnt']]"]["output/calc"]"]
13/01/04 21:15:37 INFO flow.Flow: [] parallel execution is enabled: false
13/01/04 21:15:37 INFO flow.Flow: [] starting jobs: 4
13/01/04 21:15:37 INFO flow.Flow: [] allocating threads: 1
13/01/04 21:15:37 INFO flow.FlowStep: [] at least one sink does not exist
13/01/04 21:15:37 INFO flow.FlowStep: [] source modification date at: Fri Jan 04 20:14:44 PST 2013
13/01/04 21:15:37 INFO flow.FlowStep: [] starting step: (1/4)
13/01/04 21:15:37 INFO mapred.FileInputFormat: Total input paths to process : 1
13/01/04 21:15:37 INFO flow.FlowStep: [] submitted hadoop job: job_local_0001
13/01/04 21:15:37 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:15:37 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/scratch/data/sample.tsv
13/01/04 21:15:37 INFO mapred.MapTask: numReduceTasks: 1
13/01/04 21:15:37 INFO mapred.MapTask: io.sort.mb = 100
13/01/04 21:15:37 INFO mapred.MapTask: data buffer = 79691776/99614720
13/01/04 21:15:37 INFO mapred.MapTask: record buffer = 262144/327680
13/01/04 21:15:37 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:15:37 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/01/04 21:15:37 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['a', 'b', 'c']->[ALL]]"]["data/sample.tsv"]"]
13/01/04 21:15:37 INFO hadoop.FlowMapper: sinking to: GroupBy(cntPipe)[by:[{2}:'a', 'b']]
13/01/04 21:15:37 INFO assembly.AggregateBy: using threshold value: 10000
13/01/04 21:15:37 INFO mapred.MapTask: Starting flush of map output
13/01/04 21:15:37 INFO mapred.MapTask: Finished spill 0
13/01/04 21:15:37 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/01/04 21:15:40 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/scratch/data/sample.tsv:0+36
13/01/04 21:15:40 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/01/04 21:15:40 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:15:40 INFO mapred.LocalJobRunner:
13/01/04 21:15:40 INFO mapred.Merger: Merging 1 sorted segments
13/01/04 21:15:40 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 42 bytes
13/01/04 21:15:40 INFO mapred.LocalJobRunner:
13/01/04 21:15:40 INFO hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:15:40 INFO hadoop.FlowReducer: child jvm opts: -Xmx200m
13/01/04 21:15:40 INFO hadoop.FlowReducer: sourcing from: GroupBy(cntPipe)[by:[{2}:'a', 'b']]
13/01/04 21:15:40 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['a', 'b', 'cnt']]"][800651948/cntPipe/]
13/01/04 21:15:40 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
13/01/04 21:15:40 INFO mapred.LocalJobRunner:
13/01/04 21:15:40 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
13/01/04 21:15:40 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/tmp/hadoop-ceteri/800651948_cntPipe_30DB56030A81F7CC3E894B0583259553
13/01/04 21:15:43 INFO mapred.LocalJobRunner: reduce > reduce
13/01/04 21:15:43 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
13/01/04 21:15:47 INFO flow.FlowStep: [] at least one sink does not exist
13/01/04 21:15:47 INFO flow.FlowStep: [] source modification date at: Fri Jan 04 20:14:44 PST 2013
13/01/04 21:15:47 INFO flow.FlowStep: [] starting step: (2/4)
13/01/04 21:15:47 INFO mapred.FileInputFormat: Total input paths to process : 1
13/01/04 21:15:47 INFO flow.FlowStep: [] submitted hadoop job: job_local_0002
13/01/04 21:15:47 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:15:47 INFO io.MultiInputSplit: current split input path: file:/Users/ceteri/src/concur/scratch/data/sample.tsv
13/01/04 21:15:47 INFO mapred.MapTask: numReduceTasks: 1
13/01/04 21:15:47 INFO mapred.MapTask: io.sort.mb = 100
13/01/04 21:15:47 INFO mapred.MapTask: data buffer = 79691776/99614720
13/01/04 21:15:47 INFO mapred.MapTask: record buffer = 262144/327680
13/01/04 21:15:47 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:15:47 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/01/04 21:15:47 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['a', 'b', 'c']->[ALL]]"]["data/sample.tsv"]"]
13/01/04 21:15:47 INFO hadoop.FlowMapper: sinking to: GroupBy(ccntPipe)[by:[{3}:'a', 'b', 'c']]
13/01/04 21:15:47 INFO mapred.MapTask: Starting flush of map output
13/01/04 21:15:47 INFO mapred.MapTask: Finished spill 0
13/01/04 21:15:47 INFO mapred.Task: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
13/01/04 21:15:50 INFO mapred.LocalJobRunner: file:/Users/ceteri/src/concur/scratch/data/sample.tsv:0+36
13/01/04 21:15:50 INFO mapred.Task: Task 'attempt_local_0002_m_000000_0' done.
13/01/04 21:15:50 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:15:50 INFO mapred.LocalJobRunner:
13/01/04 21:15:50 INFO mapred.Merger: Merging 1 sorted segments
13/01/04 21:15:50 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 77 bytes
13/01/04 21:15:50 INFO mapred.LocalJobRunner:
13/01/04 21:15:50 INFO hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:15:50 INFO hadoop.FlowReducer: child jvm opts: -Xmx200m
13/01/04 21:15:50 INFO hadoop.FlowReducer: sourcing from: GroupBy(ccntPipe)[by:[{3}:'a', 'b', 'c']]
13/01/04 21:15:50 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['a', 'b', 'ccnt']]"][6707102613/ccntPipe/]
13/01/04 21:15:50 INFO assembly.AggregateBy: using threshold value: 10000
13/01/04 21:15:50 INFO mapred.Task: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
13/01/04 21:15:50 INFO mapred.LocalJobRunner:
13/01/04 21:15:50 INFO mapred.Task: Task attempt_local_0002_r_000000_0 is allowed to commit now
13/01/04 21:15:50 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0002_r_000000_0' to file:/tmp/hadoop-ceteri/6707102613_ccntPipe_7637049F6989C7D6211D769F1D6C7811
13/01/04 21:15:53 INFO mapred.LocalJobRunner: reduce > reduce
13/01/04 21:15:53 INFO mapred.Task: Task 'attempt_local_0002_r_000000_0' done.
13/01/04 21:15:57 INFO flow.FlowStep: [] at least one sink does not exist
13/01/04 21:15:57 INFO flow.FlowStep: [] source modification date at: Fri Jan 04 21:15:53 PST 2013
13/01/04 21:15:57 INFO flow.FlowStep: [] starting step: (4/4)
13/01/04 21:15:57 INFO mapred.FileInputFormat: Total input paths to process : 1
13/01/04 21:15:57 INFO flow.FlowStep: [] submitted hadoop job: job_local_0003
13/01/04 21:15:57 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:15:57 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/6707102613_ccntPipe_7637049F6989C7D6211D769F1D6C7811/part-00000
13/01/04 21:15:57 INFO mapred.MapTask: numReduceTasks: 1
13/01/04 21:15:57 INFO mapred.MapTask: io.sort.mb = 100
13/01/04 21:15:57 INFO mapred.MapTask: data buffer = 79691776/99614720
13/01/04 21:15:57 INFO mapred.MapTask: record buffer = 262144/327680
13/01/04 21:15:57 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:15:57 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/01/04 21:15:57 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['a', 'b', 'ccnt']]"][6707102613/ccntPipe/]
13/01/04 21:15:57 INFO hadoop.FlowMapper: sinking to: GroupBy(ccntPipe)[by:[{2}:'a', 'b']]
13/01/04 21:15:57 INFO mapred.MapTask: Starting flush of map output
13/01/04 21:15:57 INFO mapred.MapTask: Finished spill 0
13/01/04 21:15:57 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
13/01/04 21:16:00 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/6707102613_ccntPipe_7637049F6989C7D6211D769F1D6C7811/part-00000:0+118
13/01/04 21:16:00 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
13/01/04 21:16:00 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:16:00 INFO mapred.LocalJobRunner:
13/01/04 21:16:00 INFO mapred.Merger: Merging 1 sorted segments
13/01/04 21:16:00 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 42 bytes
13/01/04 21:16:00 INFO mapred.LocalJobRunner:
13/01/04 21:16:00 INFO hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:16:00 INFO hadoop.FlowReducer: child jvm opts: -Xmx200m
13/01/04 21:16:00 INFO hadoop.FlowReducer: sourcing from: GroupBy(ccntPipe)[by:[{2}:'a', 'b']]
13/01/04 21:16:00 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[['ccnt', 'a2', 'b2']]"][2077149705/ccntPipe/]
13/01/04 21:16:00 INFO mapred.Task: Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
13/01/04 21:16:00 INFO mapred.LocalJobRunner:
13/01/04 21:16:00 INFO mapred.Task: Task attempt_local_0003_r_000000_0 is allowed to commit now
13/01/04 21:16:00 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0003_r_000000_0' to file:/tmp/hadoop-ceteri/2077149705_ccntPipe_98072B3AE304B00528C66998A5C1CEEC
13/01/04 21:16:03 INFO mapred.LocalJobRunner: reduce > reduce
13/01/04 21:16:03 INFO mapred.Task: Task 'attempt_local_0003_r_000000_0' done.
13/01/04 21:16:07 INFO flow.FlowStep: [] at least one sink does not exist
13/01/04 21:16:07 INFO flow.FlowStep: [] source modification date at: Fri Jan 04 21:15:43 PST 2013
13/01/04 21:16:07 INFO flow.FlowStep: [] starting step: (3/4) output/calc
13/01/04 21:16:07 INFO mapred.FileInputFormat: Total input paths to process : 1
13/01/04 21:16:07 INFO mapred.FileInputFormat: Total input paths to process : 1
13/01/04 21:16:07 INFO flow.FlowStep: [] submitted hadoop job: job_local_0004
13/01/04 21:16:07 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:16:07 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/800651948_cntPipe_30DB56030A81F7CC3E894B0583259553/part-00000
13/01/04 21:16:07 INFO mapred.MapTask: numReduceTasks: 1
13/01/04 21:16:07 INFO mapred.MapTask: io.sort.mb = 100
13/01/04 21:16:07 INFO mapred.MapTask: data buffer = 79691776/99614720
13/01/04 21:16:07 INFO mapred.MapTask: record buffer = 262144/327680
13/01/04 21:16:07 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:16:07 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/01/04 21:16:07 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['a', 'b', 'cnt']]"][800651948/cntPipe/]
13/01/04 21:16:07 INFO hadoop.FlowMapper: sinking to: CoGroup(calcPipe)[by:cntPipe:[{2}:'a', 'b']ccntPipe:[{2}:'a2', 'b2']]
13/01/04 21:16:07 INFO mapred.MapTask: Starting flush of map output
13/01/04 21:16:07 INFO mapred.MapTask: Finished spill 0
13/01/04 21:16:07 INFO mapred.Task: Task:attempt_local_0004_m_000000_0 is done. And is in the process of commiting
13/01/04 21:16:10 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/800651948_cntPipe_30DB56030A81F7CC3E894B0583259553/part-00000:0+118
13/01/04 21:16:10 INFO mapred.Task: Task 'attempt_local_0004_m_000000_0' done.
13/01/04 21:16:10 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:16:10 INFO io.MultiInputSplit: current split input path: file:/tmp/hadoop-ceteri/2077149705_ccntPipe_98072B3AE304B00528C66998A5C1CEEC/part-00000
13/01/04 21:16:10 INFO mapred.MapTask: numReduceTasks: 1
13/01/04 21:16:10 INFO mapred.MapTask: io.sort.mb = 100
13/01/04 21:16:10 INFO mapred.MapTask: data buffer = 79691776/99614720
13/01/04 21:16:10 INFO mapred.MapTask: record buffer = 262144/327680
13/01/04 21:16:10 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:16:10 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/01/04 21:16:10 INFO hadoop.FlowMapper: sourcing from: TempHfs["SequenceFile[['ccnt', 'a2', 'b2']]"][2077149705/ccntPipe/]
13/01/04 21:16:10 INFO hadoop.FlowMapper: sinking to: CoGroup(calcPipe)[by:cntPipe:[{2}:'a', 'b']ccntPipe:[{2}:'a2', 'b2']]
13/01/04 21:16:10 INFO mapred.MapTask: Starting flush of map output
13/01/04 21:16:10 INFO mapred.MapTask: Finished spill 0
13/01/04 21:16:10 INFO mapred.Task: Task:attempt_local_0004_m_000001_0 is done. And is in the process of commiting
13/01/04 21:16:13 INFO mapred.LocalJobRunner: file:/tmp/hadoop-ceteri/2077149705_ccntPipe_98072B3AE304B00528C66998A5C1CEEC/part-00000:0+118
13/01/04 21:16:13 INFO mapred.Task: Task 'attempt_local_0004_m_000001_0' done.
13/01/04 21:16:13 INFO mapred.Task: Using ResourceCalculatorPlugin : null
13/01/04 21:16:13 INFO mapred.LocalJobRunner:
13/01/04 21:16:13 INFO mapred.Merger: Merging 2 sorted segments
13/01/04 21:16:13 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 92 bytes
13/01/04 21:16:13 INFO mapred.LocalJobRunner:
13/01/04 21:16:13 INFO hadoop.FlowReducer: cascading version: Concurrent, Inc - Cascading 2.1.0
13/01/04 21:16:13 INFO hadoop.FlowReducer: child jvm opts: -Xmx200m
13/01/04 21:16:13 INFO hadoop.FlowReducer: sourcing from: CoGroup(calcPipe)[by:cntPipe:[{2}:'a', 'b']ccntPipe:[{2}:'a2', 'b2']]
13/01/04 21:16:13 INFO hadoop.FlowReducer: sinking to: Hfs["TextDelimited[[UNKNOWN]->['a', 'b', 'cnt', 'ccnt']]"]["output/calc"]"]
13/01/04 21:16:13 INFO collect.SpillableTupleList: attempting to load codec: org.apache.hadoop.io.compress.GzipCodec
13/01/04 21:16:13 INFO collect.SpillableTupleList: found codec: org.apache.hadoop.io.compress.GzipCodec
13/01/04 21:16:13 INFO mapred.Task: Task:attempt_local_0004_r_000000_0 is done. And is in the process of commiting
13/01/04 21:16:13 INFO mapred.LocalJobRunner:
13/01/04 21:16:13 INFO mapred.Task: Task attempt_local_0004_r_000000_0 is allowed to commit now
13/01/04 21:16:13 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0004_r_000000_0' to file:/Users/ceteri/src/concur/scratch/output/calc
13/01/04 21:16:16 INFO mapred.LocalJobRunner: reduce > reduce
13/01/04 21:16:16 INFO mapred.Task: Task 'attempt_local_0004_r_000000_0' done.
13/01/04 21:16:17 INFO util.Hadoop18TapUtil: deleting temp path output/calc/_temporary
bash-3.2$ cat output/calc/part-00000
a b cnt ccnt
x y 4 2
x z 1 1
bash-3.2$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment