Skip to content

Instantly share code, notes, and snippets.

@MtkN1
Created March 25, 2022 12:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MtkN1/1efaf7928e8f823ae01a5e8744af309b to your computer and use it in GitHub Desktop.
Save MtkN1/1efaf7928e8f823ae01a5e8744af309b to your computer and use it in GitHub Desktop.
import asyncio
import itertools
import time
import aiohttp
"""
WebSocket テストクライアント
テストサーバーに接続して 0 から始まるサーバーのカウンターとローカルのカウンターの一致を照合する。
カウンターが 10 の時点で CPU バウンドのブロッキング処理を行う。(1億回ループの加算、自環境10秒程度)
カウンターの受信漏れがあれば一致せず AssertionError が発生すると想定。
結果:
受信漏れは発生しなかった。
ブロッキング処理を長くしたりサーバー側の送信速度などを早くしても同様。
想定:
ブロッキング処理後にカウンターの print 標準出力が一気(100 ms より早く)に表示される。
そしてサーバー側がタイムアウトしないことからも Python プログラムより上位レイヤーで常に通信が行われて WebSocket メッセージを受信している(?)
その為ブロッキング処理後待ち時間なしで print されているのだと思われる。
"""
def heavy_task():
print("heavy_task")
# time.sleep(10.0)
n = 0
for i in range(100000000):
n += i
async def main():
async with aiohttp.ClientSession() as session:
async with session.ws_connect("ws://localhost:8080/ws") as ws:
counter = itertools.count()
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
server_count = msg.json()
local_count = next(counter)
print(f"{server_count}:{local_count} {(server_count == local_count)=:}")
assert server_count == local_count, f"different count"
if server_count == 10:
heavy_task()
elif msg.type == aiohttp.WSMsgType.ERROR:
break
if server_count >= 99:
break
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
import asyncio
import itertools
from aiohttp import web
"""
WebSocket テストサーバー
クライアントが接続されたら 100 ms ごとに 0 から始まるカウンターを送信する。
送信に 1 ms 以上要した場合は TimeoutError を発生させる。(その場合クライアントはローカルのカウンターと一致しないと想定)
"""
async def send_counter(ws: web.WebSocketResponse):
for count in itertools.count():
try:
await asyncio.wait_for(ws.send_json(count), timeout=0.001)
except asyncio.TimeoutError:
print(f"TimeoutError: {count}")
await asyncio.sleep(0.1)
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
task = asyncio.create_task(send_counter(ws))
async for msg in ws: ...
task.cancel()
print("websocket connection closed")
return ws
app = web.Application()
app.add_routes([web.get("/ws", websocket_handler)])
web.run_app(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment