Skip to content

Instantly share code, notes, and snippets.

@mairbek
Created October 14, 2017 00:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mairbek/869f5c5d120ed668fb7132f551afe63a to your computer and use it in GitHub Desktop.
Save mairbek/869f5c5d120ed668fb7132f551afe63a to your computer and use it in GitHub Desktop.
package org.apache.beam.sdk.io.gcp.spanner;
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
public class Example {
@AutoValue
static abstract class Range implements Serializable {
public static Range of(Long start, Long end) {
return new AutoValue_Example_Range(start, end);
}
public abstract Long getStart();
public abstract Long getEnd();
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
PCollection<Range> keyRanges = p.apply(Create.of(
Range.of(0L, 1000L),
Range.of(1000L, 2000L),
Range.of(2000L, 3000L)
));
PCollection<ReadOperation> reads = keyRanges.apply(ParDo.of(new DoFn<Range, ReadOperation>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Range range = c.element();
Statement st = Statement
.newBuilder("SELECT * from usertable where id >= @start and id < @end")
.bind("start").to(range.getStart())
.bind("end").to(range.getEnd())
.build();
c.output(ReadOperation.create().withQuery(st));
}
}));
PCollection<Struct> rows = reads.apply(SpannerIO.readAll());
// Use rows.
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment