Skip to content

Instantly share code, notes, and snippets.

@victorazzam
Last active February 14, 2019 22:53
Show Gist options
  • Save victorazzam/b34e9fb3d4b1e84e9c1841635071201d to your computer and use it in GitHub Desktop.
Save victorazzam/b34e9fb3d4b1e84e9c1841635071201d to your computer and use it in GitHub Desktop.
Simple peer-to-peer chat using Python sockets.
#!/usr/bin/env python3
from sys import argv, stdout, exit
from threading import Thread
import re, socket, random, gnureadline
PORT = 6310
def Bind_UDP():
global sock2
try:
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock2.bind(("", PORT))
except socket.error:
sock2.close()
exit(f"Port {PORT} is already in use.")
while 1:
_data, addr = sock2.recvfrom(1024)
data_raw = _data.decode().split("\n")[0]
prefix, data = data_raw[:4], data_raw[4:]
# print(f"\n{repr(data_raw)}\n") # DEBUGGING
if prefix == "HELO":
sock1.sendto(f"PING{user}".encode(), (addr[0], PORT))
send("PEER" + addr[0])
continue
elif prefix == "PEER":
sock1.sendto(f"PING{user}".encode(), (data, PORT))
continue
elif prefix == "PING":
sock1.sendto(f"LIVE{user}".encode(), (addr[0], PORT))
peers[addr[0]] = data
to_print = f"User [{data}] is online"
elif prefix == "LIVE":
peers[addr[0]] = data
to_print = f"* [{data}] * joined the chat"
elif prefix == "TEXT":
to_print = f"[{peers[addr[0]]}] {data}"
elif prefix == "OLEH":
to_print = f"* [{peers[addr[0]]}] * left the chat"
del peers[addr[0]]
else:
continue
temp = gnureadline.get_line_buffer()
print(f"\n{to_print}\n{prompt}", end="")
stdout.flush()
gnureadline.insert_text(temp)
gnureadline.redisplay()
def send(msg):
for peer in peers:
sock1.sendto(msg.encode(), (peer, PORT))
def check(H):
return re.search(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", H) and all(int(x) < 256 for x in H.split("."))
def main():
global sock1, prompt
listen = Thread(target=Bind_UDP)
listen.setDaemon(True)
listen.start()
sock1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock1.sendto(b"HELO", (peer, PORT))
prompt = f"[\033[9{colors.index(color)};1m{user}\033[0m] "
while 1:
ui = input(prompt).strip()
if ui:
send("TEXT" + ui)
# Readline:
# https://stackoverflow.com/a/28604256
# https://pymotw.com/3/readline/index.html
try:
peers = {}
colors = ["", "red", "green", "yellow", "blue", "pink", "cyan"]
if not argv[2:] or not check(argv[2]):
exit(f"Usage: {argv[0]} username peer_ip [prompt_color]")
user, peer = argv[1:3]
color = argv[3] if (argv[3:] + [" "])[0] in colors else "red"
main()
except (KeyboardInterrupt, EOFError):
send("OLEH")
sock1.close()
sock2.close()
exit("")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment