Skip to content

Instantly share code, notes, and snippets.

@Sorseg
Last active December 29, 2015 10:39
Show Gist options
  • Save Sorseg/7658392 to your computer and use it in GitHub Desktop.
Save Sorseg/7658392 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import tkinter as tk
import tkinter.ttk as ttk
from threading import Thread
import socket
PORT=1235
root = tk.Tk()
root.title('QIP 0.0')
def on_exit(event=None):
try: userlist.delete(0, 'end')
except: pass
try:
root.sock.close()
root.sockfile.close()
except: pass
for c in users.values():
if c:
try:
c.conn.close()
c.close()
except: pass
root.bind("<Destroy>", on_exit)
users = {}
root.host_login='User'
def to_thread(func):
Thread(target=func).start()
def to_loop(func):
root.after(0, func)
def socket_reader(sckfile):
while True:
try:
l = sckfile.readline()
print(repr(l))
except Exception as e:
print("Ex! "+str(e))
return
if not l: return
yield l
def connect():
on_exit()
root.sock = socket.socket()
root.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
root.sock.connect((root.address.get(), PORT))
except Exception as e:
message('>error: '+str(e))
return
root.msg_send = client_send
message('>Connected to: '+root.address.get())
root.sockfile = root.sock.makefile('rw', buffering=1, encoding='utf-8')
root.msg_send(root.login.get()+'\n')
enable_chat()
root.options_window.withdraw()
@to_thread
def _():
for l in socket_reader(root.sockfile):
if l.startswith('MESSAGE:'):
message(l[len('MESSAGE:'):])
if l.startswith('USERIN:'):
add_user(l[len('USERIN:'):])
if l.startswith('USEROUT:'):
remove_user(l[len('USEROUT:'):])
message('>Disconnected')
disable_chat()
def host():
on_exit()
sck = socket.socket()
root.sock = sck
sck.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sck.bind(('',PORT))
message('>Hosting on '+str(PORT))
sck.listen(1)
root.msg_send = server_send
enable_chat()
login = root.login.get().strip()
root.host_login = login
add_user(login)
users[login] = None
root.options_window.withdraw()
@to_thread
def _():
while 1:
conn, addr = sck.accept()
cfile = conn.makefile('rw', buffering=1, encoding='utf-8')
cfile.conn = conn
username = cfile.readline().strip()
login = register_user(username, cfile)
message('>Connected: '+login+' '+str(addr[0]))
@to_thread
def serve():
name = login
for l in socket_reader(cfile):
msg = name+': '+l
message(msg)
broadcast('MESSAGE:'+msg)
print(name, 'disconnected')
message('>Disconnected: '+name)
unregister_user(name)
remove_user(name)
def show_options():
'''show options window'''
if root.options_window.winfo_exists():
root.options_window.deiconify()
return
root.options_window = tk.Toplevel(takefocus=True)
root.options_window.title('Options')
login_label = ttk.Label(root.options_window, text='Login:')
login_label.grid(row=0,column=0)
root.login = ttk.Entry(root.options_window)
root.login.insert(0,'User')
root.login.grid(row=0, column=1)
connect_button = ttk.Button(root.options_window,
text='Connect', command=connect)
connect_button.grid(row=1, column=0)
root.address = ttk.Entry(root.options_window)
root.address.insert(0,'sorseg.dyndns.org')
root.address.grid(row=1, column=1)
host_button = ttk.Button(root.options_window,
text='Host', command=host)
host_button.grid(row=2, column=0)
root.options_window.lift()
root.options_window.deiconify()
def register_user(login, connection = None):
if login in users:
for i in range(1, 10000):
lgn = login+str(i)
if lgn not in users:
login = lgn
break
else:
if connection:
connection.close()
return
if connection:
for u in users.keys():
connection.write('USERIN:'+u+'\n')
connection.flush()
users[login] = connection
add_user(login)
broadcast('USERIN:'+login)
return login
def unregister_user(login):
users.pop(login)
broadcast('USEROUT:'+login)
def add_user(login):
userlist.insert('end', login)
def remove_user(login):
for i,l in enumerate(userlist.get(0,'end')):
if l == login:
userlist.delete(i)
break
else:
print("USER NOT FOUND")
def build_from_dict(tkd, root):
'''Builds tkinter app from dictionary'''
result = {}
for k,v in tkd.items():
pack = {'fill':'both'}
pack.update(v.get('pack',{}))
holder = tk.Frame(root)
holder.pack(pack)
widget = v['widget'](holder, **v.get('args',{}))
widget.pack(expand = 1, fill = 'both',side='left')
result[k] = {'widget':widget}
if 'y' in v.get('scroll',''):
scrolly = ttk.Scrollbar(holder, orient='vertical')
result[k]['scrolly'] = scrolly
scrolly.pack(side='right', fill='y',expand=0)
scrolly['command']=widget.yview
widget['yscrollcommand']=scrolly.set
if 'children' in v:
result[k].update(build_from_dict(v['children'], widget))
continue
return result
widgets_dict = {
'top frame':{
'widget':tk.Frame, 'pack':{'side':'top','expand':1},
'children':{
'messages':{
'widget':tk.Text,
'args':{'state':'disabled', 'wrap':'word'},
'pack':{'side':'left', 'expand':1,'fill':'both' },
'scroll':'y'
},
'right frame':{
'widget':tk.Frame,
'pack':{'side':'right', 'fill':'y'},
'children':{
'options':{
'widget':ttk.Button,
'args':{'text':'options','command':show_options},
'pack':{'side':'top'}
},
'users':{
'widget':tk.Listbox,
'scroll':'y',
'pack':{'side':'bottom','fill':'y','expand':1}
}
}
}
}
},
'bottom frame':{
'widget':tk.Frame, 'pack':{'side':'bottom'},
'children':{
'text':{
'widget':tk.Text,
'args':{'height':3, 'width':50},
'pack':{'side':'left', 'fill':'x', 'expand':1}
},
'send':{
'widget':ttk.Button,
'args':{'text':'Send'},
'pack':{'side':'right'}
}
}
}
}
root.widgets = build_from_dict(widgets_dict, root)
userlist = root.widgets['top frame']['right frame']['users']['widget']
text = root.widgets['bottom frame']['text']['widget']
messages = root.widgets['top frame']['messages']['widget']
def message(msg):
if not msg.endswith('\n'):
msg = msg+'\n'
@to_loop
def _():
messages['state']='normal'
messages.insert('end', msg)
messages['state']='disabled'
messages.yview('moveto',1.0)
def send(e=None):
msg = text.get(1.0,'end')
text.delete(1.0,'end')
msg = msg.strip()+'\n'
print(repr(msg))
root.msg_send(msg)
return "break"
def client_send(msg):
root.sockfile.write(msg)
root.sockfile.flush()
def server_send(msg):
msg = root.host_login+": "+msg
message(msg)
broadcast("MESSAGE:"+msg)
text.bind('<Return>', send)
root.widgets['bottom frame']['send']['widget']['command'] = send
def broadcast(msg):
for u,c in users.items():
if not c:
continue
try:
c.write(msg.strip()+'\n')
c.flush()
except Exception as e:
print(repr(e),str(e))
message('>failed to deliver to '+u)
def disable_chat():
@to_loop
def _():
userlist['bg']='grey'
text['bg']='grey'
text['state']='disabled'
def enable_chat():
@to_loop
def _():
userlist['bg']='white'
text['bg']='white'
text['state']='normal'
root.options_window = tk.Toplevel()
root.options_window.destroy()
disable_chat()
root.after(50, show_options)
root.mainloop()
import socket
from threading import Thread
PORT = 1235
def to_thread(func):
Thread(target=func).start()
def socket_reader(sckfile):
while True:
try:
l = sckfile.readline()
print(repr(l))
except Exception as e:
print("Ex!"+str(e))
return
if not l: return
yield l
sck = socket.socket()
sck.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sck.bind(('', PORT))
sck.listen(1)
users = {}
def register_user(username, cfile):
if username in users:
for i in range(1,1000000):
new_login = username + str(i)
if new_login not in users:
username = new_login
break
else:
print("Too many", username+"'s")
cfile.close()
for u in users.keys():
cfile.write('USERIN:'+u+'\n')
cfile.flush()
users[username] = cfile
broadcast('USERIN:'+username)
return username
def broadcast(msg):
for u, c in users.items():
if not c:
continue
try:
c.write(msg.strip()+'\n')
c.flush()
except Exception as e:
print(repr(e), str(e))
print('>failed to deliver to '+u)
def unregister_user(name):
try:
users.pop(name).close()
except: pass
broadcast('USEROUT:'+name)
@to_thread
def _():
while 1:
conn, addr = sck.accept()
cfile = conn.makefile('rw', buffering=1)
cfile.conn = conn
username = cfile.readline().strip()
login = register_user(username, cfile)
print('>Connected: '+login+' '+str(addr[0]))
@to_thread
def serve():
name = login
for l in socket_reader(cfile):
msg = name+': '+l
broadcast('MESSAGE:'+msg)
print(name, 'disconnected')
print('>Disconnected: '+name)
unregister_user(name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment