This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def is_my_key(key, modules): | |
""" Returns True if the key belongs to one of the modules. | |
Args: | |
key: key of the stats dictionary. | |
modules: list of strings with the modules to be matched. | |
""" | |
fn = key[0] | |
for m in modules: | |
module_name = '%s' % m |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pstats | |
p = pstats.Stats('dataflow.prof') | |
p.sort_stats('cumulative').print_stats() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pstats | |
directory = 'profiling' | |
files = [ '%s/%s' % (directory, x) for x in os.listdir('%s/' % directory)] | |
p = pstats.Stats(*files) | |
p.sort_stats('cumulative').dump_stats('dataflow.prof') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Trigger Dags #1 and #2 and do something if they succeed.""" | |
from airflow import DAG | |
from airflow.operators.sensors import ExternalTaskSensor | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.utils.dates import days_ago | |
with DAG( | |
'master_dag', | |
schedule_interval='*/1 * * * *', # Every 1 minute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple dag #2.""" | |
from airflow import models | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators import python_operator | |
from airflow.utils.dates import days_ago | |
with models.DAG( | |
'dag_2', | |
schedule_interval='*/1 * * * *', # Every 1 minute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple dag #1.""" | |
from airflow import models | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators import python_operator | |
from airflow.utils.dates import days_ago | |
with models.DAG( | |
'dag_1', | |
schedule_interval='*/1 * * * *', # Every 1 minute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// We assume that we have the following variables set | |
// String fieldName | |
// GenericRecord rec | |
// Let's recover the schema of the field, and its type | |
Schema fieldSchema = rec.getSchema().getField(fieldName).schema(); | |
Schema.Type type = fieldSchema.getType(); | |
// We can check if the type is an UNION | |
if (type == Schema.Type.UNION) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GenericRecord record = input.getRecord() | |
Schema fieldSchema = record.getSchema().getField(fieldName).schema(); | |
Schema.Type type = fieldSchema.getType(); | |
// LogicalType will be Decimal for NUMERIC, and null for most of other types | |
Optional<LogicalType> logicalType = Optional.ofNullable(fieldSchema.getLogicalType); | |
switch (type) { | |
case BYTES: | |
if (!logicalType.isPresent()) { | |
// Type is not NUMERIC, Bytes can also be used for the BigQuery BYTES type) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GenericRecord record = input.getRecord(); | |
String someStringValue = (Utf8) recoord.get(fieldName1).toString(); | |
Long someLongValue = (Long) record.get(fieldName2); | |
Double someDoubleValue = (Double).record.get(fieldName3); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// p is a Pipeline | |
p.apply( | |
"Read from BigQuery", | |
BigQueryIO.read( | |
new SerializableFunction<SchemaAndRecord, SomeCustomObject>() { | |
@Override | |
public String apply(SchemaAndRecord input) { | |
GenericRecord record = input.getRecord(); | |
SomeCustomObject result = new SomeCustomObject(...); |