Skip to content

Instantly share code, notes, and snippets.

View ottomata's full-sized avatar

Andrew Otto ottomata

View GitHub Profile
@ottomata
ottomata / 0_mediawiki_page_create_stream_table.py
Last active September 29, 2022 20:38
pyflink + event platform experiments
# Pyflink Streaming Table Env + Event Utilities Event Platform integration.
# Download wikimedia-event-utilities Flink:
# Download Wikimedia Event Utilities Flink jar, e.g.
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar
# Also download other dependencies we need:
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar
# If you like, use ipython for your REPL, its nicer!
@ottomata
ottomata / airflow_cli.sh
Created August 8, 2022 16:46
Airflow CLI command as analytics-privatedata (with kerberos)
# How to run an airflow dag and/or task using analytics-privatedata on the CLI.
#
# - Make sure airflow-dags repo is on the PYTHONPATH, so wmf_airflow_common can load
#
# - Set AIRFLOW_HOME and HOME. This must be a directory that has airflow.cfg, etc.,
# airflow.cfg and airflow sqlite dbs, and those must be readable and writable by analytics-privatedata user
#
# - Run bin/airflow out of an airflow conda env. /usr/lib/airflow is fine, but you can also use
# your development conda env (in ~/.conda/envs/airflow_development/bin/airflow ?) too.
@ottomata
ottomata / EventDataStreamFactory.kafkaDataStream.java
Created July 14, 2022 15:06
EventDataStreamFactory.kafkaDataStream.java
/**
* Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from Kafka.
* If you need more control of your KafkaSource, use
* {@link EventDataStreamFactory#kafkaSourceBuilder(String, String, String)} and then
* call {@link StreamExecutionEnvironment#fromSource(Source, WatermarkStrategy, String)} yourself.
*
* Example:
* <pre>{@code
* EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
* DataStreamSource&lt;Row&gt; eventStreamSource = eventDataStreamFactory.kafkaDataStream(
@ottomata
ottomata / _before.scala
Last active June 7, 2022 13:42
Event Platform Flink Integration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// Write case clases (or Java Pojos) for every event schema out there.
// There are at least 117 of them!
case class Meta(
@ottomata
ottomata / spark_event_platform.py
Last active May 9, 2022 18:33
Event Platform Spark Structured Streaming DataFarmeA
"""
Example pyspark code to integrate with Wikimedia Event Platform to automate
getting a Spark Structured Streaming DataFrame using event streams and event JSONSchemas.
See also: https://wikitech.wikimedia.org/wiki/Event_Platform
You'll need the following jar dependencies:
- Kafka Client:
with DAG(
'run_hql_and_arhive',
default_args=default_args,
...
) as dag:
hql_runner_task_leaf = generate_hive_triggered_hql_runner_tasks(
dag=dag,
...
# Test run the dag. This will wait for hdfs:///tmp/test_url_sensor.2022-01-01 to exist
~/.conda/envs/airflow1/bin/airflow dags test test_url_sensor 2022-01-01
# In another terminal, create the file and watch the dag begin to run:
hdfs dfs -touchz /tmp/test_url_sensor.2022-01-01 to exist
#!/usr/bin/env python
"""
Imports and calls a python function.
call.py is a standalone python module and
CLI, and should not import any dependencies
unless they are available in any standard
python environment.
@ottomata
ottomata / _example.py
Last active January 13, 2022 22:25
SkeinSparkSubmitOperator
from wmf_airflow_common.operators import skein
op1 = skein.SkeinSparkSubmitOperator(
spark_submit_kwargs={
'application': 'hdfs:///user/otto/spark-examples_2.11-2.4.4.jar',
'spark_submit': '/usr/bin/spark2-submit',
'master': 'yarn',
'deploy_mode': 'client',
'java_class': 'org.apache.spark.examples.SparkPi',
'application_args': ['10'],
@ottomata
ottomata / Dockerfile
Last active December 13, 2021 17:21
Build a packed conda env .tgz from conda environment.yml and/or pip requirements.txt
# See also:_https://pythonspeed.com/articles/conda-docker-image-size/
# The conda env build stage image:
# 1. Create conda env
# 2. Optionally install conda dependencies
# 3. Optionally install pip dependencies
FROM continuumio/miniconda3 AS conda_build_env
# Create a bare conda env.
# We need at minimum python, pip, conda-pack and gcc and g++ (to build any binary pip packages later)