Skip to content

Instantly share code, notes, and snippets.

@dzil123
Last active August 7, 2019 07:31
Show Gist options
  • Save dzil123/0eed50967931bcd44d6c4c70004752a4 to your computer and use it in GitHub Desktop.
Save dzil123/0eed50967931bcd44d6c4c70004752a4 to your computer and use it in GitHub Desktop.
import asyncio
from typing import AsyncGenerator, Dict, List, Tuple
import cv2
import uvicorn
from starlette.applications import Starlette
from starlette.responses import HTMLResponse
# -----------------------------------------------------------------------------
# Reusable ASGI framework
# Takes care of waiting to start sending
# Nice way to be peacefully notified when the client has left
# Usage: ```
# async with ASGILifespan(receive) as lifespan:
# # do your thing, and periodically, do
# if lifespan.end:
# # cleanup tasks
# return
# ```
class ASGILifespan:
def __init__(self, receive):
self._receive = receive
self._task = None
@staticmethod
def is_msg_start(message):
return (message["type"] == "http.request") and (not message["more_body"])
@staticmethod
def is_msg_end(message):
return message["type"] == "http.disconnect"
# Blocks until it is time to start the response
# Private, internal use only
async def _task_start(self):
while True:
message = await self._receive()
if self.is_msg_start(message) or self.is_msg_end(message):
return
# Blocks until it is time to end the response
# Private, internal use only
async def _task_end(self):
while True:
message = await self._receive()
if self.is_msg_end(message):
return
# Blocks until it is time to end the response
# Why would you use this, though?
async def wait_end(self):
if self.end:
return
await self._task
# Returns True if it is time to end the response
@property
def end(self):
if self._task is None:
return True # Invalid state
return self._task.done()
# Blocks until it is time to start the response
async def __aenter__(self):
await self._task_start()
if self._task is None:
self._task = asyncio.ensure_future(self._task_end())
return self
async def __aexit__(self, *exc_info):
if self._task is not None:
self._task.cancel()
self._task = None
# Takes care of all headers, preamble, and postamble
# Usage: ```
# async with ASGIApplication(receive, send) as app:
# # do your thing, and periodically, do
# if app.end:
# # cleanup tasks
# return
# ```
class ASGIApplication(ASGILifespan):
def __init__(self, receive, send, *, status=200, headers={}):
self._send = send
self._status = status
self._headers = headers
super().__init__(receive)
@staticmethod
def _encode_bytes(val):
return val.encode("latin-1")
@classmethod
def _convert_headers(cls, headers={}):
return [
(cls._encode_bytes(k), cls._encode_bytes(v)) for k, v in headers.items()
]
async def send(self, data):
await self._send(
{"type": "http.response.body", "body": data, "more_body": True}
)
async def __aenter__(self):
await super().__aenter__()
await self._send(
{
"type": "http.response.start",
"status": self._status,
"headers": self._convert_headers(self._headers),
}
)
return self
async def __aexit__(self, *exc_info):
await self._send({"type": "http.response.body"})
return await super().__aexit__(*exc_info)
# Takes care of streaming with multipart/x-mixed-replace
# Usage: ```
# async with ASGIStreamer(receive, send) as app:
# # do your thing, and periodically, do
# if app.end:
# # cleanup tasks
# return
# ```
class ASGIStreamer(ASGIApplication):
def __init__(self, receive, send, *, boundary="frame", status=200, headers={}):
self._boundary = self._encode_bytes(f"\r\n--{boundary}\r\n")
headers["Content-Type"] = f"multipart/x-mixed-replace; boundary={boundary}"
headers["Connection"] = "close"
super().__init__(receive, send, status=status, headers=headers)
async def send(self, data):
await super().send(self._boundary + data)
# -----------------------------------------------------------------------------
# An ASGI application that streams mjpeg from a jpg iterable
class MjpegResponse:
HEADERS = ASGIApplication._encode_bytes("Content-Type: image/jpeg\r\n\r\n")
def __init__(self, src):
self.src = src
async def __call__(self, scope, receive, send):
async with ASGIStreamer(receive, send) as app:
for img in self.src:
if app.end:
return
await app.send(self.HEADERS + img)
# -----------------------------------------------------------------------------
# OpenCV wrappers
def mat_to_jpg(mat):
return cv2.imencode(".jpg", mat)[1].tobytes()
def camera_to_jpg(cam):
yield from map(mat_to_jpg, cam)
# Context Manager and iterable
# Usage: ```
# with Camera(0) as cam:
# # cam is now an iterable of mats
# ```
class Camera:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.cam = None
def __iter__(self):
return self
def __next__(self):
return self.cam.read()[1]
def __enter__(self):
if self.cam is None:
self.cam = cv2.VideoCapture(*self.args, **self.kwargs)
return self
def __exit__(self, *exc_info):
if self.cam is None:
return
self.cam.release()
# -----------------------------------------------------------------------------
# Application
class WebServer:
IMAGE_URL = "/image.mjpeg"
def __init__(self, cam):
self.cam = camera_to_jpg(cam)
self.app = Starlette()
self.app.debug = True
self.app.route(self.IMAGE_URL)(self.image)
self.app.route("/")(self.index)
def image(self, request):
return MjpegResponse(self.cam)
def index(self, request):
return HTMLResponse(
f"""<html><title>Hello</title><body><h1>Hello</h1><br/><img src="{self.IMAGE_URL}"/></body></html>"""
)
def desktop_demo():
try:
with Camera(0) as cam:
for img in cam:
cv2.imshow("Asdf", img)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
finally:
cv2.destroyAllWindows()
def web_demo():
with Camera(0) as cam:
server = WebServer(cam)
uvicorn.run(server.app)
# -----------------------------------------------------------------------------
def main():
web_demo()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment