Skip to content

Instantly share code, notes, and snippets.

@ggurdanikita
Created March 9, 2021 16:11
Show Gist options
  • Save ggurdanikita/4863a05489545063695731e8a5cae92f to your computer and use it in GitHub Desktop.
Save ggurdanikita/4863a05489545063695731e8a5cae92f to your computer and use it in GitHub Desktop.
#! /usr/local/bin/python3
import re
from itertools import cycle
from typing import Any, Optional
from bs4 import BeautifulSoup # type: ignore
from proxy import main as proxy_main
from proxy.http.parser import HttpParser, httpParserStates, httpParserTypes
from proxy.http.proxy import HttpProxyBasePlugin
EMOJI_LIST = cycle(["①", "②", "③"])
REGEX = r"((^|\b)(?P<word>\w{6})(\b|$))(?![^<>]*>)"
class CustomManInTheMiddlePlugin(HttpProxyBasePlugin):
"""Modifies upstream server responses."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
# Create a new http protocol parser for response payloads
self.response = HttpParser(httpParserTypes.RESPONSE_PARSER)
def handle_upstream_chunk(self, chunk: memoryview) -> memoryview:
# Parse the response.
# Note that these chunks also include headers
self.response.parse(chunk.tobytes())
# If response is complete, modify and dispatch to client
if self.response.state == httpParserStates.COMPLETE:
self.__add_emojis(self.response.body)
self.client.queue(memoryview(self.response.build_response()))
return memoryview(b"")
def __add_emojis(self, data: Optional[bytes]) -> None:
"""Concatenates six-character words and emoji
Args:
data (Optional[bytes]): Response from server
"""
if not data:
# Empty response
return
root = BeautifulSoup(data, "lxml")
body = root.find("body")
if not body:
# Empty body
return
# Get all line with six-character words
for tag_text in body.findAll(text=re.compile(REGEX)):
tag_word_list = str(tag_text).split(" ")
# Get six-character words from tag_text
match_list = set()
for lines in re.findall(REGEX, str(tag_text), re.MULTILINE):
for six_simbol_word in lines:
match_list.add(six_simbol_word)
# Add emoji
for i, word in enumerate(tag_word_list):
if len(word) == 6 and word in match_list:
tag_word_list[i] = self.__emojify(word)
tag_text.replace_with(" ".join(tag_word_list))
self.response.body = str(root).encode("utf-8")
self.__update_content_length()
@staticmethod
def __emojify(word: str) -> str:
"""Concatenates word and next emoji
Args:
word (str): word to add a smiley
Returns:
str: Concatinated string
"""
return word + next(EMOJI_LIST)
def __update_content_length(self) -> None:
"""Updates content-length header after adding emojis"""
if b"content-length" not in self.response.headers:
return
content_length_list = list(self.response.headers[b"content-length"])
content_length_list[1] = str(len(self.response.body)).encode("utf-8")
self.response.headers[b"content-length"] = tuple(content_length_list)
# Abstract metods from base class:
def before_upstream_connection(self, request: HttpParser) -> Optional[HttpParser]:
return request
def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]:
return request
def on_upstream_connection_close(self) -> None:
pass
if __name__ == "__main__":
proxy_main(
[
"--hostname",
"0.0.0.0",
"--port",
"8899",
"--ca-key-file",
"ca-key.pem",
"--ca-cert-file",
"ca-cert.pem",
"--ca-signing-key-file",
"ca-signing-key.pem",
],
plugins=[CustomManInTheMiddlePlugin],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment