Skip to content

Instantly share code, notes, and snippets.

@blackle
Created July 8, 2022 07:57
Show Gist options
  • Save blackle/15888f0dcee4caaf34e6bd6800ae465a to your computer and use it in GitHub Desktop.
Save blackle/15888f0dcee4caaf34e6bd6800ae465a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import http.server
import socketserver
import threading
from urllib.parse import urlparse
from math import sqrt
import time
import pyvips
import ssl
counter = 0
#creates a jpeg image as bytes
def image_from_counter(counter):
svg = ""
svg += '''<?xml version="1.0" encoding="UTF-8"?>'''
svg += '''<svg width="123mm" height="42mm" version="1.1" viewBox="0 0 123 42" xmlns="http://www.w3.org/2000/svg">'''
svg += '''<g transform="translate(7.3 -241)" fill="#eaf" font-weight="bold" stroke-dasharray="5.55624993, 2.77812495999999998" stroke-linecap="round" stroke-linejoin="round" stroke-miterlimit="10" stroke-width="2.8">'''
svg += f'''<text x="2.4973986" y="260.90326" font-family="sans" font-size="14px" style="text-align:start;text-anchor:start;line-height:1.25;paint-order:stroke fill markers" xml:space="preserve"><tspan x="0" y="260.90326" font-family="FreeSans" font-size="14px" font-weight="bold" stroke-width="2.8">{counter} {"person" if counter==1 else "people"}</tspan></text>'''
svg += f'''<text x="3.2082679" y="271.4866" font-family="FreeSans" font-size="4.6px" style="text-align:start;text-anchor:start;line-height:1.25;paint-order:stroke fill markers" xml:space="preserve"><tspan x="0" y="271.4866" font-size="5.6px" stroke-width="2.8">{"is" if counter==1 else "are"} currently looking at this image</tspan></text>'''
svg += '''</g>'''
svg += '''</svg>'''
image = pyvips.Image.new_from_buffer(svg.encode('utf8'),"")
return image.write_to_buffer('.jpg[Q=20]')
class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
global counter
parsedpath = urlparse(self.path)
self.was_stream = False
if parsedpath.path == '/stream.mjpeg':
counter += 1
self.was_stream = True
self.send_response(200)
self.send_header("Content-type", "multipart/x-mixed-replace; boundary=--myboundary")
self.send_header("Cache-Control", "no-store, must-revalidate")
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
loops = 0
while True:
loops += 1
data = image_from_counter(counter)
sep = f'''--myboundary\r\nContent-Type: image/jpeg\r\nContent-Length: {len(data)}\r\n\r\n'''
self.wfile.write(sep.encode('utf8'))
self.wfile.write(data)
self.wfile.flush()
if loops > 2:
time.sleep(5)
else:
time.sleep(.1)
else:
self.send_response(400)
def __del__(self):
global counter
if self.was_stream == True:
counter -= 1
print("done")
class ThreadingSimpleServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
pass
if __name__ == "__main__":
socketserver.TCPServer.allow_reuse_address = True
server = ThreadingSimpleServer(("", 42069), MyHttpRequestHandler)
server.socket = ssl.wrap_socket (server.socket, keyfile="./key.pem", certfile='./cert.pem', server_side=True)
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment