This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static String transformKey(String key, Row record) { | |
/** | |
* TODO: Change the existing key here, by a new key | |
* | |
* <p>Here we just reverse the key, as a demo. Ideally, you should test different strategies, | |
* test the performance obtained with each key transform strategy, and then decide how you need | |
* to change the keys. | |
*/ | |
return StringUtils.reverse(key); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// inputRow is of type Row, here we traverse it to create a copy of the row as a list of mutations/ | |
// But you can traverse it to do anything with the cells. Remember that a cell may have more than | |
// 1 version, so you may have more than 1 cell per column in that record. | |
ArrayList<Mutation> mutations = new ArrayList<>(); | |
for (Family cf : inputRow.getFamiliesList()) { | |
for (Column col : cf.getColumnsList()) { | |
for (Cell cell : col.getCellsList()) { | |
Mutation m = | |
Mutation.newBuilder() | |
.setSetCell( |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PCollection<Row> bigtableRows = | |
p.apply( | |
"Read from BigTable", | |
BigtableIO.read() | |
.withProjectId("fake-project") | |
.withInstanceId("fake-instance") | |
.withTableId("mytable); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GenericRecord record = input.getRecord(); | |
String someStringValue = (Utf8) recoord.get(fieldName1).toString(); | |
Long someLongValue = (Long) record.get(fieldName2); | |
Double someDoubleValue = (Double).record.get(fieldName3); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GenericRecord record = input.getRecord() | |
Schema fieldSchema = record.getSchema().getField(fieldName).schema(); | |
Schema.Type type = fieldSchema.getType(); | |
// LogicalType will be Decimal for NUMERIC, and null for most of other types | |
Optional<LogicalType> logicalType = Optional.ofNullable(fieldSchema.getLogicalType); | |
switch (type) { | |
case BYTES: | |
if (!logicalType.isPresent()) { | |
// Type is not NUMERIC, Bytes can also be used for the BigQuery BYTES type) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// We assume that we have the following variables set | |
// String fieldName | |
// GenericRecord rec | |
// Let's recover the schema of the field, and its type | |
Schema fieldSchema = rec.getSchema().getField(fieldName).schema(); | |
Schema.Type type = fieldSchema.getType(); | |
// We can check if the type is an UNION | |
if (type == Schema.Type.UNION) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple dag #2.""" | |
from airflow import models | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators import python_operator | |
from airflow.utils.dates import days_ago | |
with models.DAG( | |
'dag_2', | |
schedule_interval='*/1 * * * *', # Every 1 minute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple dag #1.""" | |
from airflow import models | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators import python_operator | |
from airflow.utils.dates import days_ago | |
with models.DAG( | |
'dag_1', | |
schedule_interval='*/1 * * * *', # Every 1 minute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// p is a Pipeline | |
p.apply( | |
"Read from BigQuery", | |
BigQueryIO.read( | |
new SerializableFunction<SchemaAndRecord, SomeCustomObject>() { | |
@Override | |
public String apply(SchemaAndRecord input) { | |
GenericRecord record = input.getRecord(); | |
SomeCustomObject result = new SomeCustomObject(...); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pstats | |
directory = 'profiling' | |
files = [ '%s/%s' % (directory, x) for x in os.listdir('%s/' % directory)] | |
p = pstats.Stats(*files) | |
p.sort_stats('cumulative').dump_stats('dataflow.prof') |