Last active
May 22, 2023 01:37
-
-
Save JochiRaider/5adff21410ab408f37ec533447c25a90 to your computer and use it in GitHub Desktop.
Netcat like functions in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import argparse | |
import socket | |
import threading | |
import subprocess | |
class NetCarpy: | |
def __init__(self, host:str, port:int, execute:str, command: bool, upload_dest:str) -> None: | |
self.host = host | |
self.port = port | |
self.execute = execute | |
self.command = command | |
self.upload_dest = upload_dest | |
def client_sender(self, buffer:str=''): | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: | |
client.connect((self.host,self.port)) | |
client.send(buffer.encode()) | |
print(f'{self.host}:{self.port} <- {buffer}') | |
while True: | |
recv_len = 1 | |
response = b'' | |
while recv_len: | |
data = client.recv(4096) | |
recv_len = len(data) | |
response += data | |
if recv_len < 4096: | |
break | |
print(f'{self.host}:{self.port} -> {response.decode().strip()}') | |
buffer = input(f'{self.host}:{self.port} <- ') | |
buffer += '\n' | |
client.send(buffer.encode()) | |
except: | |
print(f'connection to {self.host}:{self.port} failed. Closing') | |
sys.exit() | |
def run_command(self, command: str): | |
command = command.rstrip() | |
try: | |
output = subprocess.check_output(command, stderr = subprocess.STDOUT, shell = True) | |
except: | |
output = f'Failed to execute {command} command on {self.host}:{self.port}.\r\n'.encode() | |
return output | |
def server_loop(self): | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind((self.host,self.port)) | |
server.listen() | |
client_socket, addr = server.accept() | |
# print(f'Incoming -> {addr}') | |
client_thread = threading.Thread(target=self.client_handler,args=(client_socket, )) | |
client_thread.start() | |
def client_handler(self, socket_obj): | |
if len(self.upload_dest): | |
file_buffer = b'' | |
recv_len = 1 | |
while recv_len: | |
data = socket_obj.recv(4096) | |
recv_len = len(data) | |
file_buffer += data | |
if recv_len < 4096: | |
break | |
try: | |
with open(self.upload_dest,'wb') as f: | |
f.write(file_buffer) | |
f.close() | |
print(f'File saved to {self.upload_dest}\r\n') | |
except: | |
print(f'Failed to save file to {self.upload_dest}') | |
elif len(self.execute): | |
output = self.run_command(self.execute) | |
socket_obj.send(output) | |
elif self.command: | |
while True: | |
socket_obj.send(b'<NCPy:#> ') | |
cmd_buffer = b'' | |
while b'\n' not in cmd_buffer: | |
cmd_buffer += socket_obj.recv(1024) | |
if cmd_buffer.strip() == b'EXIT': | |
print('\n') | |
sys.exit() | |
response = self.run_command(cmd_buffer) | |
socket_obj.send(response) | |
else: | |
sys.exit() | |
def cli_interface(): | |
parser = argparse.ArgumentParser(description='NetCarPy CLI') | |
parser.add_argument('-t', required=False, metavar='IP', default='0', type = str, help='IP -t xxx.xxx.xxx.xxx') | |
parser.add_argument('-p', required=True, metavar='Port',type = int, help='Port as an integer(commonly between 1024 and 65535)') | |
parser.add_argument('-u', required=False, metavar='Upload path', default='', type = str, help='Listener will take data and write it to a file at the path given, on connection ') | |
parser.add_argument('-e', required=False, metavar='Execute path', default='', type = str, help='Run a file at a given path on the listener system, output is returned to the caller') | |
parser.add_argument('-l', required=False, action='store_true',help='listen on [host]:[port] for incoming connections') | |
parser.add_argument('-c', required=False, action='store_true',help='initialize a command shell on the listener, on connection') | |
pars_arg = parser.parse_args() | |
ncpy = NetCarpy(host=pars_arg.t, port=pars_arg.p, execute=pars_arg.e, command=pars_arg.c, upload_dest=pars_arg.u) | |
if not pars_arg.l and pars_arg.t and pars_arg.p > 0: | |
print('press CTRL-D to finish and send') | |
cli = sys.stdin.read() | |
ncpy.client_sender(buffer=cli) | |
elif pars_arg.l: | |
ncpy.server_loop() | |
def main(): | |
cli_interface() | |
if __name__ == '__main__': | |
main() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import hashlib | |
import argparse | |
import re | |
class PasswordCheck: | |
def __init__(self, test_string: str, ascii_lc = True, ascii_uc = True, num = True, | |
spec_char = True, spec_char_list = ['@','$','!','%','*','#','?','&'], | |
length = 8, ) -> None: | |
'''Initialize the class values, defaults set above''' | |
self.ascii_lc = ascii_lc | |
self.ascii_uc = ascii_uc | |
self.num = num | |
self.spec_char = spec_char | |
self.spec_char_list = spec_char_list | |
self.length = length | |
self.test_string = test_string | |
def password_character_checker(self, ) -> list: | |
'''Initialize zero'd counters''' | |
count_lc = 0 | |
count_uc = 0 | |
count_num = 0 | |
count_spc = 0 | |
'''Preform tests for each character in the password''' | |
for check_char in self.test_string: | |
if check_char.islower(): | |
count_lc += 1 | |
elif check_char.isupper(): | |
count_uc += 1 | |
elif check_char.isdigit(): | |
count_num += 1 | |
elif check_char in self.spec_char_list: | |
count_spc += 1 | |
'''Build a list of the test results''' | |
check_return = [[self.ascii_lc, count_lc, 'lower case letters'],[self.ascii_uc, count_uc, 'upper case letters'],[self.num, count_num, '0-9 numbers'],[self.spec_char,count_spc, 'special characters']] | |
'''Initialize an empty list to hold the results''' | |
result = [] | |
'''Test password length''' | |
if len(self.test_string) >= self.length: | |
result.append([True, len(self.test_string), f'The password passes the length requirement of {self.length}.']) | |
else: | |
result.append([False, len(self.test_string), f'The password fails the length requirement of {self.length}.']) | |
'''Build the result list based on tests above [[bool,int,str],...]''' | |
for check in check_return: | |
if check[0] and check[1] < 1: | |
result.append([False, check[1], f'The password fails for not having any {check[2]}']) | |
elif not check[0] and check[1] >= 1: | |
result.append([False, check[1], f'{check[2]} are not allowed in passwords under the rules given.']) | |
elif check[0] and check[1] >= 1: | |
result.append([True, check[1], f'The password passed with 1 or more {check[2]}']) | |
else: | |
result.append([True, check[1], f'{check[2]} not found in the password, in compliance with the rules.']) | |
return result | |
def password_regex_checker(self, ) -> bool: | |
'''Build regex pattern based on rules from the class''' | |
pattern = '^' | |
if self.ascii_lc: | |
pattern += '(?=.*[a-z])' | |
if self.ascii_uc: | |
pattern += '(?=.*[A-Z])' | |
if self.num: | |
pattern += '(?=.*\d)' | |
if self.spec_char: | |
spec_char = ''.join(self.spec_char_list) | |
pattern += '(?=.*[' + spec_char + '])' | |
if self.length and self.spec_char: | |
pattern += '[A-Za-z\d' + spec_char + ']{' + str(self.length) + ',100}$' | |
elif self.length: | |
pattern += '[A-Za-z\d]{' + str(self.length) + ',100}$' | |
else: | |
pattern += '$' | |
'''Compile pattern for use in the search''' | |
pat_re = re.compile(pattern) | |
'''Test password against regex pattern''' | |
if re.search(pat_re,self.test_string): | |
return True | |
else: | |
return False | |
def password_rockyou_checker(self, ) -> bool: | |
'''Create a generator object for the rockyou.txt file''' | |
rockyou_gen = (row for row in open('rockyou.txt',errors='ignore')) | |
'''Test against values from the txt only returns True if the for loop completes without a break''' | |
for string in rockyou_gen: | |
if self.test_string == string.strip(): | |
return False | |
else: | |
return True | |
def password_test(self, ) -> list: | |
'''Pre-load the result list with the password and a sha1 hash''' | |
result = [[self.test_string,hashlib.sha1(str(self.test_string).encode()).hexdigest()],] | |
'''Checks if the password passes a regex pattern''' | |
if self.password_regex_checker(): | |
'''Checks the password against the rockyou list''' | |
if self.password_rockyou_checker(): | |
result.append([True, -1, f'The password {self.test_string} passed and was not found in the rockyou list']) | |
else: | |
result.append([False, -2, f'The password {self.test_string} passed but was found in the rockyou list']) | |
'''Use this test to return any failures based on class rules''' | |
result = result + self.password_character_checker() | |
return result | |
def password_hash_checker(self, ) -> bool: | |
'''future check against leaked password hash(sha1) DB from have you been pwnd''' | |
pass | |
def main(): | |
'''Set up command line argument parser''' | |
parser = argparse.ArgumentParser(description='Password Checker CLI') | |
'''Add arguments to be taken from the CL''' | |
parser.add_argument('Password', help='Enter a single password without white space and up to 100 characters') | |
parser.add_argument('-F', required=False, metavar='File path', help='Give the absolute path for a txt file of strings to be tested') | |
parser.add_argument('-SL', type=int, required=False, default=8, metavar='', help='Set minimum password length ex -SL 12') | |
parser.add_argument('-UC', action='store_false', required=False, help='Disallow upper case') | |
parser.add_argument('-LC', action='store_false', required=False, help='Disallow lower case') | |
parser.add_argument('-DC', action='store_false', required=False, help='Disallow digits') | |
parser.add_argument('-SC', action='store_false', required=False, help='Disallow special characters') | |
parser.add_argument('-SCL', action='extend', nargs='+', required=False, metavar='',default=['@','$','!','%','*','#','?','&'], help='set Special Character List ex. -SCL @ # ! ...') | |
'''Place the values in an object p''' | |
p = parser.parse_args() | |
'''Run tests and place results in a list [[bool,int,str],...]''' | |
test_result_list = PasswordCheck(p.Password,ascii_lc=p.LC,ascii_uc=p.UC,num=p.DC,spec_char=p.SC, length=p.SL, spec_char_list=p.SCL).password_test() | |
'''Initialize an empty list to hold any failure conditions''' | |
test_fails = [] | |
'''Append any fails from the results list to the fails list''' | |
for i in range(len(test_result_list)): | |
if not test_result_list[i][0]: | |
test_fails.append(test_result_list[i][2]) | |
'''Print the results''' | |
if test_fails: | |
print('\n'.join(test_fails)) | |
else: | |
print('passed') | |
if __name__ == "__main__": | |
main() | |
'''unit test lol''' | |
# a = PasswordCheck('JJ#!pj32') | |
# print(a.password_character_checker()) | |
# print(a.password_regex_checker()) | |
# print(a.password_rockyou_checker()) | |
# print(a.password_test()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import secrets | |
import string | |
class password_creator: | |
def __init__(self, ascii_lc = True, ascii_uc = True, ascii_num = True, | |
spec_char = True, spec_char_list = ['@','$','!','%','*','#','?','&'], | |
length = 12 ) -> None: | |
self.ascii_lc = ascii_lc | |
self.ascii_uc = ascii_uc | |
self.ascii_num = ascii_num | |
self.spec_char = spec_char | |
self.spec_char_list = spec_char_list | |
self.length = length | |
def password_gen_rnd_chr(self) -> str: | |
test_string = '' | |
while len(test_string) < self.length: | |
seed = secrets.randbelow(9) | |
if self.ascii_lc and seed <= 2 : | |
test_string += secrets.choice([x for x in string.ascii_lowercase]) | |
elif self.ascii_uc and 2 < seed <= 4: | |
test_string += secrets.choice([x for x in string.ascii_uppercase]) | |
elif self.ascii_num and 4 < seed <= 6: | |
test_string += secrets.choice([x for x in string.digits]) | |
elif self.spec_char and 6 < seed <= 8: | |
test_string += secrets.choice(self.spec_char_list) | |
return test_string | |
def password_gen_xkcd_style(self) -> str: | |
with open('/usr/share/dict/words') as f: | |
words = [word.strip() for word in f] | |
test_string = [secrets.choice(words), ] | |
for i in range(3): | |
if i % 2 == 0: | |
test_string.append(secrets.choice(string.digits)) | |
else: | |
test_string.append(secrets.choice(self.spec_char_list)) | |
test_string.append(secrets.choice(words)) | |
test_string = ''.join(test_string) | |
return test_string | |
#unit test lol | |
def main(): | |
pass | |
if __name__ == "__main__": | |
a = password_creator(ascii_num=False) | |
print(a.password_gen_rnd_chr()) | |
# print(a.password_gen_xkcd_style()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import secrets | |
from math import gcd | |
from base64 import b64encode,b64decode | |
class RandomPrimeGenerator: | |
"""A collection of tools for generating and testing prime numbers.""" | |
def __init__(self): | |
"""Initialize the RandomPrimeGenerator class.""" | |
pass | |
def miller_rabin(self, num): | |
"""Apply the Miller-Rabin primality test to a given number. | |
Args: | |
num (int): Number to be tested for primality. | |
Returns: | |
bool: True if number is probably prime, False otherwise. | |
""" | |
s = num - 1 | |
t = 0 | |
while s % 2 == 0: | |
s = s // 2 | |
t += 1 | |
for _ in range(5): # Perform the test 5 times for accuracy | |
a = secrets.randbelow(num+1) | |
v = pow(a, s, num) | |
if v != 1: | |
i = 0 | |
while v != (num - 1): | |
if i == t - 1: | |
return False | |
else: | |
i += 1 | |
v = pow(v, 2, num) | |
return True | |
def is_prime(self, num): | |
"""Check if a number is prime using low primes and the Miller-Rabin test. | |
Args: | |
num (int): Number to be tested for primality. | |
Returns: | |
bool: True if number is prime, False otherwise. | |
""" | |
# A list of low prime numbers for initial checks | |
low_primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, | |
67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, | |
157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, | |
251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313,317, 331, 337, 347, 349, | |
353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, | |
457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, 569, | |
571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661, | |
673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, 769, 773, 787, | |
797, 809, 811, 821, 823, 827, 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, | |
911, 919, 929, 937, 941, 947, 953, 967, 971, 977, 983, 991, 997] | |
if num in low_primes: | |
return True | |
for prime in low_primes: | |
if num % prime == 0: | |
return False | |
return self.miller_rabin(num) | |
def generate_large_prime(self, key_size=1024): | |
"""Generate a large prime number. | |
Args: | |
key_size (int): Desired size of the prime number in bits. | |
Returns: | |
int: A prime number of the specified size. | |
""" | |
while True: | |
num = secrets.randbelow(2**(key_size)+1) | |
if self.is_prime(num): | |
return num | |
class RSAKeyGenerator: | |
"""A class for generating RSA keys.""" | |
def __init__(self): | |
"""Initialize the RSAKeyGenerator class.""" | |
self.prime_tools = RandomPrimeGenerator() | |
def generate_keys(self, key_size=1024): | |
"""Generate a pair of RSA keys.""" | |
p = self.prime_tools.generate_large_prime(key_size) | |
q = self.prime_tools.generate_large_prime(key_size) | |
n = p * q | |
totient = (p - 1) * (q - 1) | |
e = self.prime_tools.generate_large_prime(16) | |
while gcd(e, totient) != 1: | |
e = self.prime_tools.generate_large_prime(16) | |
d = self.mod_inverse(e, totient) | |
return ((e, n), (d, n)) | |
def mod_inverse(self, a, m): | |
"""Perform the Extended Euclidean Algorithm to find the modular multiplicative inverse. | |
Args: | |
a (int): The number to find the inverse of. | |
m (int): The modulus. | |
Returns: | |
int: The modular multiplicative inverse of a modulo m. | |
Raises: | |
ValueError: If a and m are not coprime. | |
""" | |
if gcd(a, m) != 1: | |
raise ValueError('The given numbers are not coprime.') | |
u1, u2, u3 = 1, 0, a | |
v1, v2, v3 = 0, 1, m | |
while v3 != 0: | |
q = u3 // v3 | |
v1, v2, v3, u1, u2, u3 = (u1 - q * v1), (u2 - q * v2), (u3 - q * v3), v1, v2, v3 | |
return u1 % m | |
def create_key_files(self, name, key_size): | |
"""Create two files 'name_pubkey.pem' and 'name_privkey.pem' with RSA public and private keys. | |
Args: | |
name (str): The base name for the key files. | |
key_size (int): The size of the keys in bits. | |
Raises: | |
FileExistsError: If key files with the given names already exist. | |
""" | |
if os.path.exists(f'{name}_pubkey.pem') or os.path.exists(f'{name}_privkey.pem'): | |
raise FileExistsError('There is already a key file with this name.') | |
public_key, private_key = self.generate_keys(key_size) | |
with open(f'{name}_pubkey.pem', 'w') as pub_file: | |
pub_file.write('-----BEGIN RSA PUBLIC KEY-----\n') | |
pub_file.write(b64encode(f'{public_key[0]},{public_key[1]}'.encode()).decode() + '\n') | |
pub_file.write('-----END RSA PUBLIC KEY-----\n') | |
with open(f'{name}_privkey.pem', 'w') as priv_file: | |
priv_file.write('-----BEGIN RSA PRIVATE KEY-----\n') | |
priv_file.write(b64encode(f'{private_key[0]},{private_key[1]}'.encode()).decode() + '\n') | |
priv_file.write('-----END RSA PRIVATE KEY-----\n') | |
print(f'Successfully created key files {name}_pubkey.pem and {name}_privkey.pem') | |
class User: | |
def __init__(self, username, password, rsa_key_generator): | |
self.username = username | |
self.rsa_key_generator = rsa_key_generator | |
rsa_key_generator.create_key_files(self.username, 1024) # Use your existing key generator | |
self.public_key = self.load_key(f'{self.username}_pubkey.pem') | |
self.private_key = self.load_key(f'{self.username}_privkey.pem') | |
self.encrypted_password = self.encrypt_password(password) | |
def load_key(self, filename): | |
with open(filename, 'r') as file: | |
key_data = file.readlines()[1:-1] # Skip the first and last line | |
key_data = ''.join(key_data).replace('\n', '') | |
key_data = b64decode(key_data).decode() | |
key_data = tuple(map(int, key_data.split(','))) | |
return key_data | |
def encrypt_password(self, password): | |
"""Encrypts the password using the user's public key.""" | |
e, n = self.public_key | |
encrypted_password = [pow(ord(char), e, n) for char in password] | |
return encrypted_password | |
def check_password(self, password_attempt): | |
"""Decrypts the stored password and checks against the password attempt.""" | |
d, n = self.private_key | |
decrypted_password = ''.join(chr(pow(char, d, n)) for char in self.encrypted_password) | |
return decrypted_password == password_attempt | |
rsa_key_gen = RSAKeyGenerator() | |
user = User('username', 'password', rsa_key_gen) | |
print(user.check_password('password')) # Should print: True | |
print(user.check_password('wrong_password')) # Should print: False | |
# Testing the RSAKeyGenerator class | |
# rsa_key_gen = RSAKeyGenerator() | |
# rsa_key_gen.create_key_files('my_key', 2048) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import socket | |
import threading | |
class tcp_prox(): | |
def __init__(self,local_host: str,local_port: str,remote_host: str,remote_port: str,receive_first: bool) -> None: | |
self.local_host = local_host | |
self.local_port = local_port | |
self.remote_host = remote_host | |
self.remote_port = remote_port | |
self.receive_first = receive_first | |
def server_loop(self)->None: | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
server.bind((self.local_host,self.local_port)) | |
except BaseException: | |
print(f'>> Failed to listen on {self.local_host}:{self.local_port}\nExitting.') | |
sys.exit() | |
print(f'>> Listening on {self.local_host}:{self.local_port}') | |
server.listen() | |
while True: | |
client_socket, addr = server.accept() | |
print(f'>> Incoming connection from {addr[0]}:{addr[1]}') | |
proxy_thread = threading.Thread(target=self.proxy_handler,args=(client_socket)) | |
proxy_thread.start() | |
def proxy_handler(self,client_socket: socket)->None: | |
remote_socket: socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
remote_socket.connect((self.remote_host,self.remote_port)) | |
if self.receive_first: | |
remote_buffer: str = self.recevie_from(remote_socket) | |
hex_remote: str = self.hexdump(remote_buffer) | |
print(f'>> Incoming buffer dump from {self.remote_host}:{self.remote_port}\n{hex_remote}') | |
remote_buffer = self.response_handler(remote_buffer) | |
if len(remote_buffer): | |
print(f'<< {len(remote_buffer)} bytes sent to {self.local_host}:{self.local_port}') | |
client_socket.send(remote_buffer) | |
while True: | |
local_buffer: str = self.recevie_from(client_socket) | |
if len(local_buffer): | |
print(f'>> Received {len(local_buffer)} btyes from {self.local_host}:{self.local_port}') | |
hex_local:str = self.hexdump(local_buffer) | |
print(f'>> outgoing buffer dump from {self.local_host}:{self.local_port}\n{hex_local}') | |
local_buffer = self.response_handler(local_buffer) | |
remote_socket.send(local_buffer) | |
print(f'<< sent to {self.remote_host}:{self.remote_port}') | |
remote_buffer: str = self.recevie_from(remote_socket) | |
if len(remote_buffer): | |
print(f'>> Received {len(remote_buffer)} btyes from {self.remote_host}:{self.remote_port}') | |
hex_remote: str = self.hexdump(remote_buffer) | |
print(f'>> Inbound buffer dump from {self.remote_host}:{self.remote_port}\n{hex_remote}') | |
remote_buffer = self.response_handler(remote_buffer) | |
client_socket.send(local_buffer) | |
print(f'<< sent to {self.local_host}:{self.local_port}') | |
if not len(local_buffer) or not len(remote_buffer): | |
client_socket.close() | |
remote_socket.close() | |
print('\n>< No more data, conections closed.\n') | |
break | |
def hexdump(self, src, length=16): | |
result = [] | |
src = bytes(src, 'utf-8') # ensure bytes | |
for i in range(0, len(src), length): | |
s = src[i:i+length] | |
hexa = ' '.join(["{:02X}".format(x) for x in s]) | |
text = ''.join([chr(x) if 0x20 <= x < 0x7F else '.' for x in s]) | |
result.append("{:04X} {:<49} {}".format(i, hexa, text)) | |
return '\n'.join(result) | |
def receive_from(self, connection: socket): | |
""" | |
Receive data from a socket until no more data is available. | |
""" | |
buffer = b"" | |
connection.settimeout(2) | |
try: | |
while True: | |
data = connection.recv(4096) | |
if not data: | |
break | |
buffer += data | |
except BaseException: | |
pass | |
return buffer | |
def response_handler(self, buffer: bytes): | |
""" | |
Perform packet modifications here before sending to the remote host. | |
For now, this function doesn't do anything. | |
""" | |
return buffer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment