Skip to content

Instantly share code, notes, and snippets.

@yashk
Last active February 22, 2018 16:16
Show Gist options
  • Save yashk/8580517 to your computer and use it in GitHub Desktop.
Save yashk/8580517 to your computer and use it in GitHub Desktop.
CVS to Avro using cascading
{
"type":"record",
"name":"wordcount",
"namespace":"cascading.avro.examples",
"fields":[
{"name":"Time", "type":"string"},
{"name":"User_ID", "type":"long"}
]
}
Time,User_ID
1233,1321345345
22234,53643455
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-simple/1.6.1/slf4j-simple-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
165 [main] INFO cascading.flow.hadoop.util.HadoopUtil - resolving application jar from found main method on: com.aegis.adl.etl.WordCountAvroWrite
166 [main] INFO cascading.flow.hadoop.planner.HadoopPlanner - using application jar: null
183 [main] INFO cascading.property.AppProps - using app.id: 0A1CCD12F6384F0C9396BD5484CCDF65
339 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
599 [flow wc] INFO cascading.util.Version - Concurrent, Inc - Cascading 2.2.0
600 [flow wc] INFO cascading.flow.Flow - [wc] starting
600 [flow wc] INFO cascading.flow.Flow - [wc] source: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"]
600 [flow wc] INFO cascading.flow.Flow - [wc] sink: Hfs["AvroScheme[['Time', 'User_ID']]"]["sample/data/WC/out"]
600 [flow wc] INFO cascading.flow.Flow - [wc] parallel execution is enabled: false
600 [flow wc] INFO cascading.flow.Flow - [wc] starting jobs: 1
600 [flow wc] INFO cascading.flow.Flow - [wc] allocating threads: 1
601 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] starting step: (1/1) sample/data/WC/out
646 [pool-1-thread-1] WARN org.apache.hadoop.conf.Configuration - session.id is deprecated. Instead, use dfs.metrics.session-id
647 [pool-1-thread-1] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId=
696 [pool-1-thread-1] WARN org.apache.hadoop.mapred.JobClient - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
713 [pool-1-thread-1] INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 1
972 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter set in config null
972 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] submitted hadoop job: job_local229312516_0001
973 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
978 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks
980 [pool-2-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local229312516_0001_m_000000_0
991 [pool-2-thread-1] WARN mapreduce.Counters - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
1063 [pool-2-thread-1] INFO org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0
1073 [pool-2-thread-1] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@74c000c
1081 [pool-2-thread-1] INFO cascading.tap.hadoop.io.MultiInputSplit - current split input path: file:/home/yash/prof/projects/aegis/code/adl/etl/sample/data/WC/input.txt
1081 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - Processing split: cascading.tap.hadoop.io.MultiInputSplit@51f7f4e3
1084 [pool-2-thread-1] WARN mapreduce.Counters - Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead
1086 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - numReduceTasks: 0
1127 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - cascading version: 2.2.0
1127 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - child jvm opts: -Xmx200m
1146 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sourcing from: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"]
1146 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sinking to: Hfs["AvroScheme[['Time', 'User_ID']]"]["sample/data/WC/out"]
1161 [pool-2-thread-1] ERROR cascading.flow.stream.TrapHandler - caught Throwable, no trap available, rethrowing
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
at cascading.avro.AvroScheme.sink(AvroScheme.java:152)
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
... 16 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
... 23 more
1162 [pool-2-thread-1] ERROR cascading.flow.stream.SourceStage - caught throwable
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
at cascading.avro.AvroScheme.sink(AvroScheme.java:152)
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
... 16 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
... 23 more
1163 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete.
1165 [Thread-11] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local229312516_0001
java.lang.Exception: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161)
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
at cascading.avro.AvroScheme.sink(AvroScheme.java:152)
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
... 16 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
... 23 more
5974 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events identify failed tasks
5974 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events count: 0
5983 [flow wc] INFO cascading.flow.Flow - [wc] stopping all jobs
5983 [flow wc] INFO cascading.flow.FlowStep - [wc] stopping: (1/1) sample/data/WC/out
5983 [flow wc] INFO cascading.flow.Flow - [wc] stopped all jobs
5985 [flow wc] INFO cascading.tap.hadoop.util.Hadoop18TapUtil - deleting temp path sample/data/WC/out/_temporary
Exception in thread "main" cascading.flow.FlowException: local step failed
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:212)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:145)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:120)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
import java.io.File;
import java.util.Properties;
import org.apache.avro.Schema;
import cascading.avro.AvroScheme;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
public class
WordCountAvroWrite
{
public static void
main( String[] args ) throws Exception
{
String docPath = "sample/data/WC/input.txt";
String wcPath = "sample/data/WC/out";
String schemaFile = "sample/data/WC/input.avsc";
// Get the schema from a file
Schema schema = new Schema.Parser().parse(new File(schemaFile));
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, WordCountAvroWrite.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
Fields toFields = new Fields("Time","User_ID");
// create source and sink taps
Tap docTap = new Hfs( new TextDelimited(toFields,true,","),docPath );
//Tap docTap = new Hfs( new TextLine( new Fields("text")), docPath );
// Create the output tap with AvroScheme and the schema we read up.
Tap wcTap = new Hfs( new AvroScheme( schema), wcPath ,SinkMode.REPLACE);
Pipe wcPipe = new Pipe( "wc");
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( wcPipe, docTap )
.addTailSink( wcPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.complete();
}
}
@edikep2000
Copy link

Was this problem ever resolved, I am curious to see how you where able to surmount this problem. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment