Skip to content

Instantly share code, notes, and snippets.

@lucasg
Created January 16, 2018 10:21
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save lucasg/65855d9255c79cec864f7779568cc417 to your computer and use it in GitHub Desktop.
Save lucasg/65855d9255c79cec864f7779568cc417 to your computer and use it in GitHub Desktop.
Download pdb and PE files from microsoft symbol store
import os
import re
import sys
import logging
import argparse
import subprocess
import requests
MICROSOFT_SYMBOL_STORE = "https://msdl.microsoft.com/download/symbols"
def try_download_pdb(url, filename, guid, output_filename):
pdb_url = "{url:s}/{filename:s}/{guid:s}/{filename:s}".format(
url = url,
guid = guid,
filename = filename
)
logging.debug("[-] testing url : %s" % pdb_url)
response = requests.get(pdb_url, stream=True)
if response.status_code != 200:
logging.warning("[x] not found pdb at url : %s" % pdb_url)
return response.status_code
logging.info("[+] found pdb at url : %s" % pdb_url)
os.makedirs(os.path.dirname(output_filename), exist_ok = True)
with open(output_filename, 'wb') as f:
for data in response.iter_content(32*1024):
f.write(data)
return response.status_code
if __name__ == '__main__':
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
# logging.getLogger().setLevel(logging.DEBUG)
arg_parser = argparse.ArgumentParser("download pe from PDB file.")
arg_parser.add_argument("--name", type=str, help="pdb filename")
arg_parser.add_argument("--pdb", type=str, help="path to input pdb GUID file")
arg_parser.add_argument("--dir", type=str, help="path to root directory")
arg_parser.add_argument("-v", "--verbose", action="store_true", help="verbose output : activate debug logging.")
args = arg_parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
with open(args.pdb, "r") as pdb_guids_fd:
pdb_guids = pdb_guids_fd.read()
for pdb_guid in pdb_guids.split():
output_filename = os.path.join(
args.dir,
pdb_guid,
args.name
)
# if os.path.exists(output_filename):
# continue
try_download_pdb(
MICROSOFT_SYMBOL_STORE,
args.name,
pdb_guid,
output_filename
)
import os
import re
import sys
import argparse
import subprocess
import logging
import time
import struct
import multiprocessing
import shutil
import random
import requests
import yaml
import pefile
MICROSOFT_SYMBOL_STORE = "https://msdl.microsoft.com/download/symbols"
PAGE_SIZE = 4*1024
def LOWORD(dword):
return dword & 0x0000ffff
def HIWORD(dword):
return dword >> 16
def get_product_version(pe):
# https://stackoverflow.com/a/16076661/1741450
ms = pe.VS_FIXEDFILEINFO.ProductVersionMS
ls = pe.VS_FIXEDFILEINFO.ProductVersionLS
# return (HIWORD (ms), LOWORD (ms), HIWORD (ls), LOWORD (ls))
return (ms << 32) + ls
def get_guid(dll):
# https://gist.github.com/steeve85/2665503
# ugly code, isn't it ?
try:
# dll = pefile.PE(dll_path)
rva = dll.DIRECTORY_ENTRY_DEBUG[0].struct.AddressOfRawData
tmp = ''
tmp += '%0.*X' % (8, dll.get_dword_at_rva(rva+4))
tmp += '%0.*X' % (4, dll.get_word_at_rva(rva+4+4))
tmp += '%0.*X' % (4, dll.get_word_at_rva(rva+4+4+2))
x = dll.get_word_at_rva(rva+4+4+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
x = dll.get_word_at_rva(rva+4+4+2+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
x = dll.get_word_at_rva(rva+4+4+2+2+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
x = dll.get_word_at_rva(rva+4+4+2+2+2+2+2)
tmp += '%0.*X' % (4, struct.unpack('<H',struct.pack('>H',x))[0])
tmp += '%0.*X' % (1, dll.get_word_at_rva(rva+4+4+2+2+2+2+2+2))
except AttributeError as e:
# print ('Error appends during %s parsing' % dll_path)
print (e)
return None
return tmp.upper()
def extract_timestamp_from_pdb(pdb_file):
""" Extract the PdbStream signature (which is the PE "timestamp") from a pdb """
pdbutil_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"llvm-pdbutil-x64.exe"
)
command = '{pdbutil:s} pdb2yaml -pdb-stream "{path:s}"'.format(
pdbutil = pdbutil_path,
path = pdb_file
)
output = subprocess.check_output(command)
pdb_stream = yaml.load(output)
return pdb_stream['PdbStream']['Signature']
def extract_image_size_from_pdb(pdb_file):
""" Extract the PE sections headers from a pdb file and try to recompute the image size """
cvdump_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"cvdump.exe"
)
command = '{cvdump:s} -headers "{path:s}"'.format(
cvdump = cvdump_path,
path = pdb_file
)
# dump PE sections in one text block
output = subprocess.check_output(command)
output = output.decode('ascii')
# locate sections headers information
idx_section_headers = output.find('*** SECTION HEADERS')
idx_orig_section_headers = output.find('*** ORIGINAL SECTION HEADERS')
section_headers = output[idx_section_headers:idx_orig_section_headers]
sections_headers = re.split(r"SECTION HEADER #", section_headers)[1:]
header_regex = re.compile("\r\n".join([
"(\d+)",
" *([\.\w]+) name",
" *([0-9A-F]+) virtual size",
" *([0-9A-F]+) virtual address",
" *([0-9A-F]+) size of raw data",
" *([0-9A-F]+) file pointer to raw data",
" *([0-9A-F]+) file pointer to relocation table",
" *([0-9A-F]+) file pointer to line numbers",
" *([0-9A-F]+) number of relocations",
" *([0-9A-F]+) number of line numbers",
" *([0-9A-F]+) flags",
]), re.MULTILINE)
# Parse every section header and return the max vaddr present
max_virtual_address = 0
for hdr in sections_headers:
header = header_regex.match(hdr)
hid, name, vsize, vaddr, rdsize, rd_fp, rt_fp, ln_fp, reloc_number, ln_number, flags = header.groups()
vaddr_end_section = int(vaddr, 16) + int(vsize,16)
max_virtual_address = max(max_virtual_address, vaddr_end_section)
# align max_virtual_address on a page sixze
max_virtual_address = (max_virtual_address + PAGE_SIZE) - (max_virtual_address % PAGE_SIZE)
return max_virtual_address
def relocate_dll(filepath, filename):
pe_obj = pefile.PE(filepath)
pe_pdb_GUID = get_guid(pe_obj)
pe_obj.close()
new_GUID_folder = os.path.join(
os.path.dirname(os.path.dirname(filepath)),
pe_pdb_GUID,
)
new_GUID_dll = os.path.join(new_GUID_folder, filename)
os.makedirs(new_GUID_folder, exist_ok = True)
shutil.move(filepath, new_GUID_dll)
def try_download_pe(url, filename, timestamp, image_size, output_filename):
pe_url = "{url:s}/{filename:s}/{timestamp:s}{image_size:s}/{filename:s}".format(
url = url,
filename = filename,
timestamp = "%X" % timestamp,
image_size = "%X" % image_size,
)
# print("[-] testing url : %s" % pe_url)
for test_try in range(0, 5):
try:
response = requests.get(pe_url, stream=True)
if response.status_code != 200:
return response.status_code
with open(output_filename, 'wb') as f:
for data in response.iter_content(32*1024):
f.write(data)
return response.status_code
except requests.exceptions.ConnectionError as ce:
pass
def try_download_pe_routine(params):
url, filename, timestamp, image_size, output_filename = params
try_download_pe_fuzzy_timestamp(url, filename, timestamp, image_size, output_filename)
def try_download_pe_fuzzy_timestamp(url, filename, timestamp, image_size, output_filename):
# fuzz_range = 15000
fuzz_range = 5000
for test_timestamp in range(timestamp - 10, timestamp+fuzz_range):
status_code = try_download_pe(url, filename, test_timestamp, image_size, output_filename)
if status_code == 200:
pdb_guid = os.path.basename(os.path.dirname(output_filename))
pe_obj = pefile.PE(output_filename)
pe_pdb_GUID = get_guid(pe_obj)
pe_obj.close()
if pdb_guid != pe_pdb_GUID:
print("[x] Found matching fuzzy timestamp (%x) but the GUID don't match (%s != %s)" % (test_timestamp, pdb_guid, pe_pdb_GUID))
relocate_dll(output_filename, filename)
return False
else:
print("[!] Found matching fuzzy timestamp %s : %x" % (output_filename, test_timestamp))
return True
print("[x] Cound not found a matching timestamp in : [%x, %x] for %s" % (timestamp - 10, timestamp+fuzz_range, output_filename))
return False
def bulk_download(root_pdb_folder, pe_filename = None):
pdb_found = []
# enumerate all the pdb files without an associated pe
for root, folders, files in os.walk(root_pdb_folder):
for pdb_filename in filter(lambda pfile: pfile.endswith(".pdb"), files):
pdb_filepath = os.path.join(root, pdb_filename)
pdb_filename, _ = os.path.splitext(os.path.basename(pdb_filepath))
if not pe_filename:
pe_filename = "%s.dll" % pdb_filename
pe_filepath = os.path.join(
os.path.dirname(pdb_filepath),
"%s" % pe_filename
)
if os.path.exists(pe_filepath):
continue
try:
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath)
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath)
except subprocess.CalledProcessError as cpe:
continue
logging.info("[-] Found pdb file : %s (%x %x)" % (pdb_filepath, pdb_timestamp, pdb_size_of_image))
pdb_found.append((MICROSOFT_SYMBOL_STORE, pe_filename, pdb_timestamp, pdb_size_of_image, pe_filepath))
random.shuffle(pdb_found)
with multiprocessing.Pool(15) as p:
p.map(try_download_pe_routine, pdb_found)
def export_summary_infos(root_pdb_folder):
# print("Version;FileVersion;TimeDateStamp;Interpreted timestamp;SizeOfImage;PDBSignature;PDB reconstructed SizeOfImage;Timestamp difference;PDB GUID;Filepath")
print("| Version | FileVersion | TimeDateStamp | Interpreted timestamp | SizeOfImage | PDBSignature | PDB reconstructed SizeOfImage | Timestamp difference | PDB GUID | ")
print("| ------------- | ------------- | ------------- | ----------------------| ------------- | ------------- | ----------------------------- | -------------------- | ------------- | ")
for root, folders, files in os.walk(root_pdb_folder):
for pe_filename in filter(lambda pfile: pfile.endswith(".dll"), files):
pe_filepath = os.path.join(root, pe_filename)
pe_name, _ = os.path.splitext(os.path.basename(pe_filepath))
pdb_filename = "%s.pdb" % pe_name
pdb_filepath = os.path.join(
os.path.dirname(pe_filepath),
"%s" % pdb_filename
)
if os.path.exists(pdb_filepath):
pdb_guid = os.path.basename(os.path.dirname(pe_filepath))
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath)
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath)
pe_obj = pefile.PE(pe_filepath)
pe_size_of_image = pe_obj.OPTIONAL_HEADER.SizeOfImage
pe_timestamp = pe_obj.FILE_HEADER.TimeDateStamp
pe_version = get_product_version(pe_obj)
pe_file_version = pe_obj.FileInfo[0].StringTable[0].entries[b'FileVersion'].decode('ascii')
pe_obj.close()
pe_timestamp_as_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(float(pe_timestamp)))
print(
# "\"{Version:d}\";\"{FileVersion:s}\";{TimeDateStamp:d};\"{Interpreted_Timestamp:s}\";{SizeOfImage:d};{PDBSignature:d};{PDBSizeOfImage:d};{TimeStampDifference:d};\"{PDB_GUID:s}\";\"{Filepath:s}\"".format(
"|{Version:d}|{FileVersion:s}|0x{TimeDateStamp:x}|{Interpreted_Timestamp:s}|0x{SizeOfImage:x}|0x{PDBSignature:x}|0x{PDBSizeOfImage:x}|{TimeStampDifference:d}|{PDB_GUID:s}|".format(
Version = pe_version,
FileVersion = pe_file_version,
TimeDateStamp = pe_timestamp,
Interpreted_Timestamp = pe_timestamp_as_time,
SizeOfImage = pe_size_of_image,
PDBSignature = pdb_timestamp,
PDBSizeOfImage = pdb_size_of_image,
TimeStampDifference = pe_timestamp - pdb_timestamp,
PDB_GUID = pdb_guid,
Filepath = pe_filepath
))
def verify_downloads(root_pdb_folder):
for root, folders, files in os.walk(root_pdb_folder):
for pe_filename in filter(lambda pfile: pfile.endswith(".dll"), files):
pe_filepath = os.path.join(root, pe_filename)
pe_name, _ = os.path.splitext(os.path.basename(pe_filepath))
pdb_filename = "%s.pdb" % pe_name
pdb_filepath = os.path.join(
os.path.dirname(pe_filepath),
"%s" % pdb_filename
)
if not os.path.exists(pdb_filepath):
continue
pdb_guid = os.path.basename(os.path.dirname(pe_filepath))
pe_obj = pefile.PE(pe_filepath)
pe_pdb_GUID = get_guid(pe_obj)
pe_obj.close()
status = ("+", "x")[pdb_guid != pe_pdb_GUID]
print("[%s] %s (%s == %s)" % (status, pe_filepath, pdb_guid, pe_pdb_GUID))
if pdb_guid != pe_pdb_GUID:
relocate_dll(pe_filepath, pe_name)
if __name__ == '__main__':
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
# logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
arg_parser = argparse.ArgumentParser("download pe from PDB file.")
arg_parser.add_argument("--pdb", type=str, help="path to root pdb folder")
arg_parser.add_argument("-v", "--verbose", action="store_true", help="verbose output : activate debug logging.")
subparsers = arg_parser.add_subparsers(dest='command')
download_parser = subparsers.add_parser('DOWNLOAD')
download_parser.add_argument("--url", type=str, help="url to symbol store")
export_parser = subparsers.add_parser('EXPORT')
verify_parser = subparsers.add_parser('VERIFY')
args = arg_parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
if args.command == 'DOWNLOAD':
bulk_download(args.pdb, pe_filename="ntdll.dll")
elif args.command == 'EXPORT':
export_summary_infos(args.pdb)
elif args.command == 'VERIFY':
verify_downloads(args.pdb)
else:
raise ValueError("unsupported command : %s" % args.command)
sys.exit(0)
bulk_download(r"F:\Dev\pypdb\tests\ntdll.pdb", pe_filename="ntdll.dll")
sys.exit(0)
pdb_filepath = r"F:\Dev\pypdb\tests\ntdll.pdb\B7803D0EC5D54691BDF74A72B4988B401\ntdll.pdb"
pdb_filename, _ = os.path.splitext(os.path.basename(pdb_filepath))
pe_filename = "%s.dll" % pdb_filename
pe_filepath = os.path.join(
os.path.dirname(pdb_filepath),
"%s" % pe_filename
)
pdb_timestamp = extract_timestamp_from_pdb(pdb_filepath)
pdb_size_of_image = extract_image_size_from_pdb(pdb_filepath)
try_download_pe_fuzzy_timestamp(MICROSOFT_SYMBOL_STORE, pe_filename, pdb_timestamp, pdb_size_of_image, pe_filepath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment