Skip to content

Instantly share code, notes, and snippets.

View iht's full-sized avatar
💭
Migrating the world to the cloud

Israel Herraiz iht

💭
Migrating the world to the cloud
View GitHub Profile
public static String transformKey(String key, Row record) {
/**
* TODO: Change the existing key here, by a new key
*
* <p>Here we just reverse the key, as a demo. Ideally, you should test different strategies,
* test the performance obtained with each key transform strategy, and then decide how you need
* to change the keys.
*/
return StringUtils.reverse(key);
}
// inputRow is of type Row, here we traverse it to create a copy of the row as a list of mutations/
// But you can traverse it to do anything with the cells. Remember that a cell may have more than
// 1 version, so you may have more than 1 cell per column in that record.
ArrayList<Mutation> mutations = new ArrayList<>();
for (Family cf : inputRow.getFamiliesList()) {
for (Column col : cf.getColumnsList()) {
for (Cell cell : col.getCellsList()) {
Mutation m =
Mutation.newBuilder()
.setSetCell(
@iht
iht / connectToBigtable.java
Created May 26, 2020 16:57
Trying to connect to the Bigtable emulator using a fake instance name
PCollection<Row> bigtableRows =
p.apply(
"Read from BigTable",
BigtableIO.read()
.withProjectId("fake-project")
.withInstanceId("fake-instance")
.withTableId("mytable);
@iht
iht / ReadSomeFields.java
Created June 15, 2020 14:48
Reading some straightforward fields from an Avro GenericRecord
GenericRecord record = input.getRecord();
String someStringValue = (Utf8) recoord.get(fieldName1).toString();
Long someLongValue = (Long) record.get(fieldName2);
Double someDoubleValue = (Double).record.get(fieldName3);
@iht
iht / ReadNumeric.java
Created June 15, 2020 14:59
Reading a NUMERIC from an Avro GENERIC RECORD
GenericRecord record = input.getRecord()
Schema fieldSchema = record.getSchema().getField(fieldName).schema();
Schema.Type type = fieldSchema.getType();
// LogicalType will be Decimal for NUMERIC, and null for most of other types
Optional<LogicalType> logicalType = Optional.ofNullable(fieldSchema.getLogicalType);
switch (type) {
case BYTES:
if (!logicalType.isPresent()) {
// Type is not NUMERIC, Bytes can also be used for the BigQuery BYTES type)
@iht
iht / ReadNullable.java
Last active June 21, 2020 18:58
Read a NULLABLE field from a GenericRecord from BigQueryIO
// We assume that we have the following variables set
// String fieldName
// GenericRecord rec
// Let's recover the schema of the field, and its type
Schema fieldSchema = rec.getSchema().getField(fieldName).schema();
Schema.Type type = fieldSchema.getType();
// We can check if the type is an UNION
if (type == Schema.Type.UNION) {
"""Simple dag #2."""
from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator
from airflow.utils.dates import days_ago
with models.DAG(
'dag_2',
schedule_interval='*/1 * * * *', # Every 1 minute
@iht
iht / parent_dag_1.py
Last active June 22, 2020 19:11
A simple DAG that will be "sensed" using a ExternalTaskSensor in Airflow
"""Simple dag #1."""
from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator
from airflow.utils.dates import days_ago
with models.DAG(
'dag_1',
schedule_interval='*/1 * * * *', # Every 1 minute
@iht
iht / ReadBigQueryAsAvros.java
Last active June 30, 2020 21:04
Generic SerializableFunction read with BigQueryIO
// p is a Pipeline
p.apply(
"Read from BigQuery",
BigQueryIO.read(
new SerializableFunction<SchemaAndRecord, SomeCustomObject>() {
@Override
public String apply(SchemaAndRecord input) {
GenericRecord record = input.getRecord();
SomeCustomObject result = new SomeCustomObject(...);
@iht
iht / pstats_agg.py
Last active November 21, 2020 22:22
Aggregate the output of Beam profiling and produce sorted file
import os
import pstats
directory = 'profiling'
files = [ '%s/%s' % (directory, x) for x in os.listdir('%s/' % directory)]
p = pstats.Stats(*files)
p.sort_stats('cumulative').dump_stats('dataflow.prof')