Skip to content

Instantly share code, notes, and snippets.

@orian
Last active July 5, 2016 14:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save orian/265e5494e61de931bbfa885ac60eb539 to your computer and use it in GitHub Desktop.
Save orian/265e5494e61de931bbfa885ac60eb539 to your computer and use it in GitHub Desktop.
A simple wrapper to apply one function to map keys and another for values.
package eu.pawelsz.apache.beam;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class MakeKV<V0, K, V1> extends PTransform<PCollection<V0>,
PCollection<KV<K, V1>>> {
public static <V0T, KT, V1T> MakeKV<V0T, KT, V1T> of(
SerializableFunction<V0T, KT> keyMap, SerializableFunction<V0T, V1T> valueMap) {
return new MakeKV<>(keyMap, valueMap);
}
private final SerializableFunction<V0, K> keyMap;
private final SerializableFunction<V0, V1> valueMap;
private MakeKV(SerializableFunction<V0, K> keyMap, SerializableFunction<V0, V1> valueMap) {
this.keyMap = keyMap;
this.valueMap = valueMap;
}
@Override
public PCollection<KV<K, V1>> apply(PCollection<V0> in) {
// To understand please read a WithKeys.java of Apache Beam.
PCollection<KV<K, V1>> result =
in.apply("MapKeyValue", ParDo.of(new DoFn<V0, KV<K, V1>>() {
@Override
public void processElement(ProcessContext c) {
c.output(
KV.of(keyMap.apply(c.element()),
valueMap.apply(c.element())));
}
}));
try {
Coder<K> keyCoder;
Coder<V1> valueCoder;
CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
keyCoder = coderRegistry.getDefaultOutputCoder(keyMap, in.getCoder());
valueCoder = coderRegistry.getDefaultOutputCoder(valueMap, in.getCoder());
result.setCoder(KvCoder.of(keyCoder, valueCoder));
} catch (CannotProvideCoderException exc) {
// let lazy coder inference have a try
}
return result;
}
}
package eu.pawelsz.apache.beam;
import com.google.common.collect.Lists;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.ArrayList;
@RunWith(JUnit4.class)
public class MakeKVTest {
private static class ToStr extends SimpleFunction<Integer, String> {
@Override
public String apply(Integer i) {
return ""+i;
}
}
@Test
public void BaseTest() {
TestPipeline p = TestPipeline.create();
PCollection<Integer> input =
p.apply(Create.of(Lists.newArrayList(1,2))).setCoder(VarIntCoder.of());
ArrayList<KV<String, String>> expected = Lists.newArrayList(KV.of("1", "1"), KV.of("2", "2"));
PCollection<KV<String, String>> out = input.apply(MakeKV.of(new ToStr(), new ToStr()));
PAssert.that(out).containsInAnyOrder(expected);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment