Skip to content

Instantly share code, notes, and snippets.

@frhack
Created January 22, 2024 07:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frhack/8f545d68f1cea2cbf0b401609724a958 to your computer and use it in GitHub Desktop.
Save frhack/8f545d68f1cea2cbf0b401609724a958 to your computer and use it in GitHub Desktop.
httpcat
#!/usr/bin/python
# nettool.py - Python 3 reimplemntation of the netcat script in Black Hat Python
# Copyright (C) 2019 Stephen Gream
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import socket
import subprocess
import sys
import threading
import select
import os
import signal
class Handler:
def init_connection(self, connection):
pass
def handle_msg(self, msg, connection):
return False
class EchoHandler(Handler):
def init_connection(self, connection):
connection.send(b'Echo enabled\r\n')
def handle_msg(self, msg, connection):
if len(msg) == 0:
return False
strmsg = msg.decode('utf-8')
strmsg.rstrip()
connection.send(bytes("{0}\r\n".format(strmsg), 'utf-8'))
return False
class CommandHandler(Handler):
__prompt = b'nettoolsh > '
def init_connection(self, connection):
connection.send(b'Call for papa Palpatine!\r\n')
connection.send(self.__prompt)
def handle_msg(self, msg, connection):
if len(msg) > 0:
strmsg = msg.decode('utf-8').strip()
print('[*] Running command "{0}"'.format(strmsg))
try:
output = subprocess.check_output(strmsg, stderr=subprocess.STDOUT, shell=True)
print("[*] Output: {0}".format(output))
connection.send(output)
connection.send(b'\r\n')
except Exception as e:
print("[*] Error trying to run command")
print(e)
connection.send(b'Error running command\r\n')
connection.send(self.__prompt)
return False
class UploadHandler(Handler):
def __init__(self, upload_location):
self.__upload_location = upload_location
def handle_msg(self, msg, connection):
try:
file_descriptor = open(self.__upload_location, "w")
print('[*] Writing {0} bytes to file {1}'.format(len(msg), self.__upload_location))
file_descriptor.write(msg)
file_descriptor.close()
connection.send(bytes("Successfully saved file to {0}\r\n".format(self.__upload_location), 'utf-8'))
except:
connection.send(bytes("Failed to save file {0}\r\n".format(self.__upload_location), 'utf-8'))
return True
class Server:
def __init__(self, target, port, handlers):
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__target = target
self.__port = port
self.__handlers = handlers
self.__stop = False
def listen(self):
print('[*] Listening on {0}:{1}'.format(self.__target, self.__port))
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__socket.bind((self.__target, self.__port))
self.__socket.listen(5)
try:
while True:
connection, addr = self.__socket.accept()
print("[*] Connection from {0}".format(addr))
proc = threading.Thread(target=self.__handle, args=(connection, addr))
proc.start()
except Exception as e:
""
#print("[*] Exception caught, shutting down")
#print(e)
except KeyboardInterrupt as e:
print("[!!] Server was interupted. Shutting down")
finally:
self.__stop = True
self.__socket.shutdown(socket.SHUT_RDWR)
self.__socket.close()
def __handle(self, client_conn, addr):
close = False
for handler in self.__handlers:
handler.init_connection(client_conn)
try:
while not close and not self.__stop:
raw_buffer = bytearray()
while True:
(sock_ready,x,y) = select.select([client_conn],[], [], 0.01)
if len(sock_ready) == 0:
break
data = client_conn.recv(1028)
if len(data) == 0 or data == b'\xff\xf4\xff\xfd\x06':
# Connection was probably closed
close = True
break
elif data:
raw_buffer.extend(data)
else:
break
if len(raw_buffer) > 0:
for handler in self.__handlers:
try:
close = handler.handle_msg(raw_buffer,client_conn)
if close:
break
except Exception as e:
print("[*] Caught an exception")
print(e)
except BrokenPipeError as e:
print("[*] Connection closed")
finally:
print("[*] Closing connection from {0}".format(addr))
client_conn.shutdown(socket.SHUT_RDWR)
client_conn.close()
class Client:
def __init__(self, target, port):
self.__target = target
self.__port = port
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__readerThread = threading.Thread(target=self.__reader, args=())
self.__stop = False
self.__target_disconnect = False
def run(self):
try:
self.__socket.connect((self.__target, self.__port))
self.__readerThread.start()
while not self.__stop:
t = input()
#t.strip()
msg = "{0}\r\n".format(t)
sent = self.__socket.send(bytes(msg, 'utf-8'))
if sent == 0:
print("[*] Collection closed")
break
except EOFError as e:
""
#print('[*] End of file reached')
except InterruptedError as e:
print('[*] Interrupted. Exiting')
except Exception as e:
""
#print('[*] Exception thrown')
#print(e)
finally:
if not self.__target_disconnect:
""
#self.__stop = True
#self.__socket.shutdown(socket.SHUT_RDWR)
#self.__socket.close()
self.__readerThread.join(100)
def __reader(self):
while not self.__stop:
try:
buffer = ""
while True:
(readylist, x, y) = select.select([self.__socket], [], [], 0.01)
if len(readylist) == 0:
# Target is probably done writing
break
data = self.__socket.recv(1024)
if len(data) == 0 or data == b'\xff\xf4\xff\xfd\x06':
# Socket closed
self.__stop = True
self.__target_disconnect = True
break
elif data:
#data.replace(b"\r", b"")
#print(data)
#print(data.decode('utf-8'))
buffer += data.decode('utf-8').replace("\r","")
#buffer += bytes(data, 'utf-8')
else:
break
if len(buffer) > 0:
#buffer.strip()
print(buffer,end='', flush=True)
except Exception as e:
""
#print("[*] Exception thrown: {0}".format(e))
if self.__target_disconnect:
# print("[!!] Target machine shut down")
# Interrupt the input
os.kill(os.getpid(), signal.SIGINT)
def parse_args():
parser = argparse.ArgumentParser(prog='nettool.py',description='Connect to a TCP server or create a server on a port')
parser.add_argument('-C', '--crlf', dest='crlf', action='store_true',
help='IP target or address to bind to')
parser.add_argument('-i', '--ip', dest='target', metavar='host', type=str,
help='IP target or address to bind to')
parser.add_argument('-p', '--port', dest='port', metavar='port', type=int,
help='Target port or port to bind to')
parser.add_argument('-l', '--listen', dest='listen', action='store_true',
help='Initialise a listener on {target}:{port}')
parser.add_argument('-c', '--command', dest='command', action='store_true',
help='Attach a command listener to a server. Cannot be used with -u')
parser.add_argument('-e', '--echo', dest='echo', action='store_true',
help='Attach an echo listener to a server')
parser.add_argument('-u', '--upload', dest='upload', metavar='upload_location', type=str,
help='Start an upload server and upload to {upload_location}. Cannot be used with -c')
parser.set_defaults(listen=False, command=False, echo=False)
args = parser.parse_args(sys.argv[1:])
arg_problems = arg_sanity_check(args)
if len(arg_problems) > 0:
for p in arg_problems:
print("[*] {0}".format(p))
parser.print_help()
sys.exit(1)
return args
def arg_sanity_check(args):
problems = []
if not args.target:
problems.append("Target is required")
if not args.port:
problems.append("Port is required")
if args.command and args.upload:
problems.append("Can't have an upload server and a command server at the same time")
return problems
def main():
sys.stdin.reconfigure(newline=None)
sys.stdout.reconfigure(newline=None)
sys.stdout.reconfigure(encoding="utf-8")
args = parse_args()
if args.crlf:
print("crlf")
if args.listen:
handlers = []
if args.command:
handlers.append(CommandHandler())
if args.echo:
handlers.append(EchoHandler())
if args.upload:
handlers.append(UploadHandler(args.upload))
s = Server(args.target, args.port, handlers)
s.listen()
if not args.listen:
client = Client(args.target, args.port)
client.run()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
# Just eat this, don't need a stack trace
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment