Created
November 3, 2022 21:50
-
-
Save ottomata/4a0fbe57ddefeca3e9c08ac0015535c2 to your computer and use it in GitHub Desktop.
flink SQL enrichment UDF experiements 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TEMPORARY TABLE mediawiki_page_change ( | |
`wiki_id` STRING, | |
`meta` ROW<domain STRING>, | |
`page_change_kind` STRING, | |
`page` ROW<page_id BIGINT, page_title STRING>, | |
`revision` ROW<rev_id BIGINT, content_slots MAP<string, ROW<slot_role STRING, content_format STRING, content_body STRING>>> | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'eqiad.rc0.mediawiki.page_change', | |
'properties.bootstrap.servers' = 'kafka-jumbo1007.eqiad.wmnet:9092', | |
'properties.group.id' = 'otto_test0', | |
'scan.startup.mode' = 'earliest-offset', | |
'format' = 'json' | |
); | |
CREATE TEMPORARY FUNCTION get_main_revision_content as 'get_revision_content_udf.get_main_revision_content' LANGUAGE PYTHON; | |
CREATE TEMPORARY FUNCTION add_content_body_to_content_slots as 'get_revision_content_udf.add_content_body_to_content_slots' LANGUAGE PYTHON; | |
CREATE TEMPORARY TABLE mediawiki_page_content_change_otto ( | |
`wiki_id` STRING, | |
`meta` ROW<domain STRING>, | |
`page_change_kind` STRING, | |
`page` ROW<page_id BIGINT, page_title STRING>, | |
`revision` ROW<rev_id BIGINT, content_slots MAP<string, ROW<slot_role STRING, content_format STRING, content_body STRING>>> | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'ottotest0.rc0.mediawiki.page_content_change', | |
'properties.bootstrap.servers' = 'kafka-jumbo1007.eqiad.wmnet:9092', | |
'properties.group.id' = 'otto_test0', | |
'scan.startup.mode' = 'earliest-offset', | |
'format' = 'json' | |
); | |
<!-- NOT QUITE WORKING YET BUT ALMOST! | |
INSERT INTO mediawiki_page_content_change_otto | |
SELECT | |
wiki_id, | |
meta, | |
page_change_kind, | |
page, | |
row( | |
rev_id, | |
content_slots | |
) as revision | |
FROM ( | |
SELECT *, | |
revision.rev_id as rev_id, | |
add_content_body_to_content_slots(meta.domain, revision.rev_id, revision.content_slots) as content_slots | |
FROM mediawiki_page_change WHERE meta.domain <> 'canary' limit 3 | |
); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from pyflink.table import DataTypes | |
from pyflink.table.udf import udf | |
from pyflink.common import Row | |
def get_content(domain: str, rev_id: int, slot_role: str) -> str: | |
api_url = f'https://{domain}/w/api.php?action=query&format=json&prop=revisions&formatversion=2&rvprop=content&rvslots=main&revids={rev_id}' | |
response = requests.get(api_url) | |
if response.status_code == 200: | |
return response.json()['query']['pages'][0]['revisions'][0]['slots'][slot_role]['content'] | |
else: | |
print("OOPS bad response", response) | |
return "UNKNOWN" | |
@udf(result_type=DataTypes.STRING()) | |
def get_main_revision_content(domain: str, rev_id: int): | |
return get_content(domain, rev_id, 'main') | |
@udf(result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.ROW([ | |
DataTypes.FIELD('slot_role', DataTypes.STRING()), | |
DataTypes.FIELD('content_format', DataTypes.STRING()), | |
DataTypes.FIELD('content_body', DataTypes.STRING()) | |
]))) | |
def add_content_body_to_content_slots(domain: str, rev_id: int, content_slots): | |
main_content = get_content(domain, rev_id, 'main') | |
print('content_slots is', content_slots) | |
new_content_slots = {} | |
new_content_slots['main'] = Row( | |
slot_role='main', | |
content_format=content_slots['main'].content_format, | |
content_body=main_content | |
) | |
return new_content_slots | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment