Skip to content

Instantly share code, notes, and snippets.

@JochiRaider
Last active May 22, 2023 01:37
Show Gist options
  • Save JochiRaider/5adff21410ab408f37ec533447c25a90 to your computer and use it in GitHub Desktop.
Save JochiRaider/5adff21410ab408f37ec533447c25a90 to your computer and use it in GitHub Desktop.
Netcat like functions in python
#!/usr/bin/env python3
import sys
import argparse
import socket
import threading
import subprocess
class NetCarpy:
def __init__(self, host:str, port:int, execute:str, command: bool, upload_dest:str) -> None:
self.host = host
self.port = port
self.execute = execute
self.command = command
self.upload_dest = upload_dest
def client_sender(self, buffer:str=''):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
client.connect((self.host,self.port))
client.send(buffer.encode())
print(f'{self.host}:{self.port} <- {buffer}')
while True:
recv_len = 1
response = b''
while recv_len:
data = client.recv(4096)
recv_len = len(data)
response += data
if recv_len < 4096:
break
print(f'{self.host}:{self.port} -> {response.decode().strip()}')
buffer = input(f'{self.host}:{self.port} <- ')
buffer += '\n'
client.send(buffer.encode())
except:
print(f'connection to {self.host}:{self.port} failed. Closing')
sys.exit()
def run_command(self, command: str):
command = command.rstrip()
try:
output = subprocess.check_output(command, stderr = subprocess.STDOUT, shell = True)
except:
output = f'Failed to execute {command} command on {self.host}:{self.port}.\r\n'.encode()
return output
def server_loop(self):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.host,self.port))
server.listen()
client_socket, addr = server.accept()
# print(f'Incoming -> {addr}')
client_thread = threading.Thread(target=self.client_handler,args=(client_socket, ))
client_thread.start()
def client_handler(self, socket_obj):
if len(self.upload_dest):
file_buffer = b''
recv_len = 1
while recv_len:
data = socket_obj.recv(4096)
recv_len = len(data)
file_buffer += data
if recv_len < 4096:
break
try:
with open(self.upload_dest,'wb') as f:
f.write(file_buffer)
f.close()
print(f'File saved to {self.upload_dest}\r\n')
except:
print(f'Failed to save file to {self.upload_dest}')
elif len(self.execute):
output = self.run_command(self.execute)
socket_obj.send(output)
elif self.command:
while True:
socket_obj.send(b'<NCPy:#> ')
cmd_buffer = b''
while b'\n' not in cmd_buffer:
cmd_buffer += socket_obj.recv(1024)
if cmd_buffer.strip() == b'EXIT':
print('\n')
sys.exit()
response = self.run_command(cmd_buffer)
socket_obj.send(response)
else:
sys.exit()
def cli_interface():
parser = argparse.ArgumentParser(description='NetCarPy CLI')
parser.add_argument('-t', required=False, metavar='IP', default='0', type = str, help='IP -t xxx.xxx.xxx.xxx')
parser.add_argument('-p', required=True, metavar='Port',type = int, help='Port as an integer(commonly between 1024 and 65535)')
parser.add_argument('-u', required=False, metavar='Upload path', default='', type = str, help='Listener will take data and write it to a file at the path given, on connection ')
parser.add_argument('-e', required=False, metavar='Execute path', default='', type = str, help='Run a file at a given path on the listener system, output is returned to the caller')
parser.add_argument('-l', required=False, action='store_true',help='listen on [host]:[port] for incoming connections')
parser.add_argument('-c', required=False, action='store_true',help='initialize a command shell on the listener, on connection')
pars_arg = parser.parse_args()
ncpy = NetCarpy(host=pars_arg.t, port=pars_arg.p, execute=pars_arg.e, command=pars_arg.c, upload_dest=pars_arg.u)
if not pars_arg.l and pars_arg.t and pars_arg.p > 0:
print('press CTRL-D to finish and send')
cli = sys.stdin.read()
ncpy.client_sender(buffer=cli)
elif pars_arg.l:
ncpy.server_loop()
def main():
cli_interface()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import hashlib
import argparse
import re
class PasswordCheck:
def __init__(self, test_string: str, ascii_lc = True, ascii_uc = True, num = True,
spec_char = True, spec_char_list = ['@','$','!','%','*','#','?','&'],
length = 8, ) -> None:
'''Initialize the class values, defaults set above'''
self.ascii_lc = ascii_lc
self.ascii_uc = ascii_uc
self.num = num
self.spec_char = spec_char
self.spec_char_list = spec_char_list
self.length = length
self.test_string = test_string
def password_character_checker(self, ) -> list:
'''Initialize zero'd counters'''
count_lc = 0
count_uc = 0
count_num = 0
count_spc = 0
'''Preform tests for each character in the password'''
for check_char in self.test_string:
if check_char.islower():
count_lc += 1
elif check_char.isupper():
count_uc += 1
elif check_char.isdigit():
count_num += 1
elif check_char in self.spec_char_list:
count_spc += 1
'''Build a list of the test results'''
check_return = [[self.ascii_lc, count_lc, 'lower case letters'],[self.ascii_uc, count_uc, 'upper case letters'],[self.num, count_num, '0-9 numbers'],[self.spec_char,count_spc, 'special characters']]
'''Initialize an empty list to hold the results'''
result = []
'''Test password length'''
if len(self.test_string) >= self.length:
result.append([True, len(self.test_string), f'The password passes the length requirement of {self.length}.'])
else:
result.append([False, len(self.test_string), f'The password fails the length requirement of {self.length}.'])
'''Build the result list based on tests above [[bool,int,str],...]'''
for check in check_return:
if check[0] and check[1] < 1:
result.append([False, check[1], f'The password fails for not having any {check[2]}'])
elif not check[0] and check[1] >= 1:
result.append([False, check[1], f'{check[2]} are not allowed in passwords under the rules given.'])
elif check[0] and check[1] >= 1:
result.append([True, check[1], f'The password passed with 1 or more {check[2]}'])
else:
result.append([True, check[1], f'{check[2]} not found in the password, in compliance with the rules.'])
return result
def password_regex_checker(self, ) -> bool:
'''Build regex pattern based on rules from the class'''
pattern = '^'
if self.ascii_lc:
pattern += '(?=.*[a-z])'
if self.ascii_uc:
pattern += '(?=.*[A-Z])'
if self.num:
pattern += '(?=.*\d)'
if self.spec_char:
spec_char = ''.join(self.spec_char_list)
pattern += '(?=.*[' + spec_char + '])'
if self.length and self.spec_char:
pattern += '[A-Za-z\d' + spec_char + ']{' + str(self.length) + ',100}$'
elif self.length:
pattern += '[A-Za-z\d]{' + str(self.length) + ',100}$'
else:
pattern += '$'
'''Compile pattern for use in the search'''
pat_re = re.compile(pattern)
'''Test password against regex pattern'''
if re.search(pat_re,self.test_string):
return True
else:
return False
def password_rockyou_checker(self, ) -> bool:
'''Create a generator object for the rockyou.txt file'''
rockyou_gen = (row for row in open('rockyou.txt',errors='ignore'))
'''Test against values from the txt only returns True if the for loop completes without a break'''
for string in rockyou_gen:
if self.test_string == string.strip():
return False
else:
return True
def password_test(self, ) -> list:
'''Pre-load the result list with the password and a sha1 hash'''
result = [[self.test_string,hashlib.sha1(str(self.test_string).encode()).hexdigest()],]
'''Checks if the password passes a regex pattern'''
if self.password_regex_checker():
'''Checks the password against the rockyou list'''
if self.password_rockyou_checker():
result.append([True, -1, f'The password {self.test_string} passed and was not found in the rockyou list'])
else:
result.append([False, -2, f'The password {self.test_string} passed but was found in the rockyou list'])
'''Use this test to return any failures based on class rules'''
result = result + self.password_character_checker()
return result
def password_hash_checker(self, ) -> bool:
'''future check against leaked password hash(sha1) DB from have you been pwnd'''
pass
def main():
'''Set up command line argument parser'''
parser = argparse.ArgumentParser(description='Password Checker CLI')
'''Add arguments to be taken from the CL'''
parser.add_argument('Password', help='Enter a single password without white space and up to 100 characters')
parser.add_argument('-F', required=False, metavar='File path', help='Give the absolute path for a txt file of strings to be tested')
parser.add_argument('-SL', type=int, required=False, default=8, metavar='', help='Set minimum password length ex -SL 12')
parser.add_argument('-UC', action='store_false', required=False, help='Disallow upper case')
parser.add_argument('-LC', action='store_false', required=False, help='Disallow lower case')
parser.add_argument('-DC', action='store_false', required=False, help='Disallow digits')
parser.add_argument('-SC', action='store_false', required=False, help='Disallow special characters')
parser.add_argument('-SCL', action='extend', nargs='+', required=False, metavar='',default=['@','$','!','%','*','#','?','&'], help='set Special Character List ex. -SCL @ # ! ...')
'''Place the values in an object p'''
p = parser.parse_args()
'''Run tests and place results in a list [[bool,int,str],...]'''
test_result_list = PasswordCheck(p.Password,ascii_lc=p.LC,ascii_uc=p.UC,num=p.DC,spec_char=p.SC, length=p.SL, spec_char_list=p.SCL).password_test()
'''Initialize an empty list to hold any failure conditions'''
test_fails = []
'''Append any fails from the results list to the fails list'''
for i in range(len(test_result_list)):
if not test_result_list[i][0]:
test_fails.append(test_result_list[i][2])
'''Print the results'''
if test_fails:
print('\n'.join(test_fails))
else:
print('passed')
if __name__ == "__main__":
main()
'''unit test lol'''
# a = PasswordCheck('JJ#!pj32')
# print(a.password_character_checker())
# print(a.password_regex_checker())
# print(a.password_rockyou_checker())
# print(a.password_test())
import secrets
import string
class password_creator:
def __init__(self, ascii_lc = True, ascii_uc = True, ascii_num = True,
spec_char = True, spec_char_list = ['@','$','!','%','*','#','?','&'],
length = 12 ) -> None:
self.ascii_lc = ascii_lc
self.ascii_uc = ascii_uc
self.ascii_num = ascii_num
self.spec_char = spec_char
self.spec_char_list = spec_char_list
self.length = length
def password_gen_rnd_chr(self) -> str:
test_string = ''
while len(test_string) < self.length:
seed = secrets.randbelow(9)
if self.ascii_lc and seed <= 2 :
test_string += secrets.choice([x for x in string.ascii_lowercase])
elif self.ascii_uc and 2 < seed <= 4:
test_string += secrets.choice([x for x in string.ascii_uppercase])
elif self.ascii_num and 4 < seed <= 6:
test_string += secrets.choice([x for x in string.digits])
elif self.spec_char and 6 < seed <= 8:
test_string += secrets.choice(self.spec_char_list)
return test_string
def password_gen_xkcd_style(self) -> str:
with open('/usr/share/dict/words') as f:
words = [word.strip() for word in f]
test_string = [secrets.choice(words), ]
for i in range(3):
if i % 2 == 0:
test_string.append(secrets.choice(string.digits))
else:
test_string.append(secrets.choice(self.spec_char_list))
test_string.append(secrets.choice(words))
test_string = ''.join(test_string)
return test_string
#unit test lol
def main():
pass
if __name__ == "__main__":
a = password_creator(ascii_num=False)
print(a.password_gen_rnd_chr())
# print(a.password_gen_xkcd_style())
import os
import secrets
from math import gcd
from base64 import b64encode,b64decode
class RandomPrimeGenerator:
"""A collection of tools for generating and testing prime numbers."""
def __init__(self):
"""Initialize the RandomPrimeGenerator class."""
pass
def miller_rabin(self, num):
"""Apply the Miller-Rabin primality test to a given number.
Args:
num (int): Number to be tested for primality.
Returns:
bool: True if number is probably prime, False otherwise.
"""
s = num - 1
t = 0
while s % 2 == 0:
s = s // 2
t += 1
for _ in range(5): # Perform the test 5 times for accuracy
a = secrets.randbelow(num+1)
v = pow(a, s, num)
if v != 1:
i = 0
while v != (num - 1):
if i == t - 1:
return False
else:
i += 1
v = pow(v, 2, num)
return True
def is_prime(self, num):
"""Check if a number is prime using low primes and the Miller-Rabin test.
Args:
num (int): Number to be tested for primality.
Returns:
bool: True if number is prime, False otherwise.
"""
# A list of low prime numbers for initial checks
low_primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61,
67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151,
157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241,
251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313,317, 331, 337, 347, 349,
353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449,
457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, 569,
571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661,
673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, 769, 773, 787,
797, 809, 811, 821, 823, 827, 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907,
911, 919, 929, 937, 941, 947, 953, 967, 971, 977, 983, 991, 997]
if num in low_primes:
return True
for prime in low_primes:
if num % prime == 0:
return False
return self.miller_rabin(num)
def generate_large_prime(self, key_size=1024):
"""Generate a large prime number.
Args:
key_size (int): Desired size of the prime number in bits.
Returns:
int: A prime number of the specified size.
"""
while True:
num = secrets.randbelow(2**(key_size)+1)
if self.is_prime(num):
return num
class RSAKeyGenerator:
"""A class for generating RSA keys."""
def __init__(self):
"""Initialize the RSAKeyGenerator class."""
self.prime_tools = RandomPrimeGenerator()
def generate_keys(self, key_size=1024):
"""Generate a pair of RSA keys."""
p = self.prime_tools.generate_large_prime(key_size)
q = self.prime_tools.generate_large_prime(key_size)
n = p * q
totient = (p - 1) * (q - 1)
e = self.prime_tools.generate_large_prime(16)
while gcd(e, totient) != 1:
e = self.prime_tools.generate_large_prime(16)
d = self.mod_inverse(e, totient)
return ((e, n), (d, n))
def mod_inverse(self, a, m):
"""Perform the Extended Euclidean Algorithm to find the modular multiplicative inverse.
Args:
a (int): The number to find the inverse of.
m (int): The modulus.
Returns:
int: The modular multiplicative inverse of a modulo m.
Raises:
ValueError: If a and m are not coprime.
"""
if gcd(a, m) != 1:
raise ValueError('The given numbers are not coprime.')
u1, u2, u3 = 1, 0, a
v1, v2, v3 = 0, 1, m
while v3 != 0:
q = u3 // v3
v1, v2, v3, u1, u2, u3 = (u1 - q * v1), (u2 - q * v2), (u3 - q * v3), v1, v2, v3
return u1 % m
def create_key_files(self, name, key_size):
"""Create two files 'name_pubkey.pem' and 'name_privkey.pem' with RSA public and private keys.
Args:
name (str): The base name for the key files.
key_size (int): The size of the keys in bits.
Raises:
FileExistsError: If key files with the given names already exist.
"""
if os.path.exists(f'{name}_pubkey.pem') or os.path.exists(f'{name}_privkey.pem'):
raise FileExistsError('There is already a key file with this name.')
public_key, private_key = self.generate_keys(key_size)
with open(f'{name}_pubkey.pem', 'w') as pub_file:
pub_file.write('-----BEGIN RSA PUBLIC KEY-----\n')
pub_file.write(b64encode(f'{public_key[0]},{public_key[1]}'.encode()).decode() + '\n')
pub_file.write('-----END RSA PUBLIC KEY-----\n')
with open(f'{name}_privkey.pem', 'w') as priv_file:
priv_file.write('-----BEGIN RSA PRIVATE KEY-----\n')
priv_file.write(b64encode(f'{private_key[0]},{private_key[1]}'.encode()).decode() + '\n')
priv_file.write('-----END RSA PRIVATE KEY-----\n')
print(f'Successfully created key files {name}_pubkey.pem and {name}_privkey.pem')
class User:
def __init__(self, username, password, rsa_key_generator):
self.username = username
self.rsa_key_generator = rsa_key_generator
rsa_key_generator.create_key_files(self.username, 1024) # Use your existing key generator
self.public_key = self.load_key(f'{self.username}_pubkey.pem')
self.private_key = self.load_key(f'{self.username}_privkey.pem')
self.encrypted_password = self.encrypt_password(password)
def load_key(self, filename):
with open(filename, 'r') as file:
key_data = file.readlines()[1:-1] # Skip the first and last line
key_data = ''.join(key_data).replace('\n', '')
key_data = b64decode(key_data).decode()
key_data = tuple(map(int, key_data.split(',')))
return key_data
def encrypt_password(self, password):
"""Encrypts the password using the user's public key."""
e, n = self.public_key
encrypted_password = [pow(ord(char), e, n) for char in password]
return encrypted_password
def check_password(self, password_attempt):
"""Decrypts the stored password and checks against the password attempt."""
d, n = self.private_key
decrypted_password = ''.join(chr(pow(char, d, n)) for char in self.encrypted_password)
return decrypted_password == password_attempt
rsa_key_gen = RSAKeyGenerator()
user = User('username', 'password', rsa_key_gen)
print(user.check_password('password')) # Should print: True
print(user.check_password('wrong_password')) # Should print: False
# Testing the RSAKeyGenerator class
# rsa_key_gen = RSAKeyGenerator()
# rsa_key_gen.create_key_files('my_key', 2048)
#!/usr/bin/env python3
import sys
import socket
import threading
class tcp_prox():
def __init__(self,local_host: str,local_port: str,remote_host: str,remote_port: str,receive_first: bool) -> None:
self.local_host = local_host
self.local_port = local_port
self.remote_host = remote_host
self.remote_port = remote_port
self.receive_first = receive_first
def server_loop(self)->None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.bind((self.local_host,self.local_port))
except BaseException:
print(f'>> Failed to listen on {self.local_host}:{self.local_port}\nExitting.')
sys.exit()
print(f'>> Listening on {self.local_host}:{self.local_port}')
server.listen()
while True:
client_socket, addr = server.accept()
print(f'>> Incoming connection from {addr[0]}:{addr[1]}')
proxy_thread = threading.Thread(target=self.proxy_handler,args=(client_socket))
proxy_thread.start()
def proxy_handler(self,client_socket: socket)->None:
remote_socket: socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
remote_socket.connect((self.remote_host,self.remote_port))
if self.receive_first:
remote_buffer: str = self.recevie_from(remote_socket)
hex_remote: str = self.hexdump(remote_buffer)
print(f'>> Incoming buffer dump from {self.remote_host}:{self.remote_port}\n{hex_remote}')
remote_buffer = self.response_handler(remote_buffer)
if len(remote_buffer):
print(f'<< {len(remote_buffer)} bytes sent to {self.local_host}:{self.local_port}')
client_socket.send(remote_buffer)
while True:
local_buffer: str = self.recevie_from(client_socket)
if len(local_buffer):
print(f'>> Received {len(local_buffer)} btyes from {self.local_host}:{self.local_port}')
hex_local:str = self.hexdump(local_buffer)
print(f'>> outgoing buffer dump from {self.local_host}:{self.local_port}\n{hex_local}')
local_buffer = self.response_handler(local_buffer)
remote_socket.send(local_buffer)
print(f'<< sent to {self.remote_host}:{self.remote_port}')
remote_buffer: str = self.recevie_from(remote_socket)
if len(remote_buffer):
print(f'>> Received {len(remote_buffer)} btyes from {self.remote_host}:{self.remote_port}')
hex_remote: str = self.hexdump(remote_buffer)
print(f'>> Inbound buffer dump from {self.remote_host}:{self.remote_port}\n{hex_remote}')
remote_buffer = self.response_handler(remote_buffer)
client_socket.send(local_buffer)
print(f'<< sent to {self.local_host}:{self.local_port}')
if not len(local_buffer) or not len(remote_buffer):
client_socket.close()
remote_socket.close()
print('\n>< No more data, conections closed.\n')
break
def hexdump(self, src, length=16):
result = []
src = bytes(src, 'utf-8') # ensure bytes
for i in range(0, len(src), length):
s = src[i:i+length]
hexa = ' '.join(["{:02X}".format(x) for x in s])
text = ''.join([chr(x) if 0x20 <= x < 0x7F else '.' for x in s])
result.append("{:04X} {:<49} {}".format(i, hexa, text))
return '\n'.join(result)
def receive_from(self, connection: socket):
"""
Receive data from a socket until no more data is available.
"""
buffer = b""
connection.settimeout(2)
try:
while True:
data = connection.recv(4096)
if not data:
break
buffer += data
except BaseException:
pass
return buffer
def response_handler(self, buffer: bytes):
"""
Perform packet modifications here before sending to the remote host.
For now, this function doesn't do anything.
"""
return buffer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment