Last active
September 29, 2022 20:38
-
-
Save ottomata/2778938f1bc97932d49d45bbb64a78e0 to your computer and use it in GitHub Desktop.
pyflink + event platform experiments
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pyflink Streaming Table Env + Event Utilities Event Platform integration. | |
# Download wikimedia-event-utilities Flink: | |
# Download Wikimedia Event Utilities Flink jar, e.g. | |
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0 | |
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar | |
# Also download other dependencies we need: | |
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar | |
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar | |
# If you like, use ipython for your REPL, its nicer! | |
# export PYFLINK_PYTHON=ipython | |
# | |
# Since this example demo prints results, it only works in local mode. | |
# pyflink-shell.sh local | |
from pyflink.java_gateway import get_gateway | |
from pyflink.table import TableDescriptor | |
def flink_jvm(): | |
return get_gateway().jvm | |
# Change this to the path where you downloaded eventutilties-flink.jar | |
st_env.get_config().set("pipeline.jars", "file:///home/otto/eventutilities-flink-1.2.0-jar-with-dependencies.jar;file:///home/otto/flink-connector-kafka-1.15.2.jar;file:///home/otto/kafka-clients-3.2.3.jar") | |
# Import Wikimedia Event Utilities java classes. | |
EventTableDescriptorBuilder = flink_jvm().org.wikimedia.eventutilities.flink.table.EventTableDescriptorBuilder | |
# Set schema base URIs and Stream Config URIs. | |
schema_uris = [ | |
"https://schema.wikimedia.org/repositories/primary/jsonschema", | |
"https://schema.wikimedia.org/repositories/secondary/jsonschema", | |
] | |
stream_config_uri = "https://meta.wikimedia.org/w/api.php" | |
# Instantiate an EventTableDescriptorBuilder using the schema_uris and stream_config_uri. | |
# TODO: We have to get the 'from' factory method by string name, because from is a keyword in python. | |
# We should probably rename this factory method. | |
EventTableDescriptorBuilderFrom = getattr(EventTableDescriptorBuilder, 'from') | |
tableDescriptorBuilder = EventTableDescriptorBuilderFrom(schema_uris, stream_config_uri) | |
# Create the TableDescriptor for the mediawiki.page-create stream. | |
# Note: We instantiate the TableDescriptor python class with the Java instance | |
# returned by tableDescriptorBuilder(...).build() so that we can pass it to our | |
# python st_env. | |
page_create_stream_table_descriptor = TableDescriptor( | |
tableDescriptorBuilder.eventStream( | |
"mediawiki.page-create" | |
).setupKafka( | |
"kafka-jumbo1001.eqiad.wmnet:9092", | |
"my_consumer_group", | |
).withKafkaTimestampAsWatermark().option("json.timestamp-format.standard", "ISO-8601").build() | |
) | |
# TODO: ^ we should make "json.timestamp-format.standard", "ISO-8601" | |
# the default in EventTableDescriptorBuilderFrom. | |
# Instantiate the mediawiki.page-create Table from the TableDescriptor | |
page_create_stream_table = st_env.from_descriptor(page_create_stream_table_descriptor) | |
# Register the table in the Table Env by a name we can use in SQL. | |
st_env.register_table("mediawiki_page_create", page_create_stream_table) | |
# This is a pretty simple streaming query that selects every row. | |
result_table = st_env.sql_query( | |
""" | |
SELECT database, page_title FROM mediawiki_page_create | |
""" | |
) | |
# Print the results on the CLI. | |
streaming_result = result_table.execute() | |
streaming_result.print() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pyflink DataStream + Event Utilities Event Platform integration. | |
# Download wikimedia-event-utilities Flink: | |
# Download Wikimedia Event Utilities Flink jar, e.g. | |
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0 | |
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar | |
# Also download other dependencies we need: | |
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar | |
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar | |
# If you like, use ipython for your REPL, its nicer! | |
# export PYFLINK_PYTHON=ipython | |
# | |
# Since this example demo prints results, it only works in local mode. | |
# pyflink-shell.sh local | |
from pyflink.java_gateway import get_gateway | |
from pyflink.datastream import StreamExecutionEnvironment, DataStream | |
from pyflink.datastream.connectors import Source | |
from pyflink.common import WatermarkStrategy, Encoder, Types | |
# Change this to the path where you downloaded eventutilties-flink.jar | |
s_env.add_jars("file:///home/otto/eventutilities-flink-1.2.0-jar-with-dependencies.jar", "file:///home/otto/flink-connector-kafka-1.15.2.jar", "file:///home/otto/kafka-clients-3.2.3.jar") | |
def flink_jvm(): | |
return get_gateway().jvm | |
# Import Wikimedia Event Utilities java classes. | |
EventDataStreamFactory = flink_jvm().org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory | |
schema_uris = [ | |
"https://schema.wikimedia.org/repositories/primary/jsonschema", | |
"https://schema.wikimedia.org/repositories/secondary/jsonschema", | |
] | |
stream_config_uri = "https://meta.wikimedia.org/w/api.php" | |
# Instantiate an EventDataStreamFactory using the schema_uris and stream_config_uri. | |
# TODO: We have to get the 'from' factory method by string name, because from is a keyword in python. | |
# We should probably rename this factory method. | |
EventDataStreamFactoryFrom = getattr(EventDataStreamFactory, 'from') | |
datastream_factory = EventDataStreamFactoryFrom(schema_uris, stream_config_uri) | |
# Use the datastream_factory sourcce builder to create a Java KafkaSource, | |
# and then instantiate the python Source so we can pass it to the s_env. | |
kafka_source = Source( | |
datastream_factory.kafkaSourceBuilder( | |
"mediawiki.page-create", | |
"kafka-jumbo1001.eqiad.wmnet:9092", | |
"test_otto_0" | |
).build() | |
) | |
# Instantiate the python DataStream from the kafka_source | |
datastream = s_env.from_source( | |
source=kafka_source, | |
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), | |
source_name="kafka_mediawiki_page_create" | |
) | |
# Print out every record on the CLI. | |
datastream.print() | |
s_env.execute() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pyflink Streaming Table Env + Event Utilities Event Platform integration. | |
# Download wikimedia-event-utilities Flink: | |
# Download Wikimedia Event Utilities Flink jar, e.g. | |
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0 | |
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar | |
# Also download other dependencies we need: | |
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar | |
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar | |
# If you like, use ipython for your REPL, its nicer! | |
# export PYFLINK_PYTHON=ipython | |
# | |
# Since this example demo prints results, it only works in local mode. | |
# pyflink-shell.sh local | |
from pyflink.java_gateway import get_gateway | |
from pyflink.table import TableDescriptor | |
from pyflink.common import Row | |
from pyflink.table import DataTypes | |
from pyflink.table.types import _from_java_type | |
from pyflink.table.udf import udf | |
def flink_jvm(): | |
return get_gateway().jvm | |
# Change this to the path where you downloaded eventutilties-flink.jar | |
# st_env.get_config().set("pipeline.jars", "file:///home/otto/eventutilities-flink-1.2.0-jar-with-dependencies.jar;file:///home/otto/flink-connector-kafka-1.15.2.jar;file:///home/otto/kafka-clients-3.2.3.jar") | |
st_env.get_config().set("pipeline.jars", "file:///home/otto/event-utilities/eventutilities-flink/target/eventutilities-flink-1.2.1-SNAPSHOT-jar-with-dependencies.jar;file:///home/otto/flink-connector-kafka-1.15.2.jar;file:///home/otto/kafka-clients-3.2.3.jar") | |
JsonSchemaFlinkConverter = flink_jvm().org.wikimedia.eventutilities.flink.formats.json.JsonSchemaFlinkConverter | |
# Import Wikimedia Event Utilities java classes. | |
EventTableDescriptorBuilder = flink_jvm().org.wikimedia.eventutilities.flink.table.EventTableDescriptorBuilder | |
# Set schema base URIs and Stream Config URIs. | |
schema_uris = [ | |
"https://schema.wikimedia.org/repositories/primary/jsonschema", | |
"https://schema.wikimedia.org/repositories/secondary/jsonschema", | |
] | |
stream_config_uri = "https://meta.wikimedia.org/w/api.php" | |
# Instantiate an EventTableDescriptorBuilder using the schema_uris and stream_config_uri. | |
# TODO: We have to get the 'from' factory method by string name, because from is a keyword in python. | |
# We should probably rename this factory method. | |
EventTableDescriptorBuilderFrom = getattr(EventTableDescriptorBuilder, 'from') | |
tableDescriptorBuilder = EventTableDescriptorBuilderFrom(schema_uris, stream_config_uri) | |
eventStreamFactory = tableDescriptorBuilder.getEventStreamFactory() | |
# Create the TableDescriptor for the mediawiki.page-create stream. | |
# Note: We instantiate the TableDescriptor python class with the Java instance | |
# returned by tableDescriptorBuilder(...).build() so that we can pass it to our | |
# python st_env. | |
page_create_stream_table_descriptor = TableDescriptor( | |
tableDescriptorBuilder.eventStream( | |
"mediawiki.page-create" | |
).setupKafka( | |
"kafka-jumbo1001.eqiad.wmnet:9092", | |
"my_consumer_group", | |
).withKafkaTimestampAsWatermark().option("json.timestamp-format.standard", "ISO-8601").build() | |
) | |
# TODO: ^ we should make "json.timestamp-format.standard", "ISO-8601" | |
# the default in EventTableDescriptorBuilderFrom. | |
# Instantiate the mediawiki.page-create Table from the TableDescriptor | |
page_create_stream_table = st_env.from_descriptor(page_create_stream_table_descriptor) | |
# Register the table in the Table Env by a name we can use in SQL. | |
st_env.register_table("mediawiki_page_create", page_create_stream_table) | |
# we are going to map to the revision-score schema, so we need to get its DataType. | |
# Create the TableDescriptor for the mediawiki.page-create stream. | |
# Note: We instantiate the TableDescriptor python class with the Java instance | |
# returned by tableDescriptorBuilder(...).build() so that we can pass it to our | |
# python st_env. | |
revision_score_stream_table_descriptor = TableDescriptor( | |
tableDescriptorBuilder.eventStream( | |
"mediawiki.revision-score" | |
).setupKafka( | |
"kafka-jumbo1001.eqiad.wmnet:9092", | |
"my_consumer_group", | |
).withKafkaTimestampAsWatermark().option("json.timestamp-format.standard", "ISO-8601").build() | |
) | |
# TODO: ^ we should make "json.timestamp-format.standard", "ISO-8601" | |
# the default in EventTableDescriptorBuilderFrom. | |
# Instantiate the mediawiki.page-create Table from the TableDescriptor | |
revision_score_stream_table = st_env.from_descriptor(revision_score_stream_table_descriptor) | |
revision_score_datatype = revision_score_stream_table.get_schema().to_row_data_type() | |
# given a rev_id, look up rev score and return the value of the 'scores' field. | |
@udf(result_type=revision_score_datatype['scores'].data_type) | |
def get_score(rev_id: int): | |
scores_row = { | |
"awesomeness": Row( | |
model_name="awesomeness", | |
model_version="1.0.1", | |
prediction=["yes", "mostly"], | |
probability={ | |
'yes':0.99, | |
'mostly':0.90, | |
'hardly':0.01 | |
} | |
) | |
} | |
print("scores for rev_id", rev_id, scores_row) | |
return scores_row | |
# Register a function called get_score with this udf | |
st_env.create_temporary_function("get_score", get_score) | |
# I can't get this to work. I can't figure out how to pass the full input Row | |
# to the UDF. Only individual parameters can be passed. | |
# @udf(result_type=revision_score_datatype) | |
# def add_score(revision_create: Row): | |
# scores_map = { | |
# "awesomeness": Row( | |
# model_name="awesomeness", | |
# model_version="1.0.1", | |
# prediction=["yes", "mostly"], | |
# probability={ | |
# 'yes':0.99, | |
# 'mostly':0.90, | |
# 'hardly':0.01 | |
# } | |
# ) | |
# } | |
# revision_score_dict = revision_create.as_dict() | |
# revision_score_dict['scores'] = scores_map | |
# del revision_score_dict['kafka_timestamp'] # TODO: do we need this? | |
# revision_score_event = Row(**revision_score_dict) | |
# # print("scores for rev_id", rev_id, scores_row) | |
# return revision_score_event | |
# Select all input fields, plus a new scores field with the result of get_score | |
result_table = st_env.sql_query("SELECT *, get_score(rev_id) as scores FROM mediawiki_page_create LIMIT 10") | |
# Print the results on the CLI. | |
streaming_result = result_table.execute() | |
streaming_result.print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment