Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active September 29, 2022 20:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/2778938f1bc97932d49d45bbb64a78e0 to your computer and use it in GitHub Desktop.
Save ottomata/2778938f1bc97932d49d45bbb64a78e0 to your computer and use it in GitHub Desktop.
pyflink + event platform experiments
# Pyflink Streaming Table Env + Event Utilities Event Platform integration.
# Download wikimedia-event-utilities Flink:
# Download Wikimedia Event Utilities Flink jar, e.g.
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar
# Also download other dependencies we need:
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar
# If you like, use ipython for your REPL, its nicer!
# export PYFLINK_PYTHON=ipython
#
# Since this example demo prints results, it only works in local mode.
# pyflink-shell.sh local
from pyflink.java_gateway import get_gateway
from pyflink.table import TableDescriptor
def flink_jvm():
return get_gateway().jvm
# Change this to the path where you downloaded eventutilties-flink.jar
st_env.get_config().set("pipeline.jars", "file:///home/otto/eventutilities-flink-1.2.0-jar-with-dependencies.jar;file:///home/otto/flink-connector-kafka-1.15.2.jar;file:///home/otto/kafka-clients-3.2.3.jar")
# Import Wikimedia Event Utilities java classes.
EventTableDescriptorBuilder = flink_jvm().org.wikimedia.eventutilities.flink.table.EventTableDescriptorBuilder
# Set schema base URIs and Stream Config URIs.
schema_uris = [
"https://schema.wikimedia.org/repositories/primary/jsonschema",
"https://schema.wikimedia.org/repositories/secondary/jsonschema",
]
stream_config_uri = "https://meta.wikimedia.org/w/api.php"
# Instantiate an EventTableDescriptorBuilder using the schema_uris and stream_config_uri.
# TODO: We have to get the 'from' factory method by string name, because from is a keyword in python.
# We should probably rename this factory method.
EventTableDescriptorBuilderFrom = getattr(EventTableDescriptorBuilder, 'from')
tableDescriptorBuilder = EventTableDescriptorBuilderFrom(schema_uris, stream_config_uri)
# Create the TableDescriptor for the mediawiki.page-create stream.
# Note: We instantiate the TableDescriptor python class with the Java instance
# returned by tableDescriptorBuilder(...).build() so that we can pass it to our
# python st_env.
page_create_stream_table_descriptor = TableDescriptor(
tableDescriptorBuilder.eventStream(
"mediawiki.page-create"
).setupKafka(
"kafka-jumbo1001.eqiad.wmnet:9092",
"my_consumer_group",
).withKafkaTimestampAsWatermark().option("json.timestamp-format.standard", "ISO-8601").build()
)
# TODO: ^ we should make "json.timestamp-format.standard", "ISO-8601"
# the default in EventTableDescriptorBuilderFrom.
# Instantiate the mediawiki.page-create Table from the TableDescriptor
page_create_stream_table = st_env.from_descriptor(page_create_stream_table_descriptor)
# Register the table in the Table Env by a name we can use in SQL.
st_env.register_table("mediawiki_page_create", page_create_stream_table)
# This is a pretty simple streaming query that selects every row.
result_table = st_env.sql_query(
"""
SELECT database, page_title FROM mediawiki_page_create
"""
)
# Print the results on the CLI.
streaming_result = result_table.execute()
streaming_result.print()
# Pyflink DataStream + Event Utilities Event Platform integration.
# Download wikimedia-event-utilities Flink:
# Download Wikimedia Event Utilities Flink jar, e.g.
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar
# Also download other dependencies we need:
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar
# If you like, use ipython for your REPL, its nicer!
# export PYFLINK_PYTHON=ipython
#
# Since this example demo prints results, it only works in local mode.
# pyflink-shell.sh local
from pyflink.java_gateway import get_gateway
from pyflink.datastream import StreamExecutionEnvironment, DataStream
from pyflink.datastream.connectors import Source
from pyflink.common import WatermarkStrategy, Encoder, Types
# Change this to the path where you downloaded eventutilties-flink.jar
s_env.add_jars("file:///home/otto/eventutilities-flink-1.2.0-jar-with-dependencies.jar", "file:///home/otto/flink-connector-kafka-1.15.2.jar", "file:///home/otto/kafka-clients-3.2.3.jar")
def flink_jvm():
return get_gateway().jvm
# Import Wikimedia Event Utilities java classes.
EventDataStreamFactory = flink_jvm().org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory
schema_uris = [
"https://schema.wikimedia.org/repositories/primary/jsonschema",
"https://schema.wikimedia.org/repositories/secondary/jsonschema",
]
stream_config_uri = "https://meta.wikimedia.org/w/api.php"
# Instantiate an EventDataStreamFactory using the schema_uris and stream_config_uri.
# TODO: We have to get the 'from' factory method by string name, because from is a keyword in python.
# We should probably rename this factory method.
EventDataStreamFactoryFrom = getattr(EventDataStreamFactory, 'from')
datastream_factory = EventDataStreamFactoryFrom(schema_uris, stream_config_uri)
# Use the datastream_factory sourcce builder to create a Java KafkaSource,
# and then instantiate the python Source so we can pass it to the s_env.
kafka_source = Source(
datastream_factory.kafkaSourceBuilder(
"mediawiki.page-create",
"kafka-jumbo1001.eqiad.wmnet:9092",
"test_otto_0"
).build()
)
# Instantiate the python DataStream from the kafka_source
datastream = s_env.from_source(
source=kafka_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="kafka_mediawiki_page_create"
)
# Print out every record on the CLI.
datastream.print()
s_env.execute()
# Pyflink Streaming Table Env + Event Utilities Event Platform integration.
# Download wikimedia-event-utilities Flink:
# Download Wikimedia Event Utilities Flink jar, e.g.
# https://archiva.wikimedia.org/#artifact-details-download-content/org.wikimedia/eventutilities-flink/1.2.0
# wget http://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.0/eventutilities-flink-1.2.0-jar-with-dependencies.jar
# Also download other dependencies we need:
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.2/flink-connector-kafka-1.15.2.jar
# wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar
# If you like, use ipython for your REPL, its nicer!
# export PYFLINK_PYTHON=ipython
#
# Since this example demo prints results, it only works in local mode.
# pyflink-shell.sh local
from pyflink.java_gateway import get_gateway
from pyflink.table import TableDescriptor
from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.types import _from_java_type
from pyflink.table.udf import udf
def flink_jvm():
return get_gateway().jvm
# Change this to the path where you downloaded eventutilties-flink.jar
# st_env.get_config().set("pipeline.jars", "file:///home/otto/eventutilities-flink-1.2.0-jar-with-dependencies.jar;file:///home/otto/flink-connector-kafka-1.15.2.jar;file:///home/otto/kafka-clients-3.2.3.jar")
st_env.get_config().set("pipeline.jars", "file:///home/otto/event-utilities/eventutilities-flink/target/eventutilities-flink-1.2.1-SNAPSHOT-jar-with-dependencies.jar;file:///home/otto/flink-connector-kafka-1.15.2.jar;file:///home/otto/kafka-clients-3.2.3.jar")
JsonSchemaFlinkConverter = flink_jvm().org.wikimedia.eventutilities.flink.formats.json.JsonSchemaFlinkConverter
# Import Wikimedia Event Utilities java classes.
EventTableDescriptorBuilder = flink_jvm().org.wikimedia.eventutilities.flink.table.EventTableDescriptorBuilder
# Set schema base URIs and Stream Config URIs.
schema_uris = [
"https://schema.wikimedia.org/repositories/primary/jsonschema",
"https://schema.wikimedia.org/repositories/secondary/jsonschema",
]
stream_config_uri = "https://meta.wikimedia.org/w/api.php"
# Instantiate an EventTableDescriptorBuilder using the schema_uris and stream_config_uri.
# TODO: We have to get the 'from' factory method by string name, because from is a keyword in python.
# We should probably rename this factory method.
EventTableDescriptorBuilderFrom = getattr(EventTableDescriptorBuilder, 'from')
tableDescriptorBuilder = EventTableDescriptorBuilderFrom(schema_uris, stream_config_uri)
eventStreamFactory = tableDescriptorBuilder.getEventStreamFactory()
# Create the TableDescriptor for the mediawiki.page-create stream.
# Note: We instantiate the TableDescriptor python class with the Java instance
# returned by tableDescriptorBuilder(...).build() so that we can pass it to our
# python st_env.
page_create_stream_table_descriptor = TableDescriptor(
tableDescriptorBuilder.eventStream(
"mediawiki.page-create"
).setupKafka(
"kafka-jumbo1001.eqiad.wmnet:9092",
"my_consumer_group",
).withKafkaTimestampAsWatermark().option("json.timestamp-format.standard", "ISO-8601").build()
)
# TODO: ^ we should make "json.timestamp-format.standard", "ISO-8601"
# the default in EventTableDescriptorBuilderFrom.
# Instantiate the mediawiki.page-create Table from the TableDescriptor
page_create_stream_table = st_env.from_descriptor(page_create_stream_table_descriptor)
# Register the table in the Table Env by a name we can use in SQL.
st_env.register_table("mediawiki_page_create", page_create_stream_table)
# we are going to map to the revision-score schema, so we need to get its DataType.
# Create the TableDescriptor for the mediawiki.page-create stream.
# Note: We instantiate the TableDescriptor python class with the Java instance
# returned by tableDescriptorBuilder(...).build() so that we can pass it to our
# python st_env.
revision_score_stream_table_descriptor = TableDescriptor(
tableDescriptorBuilder.eventStream(
"mediawiki.revision-score"
).setupKafka(
"kafka-jumbo1001.eqiad.wmnet:9092",
"my_consumer_group",
).withKafkaTimestampAsWatermark().option("json.timestamp-format.standard", "ISO-8601").build()
)
# TODO: ^ we should make "json.timestamp-format.standard", "ISO-8601"
# the default in EventTableDescriptorBuilderFrom.
# Instantiate the mediawiki.page-create Table from the TableDescriptor
revision_score_stream_table = st_env.from_descriptor(revision_score_stream_table_descriptor)
revision_score_datatype = revision_score_stream_table.get_schema().to_row_data_type()
# given a rev_id, look up rev score and return the value of the 'scores' field.
@udf(result_type=revision_score_datatype['scores'].data_type)
def get_score(rev_id: int):
scores_row = {
"awesomeness": Row(
model_name="awesomeness",
model_version="1.0.1",
prediction=["yes", "mostly"],
probability={
'yes':0.99,
'mostly':0.90,
'hardly':0.01
}
)
}
print("scores for rev_id", rev_id, scores_row)
return scores_row
# Register a function called get_score with this udf
st_env.create_temporary_function("get_score", get_score)
# I can't get this to work. I can't figure out how to pass the full input Row
# to the UDF. Only individual parameters can be passed.
# @udf(result_type=revision_score_datatype)
# def add_score(revision_create: Row):
# scores_map = {
# "awesomeness": Row(
# model_name="awesomeness",
# model_version="1.0.1",
# prediction=["yes", "mostly"],
# probability={
# 'yes':0.99,
# 'mostly':0.90,
# 'hardly':0.01
# }
# )
# }
# revision_score_dict = revision_create.as_dict()
# revision_score_dict['scores'] = scores_map
# del revision_score_dict['kafka_timestamp'] # TODO: do we need this?
# revision_score_event = Row(**revision_score_dict)
# # print("scores for rev_id", rev_id, scores_row)
# return revision_score_event
# Select all input fields, plus a new scores field with the result of get_score
result_table = st_env.sql_query("SELECT *, get_score(rev_id) as scores FROM mediawiki_page_create LIMIT 10")
# Print the results on the CLI.
streaming_result = result_table.execute()
streaming_result.print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment