Skip to content

Instantly share code, notes, and snippets.

@welmends
Created April 16, 2020 23:35
Show Gist options
  • Save welmends/ddde1549db71f9755432e75c13301715 to your computer and use it in GitHub Desktop.
Save welmends/ddde1549db71f9755432e75c13301715 to your computer and use it in GitHub Desktop.
WebSocket in Python - Example
# WS client example
import asyncio
import websockets
import time
async def receive():
uri = "ws://localhost:9999"
async with websockets.connect(uri) as websocket:
while(True):
count = await websocket.recv()
print(count)
time.sleep(1)
asyncio.get_event_loop().run_until_complete(receive())
asyncio.get_event_loop().run_forever()
# WS server example
import asyncio
import websockets
import threading
import time
mutex = threading.Semaphore()
changed = False
amount = 10
def calc(argument):
global amount, changed
while(True):
mutex.acquire()
amount += 1
changed = True
mutex.release()
time.sleep(5)
t = threading.Thread(target=calc, args=("thread sendo executada",))
t.start()
def get_data():
global amount, changed
mutex.acquire()
if(changed == True):
changed = False
data = str(amount)
mutex.release()
return data
else:
mutex.release()
return -1
async def send(websocket, path):
loop = asyncio.get_event_loop()
while(True):
data = await loop.run_in_executor(None, get_data)
if(data != -1):
await websocket.send(data)
print('> sent')
time.sleep(1)
start_server = websockets.serve(send, "localhost", 9999)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment