Skip to content

Instantly share code, notes, and snippets.

@bvolpato
Created August 7, 2023 03:07
Show Gist options
  • Save bvolpato/90cda04a87d6fdfb1a451bd6fe1811cc to your computer and use it in GitHub Desktop.
Save bvolpato/90cda04a87d6fdfb1a451bd6fe1811cc to your computer and use it in GitHub Desktop.
TextToBigQueryStreamingCustom
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch.Growth;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
public class TextToBigQueryStreamingCustom {
/** Default interval for polling files in GCS. */
private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);
/**
* The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed
* by the executor at the command-line.
*/
public interface TextToBigQueryStreamingOptions extends PipelineOptions {
String getInputFilePattern();
void setInputFilePattern(String value);
}
/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link
* TextToBigQueryStreamingCustom#run(TextToBigQueryStreamingOptions)} method to start the pipeline
* and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
TextToBigQueryStreamingOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TextToBigQueryStreamingOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(TextToBigQueryStreamingOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<MatchResult.Metadata> files =
pipeline.apply(
"ReadFromSource",
FileIO.match()
.filepattern(options.getInputFilePattern())
.continuously(DEFAULT_POLL_INTERVAL, Growth.never()));
PCollection<FileIO.ReadableFile> readMatches = files.apply("ReadMatches", FileIO.readMatches());
PCollection<KV<String, String>> lines =
readMatches.apply(
"ReadLines",
ParDo.of(
new DoFn<FileIO.ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(
@Element FileIO.ReadableFile file,
OutputReceiver<KV<String, String>> outputReceiver)
throws IOException {
try (InputStream inputStream = Channels.newInputStream(file.open());
BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream))) {
while (reader.ready()) {
String line = reader.readLine();
// TODO: Can add logic to whether output the line or not...
if (1 == 1) {
// TODO: convert line to JSON
outputReceiver.output(
KV.of(file.getMetadata().resourceId().getFilename(), line));
}
}
}
}
}));
PCollection<KV<String, TableRow>> convertedTableRows =
lines.apply(
"MapToTableRow",
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(TableRow.class)))
.via(json -> convertJsonToTableRow(json)));
WriteResult writeResult =
convertedTableRows.apply(
"InsertIntoBigQuery",
BigQueryIO.<KV<String, TableRow>>write()
.to(
new DynamicDestinations<KV<String, TableRow>, String>() {
@Override
public String getDestination(
ValueInSingleWindow<KV<String, TableRow>> element) {
// Destination is keyed by the file name
return element.getValue().getKey();
}
@Override
public TableDestination getTable(String fileName) {
// TODO: define project based on the file name
String project = "<project>";
String dataset = "<dataset>";
String table = fileName.split("\\.")[0];
return new TableDestination(
new TableReference()
.setProjectId(project)
.setDatasetId(dataset)
.setTableId(table),
"Table for file " + fileName);
}
@Override
public TableSchema getSchema(String fileName) {
// TODO: Function to return the TableSchema based on the fileName
return new TableSchema()
.setFields(
List.of(
new TableFieldSchema().setName("id").setType("STRING"),
new TableFieldSchema().setName("name").setType("STRING")));
}
})
.withFormatFunction(kv -> kv.getValue())
.withExtendedErrorInfo()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
return pipeline.run();
}
public static KV<String, TableRow> convertJsonToTableRow(KV<String, String> json) {
TableRow row;
// Parse the JSON into a {@link TableRow} object.
try (InputStream inputStream =
new ByteArrayInputStream(json.getValue().getBytes(StandardCharsets.UTF_8))) {
row = TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize json to table row: " + json, e);
}
return KV.of(json.getKey(), row);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment