Last active
July 5, 2016 14:37
-
-
Save orian/265e5494e61de931bbfa885ac60eb539 to your computer and use it in GitHub Desktop.
A simple wrapper to apply one function to map keys and another for values.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eu.pawelsz.apache.beam; | |
import org.apache.beam.sdk.coders.CannotProvideCoderException; | |
import org.apache.beam.sdk.coders.Coder; | |
import org.apache.beam.sdk.coders.CoderRegistry; | |
import org.apache.beam.sdk.coders.KvCoder; | |
import org.apache.beam.sdk.transforms.*; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
public class MakeKV<V0, K, V1> extends PTransform<PCollection<V0>, | |
PCollection<KV<K, V1>>> { | |
public static <V0T, KT, V1T> MakeKV<V0T, KT, V1T> of( | |
SerializableFunction<V0T, KT> keyMap, SerializableFunction<V0T, V1T> valueMap) { | |
return new MakeKV<>(keyMap, valueMap); | |
} | |
private final SerializableFunction<V0, K> keyMap; | |
private final SerializableFunction<V0, V1> valueMap; | |
private MakeKV(SerializableFunction<V0, K> keyMap, SerializableFunction<V0, V1> valueMap) { | |
this.keyMap = keyMap; | |
this.valueMap = valueMap; | |
} | |
@Override | |
public PCollection<KV<K, V1>> apply(PCollection<V0> in) { | |
// To understand please read a WithKeys.java of Apache Beam. | |
PCollection<KV<K, V1>> result = | |
in.apply("MapKeyValue", ParDo.of(new DoFn<V0, KV<K, V1>>() { | |
@Override | |
public void processElement(ProcessContext c) { | |
c.output( | |
KV.of(keyMap.apply(c.element()), | |
valueMap.apply(c.element()))); | |
} | |
})); | |
try { | |
Coder<K> keyCoder; | |
Coder<V1> valueCoder; | |
CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry(); | |
keyCoder = coderRegistry.getDefaultOutputCoder(keyMap, in.getCoder()); | |
valueCoder = coderRegistry.getDefaultOutputCoder(valueMap, in.getCoder()); | |
result.setCoder(KvCoder.of(keyCoder, valueCoder)); | |
} catch (CannotProvideCoderException exc) { | |
// let lazy coder inference have a try | |
} | |
return result; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eu.pawelsz.apache.beam; | |
import com.google.common.collect.Lists; | |
import org.apache.beam.sdk.coders.VarIntCoder; | |
import org.apache.beam.sdk.testing.PAssert; | |
import org.apache.beam.sdk.testing.TestPipeline; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.beam.sdk.transforms.SimpleFunction; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.JUnit4; | |
import java.util.ArrayList; | |
@RunWith(JUnit4.class) | |
public class MakeKVTest { | |
private static class ToStr extends SimpleFunction<Integer, String> { | |
@Override | |
public String apply(Integer i) { | |
return ""+i; | |
} | |
} | |
@Test | |
public void BaseTest() { | |
TestPipeline p = TestPipeline.create(); | |
PCollection<Integer> input = | |
p.apply(Create.of(Lists.newArrayList(1,2))).setCoder(VarIntCoder.of()); | |
ArrayList<KV<String, String>> expected = Lists.newArrayList(KV.of("1", "1"), KV.of("2", "2")); | |
PCollection<KV<String, String>> out = input.apply(MakeKV.of(new ToStr(), new ToStr())); | |
PAssert.that(out).containsInAnyOrder(expected); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment