Skip to content

Instantly share code, notes, and snippets.

Avatar
💭
Migrating the world to the cloud

Israel Herraiz iht

💭
Migrating the world to the cloud
View GitHub Profile
@iht
iht / RelayOptions.java
Created Jan 25, 2021
Relay your custom options to Dataflow, assuming your are using Flex Templates
View RelayOptions.java
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public static void main(String[] args) {
PipelineOptions opts =
PipelineOptionsFactory.fromArgs(args);
DataflowPipelineOptions dataflowPipelineOptions =
opts.as(DataflowPipelineOptions.class);
@iht
iht / relay_options.py
Created Jan 25, 2021
Relay your custom runtime options to Dataflow, assuming you use Flex Templates
View relay_options.py
def run_pipeline(argv):
opts: PipelineOptions = PipelineOptions(argv)
gcloud_opts: GoogleCloudOptions = opts.view_as(GoogleCloudOptions)
if opts.i_want_streaming_engine:
gcloud_opts.enable_streaming_engine = True
else:
gcloud_opts.enable_streaming_engine = False
...
@iht
iht / main.tf
Created Jan 25, 2021
Pass custom options to your Flex template
View main.tf
resource "google_dataflow_flex_template_job" "big_data_job" {
provider = google-beta
name = "dataflow-flextemplates-job"
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
parameters = {
i_want_streaming_engine = true
}
}
@iht
iht / MyMainFile.java
Last active Jan 25, 2021
Set Dataflow pipeline options from your Java code
View MyMainFile.java
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public static void main(String[] args) {
PipelineOptions opts =
PipelineOptionsFactory.fromArgs(args);
DataflowPipelineOptions dataflowPipelineOptions =
opts.as(DataflowPipelineOptions.class);
@iht
iht / dataflow_options_with_terraform.py
Last active Jan 25, 2021
Set Dataflow options programatically
View dataflow_options_with_terraform.py
def run_pipeline(argv):
opts: PipelineOptions = PipelineOptions(argv)
gcloud_opts: GoogleCloudOptions = opts.view_as(GoogleCloudOptions)
gcloud_opts.enable_streaming_engine = True
gcloud_opts.job_name = "Overwrite the name of your job"
...
@iht
iht / find_external_libs.py
Last active Nov 22, 2020
Find all the external functions called by my code
View find_external_libs.py
def find_external_functions(key, stats):
output = {}
for k,v in stats.items():
_, _, _, _, d = v
if key in d:
output[k] = v
return output
@iht
iht / filter_and_sort_by_avg.py
Created Nov 21, 2020
Filter the stats for our modules only and sort by average time per function
View filter_and_sort_by_avg.py
def is_my_key(key, modules):
""" Returns True if the key belongs to one of the modules.
Args:
key: key of the stats dictionary.
modules: list of strings with the modules to be matched.
"""
fn = key[0]
for m in modules:
module_name = '%s' % m
@iht
iht / print_pstats.py
Last active Nov 21, 2020
Print the output of pstats_agg.py
View print_pstats.py
import pstats
p = pstats.Stats('dataflow.prof')
p.sort_stats('cumulative').print_stats()
@iht
iht / pstats_agg.py
Last active Nov 21, 2020
Aggregate the output of Beam profiling and produce sorted file
View pstats_agg.py
import os
import pstats
directory = 'profiling'
files = [ '%s/%s' % (directory, x) for x in os.listdir('%s/' % directory)]
p = pstats.Stats(*files)
p.sort_stats('cumulative').dump_stats('dataflow.prof')
@iht
iht / child_master_dag.py
Created Jun 22, 2020
The child master DAG, that executes tasks only when both parent DAGs are completed successfully
View child_master_dag.py
"""Trigger Dags #1 and #2 and do something if they succeed."""
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
with DAG(
'master_dag',
schedule_interval='*/1 * * * *', # Every 1 minute