Skip to content

Instantly share code, notes, and snippets.

export AIRFLOW_HOME=/path/to/airflow/
# Obviously you will need the proper Read/Write access
# to this directory
mkdir -p ${AIRFLOW_HOME}
airflow initdb
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE clickstream (
click_id uuid PRIMARY KEY NOT NULL DEFAULT uuid_generate_v4(),
click_timestamp TIMESTAMP WITH TIME ZONE,
user_id UUID,
is_ad_display_event BOOLEAN,
is_ad_search_event BOOLEAN
);
airflow connections -a - conn_id "mini-warehouse-db" \
  - conn_type "postgres" \
  - conn_host "localhost" \
  - conn_port "5432" \
  - conn_schema "miniwarehousedb" \
  - conn_login "etl" \
  - conn_password "password" \
  - conn_extra ""
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.sensors.sql_sensor import SqlSensor
from datetime import datetime, timedelta
# Default DAG parameters
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 1),
# The connection ID to the datawarehouse DB (must match the connection
# handler defined in airflow for the data source)
target_db = 'mini-warehouse-db'
# A simple sensor checking wether clickstream data is present
# for the day following the execution_date of the DAG Run
sensor_query_template = '''
SELECT TRUE
FROM clickstream
WHERE click_timestamp::DATE >= '{{ execution_date + macros.timedelta(days=1) }}'::DATE
# A minimalist idempotent aggregation query for clickstream data
aggregation_query_template = '''
BEGIN;
DELETE FROM clickstream_aggregated
WHERE click_date = '{{ execution_date }}'::DATE;
INSERT INTO clickstream_aggregated
SELECT click_timestamp::DATE AS click_date,
BEGIN;
INSERT INTO clickstream
(click_timestamp, user_id, is_ad_display_event, is_ad_search_event)
VALUES
('2018–10–01 03:00:00', uuid_generate_v4(), TRUE, FALSE),
('2018–10–01 04:00:00', uuid_generate_v4(), FALSE, TRUE),
('2018–10–02 03:00:00', uuid_generate_v4(), TRUE, FALSE),
('2018–10–02 05:00:00', uuid_generate_v4(), TRUE, FALSE),
('2018–10–03 04:00:00', uuid_generate_v4(), FALSE, TRUE),
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "leboncoin_staging_ads_ad-publications_public_avro",
"s3.bucket.name": "leboncoin_kafka_events_bucket",
"topics.dir": "staging/topics/ads/raw/parquet",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"path.format": "'schema_version'=VV/'event_date'=YYYY-MM-dd/'event_hour'=HH",
"partitioner.class": "fr.leboncoin.data.archiver.partitioner.CustomTimePartitioner",
"timestamp.extractor": "fr.leboncoin.data.archiver.parser.LeboncoinTimestampExtractor",
public interface Partitioner<T> {
void configure(Map<String, Object> config);
/**
* Returns string representing the output path for a sinkRecord to be encoded and stored.
*
* @param sinkRecord The record to be stored by the Sink Connector
* @return The path/filename the SinkRecord will be stored into after it is encoded
*/
String encodePartition(SinkRecord sinkRecord);
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"errors.tolerance": "none",
"errors.logs.enable": "true",
"errors.log.include.messages": "true",
"topics": "leboncoin_staging_ads_ad-publications_public_avro",
"s3.bucket.name": "leboncoin_kafka_events_bucket",
"topics.dir": "staging/topics/ads/raw/parquet",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",