Skip to content

Instantly share code, notes, and snippets.

@muhang
Created July 10, 2019 21:53
Show Gist options
  • Save muhang/2527088c0533f1ead3000e078d665f77 to your computer and use it in GitHub Desktop.
Save muhang/2527088c0533f1ead3000e078d665f77 to your computer and use it in GitHub Desktop.
import com.bouncex.ingestion.ProductFeedOptions;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.data.v2.models.Row;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
class Scratch {
private class Foo {
private String data;
public String getKey() {
return data + "#" + data;
}
}
private static class ToKey extends DoFn<Foo, String> {
@ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(ctx.element().getKey());
}
}
private static class ToScan extends DoFn<String, Scan> {
@ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(new Scan(ctx.element().getBytes(), ctx.element().getBytes()));
}
}
private static class Read extends PTransform<PCollection<Scan>, PCollection<Row>> {
private CloudBigtableTableConfiguration config;
private PipelineOptions options;
public Read(CloudBigtableTableConfiguration config, PipelineOptions options) {
this.config = config;
this.options = options;
}
private static class ReadSource extends DoFn<BoundedSource<Result>, PCollection<Result>> {
@ProcessElement
public void processElement(ProcessContext ctx) {
BoundedSource.BoundedReader<Result> reader = ctx.element().createReader(options);
try {
for (boolean available = reader.start(); available; available = reader.advance()) {
Result item = reader.getCurrent();
}
} finally {
reader.close();
}
}
}
@Override
public PCollection<Row> expand(PCollection<Scan> input) {
input.apply("GetSource", ParDo.of(new DoFn<Scan, BoundedSource<Result>>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(CloudBigtableIO.read(CloudBigtableScanConfiguration.fromConfig(config, ctx.element())));
}
}));
}
}
public static void main(String[] args) {
PipelineOptionsFactory.register(ProductFeedOptions.class);
final PipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(PipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
final Pipeline pipeline = Pipeline.create(options);
CloudBigtableTableConfiguration config = new CloudBigtableTableConfiguration.Builder()
.withProjectId("")
.withInstanceId("")
.withTableId("")
.build();
pipeline
.apply("emptyfoo", Create.empty(new TypeDescriptor<Foo>() {}))
.apply("toKeys", ParDo.of(new ToKey()))
.apply("toScan", ParDo.of(new ToScan()))
.apply("Read", new Read(config, options));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment