Skip to content

Instantly share code, notes, and snippets.

@blinsay
Created June 26, 2012 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blinsay/2999083 to your computer and use it in GitHub Desktop.
Save blinsay/2999083 to your computer and use it in GitHub Desktop.
HashJoin after Merge throwing an IllegalStateException
1,dog
2,cat
3,fox
4,bear
5,snake
import java.io.IOException;
import org.junit.Test;
import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.pipe.HashJoin;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Retain;
import cascading.pipe.joiner.InnerJoin;
import cascading.tap.SinkMode;
import cascading.test.HadoopPlatform;
import cascading.test.LocalPlatform;
import cascading.test.PlatformRunner;
import cascading.tuple.Fields;
@PlatformRunner.Platform({LocalPlatform.class, HadoopPlatform.class})
public class TestHashJoin extends PlatformTestCase {
private static final String COMMA = ",";
@Test
public void testHashJoin() throws IOException {
final FlowDef flowDef = new FlowDef().setName("testHashJoin");
Pipe animals = new Pipe("animals");
flowDef.addSource(animals, getPlatform().getDelimitedFile(new Fields("id", "item"), COMMA, "src/test/data/hash_join_test/animals.txt"));
Pipe vegetables = new Pipe("vegetables");
flowDef.addSource(vegetables, getPlatform().getDelimitedFile(new Fields("id", "item"), COMMA, "src/test/data/hash_join_test/vegetables.txt"));
Pipe allItems = new Merge(animals, vegetables);
Pipe names = new Pipe("names");
flowDef.addSource(names, getPlatform().getDelimitedFile(new Fields("name", "id"), COMMA, "src/test/data/hash_join_test/names.txt"));
Pipe joined = new HashJoin(new Pipe[] {allItems, names}, new Fields[] {new Fields("id"), new Fields("id")}, new Fields("id", "item", "name", "_id"), new InnerJoin());
joined = new Retain(joined, new Fields("id", "item", "name"));
flowDef.addTailSink(joined, getPlatform().getDelimitedFile(Fields.ALL, COMMA, "src/test/data/hash_join_test/output", SinkMode.REPLACE));
final Flow flow = getPlatform().getFlowConnector().connect(flowDef);
flow.complete();
validateLength(flow, 7);
}
}
alice,1
bob,2
john,3
fred,4
2012-06-26 14:08:28,099 WARN mapred.LocalJobRunner (LocalJobRunner.java:run(298)) - job_local_0001
java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
... 5 more
Caused by: cascading.flow.FlowException: internal error during mapper configuration
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:96)
... 10 more
Caused by: java.lang.IllegalStateException: accumulated source conf property missing
at cascading.flow.hadoop.stream.HadoopMapStreamGraph.buildGraph(HadoopMapStreamGraph.java:92)
at cascading.flow.hadoop.stream.HadoopMapStreamGraph.<init>(HadoopMapStreamGraph.java:60)
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:80)
... 10 more
1,potato
2,tomato
3,banana
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment