Skip to content

Instantly share code, notes, and snippets.

@orian
Created May 31, 2016 15:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save orian/5fae3af167e71bf6cdb7d2ecacf2f5a1 to your computer and use it in GitHub Desktop.
Save orian/5fae3af167e71bf6cdb7d2ecacf2f5a1 to your computer and use it in GitHub Desktop.
Apache Beam - map only keys / values of PCollection<K<?,?>>
package pl.pawelsz.apache.beam;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
public class Only {
public static class Values<TK, TI, TO> extends SimpleFunction<KV<TK, TI>, KV<TK, TO>> {
private final SimpleFunction<TI, TO> map;
public static <Tk, Ti, To> SimpleFunction<KV<Tk, Ti>, KV<Tk, To>> with(
SimpleFunction<Ti, To> map) {
return new Values<Tk, Ti, To>(map);
}
public Values(SimpleFunction<TI, TO> map) {
this.map = map;
}
@Override
public KV<TK, TO> apply(KV<TK, TI> kv) {
return KV.of(kv.getKey(), map.apply(kv.getValue()));
}
}
public static class Keys<TKI, TKO, TV> extends SimpleFunction<KV<TKI, TV>, KV<TKO, TV>> {
private final SimpleFunction<TKI, TKO> map;
public static <Tki, Tko, Tv> SimpleFunction<KV<Tki, Tv>, KV<Tko, Tv>> with(
SimpleFunction<Tki, Tko> map) {
return new Keys<Tki, Tko, Tv>(map);
}
public Keys(SimpleFunction<TKI, TKO> map) {
this.map = map;
}
@Override
public KV<TKO, TV> apply(KV<TKI, TV> kv) {
return KV.of(map.apply(kv.getKey()), kv.getValue());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment