Skip to content

Instantly share code, notes, and snippets.

@ThePirateWhoSmellsOfSunflowers
Last active December 11, 2024 02:23
Show Gist options
  • Save ThePirateWhoSmellsOfSunflowers/3673746454aef7d55a5efed4dc4e1a61 to your computer and use it in GitHub Desktop.
Save ThePirateWhoSmellsOfSunflowers/3673746454aef7d55a5efed4dc4e1a61 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Impacket - Collection of Python classes for working with network protocols.
#
# Copyright Fortra, LLC and its affiliated companies
#
# All rights reserved.
#
# This software is provided under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Description:
# ATSVC example for some functions implemented, creates, enums, runs, delete jobs
# This example executes a command on the target machine through the Task Scheduler
# service. Returns the output of such command
#
# Author:
# Alberto Solino (@agsolino)
#
# Reference for:
# DCE/RPC for TSCH
#
from __future__ import division
from __future__ import print_function
import string
import sys
import argparse
import time
import random
import logging
import struct
from impacket.examples import logger
from impacket import version
from impacket.dcerpc.v5 import tsch, transport, rpcrt
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE, \
RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY
from impacket.examples.utils import parse_target
from impacket.krb5.keytab import Keytab
from six import PY2
CODEC = sys.stdout.encoding
class TSCH_EXEC:
def __init__(self, username='', password='', domain='', hashes=None, aesKey=None, doKerberos=False, kdcHost=None,
command=None, sessionId=None, silentCommand=False, port=str()):
self.__username = username
self.__password = password
self.__domain = domain
self.__lmhash = ''
self.__nthash = ''
self.__aesKey = aesKey
self.__doKerberos = doKerberos
self.__kdcHost = kdcHost
self.__command = command
self.__silentCommand = silentCommand
self.sessionId = sessionId
self.__port = port
if hashes is not None:
self.__lmhash, self.__nthash = hashes.split(':')
def bf_rpc_port(self, stringbinding):
tsch_uuid = tsch.MSRPC_UUID_TSCHS
l,h = self.__port.split('-', 2)
for port in range(int(l), int(h)):
binding_string_tsch = stringbinding.format(port)
logging.debug('candidate: '+binding_string_tsch)
rpctransport = transport.DCERPCTransportFactory(binding_string_tsch)
dce = rpctransport.get_dce_rpc()
try:
dce.connect()
dce.bind(tsch_uuid)
logging.debug('endpoint found! '+binding_string_tsch)
dce.disconnect()
return port
except rpcrt.DCERPCException as e:
pass
except struct.error as e:
pass
logging.error('TSCH RPC endpoint not found!')
sys.exit(1)
def play(self, addr):
stringbinding = 'ncacn_ip_tcp:'+ addr +'[{}]'
if not '-' in self.__port:
port = self.__port
else:
port = self.bf_rpc_port(stringbinding)
rpctransport = transport.DCERPCTransportFactory(stringbinding.format(port))
if hasattr(rpctransport, 'set_credentials'):
# This method exists only for selected protocol sequences.
rpctransport.set_credentials(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash,
self.__aesKey)
rpctransport.set_kerberos(self.__doKerberos, self.__kdcHost)
try:
self.doStuff(rpctransport)
except Exception as e:
if logging.getLogger().level == logging.DEBUG:
import traceback
traceback.print_exc()
logging.error(e)
if str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') >=0:
logging.info('When STATUS_OBJECT_NAME_NOT_FOUND is received, try running again. It might work')
def doStuff(self, rpctransport):
def output_callback(data):
try:
print(data.decode(CODEC))
except UnicodeDecodeError:
logging.error('Decoding error detected, consider running chcp.com at the target,\nmap the result with '
'https://docs.python.org/3/library/codecs.html#standard-encodings\nand then execute atexec.py '
'again with -codec and the corresponding codec')
print(data.decode(CODEC, errors='replace'))
def xml_escape(data):
replace_table = {
"&": "&",
'"': """,
"'": "'",
">": ">",
"<": "&lt;",
}
return ''.join(replace_table.get(c, c) for c in data)
def cmd_split(cmdline):
cmdline = cmdline.split(" ", 1)
cmd = cmdline[0]
args = cmdline[1] if len(cmdline) > 1 else ''
return [cmd, args]
dce = rpctransport.get_dce_rpc()
dce.set_credentials(*rpctransport.get_credentials())
if self.__doKerberos is True:
dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE)
dce.connect()
dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
dce.bind(tsch.MSRPC_UUID_TSCHS)
tmpName = ''.join([random.choice(string.ascii_letters) for _ in range(8)])
tmpFileName = tmpName + '.tmp'
if self.sessionId is not None:
cmd, args = cmd_split(self.__command)
else:
cmd = "cmd.exe"
args = "/C %s > %%windir%%\\Temp\\%s 2>&1" % (self.__command, tmpFileName)
xml = """<?xml version="1.0" encoding="UTF-16"?>
<Task version="1.2" xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task">
<Triggers>
<CalendarTrigger>
<StartBoundary>2015-07-15T20:35:13.2757294</StartBoundary>
<Enabled>true</Enabled>
<ScheduleByDay>
<DaysInterval>1</DaysInterval>
</ScheduleByDay>
</CalendarTrigger>
</Triggers>
<Principals>
<Principal id="LocalSystem">
<UserId>S-1-5-18</UserId>
<RunLevel>HighestAvailable</RunLevel>
</Principal>
</Principals>
<Settings>
<MultipleInstancesPolicy>IgnoreNew</MultipleInstancesPolicy>
<DisallowStartIfOnBatteries>false</DisallowStartIfOnBatteries>
<StopIfGoingOnBatteries>false</StopIfGoingOnBatteries>
<AllowHardTerminate>true</AllowHardTerminate>
<RunOnlyIfNetworkAvailable>false</RunOnlyIfNetworkAvailable>
<IdleSettings>
<StopOnIdleEnd>true</StopOnIdleEnd>
<RestartOnIdle>false</RestartOnIdle>
</IdleSettings>
<AllowStartOnDemand>true</AllowStartOnDemand>
<Enabled>true</Enabled>
<Hidden>true</Hidden>
<RunOnlyIfIdle>false</RunOnlyIfIdle>
<WakeToRun>false</WakeToRun>
<ExecutionTimeLimit>P3D</ExecutionTimeLimit>
<Priority>7</Priority>
</Settings>
<Actions Context="LocalSystem">
<Exec>
<Command>%s</Command>
<Arguments>%s</Arguments>
</Exec>
</Actions>
</Task>
""" % ((xml_escape(cmd) if self.__silentCommand is False else self.__command.split()[0]),
(xml_escape(args) if self.__silentCommand is False else " ".join(self.__command.split()[1:])))
taskCreated = False
try:
logging.info('Creating task \\%s' % tmpName)
tsch.hSchRpcRegisterTask(dce, '\\%s' % tmpName, xml, tsch.TASK_CREATE, NULL, tsch.TASK_LOGON_NONE)
taskCreated = True
logging.info('Running task \\%s' % tmpName)
done = False
if self.sessionId is None:
tsch.hSchRpcRun(dce, '\\%s' % tmpName)
else:
try:
tsch.hSchRpcRun(dce, '\\%s' % tmpName, flags=tsch.TASK_RUN_USE_SESSION_ID, sessionId=self.sessionId)
except Exception as e:
if str(e).find('ERROR_FILE_NOT_FOUND') >= 0 or str(e).find('E_INVALIDARG') >= 0 :
logging.info('The specified session doesn\'t exist!')
done = True
else:
raise
while not done:
logging.debug('Calling SchRpcGetLastRunInfo for \\%s' % tmpName)
resp = tsch.hSchRpcGetLastRunInfo(dce, '\\%s' % tmpName)
if resp['pLastRuntime']['wYear'] != 0:
done = True
else:
time.sleep(2)
logging.info('Deleting task \\%s' % tmpName)
tsch.hSchRpcDelete(dce, '\\%s' % tmpName)
taskCreated = False
except tsch.DCERPCSessionError as e:
logging.error(e)
e.get_packet().dump()
finally:
if taskCreated is True:
tsch.hSchRpcDelete(dce, '\\%s' % tmpName)
if self.sessionId is not None:
dce.disconnect()
return
if self.__silentCommand:
dce.disconnect()
return
dce.disconnect()
# Process command-line arguments.
if __name__ == '__main__':
print(version.BANNER)
parser = argparse.ArgumentParser()
parser.add_argument('target', action='store', help='[[domain/]username[:password]@]<targetName or address>')
parser.add_argument('command', action='store', nargs='*', default=' ', help='command to execute at the target ')
parser.add_argument('-session-id', action='store', type=int, help='an existed logon session to use (no output, no cmd.exe)')
parser.add_argument('-ts', action='store_true', help='adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
parser.add_argument('-codec', action='store', help='Sets encoding used (codec) from the target\'s output (default '
'"%s"). If errors are detected, run chcp.com at the target, '
'map the result with '
'https://docs.python.org/3/library/codecs.html#standard-encodings and then execute wmiexec.py '
'again with -codec and the corresponding codec ' % CODEC)
group = parser.add_argument_group('authentication')
group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH')
group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)')
group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file '
'(KRB5CCNAME) based on target parameters. If valid credentials cannot be found, it will use the '
'ones specified in the command line')
group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication '
'(128 or 256 bits)')
group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. '
'If omitted it will use the domain part (FQDN) specified in the target parameter')
group.add_argument('-keytab', action="store", help='Read keys for SPN from keytab file')
group.add_argument('-port', action="store", metavar = "Y-X", default='49650-49680', help='Port')
if len(sys.argv)==1:
parser.print_help()
sys.exit(1)
options = parser.parse_args()
# Init the example's logger theme
logger.init(options.ts)
if options.codec is not None:
CODEC = options.codec
else:
if CODEC is None:
CODEC = 'utf-8'
logging.warning("This will work ONLY on Windows >= Vista")
if ''.join(options.command) == ' ':
logging.error('You need to specify a command to execute!')
sys.exit(1)
if options.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
# Print the Library's installation path
logging.debug(version.getInstallationPath())
else:
logging.getLogger().setLevel(logging.INFO)
domain, username, password, address = parse_target(options.target)
if domain is None:
domain = ''
if options.keytab is not None:
Keytab.loadKeysFromKeytab (options.keytab, username, domain, options)
options.k = True
if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None:
from getpass import getpass
password = getpass("Password:")
if options.aesKey is not None:
options.k = True
atsvc_exec = TSCH_EXEC(username, password, domain, options.hashes, options.aesKey, options.k, options.dc_ip,
' '.join(options.command), options.session_id, True, options.port)
atsvc_exec.play(address)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment