Skip to content

Instantly share code, notes, and snippets.

@ghost-ng
Created December 26, 2020 22:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ghost-ng/72df188aa2fa6a18a963c0310d895529 to your computer and use it in GitHub Desktop.
Save ghost-ng/72df188aa2fa6a18a963c0310d895529 to your computer and use it in GitHub Desktop.
Python port forwarder tcp
import socket
import threading
import sys
import argparse
import os
def handle(buffer):
return buffer
def transfer(src, dst, direction):
src_name = src.getsockname()
src_address = src_name[0]
src_port = src_name[1]
dst_name = dst.getsockname()
dst_address = dst_name[0]
dst_port = dst_name[1]
while True:
buffer = src.recv(0x400)
if len(buffer) == 0:
print("[-] No data received! Breaking...")
break
if direction:
print("[+] {}:{} >>> {}:{} [{}]".format(src_address,src_port,dst_address,dst_port,len(buffer)))
else:
print("[+] {}:{} <<< {}:{} [{}]".format(dst_address,dst_port,src_address,src_port,len(buffer)))
dst.send(handle(buffer))
print("[+] Closing connections! [{}:{}]".format(src_address,src_port))
src.shutdown(socket.SHUT_RDWR)
src.close()
print("[+] Closing connections! [{}:{}]".format(dst_address,dst_port))
dst.shutdown(socket.SHUT_RDWR)
dst.close()
def server(local_host, local_port, remote_host, remote_port, max_connection):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((local_host, local_port))
server_socket.listen(max_connection)
print("[+] Server started [{}:{}]".format(local_host,local_port))
print("[+] Connected to [{}:{}] to get the content of [{}:{}]".format(local_host,local_port,remote_host,remote_port))
while True:
local_socket, local_address = server_socket.accept()
print("[+] Detect connection from [{}:{}]".format(local_address[0],local_address[1]))
print("[+] Connecting to the REMOTE server [{}:{}]".format(remote_host,remote_port))
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_socket.connect((remote_host, remote_port))
print("[+] Tunnel connected! Transferring data...")
# threads = []
s = threading.Thread(target=transfer, args=(
remote_socket, local_socket, False))
r = threading.Thread(target=transfer, args=(
local_socket, remote_socket, True))
# threads.append(s)
# threads.append(r)
s.start()
r.start()
print("[+] Releasing resources...")
remote_socket.shutdown(socket.SHUT_RDWR)
remote_socket.close()
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
print("[+] Closing the server...")
server_socket.shutdown(socket.SHUT_RDWR)
server_socket.close()
print("[+] Shutting down the server!")
def main():
global VERBOSE
parser = argparse.ArgumentParser()
parser.add_argument("-l","--listen",action="store",help="listening interface: 10.10.10.220:8888",required=True)
parser.add_argument("-c","--connect",action="store",help="connect traffic to: 127.0.0.1:3060",required=True)
parser.add_argument("-v","--verbose",action="store_true",help="enable message output",default=False)
args = parser.parse_args()
if args.verbose is False:
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
LOCAL_HOST = args.listen.split(":")[0]
LOCAL_PORT = int(args.listen.split(":")[1])
REMOTE_HOST = args.connect.split(":")[0]
REMOTE_PORT = int(args.connect.split(":")[1])
MAX_CONNECTION = 0x10
server(LOCAL_HOST, LOCAL_PORT, REMOTE_HOST, REMOTE_PORT, MAX_CONNECTION)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment