Skip to content

Instantly share code, notes, and snippets.

@scientificRat
Last active July 9, 2018 04:28
Show Gist options
  • Save scientificRat/21e953ad13f05e98a27a2a8d98553a44 to your computer and use it in GitHub Desktop.
Save scientificRat/21e953ad13f05e98a27a2a8d98553a44 to your computer and use it in GitHub Desktop.
Nat traversal (text communication)
import socket as sc
import argparse
import os, sys
class NatTraversalTool(object):
def __init__(self, server_address, local_name, remote_name):
self.server_address = server_address
self.local_name = local_name
self.remote_name = remote_name
self.socket = sc.socket(type=sc.SOCK_DGRAM)
self.socket.bind(('', 8903))
self.remote_address = None
def traversal(self):
# create udp socket
send_str = "CONN:%s,%s" % (self.local_name, self.remote_name)
self.socket.sendto(send_str.encode(), self.server_address)
while True:
buffer, address = self.socket.recvfrom(1024)
if address == self.server_address:
data = buffer.decode()
command, info = data.split(':')
if command == 'DIG':
remote_addr = info.split(',')
if len(remote_addr) != 2:
raise ValueError('receive wrong command')
remote_addr[1] = int(remote_addr[1])
self.remote_address = tuple(remote_addr)
print("start dig", self.remote_address)
self._start_dig()
break
else:
print("unknown command received")
break
else:
continue
self.socket.connect(self.remote_address)
return self.socket, self.remote_address
def _start_dig(self):
pid = os.fork()
if pid == 0:
# child process: listening recall
print("fork")
while True:
buffer, address = self.socket.recvfrom(1024)
print(address)
if address == self.remote_address:
command, info = buffer.decode().split(':')
if command == 'CONN':
print("CONN received!")
remote_name, connecting_name = info.split(',')
if remote_name == self.remote_name and connecting_name == self.local_name:
send_str = "CONN_ACK:%s" % self.local_name
self.socket.sendto(send_str.encode(), self.remote_address)
elif command == 'CONN_ACK':
if info == self.remote_name:
send_str = "CONN_ACK:%s" % self.local_name
self.socket.sendto(send_str.encode(), self.remote_address)
exit(0)
else:
for _ in range(2):
send_str = "CONN:%s,%s" % (self.local_name, self.remote_name)
# self.socket.sendto(send_str.encode(), '')
self.socket.sendto(send_str.encode(), self.remote_address)
print("CONN SEND")
os.waitpid(0, 0)
send_str = "SUCCESS:%s" % self.local_name
self.socket.sendto(send_str.encode(), self.server_address)
def start_communication(socket: sc.socket):
pid = os.fork()
if pid == 0:
# child process
while True:
data = socket.recv(1024)
if data is None:
break
print(data.decode(), file=sys.stderr)
else:
while True:
data = input()
if data == "q":
break
socket.send(data.encode())
socket.close()
def main():
parser = argparse.ArgumentParser(description='NAT Traversal')
parser.add_argument('--server', '-s', metavar='IP:PORT', default="115.28.68.253:8088", type=str)
parser.add_argument('local_name', metavar='<local_name>', type=str)
parser.add_argument('remote_name', metavar='<remote_name>', type=str)
args = parser.parse_args()
server_address = args.server.split(':')
if len(server_address) != 2:
raise ValueError('address format should be IP:PORT')
server_address[1] = int(server_address[1])
server_address = tuple(server_address)
traversalTool = NatTraversalTool(server_address, args.local_name, args.remote_name)
socket, remote_address = traversalTool.traversal()
# start communication
print("traversal success, connection established")
start_communication(socket)
if __name__ == '__main__':
main()
import socket as sc
def main():
waiting_clients = {}
server = sc.socket(type=sc.SOCK_DGRAM)
server.bind(("0.0.0.0", 8088))
while True:
buffer, address = server.recvfrom(1024)
data = buffer.decode()
command, info = data.split(':')
if command == 'CONN':
client_name, connecting_name = info.split(',')
connecting_client_info = waiting_clients.get(connecting_name)
if connecting_client_info is None:
waiting_clients[client_name] = connecting_name, address
else:
if connecting_client_info[0] != client_name:
break
send_str = "DIG:%s,%d" % connecting_client_info[1]
server.sendto(send_str.encode(), address)
send_str = "DIG:%s,%d" % address
server.sendto(send_str.encode(), connecting_client_info[1])
elif command == 'SUCCESS':
success_client_info = waiting_clients.get(info)
if success_client_info is None:
break
if success_client_info[1] != address:
break
waiting_clients.pop(info)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment