Skip to content

Instantly share code, notes, and snippets.

@dchaplinsky
Created March 25, 2023 15:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dchaplinsky/f7bf86837837778f75b704ef57e3811c to your computer and use it in GitHub Desktop.
Save dchaplinsky/f7bf86837837778f75b704ef57e3811c to your computer and use it in GitHub Desktop.
A custom fork of the gensim's library wikipedia reader which is better suited for the dump of Ukrainian wikipedia
import bz2
import logging
import multiprocessing
import re
from pickle import PicklingError
# LXML isn't faster, so let's go with the built-in solution
from xml.etree.ElementTree import iterparse
from gensim import utils
from gensim.corpora.textcorpus import TextCorpus
from gensim.corpora.wikicorpus import (
init_to_ignore_interrupt,
extract_pages,
IGNORED_NAMESPACES,
RE_P0,
RE_P1,
RE_P9,
RE_P10,
RE_P11,
RE_P14,
RE_P5,
RE_P6,
RE_P13,
RE_P17,
RE_P2,
remove_template,
get_namespace,
)
logger = logging.getLogger("__name__")
RE_P12 = re.compile(
r"\s(({\|)|(\|-(?!\d))|(\|}))(.*?)(?=\n)", re.UNICODE
) # table formatting
RE_P15 = re.compile(
r"\[\[([fF]ile:|[iI]mage:|[фФ]айл:|[кК]атегорія:)[^\]]*(\]\])", re.UNICODE
)
# Remove galleries
RE_P18 = re.compile(r"<gallery([> ].*?)(</gallery>|/>)", re.DOTALL | re.UNICODE)
# Remove headers formatting
RE_P19 = re.compile(r"={2,4}\s+([^=]*)\s+={2,4}", re.UNICODE)
RE_P20 = re.compile(r"'{2,3}([^']*)'{2,3}", re.UNICODE)
def parse_article(args):
"""Parse a Wikipedia article, extract plain texts and filter non-useful elements.
Parameters
----------
args : (int, str, str)
Page identificator, article title, article text.
Returns
-------
(int, str, str)
Page identificator, article title, article text.
"""
pageid, title, text, date_of_publish = args
text = filter_wiki(text)
return pageid, title, text, date_of_publish
def remove_file(s):
"""Remove the 'File:' and 'Image:' markup, keeping the file caption.
Parameters
----------
s : str
String containing 'File:' and 'Image:' markup.
Returns
-------
str
Сopy of `s` with all the 'File:' and 'Image:' markup replaced by their `corresponding captions
<http://www.mediawiki.org/wiki/Help:Images>`_.
"""
# The regex RE_P15 match a File: or Image: markup
for match in re.finditer(RE_P15, s):
m = match.group(0)
caption = ""
if "|" in m:
caption = m[:-2].split("|")[-1]
if re.match(r"\d+.{2,3}$", caption):
caption = ""
s = s.replace(m, caption, 1)
return s
def filter_wiki(raw, promote_remaining=True, simplify_links=True):
"""Filter out wiki markup from `raw`, leaving only text.
Parameters
----------
raw : str
Unicode or utf-8 encoded string.
promote_remaining : bool
Whether uncaught markup should be promoted to plain text.
simplify_links : bool
Whether links should be simplified keeping only their description text.
Returns
-------
str
`raw` without markup.
"""
# parsing of the wiki markup is not perfect, but sufficient for our purposes
# contributions to improving this code are welcome :)
text = utils.to_unicode(raw, "utf8", errors="ignore")
text = utils.decode_htmlentities(text) # '&amp;nbsp;' --> '\xa0'
return remove_markup(text, promote_remaining, simplify_links)
def remove_markup(text, promote_remaining=True, simplify_links=True):
"""Filter out wiki markup from `text`, leaving only text.
Parameters
----------
text : str
String containing markup.
promote_remaining : bool
Whether uncaught markup should be promoted to plain text.
simplify_links : bool
Whether links should be simplified keeping only their description text.
Returns
-------
str
`text` without markup.
"""
text = re.sub(RE_P2, "", text) # remove the last list (=languages)
# the wiki markup is recursive (markup inside markup etc)
# instead of writing a recursive grammar, here we deal with that by removing
# markup in a loop, starting with inner-most expressions and working outwards,
# for as long as something changes.
text = remove_template(text)
text = remove_file(text)
iters = 0
while True:
old, iters = text, iters + 1
text = re.sub(RE_P0, "", text) # remove comments
text = re.sub(RE_P1, "", text) # remove footnotes
text = re.sub(RE_P9, "", text) # remove outside links
text = re.sub(RE_P10, "", text) # remove math content
text = re.sub(RE_P18, "", text) # remove gallery content
text = re.sub(RE_P11, "", text) # remove all remaining tags
text = re.sub(RE_P14, "", text) # remove categories
text = re.sub(RE_P5, "\\3", text) # remove urls, keep description
if simplify_links:
text = re.sub(RE_P6, "\\2", text) # simplify links, keep description only
# remove table markup
text = text.replace("!!", "\n|") # each table head cell on a separate line
text = text.replace("|-||", "\n|") # for cases where a cell is filled with '-'
text = re.sub(RE_P12, "\n", text) # remove formatting lines
text = text.replace(
"|||", "|\n|"
) # each table cell on a separate line(where |{{a|b}}||cell-content)
text = text.replace("||", "\n|") # each table cell on a separate line
text = re.sub(RE_P13, "\n", text) # leave only cell content
text = re.sub(RE_P17, "\n", text) # remove formatting lines
# remove empty mark-up
text = text.replace("[]", "")
text = re.sub(RE_P19, "\\1", text)
text = re.sub(RE_P20, "\\1", text)
# stop if nothing changed between two iterations or after a fixed number of iterations
if old == text or iters > 2:
break
if promote_remaining:
text = text.replace("[", "").replace(
"]", ""
) # promote all remaining markup to plain text
return text
def extract_pages(f, filter_namespaces=False, filter_articles=None):
"""Extract pages from a MediaWiki database dump.
Parameters
----------
f : file
File-like object.
filter_namespaces : list of str or bool
Namespaces that will be extracted.
Yields
------
tuple of (str or None, str, str)
Title, text and page id.
"""
elems = (elem for _, elem in iterparse(f, events=("end",)))
# We can't rely on the namespace for database dumps, since it's changed
# it every time a small modification to the format is made. So, determine
# those from the first element we find, which will be part of the metadata,
# and construct element paths.
elem = next(elems)
namespace = get_namespace(elem.tag)
ns_mapping = {"ns": namespace}
page_tag = "{%(ns)s}page" % ns_mapping
text_path = "./{%(ns)s}revision/{%(ns)s}text" % ns_mapping
title_path = "./{%(ns)s}title" % ns_mapping
date_of_publish_path = "./{%(ns)s}revision/{%(ns)s}timestamp" % ns_mapping
ns_path = "./{%(ns)s}ns" % ns_mapping
pageid_path = "./{%(ns)s}id" % ns_mapping
for elem in elems:
if elem.tag == page_tag:
title = elem.find(title_path).text
date_of_publish = elem.find(date_of_publish_path).text
text = elem.find(text_path).text
if filter_namespaces:
ns = elem.find(ns_path).text
if ns not in filter_namespaces:
text = None
if filter_articles is not None:
if not filter_articles(
elem,
namespace=namespace,
title=title,
text=text,
page_tag=page_tag,
text_path=text_path,
title_path=title_path,
ns_path=ns_path,
pageid_path=pageid_path,
):
text = None
pageid = elem.find(pageid_path).text
yield title, text or "", pageid, date_of_publish # empty page will yield None
# Prune the element tree, as per
# http://www.ibm.com/developerworks/xml/library/x-hiperfparse/
# except that we don't need to prune backlinks from the parent
# because we don't use LXML.
# We do this only for <page>s, since we need to inspect the
# ./revision/text element. The pages comprise the bulk of the
# file, so in practice we prune away enough.
elem.clear()
class StreamWikiCorpusReader(TextCorpus):
"""Treat a Wikipedia articles dump retrieved by scrapy as a read-only, streamed, memory-efficient corpus.
WARNING: NOT compatible with original TextCorpus contracts. DOES NOT handle tokenization (it sucks anyway).
TUNED to work properly with Ukrainian language
Supported dump formats:
* <LANG>wiki-<YYYYMMDD>-pages-articles.xml.bz2
* <LANG>wiki-latest-pages-articles.xml.bz2
The documents are extracted on-the-fly, so that the whole (massive) dump can stay compressed on disk.
Notes
-----
Dumps for the English Wikipedia can be founded at https://dumps.wikimedia.org/enwiki/.
Warnings
--------
"Multistream" archives are *not* supported in Python 2 due to `limitations in the core bz2 library
<https://docs.python.org/2/library/bz2.html#de-compression-of-files>`_.
"""
def __init__(
self,
content,
processes=None,
dictionary=None,
filter_namespaces=("0",),
filter_articles=None,
):
"""Initialize the corpus.
Parameters
----------
content : io.BytesIO
In-memory file with dump
processes : int, optional
Number of processes to run, defaults to `max(1, number of cpu - 1)`.
filter_namespaces : tuple of str, optional
Namespaces to consider.
filter_articles: callable or None, optional
If set, each XML article element will be passed to this callable before being processed. Only articles
where the callable returns an XML element are processed, returning None allows filtering out
some articles based on customised rules.
Warnings
--------
Unless a dictionary is provided, this scans the corpus once, to determine its vocabulary.
"""
self.content = content
self.filter_namespaces = filter_namespaces
self.filter_articles = filter_articles
if processes is None:
processes = max(1, multiprocessing.cpu_count() - 1)
self.processes = processes
def get_texts(self):
"""Iterate over the dump, yielding a text by a text with no markup,
with no excessive filtering (except for namespaces and service articles) or tokenization.
Uses multiprocessing internally to parallelize the work and process the dump more quickly.
Yields
------
(int, str, str)
page id, article title and content of an article
"""
articles, articles_all = 0, 0
texts = (
(pageid, title, text, date_of_publish)
for title, text, pageid, date_of_publish in extract_pages(
bz2.BZ2File(self.content),
self.filter_namespaces,
self.filter_articles,
)
)
pool = multiprocessing.Pool(self.processes, init_to_ignore_interrupt)
try:
# process the corpus in smaller chunks of docs, because multiprocessing.Pool
# is dumb and would load the entire input into RAM at once...
for group in utils.chunkize(
texts, chunksize=10 * self.processes, maxsize=1
):
for pageid, title, text, date_of_publish in pool.imap(
parse_article, group
):
articles_all += 1
if any(
title.startswith(ignore + ":") for ignore in IGNORED_NAMESPACES
):
continue
articles += 1
yield pageid, title, text, date_of_publish
except KeyboardInterrupt:
logger.warning(
"user terminated iteration over Wikipedia corpus after %i documents"
"(total %i articles)",
articles,
articles_all,
)
except PicklingError as exc:
raise PicklingError(
f"Can not send filtering function {self.filter_articles} to multiprocessing, "
"make sure the function can be pickled."
) from exc
else:
logger.info(
"finished iterating over Wikipedia corpus of %i documents "
"(total %i articles)",
articles,
articles_all,
)
self.length = articles # cache corpus length
finally:
pool.terminate()
@dchaplinsky
Copy link
Author

dchaplinsky commented Mar 25, 2023

Usage example:

        bio = BytesIO(response.body)
        reader = StreamWikiCorpusReader(bio)
        for i, (id_, title, text, date_of_publish) in enumerate(reader.get_texts()):
             print(title, text)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment