Skip to content

Instantly share code, notes, and snippets.

@DanaEpp
Last active August 11, 2023 14:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DanaEpp/1964076c89e1e7af3daae19657687449 to your computer and use it in GitHub Desktop.
Save DanaEpp/1964076c89e1e7af3daae19657687449 to your computer and use it in GitHub Desktop.
Implant Request Logger
#!/bin/env python3
"""
request_logger.py
A simple HTTP server that will dump the requests being mirrored from an implant on an API server
WARNING: This will record all header and body content of a request to a JSON file. To reduce the risk of
information disclosure, care should be placed in the ACL of the output file.
Author: Dana Epp (dana@vulscan.com) aka SilverStr
"""
from argparse import ArgumentParser
from http.server import BaseHTTPRequestHandler, HTTPServer
import logging
from datetime import datetime
import json
class HTTPRequestHandler(BaseHTTPRequestHandler):
server_version = "IRLHTTP/1.0"
sys_version = ""
output_file = ""
def _set_response(self):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
def _log_item(self, body=""):
logging.info("\n%s %s %s\n%s\n%s\n---------\n",
str(self.command),
str(self.path),
str(self.request_version),
str(self.headers),
str(body))
def process_Command(self):
post_data = ""
try:
content_length = 0
if( 'Content-Length' in self.headers ):
content_length = int(self.headers['Content-Length'],0)
if content_length > 0:
post_data = (self.rfile.read(content_length)).decode('utf-8')
self._log_item(post_data)
self._write_log_item(self._gen_log_item(post_data))
self._set_response()
self.wfile.write(b"[]")
except:
logging.error("Failed to process request." )
pass
do_GET = do_HEAD = do_POST = do_PUT = do_DELETE = process_Command
def _gen_log_item(self, body) -> str:
log_item = {}
log_item['timestamp'] = (datetime.now()).strftime("%d/%m/%Y %H:%M:%S")
log_item['command'] = str(self.command)
log_item['request_version'] = str(self.request_version)
log_item['path'] = str(self.path)
log_item['headers'] = str(self.headers)
log_item['body'] = str(body)
return json.dumps(log_item)
def _write_log_item(self, log_item: str):
with open(self.output_file, "a") as outfile:
outfile.write(log_item)
outfile.write('\n')
def main(server_class=HTTPServer, handler_class=HTTPRequestHandler, port=80, output_file="request_log.json"):
logging.basicConfig(level=logging.INFO)
server_address = ('', port)
httpd = server_class(server_address, handler_class)
httpd.RequestHandlerClass.output_file = output_file
logging.info('Starting Implant Request Logger on port %d (writing to %s)...\n', port, output_file)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
logging.info('Stopping Implant Request Logger...\n')
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument(
"-p", "--port",
action="store",
default=8080,
help="The HTTP port to listen to.",
)
parser.add_argument(
"-o", "--output",
action="store",
default="request_log.json",
help="The output file to record requests to.",
)
args = parser.parse_args()
main(port=int(args.port), output_file=args.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment