Skip to content

Instantly share code, notes, and snippets.

@gleb-lobastov
Last active June 1, 2016 16:26
Show Gist options
  • Save gleb-lobastov/3f1e57f69533368837a29406a69ed3cb to your computer and use it in GitHub Desktop.
Save gleb-lobastov/3f1e57f69533368837a29406a69ed3cb to your computer and use it in GitHub Desktop.
Локальный прокси с предобработкой контента
# -*- coding: utf-8 -*-
""" Локальный прокси с предобработкой контента
habraproxy.py — это простейший http-прокси-сервер, запускаемый локально, который показывает содержимое страниц
указанного сайта, с одним исключением: после каждого слова из определенного числа букв следует заданный текст
Параметры передаются как аргументы коммандной строки. По умолчанию сайт: habrahabr.ru, число букв в слове: 6,
добавляемый символ: «™». Так-же можно указать хост и порт локального сервера.
Examples:
http://habrahabr.ru/company/yandex/blog/258673/
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Сейчас на фоне уязвимости Logjam все в индустрии в очередной раз обсуждают проблемы и
особенности TLS. Я хочу воспользоваться этой возможностью, чтобы поговорить об одной из
них, а именно — о настройке ciphersiutes.
http://127.0.0.1:8232/company/yandex/blog/258673/
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Сейчас™ на фоне уязвимости Logjam™ все в индустрии в очередной раз обсуждают проблемы и
особенности TLS. Я хочу воспользоваться этой возможностью, чтобы поговорить об одной из
них, а именно™ — о настройке ciphersiutes.
"""
from __future__ import unicode_literals
import locale
import argparse
import webbrowser
import threading
import urllib2
from urlparse import urlsplit, urlunsplit
from SimpleHTTPServer import SimpleHTTPRequestHandler
from SocketServer import TCPServer
from HTMLParser import HTMLParser
class AbstractHTMLContentPreprocessor(HTMLParser):
""" Передает контент без разметки на обработку в метод process_content
Args:
url_handler (Optional[func]): callback для изменения url ссылок. На вход имеющийся url, на выходе измененый
"""
def __init__(self, url_handler=None):
HTMLParser.__init__(self)
self._skip = False
self._content = []
self._url_handler = url_handler
def handle_startendtag(self, tag, attrs):
self._content.append(self.get_starttag_text())
def handle_starttag(self, tag, attrs):
self._skip = tag in self.CDATA_CONTENT_ELEMENTS
tag_content = self.get_starttag_text()
if tag == 'a' and self._url_handler:
try:
original_url = next(val for (attr, val) in attrs if attr == 'href')
target_url = self._url_handler(original_url)
if target_url != original_url:
tag_content = tag_content.replace(original_url, target_url)
except StopIteration:
pass
self._content.append(tag_content)
def handle_endtag(self, tag):
self._content.append('</{}>'.format(tag))
self._skip = False
def handle_data(self, data):
self._content.append(data if self._skip else self._process_content(data))
def handle_charref(self, name):
self._content.append('&#{};'.format(name))
def handle_decl(self, decl):
self._content.append('<!{}>'.format(decl))
def handle_entityref(self, name):
self._content.append('&{};'.format(name))
def handle_comment(self, data):
self._content.append('<!--{}-->'.format(data))
def _process_content(self, content):
raise NotImplementedError
@property
def result(self):
result = ''.join(self._content)
self._skip = False
self._content = []
return result
def feed(self, data):
HTMLParser.feed(self, data)
return self
class MarkerInjector(AbstractHTMLContentPreprocessor):
""" Вставляет текст после каждого слова длиной указанной длины """
def __init__(self, mark, word_len, *args, **kwargs):
assert mark != '' and word_len > 0
AbstractHTMLContentPreprocessor.__init__(self, *args, **kwargs)
self._mark = mark
self._word_len = word_len
def _chunks(self, text):
""" Разбивает текст на куски, в конце каждого из которых, кроме последнего стоит слово заданной длины """
streak = marker = 0
for index, char in enumerate(text):
if char.isalpha():
streak += 1
continue
elif streak == self._word_len:
yield text[marker:index]
marker = index
streak = 0
yield text[marker: None]
if streak == self._word_len:
yield '' # Если последнее слово подходящей длины, пустой элемент приведет к вствке mark при конкатенации
def _process_content(self, content):
return self._mark.join(self._chunks(content))
class AbstractProxyRequestHandler(SimpleHTTPRequestHandler):
""" Проксирует вызовы и обрабатывает html-контент препроцессором """
target_host = None
preprocessor = None
def do_GET(self):
assert self.target_host is not None
response = urllib2.urlopen(self.target_host + self.path)
if self.preprocessor is not None and "text/html" in response.headers.gettype():
self.send_response(200)
for header, value in response.info().items():
if header.lower() != "content-length":
self.send_header(header, value)
charset = response.headers.getparam('charset')
content = self.preprocessor.feed(response.read().decode(charset)).result.encode(charset)
self.send_header("content-length", len(content))
self.end_headers()
self.wfile.write(content)
else:
self.copyfile(response, self.wfile)
class ProxyServer(TCPServer):
""" Реализация прокси сервера с препроцессором """
def __init__(self, server_address, target_host, preprocessor=None):
TCPServer.__init__(self, server_address, self._create_proxy(target_host, preprocessor))
@staticmethod
def _create_proxy(target_host, preprocessor):
return type(b'Proxy', (AbstractProxyRequestHandler, object),
{'target_host': target_host, 'preprocessor': preprocessor})
def run(params):
print 'starting server...'
server_address = 'http://{}:{}'.format(params['host'], params['port'])
server_address_parts = list(urlsplit(server_address))[:2]
site_hostname = urlsplit(params['site']).hostname
httpd = ProxyServer((params['host'], params['port']), params['site'], preprocessor=MarkerInjector(
params['mark'], params['len'],
url_handler=lambda url: urlunsplit((
lambda parts: server_address_parts + list(parts)[2:] if parts.hostname == site_hostname else parts
)(urlsplit(url)))
))
print 'running server...'
threading.Thread(target=lambda: webbrowser.open_new_tab(server_address)).start()
httpd.serve_forever()
def parse_args():
def ucs(arg):
return arg if isinstance(arg, unicode) else arg.decode(encoding)
encoding = locale.getpreferredencoding()
parser = argparse.ArgumentParser(description='Proxy with preprocessor. Will inject specified '
'symbol or text after each word of specified length')
parser.add_argument('--port', '-p', dest='port', type=int, default=31337, help='local server port')
parser.add_argument('--host', '-t', dest='host', type=ucs, default='localhost', help='local server host')
parser.add_argument('--site', '-s', dest='site', type=ucs, default='https://habrahabr.ru', help='the proxied site')
parser.add_argument('--mark', '-m', dest='mark', type=ucs, default='™', help='injected symbol or text')
parser.add_argument('--len', '-l', dest='len', type=int, default=6, help='length of the words preceding injection')
return parser.parse_args()
if __name__ == '__main__':
run(vars(parse_args()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment