Skip to content

Instantly share code, notes, and snippets.

@smoofra
Last active March 14, 2022 18:02
Show Gist options
  • Save smoofra/30c10d45f0a40af05b47762efac0b088 to your computer and use it in GitHub Desktop.
Save smoofra/30c10d45f0a40af05b47762efac0b088 to your computer and use it in GitHub Desktop.
from html.parser import HTMLParser
from typing import *
import json
class SAXText(NamedTuple):
text : str
Attrs = List[Tuple[str,Optional[str]]]
class SAXStartTag(NamedTuple):
tag : str
attrs : Attrs
class SAXEndTag(NamedTuple):
tag : str
SAXEvent = Union[SAXText, SAXStartTag, SAXEndTag, None]
class SAXEvents:
async def all_events(self) -> AsyncGenerator[SAXEvent, None]:
while True:
event = await self
yield event
if not event:
return
def __await__(self) -> Generator[None, SAXEvent, SAXEvent]:
event : SAXEvent = yield
return event
def __aiter__(self) -> AsyncGenerator[SAXEvent, None]:
return self.all_events()
class InvertedParser(HTMLParser):
async def a_parse_tag(self, tag:str, events:AsyncIterable[SAXEvent]):
async def i():
async for event in events:
if isinstance(event, SAXText):
yield event.text
elif isinstance(event, SAXStartTag):
yield await self.a_parse_tag(event.tag, events)
elif isinstance(event, SAXEndTag):
if event.tag != tag:
raise Exception(f"unexpected end tag <{event.tag}> in <{tag}>")
else:
return
elif event is None:
raise Exception("unexpected EOF")
return {
'tag': tag,
'contents': [x async for x in i()]
}
async def a_parse_html(self, events:AsyncIterable[SAXEvent]):
async def i():
async for event in events:
if isinstance(event, SAXText):
if event.text.strip():
raise Exception("unexpected text")
elif isinstance(event, SAXStartTag):
yield await self.a_parse_tag(event.tag, events)
elif isinstance(event, SAXEndTag):
raise Exception(f"unexpected end tag <{event.tag}> at toplevel")
elif event is None:
break
return [tag async for tag in i()]
def __init__(self):
super(InvertedParser, self).__init__()
self.co = self.a_parse_html(SAXEvents())
def handle_starttag(self, tag:str, attrs:List[Tuple[str,Optional[str]]]):
self.co.send(SAXStartTag(tag, attrs))
def handle_endtag(self, tag:str):
self.co.send(SAXEndTag(tag))
def handle_data(self, text:str):
self.co.send(SAXText(text))
@classmethod
def parse(cls, s: str):
parser = cls()
parser.co.send(None)
parser.feed(s)
try:
parser.co.send(None)
except StopIteration as stop:
return stop.value
else:
raise AssertionError("coroutine did not terminate")
j = InvertedParser.parse(
"""
<html>
<body>
<h1>Hello, Worrld!</h1>
<p>
Zoo bar <a>baz</a> quux. <a>Lorem ipsum</a>.
</p>
</body>
</html>
""")
print(json.dumps(j, indent=True))
[
{
"tag": "html",
"contents": [
"\n",
{
"tag": "body",
"contents": [
"\n ",
{
"tag": "h1",
"contents": [
"Hello, Worrld!"
]
},
"\n ",
{
"tag": "p",
"contents": [
"\n Zoo bar ",
{
"tag": "a",
"contents": [
"baz"
]
},
" quux. ",
{
"tag": "a",
"contents": [
"Lorem ipsum"
]
},
".\n "
]
},
"\n"
]
},
"\n"
]
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment