Last active
February 22, 2018 16:16
-
-
Save yashk/8580517 to your computer and use it in GitHub Desktop.
CVS to Avro using cascading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"type":"record", | |
"name":"wordcount", | |
"namespace":"cascading.avro.examples", | |
"fields":[ | |
{"name":"Time", "type":"string"}, | |
{"name":"User_ID", "type":"long"} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Time,User_ID | |
1233,1321345345 | |
22234,53643455 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SLF4J: Class path contains multiple SLF4J bindings. | |
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-simple/1.6.1/slf4j-simple-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] | |
SLF4J: Found binding in [jar:file:/home/yash/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] | |
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. | |
165 [main] INFO cascading.flow.hadoop.util.HadoopUtil - resolving application jar from found main method on: com.aegis.adl.etl.WordCountAvroWrite | |
166 [main] INFO cascading.flow.hadoop.planner.HadoopPlanner - using application jar: null | |
183 [main] INFO cascading.property.AppProps - using app.id: 0A1CCD12F6384F0C9396BD5484CCDF65 | |
339 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
599 [flow wc] INFO cascading.util.Version - Concurrent, Inc - Cascading 2.2.0 | |
600 [flow wc] INFO cascading.flow.Flow - [wc] starting | |
600 [flow wc] INFO cascading.flow.Flow - [wc] source: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"] | |
600 [flow wc] INFO cascading.flow.Flow - [wc] sink: Hfs["AvroScheme[['Time', 'User_ID']]"]["sample/data/WC/out"] | |
600 [flow wc] INFO cascading.flow.Flow - [wc] parallel execution is enabled: false | |
600 [flow wc] INFO cascading.flow.Flow - [wc] starting jobs: 1 | |
600 [flow wc] INFO cascading.flow.Flow - [wc] allocating threads: 1 | |
601 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] starting step: (1/1) sample/data/WC/out | |
646 [pool-1-thread-1] WARN org.apache.hadoop.conf.Configuration - session.id is deprecated. Instead, use dfs.metrics.session-id | |
647 [pool-1-thread-1] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId= | |
696 [pool-1-thread-1] WARN org.apache.hadoop.mapred.JobClient - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). | |
713 [pool-1-thread-1] INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 1 | |
972 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter set in config null | |
972 [pool-1-thread-1] INFO cascading.flow.FlowStep - [wc] submitted hadoop job: job_local229312516_0001 | |
973 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter | |
978 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks | |
980 [pool-2-thread-1] INFO org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local229312516_0001_m_000000_0 | |
991 [pool-2-thread-1] WARN mapreduce.Counters - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead | |
1063 [pool-2-thread-1] INFO org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0 | |
1073 [pool-2-thread-1] INFO org.apache.hadoop.mapred.Task - Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@74c000c | |
1081 [pool-2-thread-1] INFO cascading.tap.hadoop.io.MultiInputSplit - current split input path: file:/home/yash/prof/projects/aegis/code/adl/etl/sample/data/WC/input.txt | |
1081 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - Processing split: cascading.tap.hadoop.io.MultiInputSplit@51f7f4e3 | |
1084 [pool-2-thread-1] WARN mapreduce.Counters - Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead | |
1086 [pool-2-thread-1] INFO org.apache.hadoop.mapred.MapTask - numReduceTasks: 0 | |
1127 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - cascading version: 2.2.0 | |
1127 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - child jvm opts: -Xmx200m | |
1146 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sourcing from: Hfs["TextDelimited[['Time', 'User_ID']]"]["sample/data/WC/input.txt"] | |
1146 [pool-2-thread-1] INFO cascading.flow.hadoop.FlowMapper - sinking to: Hfs["AvroScheme[['Time', 'User_ID']]"]["sample/data/WC/out"] | |
1161 [pool-2-thread-1] ERROR cascading.flow.stream.TrapHandler - caught Throwable, no trap available, rethrowing | |
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160) | |
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119) | |
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37) | |
at cascading.flow.stream.SourceStage.map(SourceStage.java:102) | |
at cascading.flow.stream.SourceStage.run(SourceStage.java:58) | |
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) | |
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158) | |
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716) | |
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526) | |
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69) | |
at cascading.avro.AvroScheme.sink(AvroScheme.java:152) | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153) | |
... 16 more | |
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) | |
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) | |
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) | |
... 23 more | |
1162 [pool-2-thread-1] ERROR cascading.flow.stream.SourceStage - caught throwable | |
cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160) | |
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119) | |
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37) | |
at cascading.flow.stream.SourceStage.map(SourceStage.java:102) | |
at cascading.flow.stream.SourceStage.run(SourceStage.java:58) | |
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) | |
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158) | |
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716) | |
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526) | |
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69) | |
at cascading.avro.AvroScheme.sink(AvroScheme.java:152) | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153) | |
... 16 more | |
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) | |
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) | |
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) | |
... 23 more | |
1163 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete. | |
1165 [Thread-11] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local229312516_0001 | |
java.lang.Exception: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406) | |
Caused by: cascading.tuple.TupleException: unable to sink into output identifier: sample/data/WC/out | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160) | |
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119) | |
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71) | |
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90) | |
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37) | |
at cascading.flow.stream.SourceStage.map(SourceStage.java:102) | |
at cascading.flow.stream.SourceStage.run(SourceStage.java:58) | |
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127) | |
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417) | |
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) | |
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) | |
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) | |
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:161) | |
at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:158) | |
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716) | |
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526) | |
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69) | |
at cascading.avro.AvroScheme.sink(AvroScheme.java:152) | |
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153) | |
... 16 more | |
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:79) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) | |
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) | |
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) | |
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) | |
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) | |
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257) | |
... 23 more | |
5974 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events identify failed tasks | |
5974 [pool-1-thread-1] WARN cascading.flow.FlowStep - [wc] task completion events count: 0 | |
5983 [flow wc] INFO cascading.flow.Flow - [wc] stopping all jobs | |
5983 [flow wc] INFO cascading.flow.FlowStep - [wc] stopping: (1/1) sample/data/WC/out | |
5983 [flow wc] INFO cascading.flow.Flow - [wc] stopped all jobs | |
5985 [flow wc] INFO cascading.tap.hadoop.util.Hadoop18TapUtil - deleting temp path sample/data/WC/out/_temporary | |
Exception in thread "main" cascading.flow.FlowException: local step failed | |
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:212) | |
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:145) | |
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:120) | |
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42) | |
at java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
at java.lang.Thread.run(Thread.java:744) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.util.Properties; | |
import org.apache.avro.Schema; | |
import cascading.avro.AvroScheme; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowDef; | |
import cascading.flow.hadoop.HadoopFlowConnector; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.hadoop.TextDelimited; | |
import cascading.tap.SinkMode; | |
import cascading.tap.Tap; | |
import cascading.tap.hadoop.Hfs; | |
import cascading.tuple.Fields; | |
public class | |
WordCountAvroWrite | |
{ | |
public static void | |
main( String[] args ) throws Exception | |
{ | |
String docPath = "sample/data/WC/input.txt"; | |
String wcPath = "sample/data/WC/out"; | |
String schemaFile = "sample/data/WC/input.avsc"; | |
// Get the schema from a file | |
Schema schema = new Schema.Parser().parse(new File(schemaFile)); | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass( properties, WordCountAvroWrite.class ); | |
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties ); | |
Fields toFields = new Fields("Time","User_ID"); | |
// create source and sink taps | |
Tap docTap = new Hfs( new TextDelimited(toFields,true,","),docPath ); | |
//Tap docTap = new Hfs( new TextLine( new Fields("text")), docPath ); | |
// Create the output tap with AvroScheme and the schema we read up. | |
Tap wcTap = new Hfs( new AvroScheme( schema), wcPath ,SinkMode.REPLACE); | |
Pipe wcPipe = new Pipe( "wc"); | |
// connect the taps, pipes, etc., into a flow | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName( "wc" ) | |
.addSource( wcPipe, docTap ) | |
.addTailSink( wcPipe, wcTap ); | |
// write a DOT file and run the flow | |
Flow wcFlow = flowConnector.connect( flowDef ); | |
wcFlow.complete(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Was this problem ever resolved, I am curious to see how you where able to surmount this problem. Thanks