Skip to content

Instantly share code, notes, and snippets.

@teabot
Last active August 29, 2015 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save teabot/c77b18d56526d6882f04 to your computer and use it in GitHub Desktop.
Save teabot/c77b18d56526d6882f04 to your computer and use it in GitHub Desktop.
package com.hotels.plunger;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.PartitionTap;
import cascading.tap.partition.DelimitedPartition;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryCollector;
public class PartitionTapTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/* FAILS */
@Test
public void writeToMultiplePartitions() throws IOException {
File output = temporaryFolder.newFolder("output");
Hfs hfs = new Hfs(new TextDelimited(new Fields("value", String.class)), output.getAbsolutePath());
PartitionTap partitionTap = new PartitionTap(hfs, new DelimitedPartition(new Fields("partition", String.class)));
TupleEntryCollector collector = partitionTap.openForWrite(new HadoopFlowProcess());
collector.add(new Tuple("record1", "partition1"));
collector.add(new Tuple("record2", "partition2"));
collector.close();
// FileNotFoundException
assertThat(FileUtils.readFileToString(new File(output, "partition1/part-00000-00000")), is("record1"));
assertThat(FileUtils.readFileToString(new File(output, "partition2/part-00000-00000")), is("record2"));
}
/* PASSES */
@Test
public void writeToSinglePartition() throws IOException {
File output = temporaryFolder.newFolder("output");
Hfs hfs = new Hfs(new TextDelimited(new Fields("value", String.class)), output.getAbsolutePath());
PartitionTap partitionTap = new PartitionTap(hfs, new DelimitedPartition(new Fields("partition", String.class)));
TupleEntryCollector collector = partitionTap.openForWrite(new HadoopFlowProcess());
collector.add(new Tuple("record1", "partition1"));
collector.add(new Tuple("record2", "partition1"));
collector.close();
assertThat(FileUtils.readFileToString(new File(output, "partition1/part-00000-00000")), is("record1\nrecord2\n"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment