Skip to content

Instantly share code, notes, and snippets.

@pold500
Created October 20, 2023 15:22
Show Gist options
  • Save pold500/3b3d693356eee04c525e74dc04f53a57 to your computer and use it in GitHub Desktop.
Save pold500/3b3d693356eee04c525e74dc04f53a57 to your computer and use it in GitHub Desktop.
websocket python server
import sys
from websocket_server import WebsocketServer
from pynput import keyboard
import threading
import queue
# Create a thread-safe queue
shared_queue = queue.Queue()
threadLocal = threading.local()
# Called for every client connecting (after handshake)
def new_client(client, server):
print("New client connected and was given id %d" % client['id'])
server.send_message_to_all("Hey all, a new client has joined us")
# Called for every client disconnecting
def client_left(client, server):
print("Client(%d) disconnected" % client['id'])
# Called when a client sends a message
def message_received(client, server, message):
print("Client(%d) said: %s" % (client['id'], message))
def keyboard_press(key, server):
if not hasattr(threadLocal, 'last_key'):
threadLocal.last_key = None
if key == keyboard.Key.ctrl_l or key == keyboard.Key.ctrl_r:
# and threadLocal.last_key == 'c':
keyboardListener = shared_queue.get(block=False);
if(keyboardListener is not None):
print("\r\nStop");
keyboardListener.stop();
return
if key == keyboard.Key.esc:
print("Input erased");
print("CMD>", end='');
threadLocal.user_input = str();
try:
read_key = key.char # single-char keys
except:
read_key = key.name # other keys
user_input = getattr(threadLocal, 'user_input', None)
if user_input is None:
threadLocal.user_input = str() #initialize cache
threadLocal.last_key = read_key;
if(key != keyboard.Key.enter and not hasattr(key, 'name')):
sys.stdout.write(read_key);
threadLocal.user_input += read_key;
return True
if(key == keyboard.Key.enter):
print("");
stripped_input = threadLocal.user_input.strip()
server.send_message_to_all(stripped_input);
print("Sending command ".format(stripped_input))
# Clear threadlocal cache
threadLocal.user_input = str("")
print("CMD>", end='');
return True
def main():
port=8082
server = WebsocketServer(port = port)
server.set_fn_new_client(new_client)
server.set_fn_client_left(client_left)
server.set_fn_message_received(message_received)
print("Current Thread: ", threading.current_thread().name);
listener = keyboard.Listener(on_press = lambda key : keyboard_press(key, server))
shared_queue.put(listener);
listener.start() # start to listen on a separate thread
server.run_forever(True)
print("WebSocket server has started...");
print("Listening to user input...");
#sys.stdout.write("GeeksforGeeks ")
print("Locking the main thread:");
sys.stdout.write("CMD>")
listener.join() # remove if main thread is polling self.keys
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment