Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created November 7, 2022 20:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/bc583fac4cafc4d7651db463dc755c9e to your computer and use it in GitHub Desktop.
Save ottomata/bc583fac4cafc4d7651db463dc755c9e to your computer and use it in GitHub Desktop.
Mediawiki Content Flink Streaming SQL Content Enrich
import requests
from pyflink.table import DataTypes
from pyflink.table.udf import udf
from pyflink.common import Row
import logging
def get_revision_slot_content_body(domain: str, rev_id: int, slot_role: str = 'main') -> str:
"""
Given a wiki domain, a rev_id, and a slot_role, uses the MW Action API to return the content
for that revision's slot role.
"""
api_url = f'https://{domain}/w/api.php?action=query&format=json&prop=revisions&formatversion=2&rvprop=content&rvslots=main&revids={rev_id}'
response = requests.get(api_url)
if response.status_code == 200:
return response.json()['query']['pages'][0]['revisions'][0]['slots'][slot_role]['content']
else:
logger.error(
f'Failed requesting revision slot content body from {api_url}. '
f'Got HTTP response status: {response.status_code}.'
)
# This result_type corresponds to the fragment/mediawiki/state/entity/revision_slots map type.
# It would be much nicer to look this field type up dymamically
# from the stream's schema. We can do this using wikimedia-event-utilities,
# but it seems accessing the py4j JVM gateway during flink-sql python UDF parsing
# is not possible.
#
# This UDF operates at the content_slots map field level, rather than
# the simple string content_body field, so as not to have to deal with
# querying and setting map types in SQL (which may not even be possible in Flink SQL)
# Alternatively, this UDF could operate at the revision entity field level.
# This would make the SQL even simpler, but move more of the DataTypes
# result_type boiler plate here. If we can abstract declaring
# DataTypes via JSONSchema (fields), this should be much simpler.
@udf(result_type=DataTypes.MAP(
DataTypes.STRING(),
DataTypes.ROW([
DataTypes.FIELD('slot_role', DataTypes.STRING()),
DataTypes.FIELD('origin_rev_id', DataTypes.BIGINT()),
DataTypes.FIELD('content_format', DataTypes.STRING()),
DataTypes.FIELD('content_model', DataTypes.STRING()),
DataTypes.FIELD('content_sha1', DataTypes.STRING()),
DataTypes.FIELD('content_size', DataTypes.BIGINT()),
DataTypes.FIELD('content_body', DataTypes.STRING())
])
))
def enrich_content_slots_with_body(domain: str, rev_id: int, content_slots):
"""
A Flink UDF that maps from a revision content_slots map field
without content bodies set, looks up the content_body via
the MW action API, and returns a content_slots map field
with content bodies set.
"""
# Iterate over content_slots map field, lookup content_body, and add it.
# NOTE: we can return the same content_slots field here because
# we know it has the same DataType as the result_type. If this were not true,
# we would need to construct a value for it manually.
for slot_role, content_slot in content_slots.items():
content_slots[slot_role]['content_body'] = get_revision_slot_content_body(domain, rev_id, slot_role)
return content_slots
-- Insert into the ottotest0.rc0.mediawiki.page_content_change topic.
INSERT INTO ottotest0_mediawiki_page_content_change
-- The results of the following query.
-- Note that the fields selected here MUST be in the exact order of the sink table schema.
-- They are inserted by field order in the result, not by name.
SELECT
meta,
`$schema`,
dt,
changelog_kind,
page_change_kind,
wiki_id,
page,
performer,
-- We need to create the new revision (row) field manually in order to
-- supply one that has content_slots with content_bodies.
-- All the fields here are the same except for enriched_content_slots
row(
rev_id,
rev_parent_id,
rev_dt,
`comment`,
comment_html,
is_comment_visible,
is_editor_visibile,
is_content_visible,
is_minor_edit,
rev_sha1,
rev_size,
-- Use the enriched_content_slots field for content_slots
-- as returned by the UDF in the subquery below.
enriched_content_slots,
editor
) as `revision`,
prior_state
FROM (
SELECT
-- Get all the fields from mediawiki_page_change
*,
-- Hoist the revision row fields to the top level sub query result so we can use them
-- in the new revision row() field construction above.
-- (Apprently we can't use '.' in row() construction function. Flink errors otherwise.
revision.rev_id,
revision.rev_parent_id,
revision.rev_dt,
revision.`comment`,
revision.comment_html,
revision.is_comment_visible,
revision.is_editor_visibile,
revision.is_content_visible,
revision.is_minor_edit,
revision.rev_sha1,
revision.rev_size,
-- We dont need the original revision.content_slots. Instead, include a new 'enriched_content_slots' field
-- as returned by the enrich_content_slots_with_body UDF.
enrich_content_slots_with_body(meta.domain, revision.rev_id, revision.content_slots) as enriched_content_slots
FROM mediawiki_page_change WHERE meta.domain <> 'canary'
) enriched;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment