Skip to content

Instantly share code, notes, and snippets.

@SeanPesce
Created June 10, 2022 00:04
Show Gist options
  • Save SeanPesce/b9f3b9b6b87c57bc3d1d529fe0476d68 to your computer and use it in GitHub Desktop.
Save SeanPesce/b9f3b9b6b87c57bc3d1d529fe0476d68 to your computer and use it in GitHub Desktop.
Interactive pseudo-shell for executing shell commands on a remote MSSQL server via xp_cmdshell
#!/usr/bin/env python3
# Author: Sean Pesce
# This script acts as a pseudo-shell by executing shell commands on a remote MSSQL server instance
# using sqsh and xp_cmdshell.
import argparse
import os
import platform
import re
import subprocess
import sys
from cmd import Cmd
SQSH_URL = 'https://github.com/vonloxley/sqsh'
CONNECTION_ERROR_MSG = 'Failed to connect to the MSSQL server.'
def sqsh_installed():
"""
Returns True if sqsh is installed.
https://github.com/vonloxley/sqsh
"""
null_output = '/dev/null'
if 'windows' in platform.system().lower():
null_output = 'NUL'
retcode = os.system(f'sqsh --help > {null_output} 2>&1')
return not retcode
def output_has_connection_error(output):
"""
Returns True if a connection error message was detected in the given output data
"""
return b'connection failed' in output.lower()
class XpCmdShell(Cmd):
prompt = 'sql$ '
intro = 'Connected to remote server.\n'
# Class variables for cleaning up output
output_delimiter = 'zZzZzZSeanPZzZzZz'
output_whitespace_threshold = 100
def __init__(self, host, port, user, password, encoding='utf8'):
super(self.__class__, self).__init__()
self.host = host
self.port = int(port)
self.user = user
self.password = password
self.encoding = encoding
self.__class__.prompt = f'{self.user}@{self.host} $ '
self.__class__.intro = f'Connected to {self.host}:{self.port}\n'
def sqsh_command_str(self, command):
"""
Returns a string containing a shell command to execute raw SQL via sqsh
"""
user = self.user.replace('\\', '\\\\')
password = self.password.replace('\\', '\\\\')
command = command.replace('\\', '\\\\').replace('"', '\\"')
return f'sqsh -P {password} -S {self.host}:{int(self.port)} -U {user} -C "{command}"'
def sqsh_command_list(self, command):
"""
Returns a string containing a shell command to execute raw SQL via sqsh
"""
#command = command.replace('\\', '\\\\').replace('"', '\\"')
return [ 'sqsh', '-P', self.password, '-S', f'{self.host}:{int(self.port)}', '-U', self.user, '-C', command ]
def sqsh_xp_cmdshell_str(self, command):
"""
Returns a string containing a shell command to execute shell commands on the remote MSSQL host via xp_cmdshell
"""
return self.sqsh_command_str(f'xp_cmdshell \'{command}\'')
def sqsh_xp_cmdshell_list(self, command):
"""
Returns a string containing a shell command to execute shell commands on the remote MSSQL host via xp_cmdshell
"""
return self.sqsh_command_list(f'xp_cmdshell \'{command}\'')
def get_xp_cmdshell_output_using_delims(self, command):
delim = self.__class__.output_delimiter
sqsh_cmd = self.sqsh_xp_cmdshell_list(f'echo {delim} & {command} & echo {delim}')
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT)
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG
output = output.decode(self.encoding)
# Isolate output using hard-coded delimiter
assert delim in output, f'Failed to obtain output for command: "{command}"'
output = output[output.find(delim)+len(delim):]
output = output[:output.find(delim)]
# Remove unnecessary whitespace
output = output.strip()
output = re.sub('[ \n\r\t]{'+str(self.__class__.output_whitespace_threshold)+',}', '\n', output)
return output
def check_connection(self):
"""
Returns True if connection to the MSSQL server succeeded
"""
print(f'Checking connection to server {self.host}:{self.port}')
sqsh_cmd = self.sqsh_command_str('select \'\'')
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True)
return not output_has_connection_error(output)
def enable_xp_cmdshell(self):
print('Enabling xp_cmdshell')
sqsh_cmd = self.sqsh_command_str('EXEC SP_CONFIGURE \'show advanced options\', 1')
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True)
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG
assert b'\'show advanced options\' changed from' in output, 'Failed to enable advanced options. Try running this script again with "--no-reconfig"'
sqsh_cmd = self.sqsh_command_str('RECONFIGURE')
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True)
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG
sqsh_cmd = self.sqsh_command_str('EXEC SP_CONFIGURE \'xp_cmdshell\', 1')
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True)
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG
assert b'\'xp_cmdshell\' changed from' in output, 'Failed to enable xp_cmdshell. Try running this script again with "--no-reconfig"'
sqsh_cmd = self.sqsh_command_str('RECONFIGURE')
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT, shell=True)
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG
def do_exit(self, line):
print("Exiting")
return True
def do_quit(self, line):
return self.do_exit(line)
def help_exit(self):
print('Exit this shell.')
def help_quit(self):
self.help_exit()
def do_sql(self, line):
sqsh_cmd = self.sqsh_command_list(line)
output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT)
assert not output_has_connection_error(output), CONNECTION_ERROR_MSG
output = output.decode(self.encoding)
print(output)
def help_sql(self):
print('Usage:\n\n\tsql <SQL STATEMENT>\n\nExecute a raw SQL statement on the server.')
def default(self, line):
"""
Default behavior: Run shell commands on the remote server via xp_cmdshell
"""
if not line:
print()
return
## Raw sqsh output:
#sqsh_cmd = self.sqsh_xp_cmdshell_list(line)
#output = subprocess.check_output(sqsh_cmd, stderr=subprocess.STDOUT)
#assert not output_has_connection_error(output), CONNECTION_ERROR_MSG
#output = output.decode(self.encoding)
# Cleaner-looking output that has a (low) potential to be malformed due to the sanitization process
output = self.get_xp_cmdshell_output_using_delims(line)
print(output)
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser(description='Wrapper shell for executing commands via xp_cmdshell on a Microsoft SQL (MSSQL) server.')
arg_parser.add_argument('-H', '--host', help='MSSQL server host/IP address', required=True)
arg_parser.add_argument('-p', '--port', type=int, default=1433, help='MSSQL server port')
arg_parser.add_argument('-u', '--username', help='MSSQL server username', required=True) #, default='sa')
arg_parser.add_argument('-P', '--password', help='MSSQL server password', required=True)
arg_parser.add_argument('-n', '--no-reconfig', action='store_true', help='Disables automatic server reconfiguration to enable xp_cmdshell. Useful when the user has low privileges, but xp_cmdshell is already enabled.')
arg_parser.set_defaults(no_reconfig=False)
args = arg_parser.parse_args()
if not sqsh_installed():
print('[ERROR] sqsh is not installed. You might be able to install it with "sudo apt-get install -y sqsh"; otherwise, get it here:')
print(f'\t{SQSH_URL}')
sys.exit(1)
xp_cmdshell = XpCmdShell(args.host, args.port, args.username, args.password)
if not xp_cmdshell.check_connection():
print(f'[ERROR] {CONNECTION_ERROR_MSG}')
sys.exit(1)
if not args.no_reconfig:
xp_cmdshell.enable_xp_cmdshell()
xp_cmdshell.cmdloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment