Skip to content

Instantly share code, notes, and snippets.

@Xevion
Created March 11, 2023 03:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Xevion/7b57bbaa3bfd771763b4e8291d83bdb0 to your computer and use it in GitHub Desktop.
Save Xevion/7b57bbaa3bfd771763b4e8291d83bdb0 to your computer and use it in GitHub Desktop.
import asyncio
import io
import os
from typing import Optional
import websockets
from PIL import Image
from constants import Environment
class PlaceClient:
"""
Defines a stateful client that manages a 'source of truth' with the image created by incremental changes to the Websocket.
"""
def __init__(self, connection) -> None:
self.connection: websockets.WebSocketClientProtocol = connection
# A lock used to manage the 'source of truth' image while performing read intensive operations.
self.source_lock = asyncio.Lock()
width, height = int(os.getenv(Environment.CANVAS_HEIGHT)), int(os.getenv(Environment.CANVAS_HEIGHT))
# The 'source of truth' image describing what is currently on the canvas.
self.source: Image = Image.new("RGBA", (width, height), (255, 0, 0, 0))
# The current targeted 'output' image.
self.current_target: Optional[Image] = None
def lock(self) -> asyncio.Lock:
print('Lock acquired.')
return self.source_lock
@classmethod
async def connect(cls, address: str):
"""A factory method for connecting to the websocket and instantiating the client."""
connection = await websockets.connect(address)
client = cls(connection)
if connection.open:
message = await connection.recv()
if type(message) != bytes:
raise RuntimeError("Fatal: Initial message from websocket was not 'bytes'")
img = Image.open(io.BytesIO(message))
client.source.paste(img)
return client
async def receive(self):
"""
Receiving all server messages and handling them
"""
while True:
try:
message = await self.connection.recv()
if type(message) == bytes:
img = Image.open(io.BytesIO(message))
print('Waiting for lock to paste.')
with self.lock():
print('Acquired. Pasting...')
self.source.paste(img)
print('Done pasting.')
except websockets.ConnectionClosed:
print('Connection with server closed')
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment