Skip to content

Instantly share code, notes, and snippets.

@omaskery
Created September 15, 2015 21:27
Show Gist options
  • Save omaskery/ef96810a093b39e8b546 to your computer and use it in GitHub Desktop.
Save omaskery/ef96810a093b39e8b546 to your computer and use it in GitHub Desktop.
a bit of code to poll my friend's mitmproxy web interface instance
from urllib import request, parse
import binascii
import hashlib
import zipfile
import json
def main():
host = ""
port = 8081
base_url = "http://{}:{}".format(host, port)
flows_url = "{}/flows".format(base_url)
cache = PageCache("cache.zip")
cache.examine_cache()
data = fetch_json(cache, flows_url)
entries = data['data']
for entry in entries:
entry_id = entry['id']
req_path = entry['request']['path']
req_headers = dict(entry['request']['headers'])
req_user_agent = req_headers.get('User-Agent')
details = ""
if req_user_agent is not None and "python" in req_user_agent:
details += " I suspect this is a Ricky packet"
else:
request_length = int(entry['request']['contentLength'])
if request_length > 0:
request_content = binascii.a2b_base64(
fetch_post(cache, "{}/{}/request/content".format(flows_url, entry_id))['httpMsg'][0]
)
details += " tx {} bytes".format(len(request_content))
else:
request_content = None
response_length = int(entry['response']['contentLength'])
if response_length > 0:
response_content = fetch_base64(cache, "{}/{}/response/content".format(flows_url, entry_id))
details += " rx {} bytes".format(len(response_content))
else:
response_content = None
print("[{}:{}]{}".format(entry_id, req_path, details))
class PageCache(object):
def __init__(self, filepath):
self._path = filepath
def examine_cache(self):
with zipfile.ZipFile(self._path, 'a') as zip:
print("cache contains {} entries".format(len(zip.namelist())))
def fetch(self, path):
path_hash = hashlib.sha256(path.encode()).hexdigest()
with zipfile.ZipFile(self._path, 'a') as zip:
if path_hash in zip.namelist():
return zip.open(path_hash, 'r').read().decode()
else:
print("hit internet for '{}'".format(path))
data = fetch(path)
zip.writestr(path_hash, data)
return data.decode()
def hexify_bytes(byte_array):
return " ".join([hexify(value, 2) for value in byte_array])
def hexify(value, nibbles=2):
prefix = ""
if value < 0:
prefix = "-"
result = hex(value)[3:]
else:
result = hex(value)[2:]
while len(result) < nibbles:
result = "0" + result
return prefix + result
def fetch(path):
return request.urlopen(path).read()
def fetch_json(cache, path):
return json.loads(cache.fetch(path))
def fetch_base64(cache, path):
return binascii.a2b_base64(parse.unquote(cache.fetch(path)))
def fetch_post(cache, path):
return parse.parse_qs(cache.fetch(path))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment