Skip to content

Instantly share code, notes, and snippets.

@james4388
Last active December 26, 2023 04:19
Show Gist options
  • Save james4388/fa59f3e7c9d4bac238c06b4327b88b46 to your computer and use it in GitHub Desktop.
Save james4388/fa59f3e7c9d4bac238c06b4327b88b46 to your computer and use it in GitHub Desktop.
MJPEG stream using aiohttp, opencv, multipartwriter
import asyncio
import cv2
from aiohttp import web, MultipartWriter
async def mjpeg_handler(request):
boundary = "boundarydonotcross"
response = web.StreamResponse(status=200, reason='OK', headers={
'Content-Type': 'multipart/x-mixed-replace; '
'boundary=--%s' % boundary,
})
await response.prepare(request)
wc = cv2.VideoCapture(0)
encode_param = (int(cv2.IMWRITE_JPEG_QUALITY), 90)
while True:
_, frame = wc.read()
if frame is None:
continue
with MultipartWriter('image/jpeg', boundary=boundary) as mpwriter:
result, encimg = cv2.imencode('.jpg', frame, encode_param)
data = encimg.tostring()
mpwriter.append(data, {
'Content-Type': 'image/jpeg'
})
await mpwriter.write(response, close_boundary=False)
await response.drain()
wc.shutdown()
return response
async def index(request):
return web.Response(text='<img src="/image"/>', content_type='text/html')
async def start_server(loop, address, port):
app = web.Application(loop=loop)
app.router.add_route('GET', "/", index)
app.router.add_route('GET', "/image", mjpeg_handler)
return await loop.create_server(app.make_handler(), address, port)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server(loop, '0.0.0.0', 8888))
print("Server ready!")
try:
loop.run_forever()
except KeyboardInterrupt:
print("Shutting Down!")
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment