Skip to content

Instantly share code, notes, and snippets.

@aviemzur
Last active March 24, 2017 12:55
Show Gist options
  • Save aviemzur/4ef08e440f989b29cb6f890ddf1f7e12 to your computer and use it in GitHub Desktop.
Save aviemzur/4ef08e440f989b29cb6f890ddf1f7e12 to your computer and use it in GitHub Desktop.
Test user reported serialization exception.
package org.apache.beam;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.junit.Test;
/**
* Test user reported serialization exception.
*/
public class SerializationExceptionTest {
@Test
public void testSerializationException() {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(Create.of(new Row("A"), new Row("B")))
.apply(Combine.globally(new CoalesceFn<Row>()).asSingletonView());
pipeline.run().waitUntilFinish();
}
/**
* Custom combineFn.
*/
public static class CoalesceFn<InputT>
extends Combine.CombineFn<InputT, List<InputT>, List<InputT>> {
@Override public List<InputT> createAccumulator() {
return new ArrayList<>();
}
@Override public List<InputT> addInput(List<InputT> accumulator, InputT item) {
accumulator.add(item);
return accumulator;
}
@Override public List<InputT> mergeAccumulators(Iterable<List<InputT>> accumulators) {
Iterator<List<InputT>> iter = accumulators.iterator();
List<InputT> merged = iter.next();
while (iter.hasNext()) {
merged.addAll(iter.next());
}
return merged;
}
@Override public List<InputT> extractOutput(List<InputT> accumulator) {
return accumulator;
}
}
/**
* Row object.
*/
public static class Row implements Serializable {
public final ImmutableList<String> fields;
public Row(String... fields) {
this.fields = ImmutableList.copyOf(fields);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment