Skip to content

Instantly share code, notes, and snippets.

View iht's full-sized avatar
💭
Migrating the world to the cloud

Israel Herraiz iht

💭
Migrating the world to the cloud
View GitHub Profile
@iht
iht / filter_and_sort_by_avg.py
Created November 21, 2020 23:32
Filter the stats for our modules only and sort by average time per function
def is_my_key(key, modules):
""" Returns True if the key belongs to one of the modules.
Args:
key: key of the stats dictionary.
modules: list of strings with the modules to be matched.
"""
fn = key[0]
for m in modules:
module_name = '%s' % m
@iht
iht / print_pstats.py
Last active November 21, 2020 22:22
Print the output of pstats_agg.py
import pstats
p = pstats.Stats('dataflow.prof')
p.sort_stats('cumulative').print_stats()
@iht
iht / pstats_agg.py
Last active November 21, 2020 22:22
Aggregate the output of Beam profiling and produce sorted file
import os
import pstats
directory = 'profiling'
files = [ '%s/%s' % (directory, x) for x in os.listdir('%s/' % directory)]
p = pstats.Stats(*files)
p.sort_stats('cumulative').dump_stats('dataflow.prof')
@iht
iht / child_master_dag.py
Created June 22, 2020 19:12
The child master DAG, that executes tasks only when both parent DAGs are completed successfully
"""Trigger Dags #1 and #2 and do something if they succeed."""
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
with DAG(
'master_dag',
schedule_interval='*/1 * * * *', # Every 1 minute
"""Simple dag #2."""
from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator
from airflow.utils.dates import days_ago
with models.DAG(
'dag_2',
schedule_interval='*/1 * * * *', # Every 1 minute
@iht
iht / parent_dag_1.py
Last active June 22, 2020 19:11
A simple DAG that will be "sensed" using a ExternalTaskSensor in Airflow
"""Simple dag #1."""
from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import python_operator
from airflow.utils.dates import days_ago
with models.DAG(
'dag_1',
schedule_interval='*/1 * * * *', # Every 1 minute
@iht
iht / ReadNullable.java
Last active June 21, 2020 18:58
Read a NULLABLE field from a GenericRecord from BigQueryIO
// We assume that we have the following variables set
// String fieldName
// GenericRecord rec
// Let's recover the schema of the field, and its type
Schema fieldSchema = rec.getSchema().getField(fieldName).schema();
Schema.Type type = fieldSchema.getType();
// We can check if the type is an UNION
if (type == Schema.Type.UNION) {
@iht
iht / ReadNumeric.java
Created June 15, 2020 14:59
Reading a NUMERIC from an Avro GENERIC RECORD
GenericRecord record = input.getRecord()
Schema fieldSchema = record.getSchema().getField(fieldName).schema();
Schema.Type type = fieldSchema.getType();
// LogicalType will be Decimal for NUMERIC, and null for most of other types
Optional<LogicalType> logicalType = Optional.ofNullable(fieldSchema.getLogicalType);
switch (type) {
case BYTES:
if (!logicalType.isPresent()) {
// Type is not NUMERIC, Bytes can also be used for the BigQuery BYTES type)
@iht
iht / ReadSomeFields.java
Created June 15, 2020 14:48
Reading some straightforward fields from an Avro GenericRecord
GenericRecord record = input.getRecord();
String someStringValue = (Utf8) recoord.get(fieldName1).toString();
Long someLongValue = (Long) record.get(fieldName2);
Double someDoubleValue = (Double).record.get(fieldName3);
@iht
iht / ReadBigQueryAsAvros.java
Last active June 30, 2020 21:04
Generic SerializableFunction read with BigQueryIO
// p is a Pipeline
p.apply(
"Read from BigQuery",
BigQueryIO.read(
new SerializableFunction<SchemaAndRecord, SomeCustomObject>() {
@Override
public String apply(SchemaAndRecord input) {
GenericRecord record = input.getRecord();
SomeCustomObject result = new SomeCustomObject(...);