Skip to content

Instantly share code, notes, and snippets.

@iht
Last active June 8, 2021 10:42
Show Gist options
  • Save iht/7f8a8f4984a7640fd05220096b098f64 to your computer and use it in GitHub Desktop.
Save iht/7f8a8f4984a7640fd05220096b098f64 to your computer and use it in GitHub Desktop.
Read a NUMERIC field from BigQuery using Apache Beam
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.cloud.pso;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes.Decimal;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
public class ReadNumericFromBigQuery {
public static Object getAvroField(String fieldName, GenericRecord rec) {
Schema fieldSchema = rec.getSchema().getField(fieldName).schema();
Schema.Type type = fieldSchema.getType();
Optional<LogicalType> logicalType = Optional.empty();
// NULLABLE fields are represented as UNION of NULL and the actual type of the element
if (type == Schema.Type.UNION) {
List<Schema> types = fieldSchema.getTypes();
if (types.get(0).getType() == Schema.Type.NULL) {
type = types.get(1).getType();
logicalType = Optional.ofNullable(types.get(1).getLogicalType());
} else {
type = types.get(0).getType();
logicalType = Optional.ofNullable(types.get(0).getLogicalType());
}
}
// Only some types are converted, the list of cases is not exhaustive
switch (type) {
case DOUBLE:
Double d = (Double) rec.get(fieldName);
return d
case LONG:
Long l = (Long) rec.get(fieldName);
return l;
case INT:
Integer i = (Integer) rec.get(fieldName);
return i;
case STRING:
Utf8 u = (Utf8) rec.get(fieldName);
return u.toString();
case BYTES:
if (!logicalType.isPresent()) {
// Type is not NUMERIC
ByteBuffer byteBuffer = (ByteBuffer) rec.get(fieldName);
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
return BaseEncoding.base64().encode(bytes);
}
// Type is NUMERIC
// Get scale property
Decimal decimalType = (Decimal) logicalType.get();
int scale = decimalType.getScale();
// Transform value into BigDecimal
ByteBuffer value = (ByteBuffer) rec.get(fieldName);
byte[] bytes = new byte[value.remaining()];
value.get(bytes);
BigDecimal bigDecimalValue = new BigDecimal(new BigInteger(bytes), scale);
return bigDecimalValue;
default:
throw new RuntimeException(
"Unknown fieldType: " + type.getName() + " for field " + fieldName);
}
}
public static void main(String[] args) {
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).withValidation().create();
String project = "your-gcp-project";
String dataset = "your-dataset";
String table = "your-table";
String tableSpec = project + ":" + dataset + "." + table;
Pipeline p = Pipeline.create(opts);
PCollection<String> values =
p.apply(
"Read from BigQuery",
BigQueryIO.read(
new SerializableFunction<SchemaAndRecord, String>() {
@Override
public String apply(SchemaAndRecord input) {
GenericRecord record = input.getRecord();
String key = (String) getAvroField("key", record);
BigDecimal value = (BigDecimal) getAvroField("value", record);
String result = key + "," + String.valueOf(value);
return result;
}
})
.from(tableSpec)
.withMethod(TypedRead.Method.DIRECT_READ));
values.apply("Write CSV", TextIO.write().to("gs://somewhere-in-gcs"));
p.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment