Skip to content

Instantly share code, notes, and snippets.

@pramos
Created March 13, 2022 13:47
Show Gist options
  • Save pramos/e5e6ac28a172244af92b5386e7390adc to your computer and use it in GitHub Desktop.
Save pramos/e5e6ac28a172244af92b5386e7390adc to your computer and use it in GitHub Desktop.
Python Script to extract PE Files from PCAP Files using Scapy
import logging
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
import argparse
from pathlib import Path
from scapy.all import *
from scapy.all import TCP
format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
def get_http_headers(http_payload):
try:
headers_raw = http_payload[:http_payload.index(b"\r\n\r\n") + 2]
headers = dict(re.findall(b"(?P<name>.*?): (?P<value>.*?)\\r\\n", headers_raw))
except ValueError as err:
logging.error('Could not find \\r\\n\\r\\n - %s' % err)
return None
except Exception as err:
logging.error('Exception found trying to parse raw headers - %s' % err)
logging.debug(str(http_payload))
return None
if b"Content-Type" not in headers:
logging.debug('Content Type not present in headers')
logging.debug(headers.keys())
return None
return headers
def extract_object(headers, http_payload):
object_extracted = None
object_type = None
content_type_filters = [b'application/x-msdownload', b'application/octet-stream']
try:
if b'Content-Type' in headers.keys():
if headers[b'Content-Type'] in content_type_filters:
object_extracted = http_payload[http_payload.index(b"\r\n\r\n") +4:]
object_type = object_extracted[:2]
logging.info("Object Type: %s" % object_type)
else:
logging.debug('Content Type did not matched with filters - %s' % headers[b'Content-Type'])
if len(http_payload) > 10:
logging.debug('Object first 50 bytes - %s' % str(http_payload[:50]))
else:
logging.info('No Content Type in Package')
logging.debug(headers.keys())
if b'Content-Length' in headers.keys():
logging.info( "%s: %s" % (b'Content-Lenght', headers[b'Content-Length']))
except Exception as err:
logging.error('Exception found trying to parse headers - %s' % err)
return None, None
return object_extracted, object_type
def create_output_directory_folder(directory_name, output_directory='objects') -> str:
if not os.path.exists(output_directory):
logging.debug('Directory %s does not exists - creating' % output_directory)
os.mkdir(output_directory)
directory_name = directory_name.replace('.pcap', '')
target_path = os.path.join(os.getcwd(),output_directory, directory_name)
if not os.path.exists(target_path):
logging.debug('Path %s does not exists - creating.' % target_path)
os.mkdir(target_path)
return target_path
def parse_pcap_filename(pcap_file) -> str:
parts = pcap_file.split('/')
logging.debug('Pcap File path %s - Parts %d' %(pcap_file, len(parts)))
if len(parts) > 1:
return parts[-1]
else:
return parts[0]
def extract_http_objects(pcap_file, output_directory):
logging.info('Starting to parse pcap/s')
filtered_object_types = [b'MZ']
pcap_file_name = parse_pcap_filename(pcap_file)
pcap_flow = rdpcap(pcap_file)
target_directory = create_output_directory_folder(pcap_file_name, output_directory)
sessions = pcap_flow.sessions()
objects_count = 0
objects_saved = 0
for session in sessions:
http_payload = bytes()
for packet in sessions[session]:
if packet.haslayer(TCP):
if packet[TCP].dport == 80 or packet[TCP].sport == 80:
if packet[TCP].payload:
payload = packet[TCP].payload
http_payload += raw(payload)
if packet[TCP].dport == 443 or packet[TCP].sport == 443:
logging.debug('https traffic detected')
if len(http_payload):
headers = get_http_headers(http_payload)
if headers is None:
continue
logging.debug("HTTP Payload lenght: %d" % len(http_payload))
object_found, object_type = extract_object(headers, http_payload)
if object_found is not None and object_type is not None:
objects_count +=1
if len(object_found) == 0:
logging.debug("Object found with lenght 0")
continue
if object_type not in filtered_object_types:
logging.debug("Non parseable Content Type %s" % (object_type))
continue
object_name = "%s_object_found_%d" % (pcap_file_name, objects_count)
fd = open("%s/%s" % (target_directory, object_name), "wb")
fd.write(object_found)
fd.close()
objects_saved +=1
elif object_found:
logging.debug('Object found lenght: %d' % len(object_found))
elif object_type:
logging.debug('Object Type: %d' % object_type)
logging.info('Parsed all files')
logging.info("Total Number of Objects Found: %d" % (objects_count))
logging.info("Total Number of Objects Saved: %d" % (objects_saved))
def extract_http_objects_from_directory(target_directory, output_directory):
# List all files in the directory
directory_files = os.listdir(target_directory)
logging.debug('Target directory has %d files for extraction' % len(directory_files))
for target_file in directory_files:
print(target_file)
# If file is a pcap we parse
if Path(target_file).suffix == '.pcap':
logging.debug('new pcap file to parse %s' % target_file)
extract_http_objects(os.path.join(target_directory, target_file), output_directory)
else:
logging.debug('not a pcap file %s' % Path(target_file).suffix)
logging.info('All files parsed')
def print_help():
print("python pcap_file_extraction.py --inputpcap <file>")
def main():
parser = argparse.ArgumentParser(description="Parse pcap and extract files")
parser.add_argument('-i', '--inputpcap', required=True, help='PCAP file or Directory to process files')
parser.add_argument('-o', '--outputdir', default='objects', type=str, help='Output Directory where to place the Extracted files')
parser.add_argument('-d', '--debug', help='Enable Debugging Logging', action='store_const', dest='loglevel', const=logging.DEBUG, default=logging.INFO)
parser.add_argument('-l','--log', help='Specificy Log File', dest='logfile', type=str, default='extractor.log')
args = parser.parse_args()
logging.basicConfig(filename=args.logfile, format=format_str, level=args.loglevel)
logging.info("Starting up")
if args.inputpcap:
if os.path.isfile(args.inputpcap):
print('Parsing file - %s' % args.inputpcap)
extract_http_objects(args.inputpcap, args.outputdir)
elif os.path.isdir(args.inputpcap):
print('Parsing Directory - %s' % args.inputpcap)
extract_http_objects_from_directory(args.inputpcap, args.outputdir)
logging.info('Finishing up')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment