Created
March 9, 2021 16:11
-
-
Save ggurdanikita/4863a05489545063695731e8a5cae92f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/local/bin/python3 | |
import re | |
from itertools import cycle | |
from typing import Any, Optional | |
from bs4 import BeautifulSoup # type: ignore | |
from proxy import main as proxy_main | |
from proxy.http.parser import HttpParser, httpParserStates, httpParserTypes | |
from proxy.http.proxy import HttpProxyBasePlugin | |
EMOJI_LIST = cycle(["①", "②", "③"]) | |
REGEX = r"((^|\b)(?P<word>\w{6})(\b|$))(?![^<>]*>)" | |
class CustomManInTheMiddlePlugin(HttpProxyBasePlugin): | |
"""Modifies upstream server responses.""" | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
super().__init__(*args, **kwargs) | |
# Create a new http protocol parser for response payloads | |
self.response = HttpParser(httpParserTypes.RESPONSE_PARSER) | |
def handle_upstream_chunk(self, chunk: memoryview) -> memoryview: | |
# Parse the response. | |
# Note that these chunks also include headers | |
self.response.parse(chunk.tobytes()) | |
# If response is complete, modify and dispatch to client | |
if self.response.state == httpParserStates.COMPLETE: | |
self.__add_emojis(self.response.body) | |
self.client.queue(memoryview(self.response.build_response())) | |
return memoryview(b"") | |
def __add_emojis(self, data: Optional[bytes]) -> None: | |
"""Concatenates six-character words and emoji | |
Args: | |
data (Optional[bytes]): Response from server | |
""" | |
if not data: | |
# Empty response | |
return | |
root = BeautifulSoup(data, "lxml") | |
body = root.find("body") | |
if not body: | |
# Empty body | |
return | |
# Get all line with six-character words | |
for tag_text in body.findAll(text=re.compile(REGEX)): | |
tag_word_list = str(tag_text).split(" ") | |
# Get six-character words from tag_text | |
match_list = set() | |
for lines in re.findall(REGEX, str(tag_text), re.MULTILINE): | |
for six_simbol_word in lines: | |
match_list.add(six_simbol_word) | |
# Add emoji | |
for i, word in enumerate(tag_word_list): | |
if len(word) == 6 and word in match_list: | |
tag_word_list[i] = self.__emojify(word) | |
tag_text.replace_with(" ".join(tag_word_list)) | |
self.response.body = str(root).encode("utf-8") | |
self.__update_content_length() | |
@staticmethod | |
def __emojify(word: str) -> str: | |
"""Concatenates word and next emoji | |
Args: | |
word (str): word to add a smiley | |
Returns: | |
str: Concatinated string | |
""" | |
return word + next(EMOJI_LIST) | |
def __update_content_length(self) -> None: | |
"""Updates content-length header after adding emojis""" | |
if b"content-length" not in self.response.headers: | |
return | |
content_length_list = list(self.response.headers[b"content-length"]) | |
content_length_list[1] = str(len(self.response.body)).encode("utf-8") | |
self.response.headers[b"content-length"] = tuple(content_length_list) | |
# Abstract metods from base class: | |
def before_upstream_connection(self, request: HttpParser) -> Optional[HttpParser]: | |
return request | |
def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]: | |
return request | |
def on_upstream_connection_close(self) -> None: | |
pass | |
if __name__ == "__main__": | |
proxy_main( | |
[ | |
"--hostname", | |
"0.0.0.0", | |
"--port", | |
"8899", | |
"--ca-key-file", | |
"ca-key.pem", | |
"--ca-cert-file", | |
"ca-cert.pem", | |
"--ca-signing-key-file", | |
"ca-signing-key.pem", | |
], | |
plugins=[CustomManInTheMiddlePlugin], | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment