Created
March 11, 2023 03:23
-
-
Save Xevion/7b57bbaa3bfd771763b4e8291d83bdb0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import io | |
import os | |
from typing import Optional | |
import websockets | |
from PIL import Image | |
from constants import Environment | |
class PlaceClient: | |
""" | |
Defines a stateful client that manages a 'source of truth' with the image created by incremental changes to the Websocket. | |
""" | |
def __init__(self, connection) -> None: | |
self.connection: websockets.WebSocketClientProtocol = connection | |
# A lock used to manage the 'source of truth' image while performing read intensive operations. | |
self.source_lock = asyncio.Lock() | |
width, height = int(os.getenv(Environment.CANVAS_HEIGHT)), int(os.getenv(Environment.CANVAS_HEIGHT)) | |
# The 'source of truth' image describing what is currently on the canvas. | |
self.source: Image = Image.new("RGBA", (width, height), (255, 0, 0, 0)) | |
# The current targeted 'output' image. | |
self.current_target: Optional[Image] = None | |
def lock(self) -> asyncio.Lock: | |
print('Lock acquired.') | |
return self.source_lock | |
@classmethod | |
async def connect(cls, address: str): | |
"""A factory method for connecting to the websocket and instantiating the client.""" | |
connection = await websockets.connect(address) | |
client = cls(connection) | |
if connection.open: | |
message = await connection.recv() | |
if type(message) != bytes: | |
raise RuntimeError("Fatal: Initial message from websocket was not 'bytes'") | |
img = Image.open(io.BytesIO(message)) | |
client.source.paste(img) | |
return client | |
async def receive(self): | |
""" | |
Receiving all server messages and handling them | |
""" | |
while True: | |
try: | |
message = await self.connection.recv() | |
if type(message) == bytes: | |
img = Image.open(io.BytesIO(message)) | |
print('Waiting for lock to paste.') | |
with self.lock(): | |
print('Acquired. Pasting...') | |
self.source.paste(img) | |
print('Done pasting.') | |
except websockets.ConnectionClosed: | |
print('Connection with server closed') | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment