Created
July 10, 2019 21:53
-
-
Save muhang/2527088c0533f1ead3000e078d665f77 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.bouncex.ingestion.ProductFeedOptions; | |
import com.google.cloud.bigtable.beam.CloudBigtableIO; | |
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration; | |
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration; | |
import com.google.cloud.bigtable.data.v2.models.Row; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.io.BoundedSource; | |
import org.apache.beam.sdk.io.FileSystems; | |
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptor; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; | |
class Scratch { | |
private class Foo { | |
private String data; | |
public String getKey() { | |
return data + "#" + data; | |
} | |
} | |
private static class ToKey extends DoFn<Foo, String> { | |
@ProcessElement | |
public void processElement(ProcessContext ctx) { | |
ctx.output(ctx.element().getKey()); | |
} | |
} | |
private static class ToScan extends DoFn<String, Scan> { | |
@ProcessElement | |
public void processElement(ProcessContext ctx) { | |
ctx.output(new Scan(ctx.element().getBytes(), ctx.element().getBytes())); | |
} | |
} | |
private static class Read extends PTransform<PCollection<Scan>, PCollection<Row>> { | |
private CloudBigtableTableConfiguration config; | |
private PipelineOptions options; | |
public Read(CloudBigtableTableConfiguration config, PipelineOptions options) { | |
this.config = config; | |
this.options = options; | |
} | |
private static class ReadSource extends DoFn<BoundedSource<Result>, PCollection<Result>> { | |
@ProcessElement | |
public void processElement(ProcessContext ctx) { | |
BoundedSource.BoundedReader<Result> reader = ctx.element().createReader(options); | |
try { | |
for (boolean available = reader.start(); available; available = reader.advance()) { | |
Result item = reader.getCurrent(); | |
} | |
} finally { | |
reader.close(); | |
} | |
} | |
} | |
@Override | |
public PCollection<Row> expand(PCollection<Scan> input) { | |
input.apply("GetSource", ParDo.of(new DoFn<Scan, BoundedSource<Result>>() { | |
@ProcessElement | |
public void processElement(ProcessContext ctx) { | |
ctx.output(CloudBigtableIO.read(CloudBigtableScanConfiguration.fromConfig(config, ctx.element()))); | |
} | |
})); | |
} | |
} | |
public static void main(String[] args) { | |
PipelineOptionsFactory.register(ProductFeedOptions.class); | |
final PipelineOptions options = PipelineOptionsFactory | |
.fromArgs(args) | |
.withValidation() | |
.as(PipelineOptions.class); | |
FileSystems.setDefaultPipelineOptions(options); | |
final Pipeline pipeline = Pipeline.create(options); | |
CloudBigtableTableConfiguration config = new CloudBigtableTableConfiguration.Builder() | |
.withProjectId("") | |
.withInstanceId("") | |
.withTableId("") | |
.build(); | |
pipeline | |
.apply("emptyfoo", Create.empty(new TypeDescriptor<Foo>() {})) | |
.apply("toKeys", ParDo.of(new ToKey())) | |
.apply("toScan", ParDo.of(new ToScan())) | |
.apply("Read", new Read(config, options)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment