Created
November 7, 2022 20:38
-
-
Save ottomata/bc583fac4cafc4d7651db463dc755c9e to your computer and use it in GitHub Desktop.
Mediawiki Content Flink Streaming SQL Content Enrich
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Need to download flink-connector-kafka-1.15.2.jar and kafka-clients-2.4.1.jar | |
./bin/sql-client.sh -i flink_sql_init.sql -pyfs get_revision_content_udf.py -pyexec /home/otto/pyflink_udf2/bin/python3 -pyclientexec /home/otto/pyflink_udf2/bin/python3 -j /home/otto/flink-connector-kafka-1.15.2.jar -j /home/otto/kafka-clients-2.4.1.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from pyflink.table import DataTypes | |
from pyflink.table.udf import udf | |
from pyflink.common import Row | |
import logging | |
def get_revision_slot_content_body(domain: str, rev_id: int, slot_role: str = 'main') -> str: | |
""" | |
Given a wiki domain, a rev_id, and a slot_role, uses the MW Action API to return the content | |
for that revision's slot role. | |
""" | |
api_url = f'https://{domain}/w/api.php?action=query&format=json&prop=revisions&formatversion=2&rvprop=content&rvslots=main&revids={rev_id}' | |
response = requests.get(api_url) | |
if response.status_code == 200: | |
return response.json()['query']['pages'][0]['revisions'][0]['slots'][slot_role]['content'] | |
else: | |
logger.error( | |
f'Failed requesting revision slot content body from {api_url}. ' | |
f'Got HTTP response status: {response.status_code}.' | |
) | |
# This result_type corresponds to the fragment/mediawiki/state/entity/revision_slots map type. | |
# It would be much nicer to look this field type up dymamically | |
# from the stream's schema. We can do this using wikimedia-event-utilities, | |
# but it seems accessing the py4j JVM gateway during flink-sql python UDF parsing | |
# is not possible. | |
# | |
# This UDF operates at the content_slots map field level, rather than | |
# the simple string content_body field, so as not to have to deal with | |
# querying and setting map types in SQL (which may not even be possible in Flink SQL) | |
# Alternatively, this UDF could operate at the revision entity field level. | |
# This would make the SQL even simpler, but move more of the DataTypes | |
# result_type boiler plate here. If we can abstract declaring | |
# DataTypes via JSONSchema (fields), this should be much simpler. | |
@udf(result_type=DataTypes.MAP( | |
DataTypes.STRING(), | |
DataTypes.ROW([ | |
DataTypes.FIELD('slot_role', DataTypes.STRING()), | |
DataTypes.FIELD('origin_rev_id', DataTypes.BIGINT()), | |
DataTypes.FIELD('content_format', DataTypes.STRING()), | |
DataTypes.FIELD('content_model', DataTypes.STRING()), | |
DataTypes.FIELD('content_sha1', DataTypes.STRING()), | |
DataTypes.FIELD('content_size', DataTypes.BIGINT()), | |
DataTypes.FIELD('content_body', DataTypes.STRING()) | |
]) | |
)) | |
def enrich_content_slots_with_body(domain: str, rev_id: int, content_slots): | |
""" | |
A Flink UDF that maps from a revision content_slots map field | |
without content bodies set, looks up the content_body via | |
the MW action API, and returns a content_slots map field | |
with content bodies set. | |
""" | |
# Iterate over content_slots map field, lookup content_body, and add it. | |
# NOTE: we can return the same content_slots field here because | |
# we know it has the same DataType as the result_type. If this were not true, | |
# we would need to construct a value for it manually. | |
for slot_role, content_slot in content_slots.items(): | |
content_slots[slot_role]['content_body'] = get_revision_slot_content_body(domain, rev_id, slot_role) | |
return content_slots |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- declare functions and source and sink tables | |
CREATE TEMPORARY FUNCTION enrich_content_slots_with_body AS | |
'get_revision_content_udf.enrich_content_slots_with_body' LANGUAGE PYTHON; | |
-- Source table from rc0.eqiad.mediawiki.page_change | |
-- If we had an Event Platform stream Catalog, we could avoid this step. | |
-- https://phabricator.wikimedia.org/T322022 | |
CREATE TEMPORARY TABLE mediawiki_page_change ( | |
`meta` ROW< | |
`domain` STRING, | |
`dt` STRING, | |
`id` STRING, | |
`request_id` STRING, | |
`stream` STRING, | |
`uri` STRING | |
>, | |
`$schema` STRING, | |
`dt` STRING, | |
`changelog_kind` STRING, | |
`page_change_kind` STRING, | |
`wiki_id` STRING, | |
`page` ROW< | |
`page_id` BIGINT, | |
`page_title` STRING, | |
`is_redirect` BOOLEAN, | |
`namespace_id` BIGINT, | |
`revision_count` BIGINT | |
>, | |
`performer` ROW< | |
`user_id` BIGINT, | |
`user_text` STRING, | |
`edit_count` BIGINT, | |
`groups` ARRAY<STRING>, | |
`is_bot` BOOLEAN, | |
`is_registered` BOOLEAN, | |
`is_system` BOOLEAN, | |
`is_temp` BOOLEAN, | |
`registration_dt` STRING | |
>, | |
`revision` ROW< | |
`rev_id` BIGINT, | |
`rev_parent_id` BIGINT, | |
`rev_dt` STRING, | |
`comment` STRING, | |
`comment_html` STRING, | |
`is_comment_visible` BOOLEAN, | |
`is_editor_visibile` BOOLEAN, | |
`is_content_visible` BOOLEAN, | |
`is_minor_edit` BOOLEAN, | |
`rev_sha1` STRING, | |
`rev_size` BIGINT, | |
`content_slots` MAP< | |
STRING, | |
ROW< | |
`slot_role` STRING, | |
`origin_rev_id` BIGINT, | |
`content_format` STRING, | |
`content_model` STRING, | |
`content_sha1` STRING, | |
`content_size` BIGINT, | |
`content_body` STRING | |
> | |
>, | |
`editor` ROW< | |
`user_id` BIGINT, | |
`user_text` STRING, | |
`edit_count` BIGINT, | |
`groups` ARRAY<STRING>, | |
`is_bot` BOOLEAN, | |
`is_registered` BOOLEAN, | |
`is_system` BOOLEAN, | |
`is_temp` BOOLEAN, | |
`registration_dt` STRING | |
> | |
>, | |
`prior_state` ROW< | |
`page` ROW< | |
`page_id` BIGINT, | |
`page_title` STRING, | |
`is_redirect` BOOLEAN, | |
`namespace_id` BIGINT, | |
`revision_count` BIGINT | |
>, | |
`revision` ROW< | |
`rev_id` BIGINT, | |
`rev_parent_id` BIGINT, | |
`rev_dt` STRING, | |
`comment` STRING, | |
`comment_html` STRING, | |
`is_comment_visible` BOOLEAN, | |
`is_editor_visibile` BOOLEAN, | |
`is_content_visible` BOOLEAN, | |
`is_minor_edit` BOOLEAN, | |
`rev_sha1` STRING, | |
`rev_size` BIGINT, | |
`content_slots` MAP< | |
STRING, | |
ROW< | |
`slot_role` STRING, | |
`origin_rev_id` BIGINT, | |
`content_format` STRING, | |
`content_model` STRING, | |
`content_sha1` STRING, | |
`content_size` BIGINT, | |
`content_body` STRING | |
> | |
>, | |
`editor` ROW< | |
`user_id` BIGINT, | |
`user_text` STRING, | |
`edit_count` BIGINT, | |
`groups` ARRAY<STRING>, | |
`is_bot` BOOLEAN, | |
`is_registered` BOOLEAN, | |
`is_system` BOOLEAN, | |
`is_temp` BOOLEAN, | |
`registration_dt` STRING | |
> | |
> | |
> | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'eqiad.rc0.mediawiki.page_change', | |
'properties.bootstrap.servers' = 'kafka-jumbo1007.eqiad.wmnet:9092', | |
'properties.group.id' = 'otto_test0', | |
'scan.startup.mode' = 'latest-offset', | |
'properties.auto.offset.reset' = 'latest', | |
'format' = 'json' | |
); | |
-- Sink table to ottotest0.rc0.mediawiki.page_content_change topic | |
-- If we had an Event Platform stream Catalog, we could avoid this step. | |
-- https://phabricator.wikimedia.org/T322022 | |
CREATE TEMPORARY TABLE ottotest0_mediawiki_page_content_change ( | |
`meta` ROW< | |
`domain` STRING, | |
`dt` STRING, | |
`id` STRING, | |
`request_id` STRING, | |
`stream` STRING, | |
`uri` STRING | |
>, | |
`$schema` STRING, | |
`dt` STRING, | |
`changelog_kind` STRING, | |
`page_change_kind` STRING, | |
`wiki_id` STRING, | |
`page` ROW< | |
`page_id` BIGINT, | |
`page_title` STRING, | |
`is_redirect` BOOLEAN, | |
`namespace_id` BIGINT, | |
`revision_count` BIGINT | |
>, | |
`performer` ROW< | |
`user_id` BIGINT, | |
`user_text` STRING, | |
`edit_count` BIGINT, | |
`groups` ARRAY<STRING>, | |
`is_bot` BOOLEAN, | |
`is_registered` BOOLEAN, | |
`is_system` BOOLEAN, | |
`is_temp` BOOLEAN, | |
`registration_dt` STRING | |
>, | |
`revision` ROW< | |
`rev_id` BIGINT, | |
`rev_parent_id` BIGINT, | |
`rev_dt` STRING, | |
`comment` STRING, | |
`comment_html` STRING, | |
`is_comment_visible` BOOLEAN, | |
`is_editor_visibile` BOOLEAN, | |
`is_content_visible` BOOLEAN, | |
`is_minor_edit` BOOLEAN, | |
`rev_sha1` STRING, | |
`rev_size` BIGINT, | |
`content_slots` MAP< | |
STRING, | |
ROW< | |
`slot_role` STRING, | |
`origin_rev_id` BIGINT, | |
`content_format` STRING, | |
`content_model` STRING, | |
`content_sha1` STRING, | |
`content_size` BIGINT, | |
`content_body` STRING | |
> | |
>, | |
`editor` ROW< | |
`user_id` BIGINT, | |
`user_text` STRING, | |
`edit_count` BIGINT, | |
`groups` ARRAY<STRING>, | |
`is_bot` BOOLEAN, | |
`is_registered` BOOLEAN, | |
`is_system` BOOLEAN, | |
`is_temp` BOOLEAN, | |
`registration_dt` STRING | |
> | |
>, | |
`prior_state` ROW< | |
`page` ROW< | |
`page_id` BIGINT, | |
`page_title` STRING, | |
`is_redirect` BOOLEAN, | |
`namespace_id` BIGINT, | |
`revision_count` BIGINT | |
>, | |
`revision` ROW< | |
`rev_id` BIGINT, | |
`rev_parent_id` BIGINT, | |
`rev_dt` STRING, | |
`comment` STRING, | |
`comment_html` STRING, | |
`is_comment_visible` BOOLEAN, | |
`is_editor_visibile` BOOLEAN, | |
`is_content_visible` BOOLEAN, | |
`is_minor_edit` BOOLEAN, | |
`rev_sha1` STRING, | |
`rev_size` BIGINT, | |
`content_slots` MAP< | |
STRING, | |
ROW< | |
`slot_role` STRING, | |
`origin_rev_id` BIGINT, | |
`content_format` STRING, | |
`content_model` STRING, | |
`content_sha1` STRING, | |
`content_size` BIGINT, | |
`content_body` STRING | |
> | |
>, | |
`editor` ROW< | |
`user_id` BIGINT, | |
`user_text` STRING, | |
`edit_count` BIGINT, | |
`groups` ARRAY<STRING>, | |
`is_bot` BOOLEAN, | |
`is_registered` BOOLEAN, | |
`is_system` BOOLEAN, | |
`is_temp` BOOLEAN, | |
`registration_dt` STRING | |
> | |
> | |
> | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'ottotest0.rc0.mediawiki.page_content_change', | |
'properties.bootstrap.servers' = 'kafka-jumbo1007.eqiad.wmnet:9092', | |
'properties.group.id' = 'otto_test0', | |
'scan.startup.mode' = 'latest-offset', | |
'properties.auto.offset.reset' = 'latest', | |
'format' = 'json' | |
); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Insert into the ottotest0.rc0.mediawiki.page_content_change topic. | |
INSERT INTO ottotest0_mediawiki_page_content_change | |
-- The results of the following query. | |
-- Note that the fields selected here MUST be in the exact order of the sink table schema. | |
-- They are inserted by field order in the result, not by name. | |
SELECT | |
meta, | |
`$schema`, | |
dt, | |
changelog_kind, | |
page_change_kind, | |
wiki_id, | |
page, | |
performer, | |
-- We need to create the new revision (row) field manually in order to | |
-- supply one that has content_slots with content_bodies. | |
-- All the fields here are the same except for enriched_content_slots | |
row( | |
rev_id, | |
rev_parent_id, | |
rev_dt, | |
`comment`, | |
comment_html, | |
is_comment_visible, | |
is_editor_visibile, | |
is_content_visible, | |
is_minor_edit, | |
rev_sha1, | |
rev_size, | |
-- Use the enriched_content_slots field for content_slots | |
-- as returned by the UDF in the subquery below. | |
enriched_content_slots, | |
editor | |
) as `revision`, | |
prior_state | |
FROM ( | |
SELECT | |
-- Get all the fields from mediawiki_page_change | |
*, | |
-- Hoist the revision row fields to the top level sub query result so we can use them | |
-- in the new revision row() field construction above. | |
-- (Apprently we can't use '.' in row() construction function. Flink errors otherwise. | |
revision.rev_id, | |
revision.rev_parent_id, | |
revision.rev_dt, | |
revision.`comment`, | |
revision.comment_html, | |
revision.is_comment_visible, | |
revision.is_editor_visibile, | |
revision.is_content_visible, | |
revision.is_minor_edit, | |
revision.rev_sha1, | |
revision.rev_size, | |
-- We dont need the original revision.content_slots. Instead, include a new 'enriched_content_slots' field | |
-- as returned by the enrich_content_slots_with_body UDF. | |
enrich_content_slots_with_body(meta.domain, revision.rev_id, revision.content_slots) as enriched_content_slots | |
FROM mediawiki_page_change WHERE meta.domain <> 'canary' | |
) enriched; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment