Skip to content

Instantly share code, notes, and snippets.

@ngollperrier
Last active October 31, 2018 15:54
Show Gist options
  • Save ngollperrier/3c8e5b435ed9cb2319e01c5aa51b595e to your computer and use it in GitHub Desktop.
Save ngollperrier/3c8e5b435ed9cb2319e01c5aa51b595e to your computer and use it in GitHub Desktop.
# A minimalist idempotent aggregation query for clickstream data
aggregation_query_template = '''
BEGIN;
DELETE FROM clickstream_aggregated
WHERE click_date = '{{ execution_date }}'::DATE;
INSERT INTO clickstream_aggregated
SELECT click_timestamp::DATE AS click_date,
SUM(CASE WHEN is_ad_display_event THEN 1 ELSE 0 END) AS nb_ad_display_events,
SUM(CASE WHEN is_ad_search_event THEN 1 ELSE 0 END) AS nb_ad_search_events
FROM clickstream
WHERE click_timestamp::DATE = '{{ execution_date }}'::DATE
GROUP BY 1;
COMMIT;
'''
# A simple postgres SQL task execution
aggregation_task = PostgresOperator(
task_id='aggregate_clickstream_data',
postgres_conn_id=target_db,
sql=aggregation_query_template,
autocommit=False,
dag=dag)
aggregation_task.set_upstream(sensor_task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment