Skip to content

Instantly share code, notes, and snippets.

View ottomata's full-sized avatar

Andrew Otto ottomata

View GitHub Profile
// Quick and hacky script that will use dyff to show the diff between
// Any modified materialized schema version and its previous version.
//
// Defaults to using https://github.com/homeport/dyff, so install that first.
// This could be cleaned up and incorporated into jsonschema-tools itself, and
// then shown in CI.
//
jsonschema_tools = require('@wikimedia/jsonschema-tools');
const _ = require('lodash');
@ottomata
ottomata / 0_mediawiki_page_create_stream_table.py
Last active September 29, 2022 20:38
pyflink + event platform experiments
# Pyflink Streaming Table Env + Event Utilities Event Platform integration.
# Download wikimedia-event-utilities Flink:
# Download Wikimedia Event Utilities Flink jar, e.g.
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar
# Also download other dependencies we need:
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar
# If you like, use ipython for your REPL, its nicer!
@ottomata
ottomata / spark-amm.sh
Created October 2, 2019 14:25
spark + ammonite
#!/usr/bin/env bash
export SPARK_HOME="${SPARK_HOME:-/usr/lib/spark2}"
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
source ${SPARK_HOME}/bin/load-spark-env.sh
export HIVE_CONF_DIR=${SPARK_CONF_DIR}
export HADOOP_CONF_DIR=/etc/hadoop/conf
AMMONITE=~/bin/amm # This is amm binary release 2.11-1.6.7
@ottomata
ottomata / airflow_cli.sh
Created August 8, 2022 16:46
Airflow CLI command as analytics-privatedata (with kerberos)
# How to run an airflow dag and/or task using analytics-privatedata on the CLI.
#
# - Make sure airflow-dags repo is on the PYTHONPATH, so wmf_airflow_common can load
#
# - Set AIRFLOW_HOME and HOME. This must be a directory that has airflow.cfg, etc.,
# airflow.cfg and airflow sqlite dbs, and those must be readable and writable by analytics-privatedata user
#
# - Run bin/airflow out of an airflow conda env. /usr/lib/airflow is fine, but you can also use
# your development conda env (in ~/.conda/envs/airflow_development/bin/airflow ?) too.
@ottomata
ottomata / EventDataStreamFactory.kafkaDataStream.java
Created July 14, 2022 15:06
EventDataStreamFactory.kafkaDataStream.java
/**
* Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from Kafka.
* If you need more control of your KafkaSource, use
* {@link EventDataStreamFactory#kafkaSourceBuilder(String, String, String)} and then
* call {@link StreamExecutionEnvironment#fromSource(Source, WatermarkStrategy, String)} yourself.
*
* Example:
* <pre>{@code
* EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
* DataStreamSource&lt;Row&gt; eventStreamSource = eventDataStreamFactory.kafkaDataStream(
@ottomata
ottomata / _before.scala
Last active June 7, 2022 13:42
Event Platform Flink Integration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// Write case clases (or Java Pojos) for every event schema out there.
// There are at least 117 of them!
case class Meta(
@ottomata
ottomata / spark_event_platform.py
Last active May 9, 2022 18:33
Event Platform Spark Structured Streaming DataFarmeA
"""
Example pyspark code to integrate with Wikimedia Event Platform to automate
getting a Spark Structured Streaming DataFrame using event streams and event JSONSchemas.
See also: https://wikitech.wikimedia.org/wiki/Event_Platform
You'll need the following jar dependencies:
- Kafka Client:
#!/bin/bash
execute="false"
if [ -z "${ZOOKEEPER_URL}" ]; then
echo "Must set ZOOKEEPER_URL env variable to Kafka Zookeeper URL"
exit 1
fi
# Test run the dag. This will wait for hdfs:///tmp/test_url_sensor.2022-01-01 to exist
~/.conda/envs/airflow1/bin/airflow dags test test_url_sensor 2022-01-01
# In another terminal, create the file and watch the dag begin to run:
hdfs dfs -touchz /tmp/test_url_sensor.2022-01-01 to exist
with DAG(
'run_hql_and_arhive',
default_args=default_args,
...
) as dag:
hql_runner_task_leaf = generate_hive_triggered_hql_runner_tasks(
dag=dag,
...