Last active
June 1, 2016 16:26
-
-
Save gleb-lobastov/3f1e57f69533368837a29406a69ed3cb to your computer and use it in GitHub Desktop.
Локальный прокси с предобработкой контента
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" Локальный прокси с предобработкой контента | |
habraproxy.py — это простейший http-прокси-сервер, запускаемый локально, который показывает содержимое страниц | |
указанного сайта, с одним исключением: после каждого слова из определенного числа букв следует заданный текст | |
Параметры передаются как аргументы коммандной строки. По умолчанию сайт: habrahabr.ru, число букв в слове: 6, | |
добавляемый символ: «™». Так-же можно указать хост и порт локального сервера. | |
Examples: | |
http://habrahabr.ru/company/yandex/blog/258673/ | |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
Сейчас на фоне уязвимости Logjam все в индустрии в очередной раз обсуждают проблемы и | |
особенности TLS. Я хочу воспользоваться этой возможностью, чтобы поговорить об одной из | |
них, а именно — о настройке ciphersiutes. | |
http://127.0.0.1:8232/company/yandex/blog/258673/ | |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
Сейчас™ на фоне уязвимости Logjam™ все в индустрии в очередной раз обсуждают проблемы и | |
особенности TLS. Я хочу воспользоваться этой возможностью, чтобы поговорить об одной из | |
них, а именно™ — о настройке ciphersiutes. | |
""" | |
from __future__ import unicode_literals | |
import locale | |
import argparse | |
import webbrowser | |
import threading | |
import urllib2 | |
from urlparse import urlsplit, urlunsplit | |
from SimpleHTTPServer import SimpleHTTPRequestHandler | |
from SocketServer import TCPServer | |
from HTMLParser import HTMLParser | |
class AbstractHTMLContentPreprocessor(HTMLParser): | |
""" Передает контент без разметки на обработку в метод process_content | |
Args: | |
url_handler (Optional[func]): callback для изменения url ссылок. На вход имеющийся url, на выходе измененый | |
""" | |
def __init__(self, url_handler=None): | |
HTMLParser.__init__(self) | |
self._skip = False | |
self._content = [] | |
self._url_handler = url_handler | |
def handle_startendtag(self, tag, attrs): | |
self._content.append(self.get_starttag_text()) | |
def handle_starttag(self, tag, attrs): | |
self._skip = tag in self.CDATA_CONTENT_ELEMENTS | |
tag_content = self.get_starttag_text() | |
if tag == 'a' and self._url_handler: | |
try: | |
original_url = next(val for (attr, val) in attrs if attr == 'href') | |
target_url = self._url_handler(original_url) | |
if target_url != original_url: | |
tag_content = tag_content.replace(original_url, target_url) | |
except StopIteration: | |
pass | |
self._content.append(tag_content) | |
def handle_endtag(self, tag): | |
self._content.append('</{}>'.format(tag)) | |
self._skip = False | |
def handle_data(self, data): | |
self._content.append(data if self._skip else self._process_content(data)) | |
def handle_charref(self, name): | |
self._content.append('&#{};'.format(name)) | |
def handle_decl(self, decl): | |
self._content.append('<!{}>'.format(decl)) | |
def handle_entityref(self, name): | |
self._content.append('&{};'.format(name)) | |
def handle_comment(self, data): | |
self._content.append('<!--{}-->'.format(data)) | |
def _process_content(self, content): | |
raise NotImplementedError | |
@property | |
def result(self): | |
result = ''.join(self._content) | |
self._skip = False | |
self._content = [] | |
return result | |
def feed(self, data): | |
HTMLParser.feed(self, data) | |
return self | |
class MarkerInjector(AbstractHTMLContentPreprocessor): | |
""" Вставляет текст после каждого слова длиной указанной длины """ | |
def __init__(self, mark, word_len, *args, **kwargs): | |
assert mark != '' and word_len > 0 | |
AbstractHTMLContentPreprocessor.__init__(self, *args, **kwargs) | |
self._mark = mark | |
self._word_len = word_len | |
def _chunks(self, text): | |
""" Разбивает текст на куски, в конце каждого из которых, кроме последнего стоит слово заданной длины """ | |
streak = marker = 0 | |
for index, char in enumerate(text): | |
if char.isalpha(): | |
streak += 1 | |
continue | |
elif streak == self._word_len: | |
yield text[marker:index] | |
marker = index | |
streak = 0 | |
yield text[marker: None] | |
if streak == self._word_len: | |
yield '' # Если последнее слово подходящей длины, пустой элемент приведет к вствке mark при конкатенации | |
def _process_content(self, content): | |
return self._mark.join(self._chunks(content)) | |
class AbstractProxyRequestHandler(SimpleHTTPRequestHandler): | |
""" Проксирует вызовы и обрабатывает html-контент препроцессором """ | |
target_host = None | |
preprocessor = None | |
def do_GET(self): | |
assert self.target_host is not None | |
response = urllib2.urlopen(self.target_host + self.path) | |
if self.preprocessor is not None and "text/html" in response.headers.gettype(): | |
self.send_response(200) | |
for header, value in response.info().items(): | |
if header.lower() != "content-length": | |
self.send_header(header, value) | |
charset = response.headers.getparam('charset') | |
content = self.preprocessor.feed(response.read().decode(charset)).result.encode(charset) | |
self.send_header("content-length", len(content)) | |
self.end_headers() | |
self.wfile.write(content) | |
else: | |
self.copyfile(response, self.wfile) | |
class ProxyServer(TCPServer): | |
""" Реализация прокси сервера с препроцессором """ | |
def __init__(self, server_address, target_host, preprocessor=None): | |
TCPServer.__init__(self, server_address, self._create_proxy(target_host, preprocessor)) | |
@staticmethod | |
def _create_proxy(target_host, preprocessor): | |
return type(b'Proxy', (AbstractProxyRequestHandler, object), | |
{'target_host': target_host, 'preprocessor': preprocessor}) | |
def run(params): | |
print 'starting server...' | |
server_address = 'http://{}:{}'.format(params['host'], params['port']) | |
server_address_parts = list(urlsplit(server_address))[:2] | |
site_hostname = urlsplit(params['site']).hostname | |
httpd = ProxyServer((params['host'], params['port']), params['site'], preprocessor=MarkerInjector( | |
params['mark'], params['len'], | |
url_handler=lambda url: urlunsplit(( | |
lambda parts: server_address_parts + list(parts)[2:] if parts.hostname == site_hostname else parts | |
)(urlsplit(url))) | |
)) | |
print 'running server...' | |
threading.Thread(target=lambda: webbrowser.open_new_tab(server_address)).start() | |
httpd.serve_forever() | |
def parse_args(): | |
def ucs(arg): | |
return arg if isinstance(arg, unicode) else arg.decode(encoding) | |
encoding = locale.getpreferredencoding() | |
parser = argparse.ArgumentParser(description='Proxy with preprocessor. Will inject specified ' | |
'symbol or text after each word of specified length') | |
parser.add_argument('--port', '-p', dest='port', type=int, default=31337, help='local server port') | |
parser.add_argument('--host', '-t', dest='host', type=ucs, default='localhost', help='local server host') | |
parser.add_argument('--site', '-s', dest='site', type=ucs, default='https://habrahabr.ru', help='the proxied site') | |
parser.add_argument('--mark', '-m', dest='mark', type=ucs, default='™', help='injected symbol or text') | |
parser.add_argument('--len', '-l', dest='len', type=int, default=6, help='length of the words preceding injection') | |
return parser.parse_args() | |
if __name__ == '__main__': | |
run(vars(parse_args())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment