Create a gist now

Instantly share code, notes, and snippets.

cascading.avro wordcount example
package cascading.avro.examples;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitGenerator;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.avro.AvroScheme;
import org.apache.avro.Schema;
public class
AvroReadExample
{
public static void
main( String[] args ) throws Exception
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, AvroReadExample.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps
// Source is Avro, note there is no schema needed.
Tap docTap = new Hfs( new AvroScheme(), docPath );
Tap wcTap = new Hfs( new TextDelimited(), wcPath, true );
Pipe wcPipe = new Pipe( "wordcount" );
wcPipe = new GroupBy( wcPipe, new Fields("count") );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(new Fields("countcount")), Fields.ALL );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( wcPipe, docTap )
.addTailSink( wcPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wcr.dot" );
wcFlow.complete();
}
}
package cascading.avro.examples;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexSplitGenerator;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextDelimited;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.avro.AvroScheme;
import org.apache.avro.Schema;
public class
WordCountAvroWrite
{
public static void
main( String[] args ) throws Exception
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
// Get the schema from a file
Schema schema = new Schema.Parser().parse(WordCountAvroWrite.class.getResourceAsStream("/wc.avsc"));
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, WordCountAvroWrite.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps
Tap docTap = new Hfs( new TextLine( new Fields("text")), docPath );
// Create the output tap with AvroScheme and the schema we read up.
Tap wcTap = new Hfs( new AvroScheme( schema ), wcPath );
//Everything from here on is the same. No changes needed.
// specify a regex operation to split the "document" text lines into a token stream
Fields token = new Fields( "token" );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
// only returns "token"
Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );
// determine the word counts
Pipe wcPipe = new Pipe( "wc", docPipe );
wcPipe = new GroupBy( wcPipe, token );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( docPipe, docTap )
.addTailSink( wcPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
}
}
{
"type":"record",
"name":"wordcount",
"namespace":"cascading.avro.examples",
"fields":[
{"name":"token", "type":"string"},
{"name":"count", "type":"long"}
]
}
@ccsevers
Owner

Count words from your favorite source and then write to Avro with the WordCountAvroWrite job. Read up your new avros using the AvroReadExample job.

@JuhanLeemet
JuhanLeemet commented Jul 8, 2016 edited

Having trouble trying to compile and run this example. I'm using hadoop=2.7.2, cascading=3.0.2, trying cascading.avro:avro-scheme:2.5.0. Gradle build fails at compile steps: says (when creating new Hfs object) AvroScheme cannot be converted to Scheme<Configuration,RecordReader,OutputCollector,?,?>, presumably because (in some source, did I find the right one?) the definition of AvroScheme starts with JobConfig, instead of Configuration? In other source (for cascading-avro?) their AvroScheme is an extension of a simpler Scheme (without any qualifications?). I'm confused.

I did have to change

import cascading.flow.hadoop.HadoopFlowConnector;

to

import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;

Did that cause the problem? I'm stuck, dunno what to do. I've been trying to walk versions back/forth in build.gradle, to no avail. Some help or hints would be appreciated.

p.s. I have created some avro containers using Hive, so I can read/write avro using other means.

p.p.s. Hmm, gradle builds the .jar when I set cascading=2.5.6, but doesn't seem to run? When I look at Cascading compatibility table, it shows that cascading 3.0 is compatible with Apache Hadoop2, specifically cascading 3.0.4 works with hadoop2 2.7 (so I should be close?). It appears to me that cascading.avro is the problem? I cannot build that for anything greater than cascading=2.5.6. Is there a compatibility table for that? It is supposed to be compatible and "supported" (not abandoned)? Any more recent example programs for cascading avro access with more recent versions of cascading and hadoop? I would be willing to put in some "grunt work" on this, but I'm not sure how to proceed. Any hints welcome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment