Skip to content

Instantly share code, notes, and snippets.

@lemon24
Last active January 24, 2022 16:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lemon24/10ae478fafb8fc1cb091f04e0ceec03f to your computer and use it in GitHub Desktop.
Save lemon24/10ae478fafb8fc1cb091f04e0ceec03f to your computer and use it in GitHub Desktop.
"""
Working of off feedparser 6.0.8.
To do:
* [x] don't read entire file in memory
* allow using alternative (defusedxml, lxml) sax handlers
* [x] fix content order bug
* [~] fix loose parsing
* still need to document that fallback only works for seekable streams
* full-featured parse()
"""
import io
import codecs
import xml.sax
import feedparser as fp
from feedparser.encodings import convert_to_utf8
from feedparser.sanitizer import replace_doctype
from feedparser.urls import make_safe_absolute_uri
from feedparser.util import FeedParserDict
def convert_stream_prefix_to_utf8(http_headers, file, result):
# based on https://gist.github.com/lemon24/dbe0f5f0cad3be3e1646c61cb026061d
prefix_len = 2**12
prefix = file.read(prefix_len)
# we call convert_to_utf8() up to 4 times,
# to make sure we eventually land on a code point boundary
for _ in range(4):
fake_result = {}
converted_prefix = convert_to_utf8(http_headers, prefix, fake_result)
if not fake_result.get('bozo'):
break
# check if the prefix we have is actually the whole thing
if len(prefix) < prefix_len:
break
byte = file.read(1)
if not byte:
break
prefix += byte
prefix_len += 1
result.update(fake_result)
return converted_prefix
class Splice:
def __init__(self, *files):
self.files = files
self.files_iter = iter(files)
self.file = next(self.files_iter)
def read(self, size=-1):
buffer = self.file.read(0)
while size:
chunk = self.file.read(size)
buffer += chunk
if size != -1:
size -= len(chunk)
if size == -1 or len(chunk) < size:
try:
self.file = next(self.files_iter)
except StopIteration:
break
return buffer
def close(self):
pass
def parse(
file,
response_headers=None,
resolve_relative_uris=None,
sanitize_html=None,
):
# similar to the original code
if sanitize_html is None:
sanitize_html = fp.SANITIZE_HTML
if resolve_relative_uris is None:
resolve_relative_uris = fp.RESOLVE_RELATIVE_URIS
result = FeedParserDict(
bozo=False,
entries=[],
feed=FeedParserDict(),
headers={},
)
result['headers'].update(response_headers or {})
# similar to the original code, except we only convert a prefix
prefix = convert_stream_prefix_to_utf8(result['headers'], file, result)
use_strict_parser = result['encoding'] and True or False
result['version'], prefix, entities = replace_doctype(prefix)
# except this part
file_offset = file.tell() if hasattr(file, 'tell') else None
def assemble_text_file(encoding, errors='strict'):
return Splice(
io.StringIO(prefix.decode(encoding, errors)),
codecs.getreader(encoding)(file, errors),
)
# similar to the original code
contentloc = result['headers'].get('content-location', '')
href = result.get('href', '')
baseuri = make_safe_absolute_uri(href, contentloc) or make_safe_absolute_uri(contentloc) or href
baselang = result['headers'].get('content-language', None)
if isinstance(baselang, bytes) and baselang is not None:
baselang = baselang.decode('utf-8', 'ignore')
if not fp.api._XML_AVAILABLE:
use_strict_parser = 0
if use_strict_parser:
feedparser = StrictFeedParser(baseuri, baselang, 'utf-8')
feedparser.resolve_relative_uris = resolve_relative_uris
feedparser.sanitize_html = sanitize_html
saxparser = xml.sax.make_parser(fp.api.PREFERRED_XML_PARSERS)
saxparser.setFeature(xml.sax.handler.feature_namespaces, 1)
try:
saxparser.setFeature(xml.sax.handler.feature_external_ges, 0)
except xml.sax.SAXNotSupportedException:
pass
saxparser.setContentHandler(feedparser)
saxparser.setErrorHandler(feedparser)
source = xml.sax.xmlreader.InputSource()
# here we pass file, not a byte string;
# note we decode most of the file only once
if result.get('encoding'):
source.setCharacterStream(assemble_text_file(result['encoding']))
else:
source.setByteStream(Splice(io.BytesIO(prefix), file))
try:
saxparser.parse(source)
except xml.sax.SAXException as e:
result['bozo'] = 1
result['bozo_exception'] = feedparser.exc or e
use_strict_parser = 0
if not use_strict_parser:
# falling back to the loose parser only works with seekable files
if not file_offset:
raise TypeError("file must be seekable")
file.seek(file_offset)
if result.get('encoding'):
data = assemble_text_file(result['encoding']).read()
else:
data = assemble_text_file('utf-8', 'replace').read()
# similar to the original code
feedparser = LooseFeedParser(baseuri, baselang, 'utf-8', entities)
feedparser.resolve_relative_uris = resolve_relative_uris
feedparser.sanitize_html = sanitize_html
feedparser.feed(data)
result['feed'] = feedparser.feeddata
result['entries'] = feedparser.entries
result['version'] = result['version'] or feedparser.version
result['namespaces'] = feedparser.namespaces_in_use
return result
class _SummaryMixin:
def _start_summary(self, attrs_d):
if not self.version.startswith('atom'):
return super()._start_summary(attrs_d)
self._summaryKey = 'summary'
self.push_content(self._summaryKey, attrs_d, 'text/plain', 1)
class StrictFeedParser(_SummaryMixin, fp.api.StrictFeedParser): pass
class LooseFeedParser(_SummaryMixin, fp.api.LooseFeedParser): pass
"""
# after passing file as-is to source (no encoding handling, no loose parsing)
error: _feeds/https-www-reddit-com-r-oilshell-rss.rss: loose parser not implemented
better 11.6 28
feedparser 13.9 56
noop 0.0 18
# after we convert_to_utf8() only the prefix and avoid reading the whole file
error: _feeds/https-www-reddit-com-r-oilshell-rss.rss: loose parser not implemented
better 10.1 32
feedparser 10.6 60
noop 0.0 19
better results are the same as for feedparser
"""
if __name__ == "__main__":
from textwrap import dedent
from pprint import pprint, pformat
import sys, feedparser, difflib
lines = dedent("""\
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8" ?>
<?xml version="1.0" encoding="utf-8" standalone="yes" ?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
<?xml version="1.0"?>
<rss version='2.0'>
""").splitlines()
"""
for line in lines:
line = line.encode('gbk')
result = {}
headers = {'content-type': 'application/xml; charset=ms932'}
data = convert_to_utf8(headers, line, result)
print(line, data, result, sep='\n', end='\n\n')
"""
"""
for path in sys.stdin:
path = path.rstrip()
with open(path, 'rb') as f:
original = feedparser.parse(f)
f.seek(0)
try:
better = parse(f)
except NotImplementedError:
continue
if original != better:
content_equal = []
for eo, eb in zip(original.entries, better.entries):
eoa = ([eo.summary] if eo.summary else []) + [c.value for c in eo.content]
eba = ([eb.summary] if eb.summary else []) + [c.value for c in eb.content]
content_equal.append(set(eoa) == set(eba))
if all(content_equal):
continue
print('===', path)
print(*difflib.ndiff(pformat(original).splitlines(), pformat(better).splitlines()), sep='\n')
# the only one that's different is _feeds/https-sobolevn-me-feed-xml.atom
# but i checked it by hand and it looks OK (it has both content and summary)
"""
with open('index.xml', 'rb') as f:
pprint(parse(f))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment