Skip to content

Instantly share code, notes, and snippets.

@bersegosx
Last active May 26, 2019 13:46
Show Gist options
  • Save bersegosx/364fde2943d5c19460fe17f8a09af02c to your computer and use it in GitHub Desktop.
Save bersegosx/364fde2943d5c19460fe17f8a09af02c to your computer and use it in GitHub Desktop.
Habr_proxy
import aiohttp
from aiohttp import web
import re
from urllib.parse import urljoin
from html import unescape
from lxml import etree
from lxml.html import tostring as etree_tostring
SERVER_HOST = 'localhost'
SERVER_PORT = 8080
BASE_URL = "https://habr.com"
TM_APPENDER = '™'
WORD_LENGTH = 6
WORD_REGEX = re.compile(rf'\b(\w{{{WORD_LENGTH}}})\b')
HTML_PARSER_OPTIONS = dict(collect_ids=False, encoding='utf-8')
def insert_tm_sigil(text: str) -> str:
return WORD_REGEX.sub(fr'\1{TM_APPENDER}', text)
def transform(html_text: str) -> str:
parser = etree.HTMLPullParser(**HTML_PARSER_OPTIONS, events=('end',))
parser.feed(html_text)
events = parser.read_events()
root = parser.close()
skipped_tags = {'script', 'path'}
for _action, elem in events:
if elem.tag in skipped_tags:
continue
href = elem.attrib.get('href')
if href and href.startswith(BASE_URL):
elem.attrib['href'] = href.replace(BASE_URL, '')
for attr_name in ('text', 'tail'):
text = getattr(elem, attr_name)
if text:
if len(text.strip()[:WORD_LENGTH]) == WORD_LENGTH:
text = insert_tm_sigil(text)
setattr(elem, attr_name, unescape(text))
result = etree_tostring(root, encoding='unicode')
return result
async def handle_request(request: web.Request) -> web.Response:
url = urljoin(BASE_URL, request.path_qs)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content_type_header = \
response.headers.get('Content-Type', 'text/html')
if 'text/html' in content_type_header:
source_html = await response.text()
body = transform(source_html)
content_type = 'text/html'
else:
body = await response.read()
if ";" in content_type_header:
content_type = content_type_header.split(";", 1)[0]
else:
content_type = content_type_header
return web.Response(body=body, content_type=content_type,
status=response.status)
def start_server():
app = web.Application()
app.add_routes([web.get('/{tail:.*}', handle_request)])
web.run_app(app, host=SERVER_HOST, port=SERVER_PORT)
if __name__ == '__main__':
start_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment