Last active
July 25, 2019 03:04
-
-
Save Sam-Belliveau/219f829ffcbecd3db2d65573a3d2d6c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import tkinter as tk | |
import time | |
import _thread as thread | |
from hashlib import blake2b | |
# Server Constants | |
STRING_ENCODING = 'utf-8' | |
# Used so the encryption can keep track of messages | |
MESSAGE_MARKER = '\0' | |
BUFFER_SIZE = 0x1000 | |
UPDATE_TIME = 0 # Blocking makes this unnecessary | |
DEFAULT_IP = "127.0.0.1" | |
DEFAULT_PORT = 8081 | |
CONNECTION_TIMEOUT = 8 | |
USER_FONT = ("Courier", 16) | |
BORDER_SIZE = 8 | |
# Initialize Server | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_connected = False | |
# Connect to server ip | |
def connect_to_server(): | |
global server_connected | |
global server | |
status_text.set("Status: Connecting") | |
status_box.update() | |
send_leave() | |
clear_chat_log() | |
clear_chat_log() | |
ip = str(ip_box.get('1.0', 'end').strip()) | |
port = int(port_box.get('1.0', 'end').strip()) | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
# Connect to the server | |
server.settimeout(CONNECTION_TIMEOUT) | |
server.connect((ip, port)) | |
server.settimeout(None) | |
# Good Connection | |
server_connected = True | |
status_text.set("Status: Connected") | |
except Exception as e: | |
say_chat_log("Could not connect to " + str(ip) + ":" + str(port) + "!") | |
say_chat_log("Exception: " + str(e)) | |
status_text.set("Status: Failed to Connect") | |
# Send Leave Command | |
def send_leave(): | |
send_to_server("?leave") | |
# Leave Server | |
def leave_server(): | |
global server_connected | |
global server | |
server_connected = False | |
try: # Catch harmless server errors | |
server.close() | |
except Exception as msg: | |
print("Exception: " + str(msg)) | |
clear_chat_log() | |
say_chat_log("You Left The Server!") | |
status_text.set("Status: Disconnected") | |
# Puts Text In Chat Log | |
def say_chat_log(message, header="[CLIENT] ", end='\n'): | |
chat_log.config(state=tk.NORMAL) | |
chat_log.insert(tk.END, header + message + end) | |
chat_log.update() | |
chat_log.config(state=tk.DISABLED) | |
# Clears Chat Log | |
def clear_chat_log(): | |
chat_log.config(state=tk.NORMAL) | |
chat_log.delete('1.0', 'end') | |
chat_log.update() | |
chat_log.config(state=tk.DISABLED) | |
# Send text to server encrypted | |
def send_to_server(message): | |
if server_connected: | |
try: | |
# Send Data | |
server.send(message.strip().encode(STRING_ENCODING)) | |
except Exception as msg: | |
leave_server() | |
print("Exception: " + str(msg)) | |
return "" | |
# Get text from server decrypted | |
def get_from_server(): | |
if server_connected: | |
try: | |
message = server.recv(BUFFER_SIZE) | |
if message: | |
message = message.decode(STRING_ENCODING) | |
return message | |
else: | |
leave_server() | |
return "" | |
except Exception as msg: | |
leave_server() | |
print("Exception: " + str(msg)) | |
return "" | |
# Check for text from the server | |
def chat_update(text_box, update_time): | |
global server_connected | |
while True: | |
if server_connected: | |
message = get_from_server() | |
say_chat_log(message, header='', end='') | |
text_box.see(tk.END) | |
time.sleep(update_time) | |
# Send what is in entry bar when enter is pressed | |
def send_message(event): | |
global server_connected | |
if server_connected: | |
# Get text from bar | |
message = str(type_bar.get("1.0", "end-1c")).strip() | |
# Clear Bar | |
type_bar.config(state=tk.NORMAL) | |
type_bar.delete('1.0', 'end') | |
type_bar.update() | |
# Send Message | |
send_to_server(message) | |
# Window | |
window = tk.Tk() | |
window.title("Chat Room Client") | |
window.geometry("1024x768") | |
# Server Frame | |
server_frame = tk.Frame(window, borderwidth=2) | |
status_text = tk.StringVar() | |
status_box = tk.Label(server_frame, width=24, height=1, textvariable=status_text, borderwidth=BORDER_SIZE/2.0) | |
status_box.config(font=USER_FONT) | |
status_text.set("Status: Disconnected") | |
ip_box = tk.Text(server_frame, width=16, height=1, borderwidth=BORDER_SIZE/2.0) | |
ip_box.config(font=USER_FONT) | |
ip_box.insert(tk.END, "127.0.0.1") | |
port_box = tk.Text(server_frame, width=8, height=1, borderwidth=BORDER_SIZE/2.0) | |
port_box.config(font=USER_FONT) | |
port_box.insert(tk.END, "8081") | |
connect_button = tk.Button(server_frame, text="Connect", height=1, borderwidth=BORDER_SIZE/2.0, command=connect_to_server) | |
connect_button.config(font=USER_FONT) | |
leave_button = tk.Button(server_frame, text="Leave", height=1, borderwidth=BORDER_SIZE/2.0, command=send_leave) | |
leave_button.config(font=USER_FONT) | |
# Pack Buttons | |
status_box.pack(side=tk.TOP, fill=tk.X, expand=0, padx=2, pady=2) | |
port_box.pack(side=tk.RIGHT, expand=0, padx=2, pady=2) | |
ip_box.pack(side=tk.RIGHT, expand=0, padx=2, pady=2) | |
connect_button.pack(side=tk.RIGHT, fill=tk.X, expand=1, padx=2, pady=2) | |
leave_button.pack(side=tk.LEFT, expand=0, padx=2, pady=2) | |
server_frame.pack(side=tk.TOP, fill=tk.X, expand=0, padx=2, pady=2) | |
# Message Entry | |
type_bar = tk.Text(window, height=2, borderwidth=BORDER_SIZE) | |
type_bar.config(font=USER_FONT) | |
type_bar.insert(tk.END, "") | |
window.bind('<Return>', send_message) | |
# Chat Log | |
chat_log = tk.Text(window, borderwidth=BORDER_SIZE) | |
chat_log.config(font=USER_FONT) | |
# Pack Elements | |
server_frame.pack(side=tk.TOP, fill=tk.X) | |
type_bar.pack(side=tk.BOTTOM, fill=tk.X, expand=0) | |
chat_log.pack(side=tk.TOP, fill=tk.BOTH, expand=1) | |
# Check for server updates | |
thread.start_new_thread(chat_update, (chat_log, UPDATE_TIME)) | |
# Function used to scroll chat on resize | |
def scroll_chat(event): | |
chat_log.see(tk.END) | |
window.bind("<Configure>", scroll_chat) | |
window.mainloop() | |
# Close socket at end of file | |
server.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Time Stamps | |
from datetime import datetime as dt | |
import time | |
# Networking / Managment | |
import socket | |
import _thread as thread | |
from random import randrange | |
from uuid import uuid4 | |
# Misc Operations | |
import sys | |
import signal | |
from hashlib import blake2b | |
# ----------= Server Constants =---------- | |
STRING_ENCODING = 'utf-8' | |
# Used so the encryption can keep track of messages | |
MESSAGE_MARKER = '\0' | |
# Message Size Options | |
BUFFER_SIZE = 0x1000 | |
MAX_CHAT_SIZE = 1024 | |
MAX_CHAT_NAME = 16 | |
# User Configurations | |
MAX_USERS = 64 | |
USER_UPDATE_TIME = 0 # Blocking Makes This unnecessary | |
# Default Networking Options | |
DEFAULT_IP = "" | |
DEFAULT_PORT = 8081 | |
# Computer IP | |
HOSTNAME = socket.gethostname() | |
COMPUTER_IP = socket.gethostbyname(HOSTNAME) | |
# Chat Room Name | |
SERVER_NAME = "Sam's Chat Room" | |
# Hashed using blake2b. Change for your own server | |
ADMIN_HASH = blake2b() | |
ADMIN_HASH.update("PleaseGiveMeAdminIReallyWantIt".encode(STRING_ENCODING)) | |
ADMIN_PASSWORD = ADMIN_HASH.hexdigest() | |
# ----------= Code =---------- | |
# Initialize Server | |
chat_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
chat_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
client_list = [] # List of users who are currently connected | |
# Prompt User for IP information | |
def bind_server(): | |
print("[SERVER] Chat Room!\n") | |
while True: | |
try: | |
port = str(input("Enter Port (Default = {})\n>>> " | |
.format(DEFAULT_PORT))) | |
if not port.isdigit(): | |
port = DEFAULT_PORT | |
else: | |
port = int(port) | |
# Create Server | |
chat_server.bind((DEFAULT_IP, port)) | |
# Add extra space for detecting extra users | |
chat_server.listen(MAX_USERS + 4) | |
break | |
except Exception as e: | |
print("Error Connecting To " + str(DEFAULT_IP) + ':' + str(port)) | |
print(str(e) + "\n\n") | |
# Log information in server log | |
def log_server(message, header="SERVER", stream=sys.stderr): | |
stream.write("[{} - {}] {}\n".format( | |
header, | |
dt.fromtimestamp(time.time()).strftime('%H:%M:%S'), | |
message)) | |
stream.flush() | |
# Broadcast message to users on client list | |
def broadcast_message(message, user=None, log_in_server=True, admins_only=False, strip=True): | |
if log_in_server: | |
log_server(message, header="CHAT", stream=sys.stdout) | |
sys.stdout.flush() | |
for client in client_list: | |
try: | |
if client != user and (client.admin or not admins_only): | |
client.send(message, strip=strip) | |
except Exception as msg: | |
client.disconnect() | |
log_server(str(msg), header="Exception") | |
# Handles User and their info | |
class User: | |
# Makes a user ID based on IP address | |
def create_id(self, d_size=3): | |
h = blake2b(digest_size=d_size) | |
h.update(uuid4().bytes) | |
return "Guest" + h.hexdigest().upper() | |
# Setup User | |
def __init__(self, conn, addr): | |
try: | |
self.open = True | |
self.conn = conn | |
self.ip_addr = addr | |
self.name = self.create_id() | |
if self.ip_addr == '127.0.0.1' or \ | |
self.ip_addr == '0.0.0.0': | |
self.admin = True | |
else: | |
self.admin = False | |
self._user_thread_var = None | |
# Notify Server | |
broadcast_message( | |
"\n[INFO] User {} Has Connected! ({} / {} Users)\n" | |
.format(self.name, len(client_list) + 1, MAX_USERS), | |
user=self, log_in_server=False, strip=False | |
) | |
log_server( | |
"User {} Has Connected! ({})".format(self.name, self.ip_addr) | |
) | |
# Welcome User | |
self.send("Welcome to " + SERVER_NAME) | |
self.send("Your user id is <{}>".format(self.name)) | |
self.send("Type '?help' for commands") | |
self.send("") | |
self.send("{} of {} Users Online:".format(len(client_list), MAX_USERS)) | |
# Go through list and | |
user_num = 1 | |
for user in client_list: | |
self.send(str(user_num) + " - " + user.name) | |
user_num += 1 | |
self.send("") | |
except Exception as msg: | |
self.disconnect() | |
log_server(str(msg), header="Exception") | |
# Correctly Disconnect User | |
def disconnect(self): | |
if self.open: | |
self.open = False | |
if self in client_list: | |
client_list.remove(self) | |
try: | |
self.conn.close() | |
except Exception as msg: | |
log_server(str(msg), header="Exception") | |
# Notify Server | |
broadcast_message( | |
"\n[INFO] User {} Has Disconnected! ({} / {} Users)\n" | |
.format(self.name, len(client_list), MAX_USERS), | |
user=self, log_in_server=False, strip=False | |
) | |
log_server( | |
"User {} Has Disconnected! ({} / {} Users)" | |
.format(self.name, len(client_list), MAX_USERS) | |
) | |
# Start thread for checking user | |
def start(self): | |
self._user_thread_var = thread.start_new_thread(self._user_thread, ()) | |
# Thread used to check for user | |
def _user_thread(self): | |
try: | |
while self.open: | |
message = self.get_text() | |
if message: | |
if len(message) < MAX_CHAT_SIZE: | |
messages = message.split('\n') | |
for line in messages: | |
if line: | |
if line[0] == '?' or line[0] == '!': | |
self.parse_command(line) | |
else: | |
self.say(line) | |
else: | |
self.send("[SERVER] Message Too Long! ({} > {})" | |
.format(len(message), MAX_CHAT_SIZE)) | |
else: | |
self.disconnect() | |
time.sleep(USER_UPDATE_TIME) | |
except Exception as msg: | |
log_server(str(msg), header="Exception") | |
self.disconnect() | |
return | |
# Take command, parse and sort it | |
def parse_command(self, command_text): | |
# Parse Command | |
command = command_text[1:].split(' ') | |
self.send("") | |
# Check for user commands | |
if command_text[0] == '?': | |
log_server("'{}' Executed '{}'" | |
.format(self.name, command_text), | |
header="CMD", stream=sys.stdout) | |
self.run_user_command(command) | |
# Check for admin commands | |
elif command_text[0] == '!' and self.admin: | |
log_server("'{}' Executed '{}'" | |
.format(self.name, command_text), | |
header="ADMIN-CMD", stream=sys.stderr) | |
self.run_admin_command(command) | |
self.send("") | |
# Run user commands | |
def run_user_command(self, command): | |
if command[0].lower() == "help": | |
self.send_cmd("?help - List Commands\n") | |
self.send_cmd("- Fun Commands") | |
self.send_cmd("?coin - Flips a Coin") | |
self.send_cmd("?8ball [message] - Asks Magic 8 Ball") | |
self.send_cmd("?roulette - Play Russian Roulette") | |
self.send_cmd("?f - Pay Respect\n") | |
self.send_cmd("- Useful Commands") | |
self.send_cmd("?list - List Users Online") | |
self.send_cmd("?leave - Disconnect From Server") | |
self.send_cmd("?clear - Clears Screen With New Line") | |
self.send_cmd("?name <new name> - Set Chat Name") | |
self.send_cmd("?pm <name> <message> - Message Only 1 Person") | |
elif command[0].lower() == "coin": | |
self.say("?" + ' '.join(command)) | |
broadcast_message("[Coin] " + ["Heads", "Tails"][randrange(2)]) | |
broadcast_message("", self, strip=False, log_in_server=False) | |
elif command[0].lower() == "8ball": | |
self.say("?" + ' '.join(command)) | |
answers = ["Yes", "No", "Probably", "Probably Not", "Maybe", "Ask Again"] | |
broadcast_message("[8Ball] " + answers[randrange(6)]) | |
broadcast_message("", self, strip=False, log_in_server=False) | |
elif command[0].lower() == "roulette": | |
self.say("?" + ' '.join(command)) | |
state = randrange(6) | |
if state == 0: | |
broadcast_message("[Roulette] {}!" | |
.format(["CRACK", "Ka-BOOM", "BOOM", "POW", "BANG"][randrange(5)])) | |
broadcast_message("", self, strip=False, log_in_server=False) | |
self.disconnect() | |
else: | |
broadcast_message("[Roulette] Click...") | |
broadcast_message("", self, strip=False, log_in_server=False) | |
elif command[0].lower() == "f": | |
self.say("Can we get an F in the chat") | |
broadcast_message("[ChatBot{}] F".format(randrange(100))) | |
broadcast_message("", self, strip=False, log_in_server=False) | |
elif command[0].lower() == "list": | |
self.send_cmd("{} of {} Users Online:" | |
.format(len(client_list), MAX_USERS)) | |
# Go through list and | |
user_num = 1 | |
for user in client_list: | |
self.send_cmd(str(user_num) + " - " + user.name) | |
user_num += 1 | |
elif command[0].lower() == "leave": | |
self.disconnect() | |
elif command[0].lower() == "clear": | |
self.send_cmd("Clearing Screen..." + "\n"*0x100) | |
self.send_cmd("Screen Cleared") | |
elif command[0].lower() == "name": | |
# Check for valid input | |
if len(command) > 1: | |
# Take input and sanitize it | |
new_name = "_".join(command[1:]).rstrip() | |
self.change_name(new_name) | |
else: | |
self.send_cmd("Current Username: " + self.name) | |
elif command[0].lower() == "pm": | |
# Check for valid input | |
if len(command) > 2: | |
for user in client_list: | |
# Check for user input | |
if user.name.lower() == command[1].lower(): | |
# Send pm message | |
user.send("<{} pm'd you> {}".format( | |
self.name, " ".join(command[2:]) | |
)) | |
self.send("<you pm'd {}> {}".format( | |
user.name, " ".join(command[2:]) | |
)) | |
return | |
self.send_cmd("User '{}' Not Found!".format(command[1])) | |
else: | |
self.send_cmd("Invalid Input!") | |
# Check if admin password is correct | |
elif command[0].lower() == "admin" and len(command) > 1: | |
h = blake2b() | |
h.update(command[1].encode('utf-8')) | |
if h.hexdigest() == ADMIN_PASSWORD and not self.admin: | |
self.admin = True | |
self.send_cmd("You Are Now Admin! Type '!help' for a list of Admin commands.") | |
broadcast_message( | |
"[INFO] User {} is now Admin".format(self.name), | |
user=self, admins_only=True | |
) | |
broadcast_message("", user=self, admins_only=True, log_in_server=False) | |
else: | |
# Disguise output | |
self.send_cmd("Unknown Command '?{}'".format(command[0])) | |
else: | |
self.send_cmd("Unknown Command '?{}'".format(command[0])) | |
# Run admin commands | |
def run_admin_command(self, command): | |
if command[0].lower() == "help": | |
self.send_cmd("!help - List Commands") | |
self.send_cmd("!list - List Admins") | |
self.send_cmd("!kick <user name> - Kick User") | |
self.send_cmd("!rename <name> <new name> - Rename User Name") | |
elif command[0].lower() == "list": | |
self.send_cmd("Admins Online:".format()) | |
user_num = 1 | |
for user in client_list: | |
if user.admin: # Print out username if admin | |
self.send_cmd(str(user_num) + " - " + user.name) | |
user_num += 1 | |
elif command[0].lower() == "kick": | |
# Check for valid input | |
if len(command) > 1: | |
for user in client_list: | |
# Check if user equals input | |
if user.name.lower() == '_'.join(command[1:]).lower(): | |
if user.admin: | |
self.send_cmd("You can not kick other admins!") | |
else: | |
if len(command) > 2: | |
user.disconnect() | |
else: | |
user.disconnect() | |
self.send_cmd("User '{}' Kicked".format(user.name)) | |
return | |
self.send_cmd("User '{}' Not Found!".format(command[1])) | |
else: | |
self.send_cmd("Invalid Input!") | |
elif command[0].lower() == "rename": | |
# Check for valid input | |
if len(command) > 2: | |
for user in client_list: | |
# Check if user equals input | |
if user.name.lower() == command[1].lower(): | |
user.change_name("_".join(command[2:])) | |
return | |
self.send_cmd("User '{}' Not Found!".format(command[1])) | |
else: | |
self.send_cmd("Invalid Input!") | |
else: | |
self.send_cmd("Unknown Command '!{}'".format(command[0])) | |
# Change name of user and log it | |
def change_name(self, new_name): | |
# Remove Spaces | |
new_name.replace(' ', '_') | |
# Make name is good | |
rebuild = "" | |
for i in new_name: | |
if i.isalpha() or i.isdigit() or i == '_': | |
rebuild += i | |
new_name = rebuild | |
# Check for empty name | |
if len(new_name) <= 0: | |
self.send_cmd("You Have To Have A Username") | |
return | |
# Max the length of names | |
if len(new_name) > MAX_CHAT_NAME: | |
new_name = new_name[0:MAX_CHAT_NAME] | |
# Check for similar name | |
for user in client_list: | |
if user.name.lower() == new_name.lower(): | |
self.send_cmd("Another User Has That Name") | |
return | |
# Prevent Confusing Names | |
if 'you' == new_name.lower(): | |
self.send_cmd("You can not set your name to 'You'") | |
return | |
# Broadcast Update to user | |
self.send_cmd("Name Changed To '{}'!".format(new_name)) | |
# Broadcast Update to Servers | |
broadcast_message( | |
"\n[INFO] '{}' Changed Their Name To '{}'\n" | |
.format(self.name, new_name), | |
user=self, log_in_server=False, strip=False | |
) | |
# Update Name | |
self.name = new_name | |
# Get text from user | |
def get_text(self): | |
if self.open: | |
try: | |
message = self.conn.recv(BUFFER_SIZE).decode(STRING_ENCODING) | |
return message.strip() | |
except Exception as msg: | |
self.disconnect() | |
log_server(str(msg), header="Exception") | |
else: | |
return None | |
# Send user text | |
def send(self, message, strip=True): | |
# Remove White Space | |
if strip: | |
message = message.strip() | |
# Check Size of Message | |
if len(message.encode(STRING_ENCODING)) < BUFFER_SIZE and self.open: | |
try: | |
# Add Newline to String | |
message = message + '\n' | |
# Send Data | |
self.conn.send(message.encode(STRING_ENCODING)) | |
except Exception as msg: | |
self.disconnect() | |
log_server(str(msg), header="Exception") | |
# Send user command text without sanitizing | |
def send_cmd(self, message): | |
self.send("[CMD] " + message, strip=False) | |
# Broadcast message as this user | |
def say(self, message): | |
# Sanatize Message | |
message.strip() | |
# Check Size of Message | |
if len(message) < MAX_CHAT_SIZE and self.open: | |
broadcast_message(self.name + "> " + message, user=self) | |
self.send("You> " + message.strip()) | |
else: | |
self.send( | |
"[WARNING] Message Too Long ({} > {})" | |
.format(len(message.encode(STRING_ENCODING)), MAX_CHAT_SIZE) | |
) | |
# Handle closing of server | |
def close_server(signal, frame): | |
# Log Closing of Server | |
log_server("Closing Server...") | |
broadcast_message("Server Has Closed!", log_in_server=False) | |
# Wait to disconnect users | |
for client in client_list: | |
client.disconnect() | |
# Close Socket | |
chat_server.close() | |
# Exit program | |
exit(0) | |
# Loop used to search for incoming users and connect them | |
def listen_to_clients(): | |
bind_server() | |
sys.stderr.write('\n') | |
sys.stderr.flush() | |
log_server("Created Chat Room on " + COMPUTER_IP + | |
':' + str(chat_server.getsockname()[1])) | |
# Constantly check for new users | |
while True: | |
try: # Accept new users | |
conn, addr = chat_server.accept() | |
except Exception as msg: | |
log_server(str(msg), header="Exception") | |
return | |
# Check for user max limit | |
if len(client_list) < MAX_USERS: | |
# Add user to client list | |
NewUser = User(conn, addr[0]) | |
client_list.append(NewUser) | |
# Start User Thread | |
client_list[len(client_list)-1].start() | |
else: | |
# Reject Connection | |
log_server("IP Address " + addr[0] + " Could Not Connect! Room Full!") | |
conn.send("Could Not Connect! Room Full!".encode(STRING_ENCODING)) | |
conn.close() | |
# Set escape codes to close server | |
signal.signal(signal.SIGINT, close_server) | |
listen_to_clients() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment