Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created November 3, 2022 21:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/4a0fbe57ddefeca3e9c08ac0015535c2 to your computer and use it in GitHub Desktop.
Save ottomata/4a0fbe57ddefeca3e9c08ac0015535c2 to your computer and use it in GitHub Desktop.
flink SQL enrichment UDF experiements 2
CREATE TEMPORARY TABLE mediawiki_page_change (
`wiki_id` STRING,
`meta` ROW<domain STRING>,
`page_change_kind` STRING,
`page` ROW<page_id BIGINT, page_title STRING>,
`revision` ROW<rev_id BIGINT, content_slots MAP<string, ROW<slot_role STRING, content_format STRING, content_body STRING>>>
) WITH (
'connector' = 'kafka',
'topic' = 'eqiad.rc0.mediawiki.page_change',
'properties.bootstrap.servers' = 'kafka-jumbo1007.eqiad.wmnet:9092',
'properties.group.id' = 'otto_test0',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TEMPORARY FUNCTION get_main_revision_content as 'get_revision_content_udf.get_main_revision_content' LANGUAGE PYTHON;
CREATE TEMPORARY FUNCTION add_content_body_to_content_slots as 'get_revision_content_udf.add_content_body_to_content_slots' LANGUAGE PYTHON;
CREATE TEMPORARY TABLE mediawiki_page_content_change_otto (
`wiki_id` STRING,
`meta` ROW<domain STRING>,
`page_change_kind` STRING,
`page` ROW<page_id BIGINT, page_title STRING>,
`revision` ROW<rev_id BIGINT, content_slots MAP<string, ROW<slot_role STRING, content_format STRING, content_body STRING>>>
) WITH (
'connector' = 'kafka',
'topic' = 'ottotest0.rc0.mediawiki.page_content_change',
'properties.bootstrap.servers' = 'kafka-jumbo1007.eqiad.wmnet:9092',
'properties.group.id' = 'otto_test0',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
<!-- NOT QUITE WORKING YET BUT ALMOST!
INSERT INTO mediawiki_page_content_change_otto
SELECT
wiki_id,
meta,
page_change_kind,
page,
row(
rev_id,
content_slots
) as revision
FROM (
SELECT *,
revision.rev_id as rev_id,
add_content_body_to_content_slots(meta.domain, revision.rev_id, revision.content_slots) as content_slots
FROM mediawiki_page_change WHERE meta.domain <> 'canary' limit 3
);
import requests
from pyflink.table import DataTypes
from pyflink.table.udf import udf
from pyflink.common import Row
def get_content(domain: str, rev_id: int, slot_role: str) -> str:
api_url = f'https://{domain}/w/api.php?action=query&format=json&prop=revisions&formatversion=2&rvprop=content&rvslots=main&revids={rev_id}'
response = requests.get(api_url)
if response.status_code == 200:
return response.json()['query']['pages'][0]['revisions'][0]['slots'][slot_role]['content']
else:
print("OOPS bad response", response)
return "UNKNOWN"
@udf(result_type=DataTypes.STRING())
def get_main_revision_content(domain: str, rev_id: int):
return get_content(domain, rev_id, 'main')
@udf(result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.ROW([
DataTypes.FIELD('slot_role', DataTypes.STRING()),
DataTypes.FIELD('content_format', DataTypes.STRING()),
DataTypes.FIELD('content_body', DataTypes.STRING())
])))
def add_content_body_to_content_slots(domain: str, rev_id: int, content_slots):
main_content = get_content(domain, rev_id, 'main')
print('content_slots is', content_slots)
new_content_slots = {}
new_content_slots['main'] = Row(
slot_role='main',
content_format=content_slots['main'].content_format,
content_body=main_content
)
return new_content_slots
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment