Skip to content

Instantly share code, notes, and snippets.

@showsky
Created August 8, 2022 11:04
Show Gist options
  • Save showsky/e7f35c7df6b738280cf6d3ddca6b984b to your computer and use it in GitHub Desktop.
Save showsky/e7f35c7df6b738280cf6d3ddca6b984b to your computer and use it in GitHub Desktop.
Apache Beam
package tw.com.feebee.staging;
import com.google.api.services.bigquery.model.TableRow;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.util.ArrayList;
import java.util.List;
/**
* Created by showsky on 2022/8/8
*/
public class DemoWriteTableRow {
public static List<TableRow> buildData() {
ArrayList<TableRow> data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
TableRow tableRow = new TableRow();
tableRow.set("name", "showsky" + i);
tableRow.set("position", String.valueOf(i));
data.add(tableRow);
}
return data;
}
public static void main(String[] args) {
// 0. Init Pipeline
DirectOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(DirectOptions.class);
options.setTargetParallelism(5);
options.setTempLocation("./demo/temp");
Pipeline p = Pipeline.create(options);
// 1. Read data
PCollection<TableRow> source = p.apply("Read data", Create.of(buildData()));
// 2. Add Group ID
PCollection<TableRow> collection1 = source.apply("App Group id", ParDo.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(@Element TableRow tableRow, OutputReceiver<TableRow> out) {
int groupId = (int) (Math.random() * 2 + 1);
TableRow newTableRow = tableRow.clone();
newTableRow.set("group_id", groupId);
out.output(newTableRow);
}
}));
// 3. Group By ID
PCollection<String> collection4 = collection1.apply("Group By Id", new PTransform<PCollection<TableRow>, PCollection<String>>() {
@Override
public PCollection<String> expand(PCollection<TableRow> input) {
return input.apply("1. Convert KV", ParDo.of(new DoFn<TableRow, KV<Integer, TableRow>>() {
@ProcessElement
public void processElement(@Element TableRow tableRow, OutputReceiver<KV<Integer, TableRow>> out) {
out.output(KV.of(Integer.valueOf(tableRow.get("group_id").toString()), tableRow));
}
})).apply("2. Group By Key", GroupByKey.create()).apply("3. Convert Group Data to String", ParDo.of(new DoFn<KV<Integer, Iterable<TableRow>>, String>() {
@ProcessElement
public void processElement(@Element KV<Integer, Iterable<TableRow>> in, OutputReceiver<String> out) {
Gson gson = new Gson();
JsonArray jsonArray = new JsonArray();
for (TableRow tableRow : in.getValue()) {
TableRow newTableRow = tableRow.clone();
newTableRow.remove("group_id");
String jsonString = gson.toJson(newTableRow);
jsonArray.add(JsonParser.parseString(jsonString));
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("group_id", in.getKey());
jsonObject.addProperty("size", jsonArray.size());
jsonObject.add("data", jsonArray);
out.output(gson.toJson(jsonObject));
}
}));
}
});
// 3.1 Save to File
collection4.apply("Write Group Data",
TextIO.write().to("./demo/result")
.withoutSharding()
.withSuffix(".json")
);
// 4. TableRow convert json string
TableRowJsonCoder coder = TableRowJsonCoder.of();
PCollection<String> collection2 = collection1.apply("Convert TableRow to Json String", ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(@Element TableRow tableRow, OutputReceiver<String> out) {
Gson gson = new Gson();
out.output(gson.toJson(tableRow));
}
}));
// 4. Save to File
String filenamePath = "./demo/output";
collection2.apply("Write File",
TextIO.write()
.to(filenamePath)
.withSuffix(".json")
.withCompression(Compression.GZIP)
.withNumShards(2)
);
p.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment