Skip to content

Instantly share code, notes, and snippets.

@misaelnieto
Last active June 26, 2024 10:08
Show Gist options
  • Save misaelnieto/2409785 to your computer and use it in GitHub Desktop.
Save misaelnieto/2409785 to your computer and use it in GitHub Desktop.
Streaming MJPEG over HTTP with gstreamr and python - WSGI version
#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Updates:
# - 2024-04-24: Apply suggestions from @Pin80
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999
from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator
import time
class MyWSGIServer(ThreadingMixIn, WSGIServer):
pass
def create_server(host, port, app, server_class=MyWSGIServer,
handler_class=WSGIRequestHandler):
return make_server(host, port, app, server_class, handler_class)
INDEX_PAGE = b"""
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Test page for dummy camera with GStreamer</h1>
<img src="http://127.0.0.1:1337/mjpeg_stream"/>
<hr />
</body>
</html>
"""
INDEX_PAGE2 = b"""
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Test page for dummy camera with GStreamer</h1>
<source src="http://127.0.0.1:1337/mjpeg_stream"type="video/jpeg"></source>
<hr />
</body>
</html>
"""
ERROR_404 = b"""
<html>
<head>
<title>404 - Not Found</title>
</head>
<body>
<h1>404 - Not Found</h1>
</body>
</html>
"""
class IPCameraApp(object):
queues = []
def __call__(self,environ, start_response):
print("env=",environ['PATH_INFO'])
if environ['PATH_INFO'] == '/':
start_response("200 OK", [
("Content-Type", "text/html"),
("Content-Length", str(len(INDEX_PAGE)))
])
return iter([INDEX_PAGE])
elif environ['PATH_INFO'] == '/mjpeg_stream':
return self.stream(start_response)
else:
start_response("404 Not Found", [
("Content-Type", "text/html"),
("Content-Length", str(len(ERROR_404)))
])
return iter([ERROR_404])
def stream(self, start_response):
start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
q = Queue(4096)
self.queues.append(q)
while True:
try:
yield q.get()
time.sleep(0.001)
except:
if q in self.queues:
self.queues.remove(q)
return
def input_loop(app):
sock = socket()
sock.bind(('127.0.0.1', 9999))
sock.listen(1)
while True:
print('Waiting for input stream')
sd, addr = sock.accept()
print('Accepted input stream from', addr)
data_flag = True
while data_flag:
readable = select([sd], [], [], 0.1)[0]
for s in readable:
data = s.recv(1024)
if not data:
break
for q in app.queues:
q.put(data)
time.sleep(0.001)
print('Lost input stream from', addr)
if __name__ == '__main__':
#Launch an instance of wsgi server
app = IPCameraApp()
port = 1337
print('Launching camera server on port', port)
httpd = create_server('127.0.0.1', port, app)
print('Launch input stream thread')
t1 = Thread(target=input_loop, args=[app])
#t1.setDaemon(True)
t1.daemon = True
t1.start()
try:
print('Httpd serve forever')
httpd.serve_forever()
except KeyboardInterrupt:
httpd.kill()
print("Shutdown camera server ...")
@Pin80
Copy link

Pin80 commented Mar 21, 2024

I fixed all bugs, but I haven't watched the picture yet in browser.


#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999

from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator

class MyWSGIServer(ThreadingMixIn, WSGIServer):
     pass 

def create_server(host, port, app, server_class=MyWSGIServer,  
          handler_class=WSGIRequestHandler):
     return make_server(host, port, app, server_class, handler_class) 

INDEX_PAGE = b"""
<html>
<head>
    <title>Gstreamer testing</title>
</head>
	<body>
		<h1>Test page for dummy camera with GStreamer</h1>
			<img src="/mjpeg_stream"/ >
		<hr />
	</body>
</html>
"""
ERROR_404 = b"""
<html>
  <head>
    <title>404 - Not Found</title>
  </head>
  <body>
    <h1>404 - Not Found</h1>
  </body>
</html>
"""


class IPCameraApp(object):
	queues = []
		
	def __call__(self,environ, start_response):
		if environ['PATH_INFO'] == '/':
			start_response("200 OK", [
			("Content-Type", "text/html"),
			("Content-Length", str(len(INDEX_PAGE)))
			])
			return iter([INDEX_PAGE])
		elif environ['PATH_INFO'] == '/mjpeg_stream':
			return self.stream(start_response)
		else:
			start_response("404 Not Found", [
				("Content-Type", "text/html"),
				("Content-Length", str(len(ERROR_404)))
			])
			return iter([ERROR_404]) 
	def stream(self, start_response):
		print("start mjpegt video") 
		start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
		q = Queue()
		self.queues.append(q)
		while True:
			try:
				yield q.get()
			except:
				if q in self.queues:
				    self.queues.remove(q)
			return


def input_loop(app):
    sock = socket()
    sock.bind(('127.0.0.1', 9999))
    sock.listen(1)
    while True:
        print('Waiting for input stream')
        sd, addr = sock.accept()
        print('Accepted input stream from', addr)
        data = True
        while data:
            readable = select([sd], [], [], 0.1)[0]
            for s in readable:
                data = s.recv(1024)
                if not data:
                    break
                for q in app.queues:
                    q.put(data)
        print('Lost input stream from', addr)

if __name__ == '__main__':

    #Launch an instance of wsgi server
    app = IPCameraApp()
    port = 1337
    print('Launching camera server on port', port)
    httpd = create_server('127.0.0.1', port, app)

    print('Launch input stream thread')
    t1 = Thread(target=input_loop, args=[app])
    #t1.setDaemon(True)
    t1.daemon = True
    t1.start()

    try:
        print('Httpd serve forever')
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.kill()
        print("Shutdown camera server ...")

@Pin80
Copy link

Pin80 commented Mar 21, 2024

Final edition:

#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999

from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator
import time
class MyWSGIServer(ThreadingMixIn, WSGIServer):
     pass 

def create_server(host, port, app, server_class=MyWSGIServer,  
          handler_class=WSGIRequestHandler):
     return make_server(host, port, app, server_class, handler_class) 

INDEX_PAGE = b"""
<html>
<head>
    <title>Gstreamer testing</title>
</head>
	<body>
		<h1>Test page for dummy camera with GStreamer</h1>
			<img src="http://127.0.0.1:1337/mjpeg_stream"/>
		<hr />
	</body>
</html>
"""

INDEX_PAGE2 = b"""
<html>
<head>
    <title>Gstreamer testing</title>
</head>
	<body>
		<h1>Test page for dummy camera with GStreamer</h1>
			<source src="http://127.0.0.1:1337/mjpeg_stream"type="video/jpeg"></source>
		<hr />
	</body>
</html>
"""


ERROR_404 = b"""
<html>
  <head>
    <title>404 - Not Found</title>
  </head>
  <body>
    <h1>404 - Not Found</h1>
  </body>
</html>
"""


class IPCameraApp(object):
	queues = []
		
	def __call__(self,environ, start_response):
		print("env=",environ['PATH_INFO']) 
		if environ['PATH_INFO'] == '/':
			start_response("200 OK", [
			("Content-Type", "text/html"),
			("Content-Length", str(len(INDEX_PAGE)))
			])
			return iter([INDEX_PAGE])
		elif environ['PATH_INFO'] == '/mjpeg_stream':
			return self.stream(start_response)
		else:
			start_response("404 Not Found", [
				("Content-Type", "text/html"),
				("Content-Length", str(len(ERROR_404)))
			])
			return iter([ERROR_404]) 
	def stream(self, start_response):
		start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
		q = Queue(4096)
		self.queues.append(q)
		while True:
			try:
				yield q.get()
				time.sleep(0.001)
			except:
				if q in self.queues:
				    self.queues.remove(q)
				return


def input_loop(app):
	sock = socket()
	sock.bind(('127.0.0.1', 9999))
	sock.listen(1)
	while True:
		print('Waiting for input stream')
		sd, addr = sock.accept()
		print('Accepted input stream from', addr)
		data_flag = True
		while data_flag:
			readable = select([sd], [], [], 0.1)[0]
			for s in readable:
				data = s.recv(1024)
				if not data:
				    break
				for q in app.queues:
					q.put(data)
					time.sleep(0.001)
		print('Lost input stream from', addr)

if __name__ == '__main__':

    #Launch an instance of wsgi server
    app = IPCameraApp()
    port = 1337
    print('Launching camera server on port', port)
    httpd = create_server('127.0.0.1', port, app)

    print('Launch input stream thread')
    t1 = Thread(target=input_loop, args=[app])
    #t1.setDaemon(True)
    t1.daemon = True
    t1.start()

    try:
        print('Httpd serve forever')
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.kill()
        print("Shutdown camera server ...")

@unitedtimur
Copy link

Great job

@0ct0cat
Copy link

0ct0cat commented May 31, 2024

Great job

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment