Skip to content

Instantly share code, notes, and snippets.

@trentonstrong
Created August 31, 2016 18:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trentonstrong/8b60933dca545eb2138b72899195019e to your computer and use it in GitHub Desktop.
Save trentonstrong/8b60933dca545eb2138b72899195019e to your computer and use it in GitHub Desktop.
MapValues spike
package com.projector.flow.transform;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
import com.projector.flow.flow.SessionFlow;
public class MapValues<K, T, U> extends PTransform<PCollection<KV<K, T>>, PCollection<KV<K, U>>> {
private final SerializableFunction<T, U> fn;
private final transient TypeDescriptor<KV<K, U>> typeDescriptor;
public static <K, T, U> MissingOutputTypeDescriptor<K, T, U> via(SerializableFunction<T, U> fn) {
return new MissingOutputTypeDescriptor<>(fn);
}
public static <K, T, U> MapValues<K, T, U> via(SimpleFunction<T, U> fn) {
return new MapValues<>(fn, new TypeDescriptor<KV<K, U>>() {});
}
private MapValues(
SerializableFunction<T, U> fn,
TypeDescriptor<KV<K, U>> typeDescriptor
) {
this.fn = fn;
this.typeDescriptor = typeDescriptor;
}
@Override
public PCollection<KV<K, U>> apply(PCollection<KV<K, T>> input) {
return MapElements.via(
(SerializableFunction<KV<K, T>, KV<K, U>>) kv ->
KV.of(kv.getKey(), fn.apply(kv.getValue())))
.withOutputType(typeDescriptor)
.apply(input);
}
public static final class MissingOutputTypeDescriptor<K, T, U> {
private final SerializableFunction<T, U> fn;
private MissingOutputTypeDescriptor(SerializableFunction<T, U> fn) {
this.fn = fn;
}
public MapValues<K, T, U> withOutputType(TypeDescriptor<KV<K, U>> outputType) {
return new MapValues<>(fn, outputType);
}
}
}
package com.projector.flow.transform;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
import org.junit.Test;
public class MapValuesTest {
@Test
public void via() throws Exception {
TestPipeline p = TestPipeline.create();
final PCollection<KV<String, Double>> actual = p.apply(Create.of(KV.of("1", 1L), KV.of("2", 2L)))
.apply(MapValues.<String, Long, Double>via(Long::doubleValue)
.withOutputType(new TypeDescriptor<KV<String, Double>>() {}));
DataflowAssert.that(actual).containsInAnyOrder(KV.of("1", 1.0d), KV.of("2", 2.0d));
p.run();
}
}
@trentonstrong
Copy link
Author

This is a really simple outline of a MapValues transform.

Obviously it needs more test coverage.

It could also be implemented in a way that doesn't depend on MapElements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment