Skip to content

Instantly share code, notes, and snippets.

@misaelnieto
Last active April 25, 2024 03:22
Show Gist options
  • Star 44 You must be signed in to star a gist
  • Fork 20 You must be signed in to fork a gist
  • Save misaelnieto/2409785 to your computer and use it in GitHub Desktop.
Save misaelnieto/2409785 to your computer and use it in GitHub Desktop.
Streaming MJPEG over HTTP with gstreamr and python - WSGI version
#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Updates:
# - 2024-04-24: Apply suggestions from @Pin80
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 ! jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999
from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator
import time
class MyWSGIServer(ThreadingMixIn, WSGIServer):
pass
def create_server(host, port, app, server_class=MyWSGIServer,
handler_class=WSGIRequestHandler):
return make_server(host, port, app, server_class, handler_class)
INDEX_PAGE = b"""
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Test page for dummy camera with GStreamer</h1>
<img src="http://127.0.0.1:1337/mjpeg_stream"/>
<hr />
</body>
</html>
"""
INDEX_PAGE2 = b"""
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Test page for dummy camera with GStreamer</h1>
<source src="http://127.0.0.1:1337/mjpeg_stream"type="video/jpeg"></source>
<hr />
</body>
</html>
"""
ERROR_404 = b"""
<html>
<head>
<title>404 - Not Found</title>
</head>
<body>
<h1>404 - Not Found</h1>
</body>
</html>
"""
class IPCameraApp(object):
queues = []
def __call__(self,environ, start_response):
print("env=",environ['PATH_INFO'])
if environ['PATH_INFO'] == '/':
start_response("200 OK", [
("Content-Type", "text/html"),
("Content-Length", str(len(INDEX_PAGE)))
])
return iter([INDEX_PAGE])
elif environ['PATH_INFO'] == '/mjpeg_stream':
return self.stream(start_response)
else:
start_response("404 Not Found", [
("Content-Type", "text/html"),
("Content-Length", str(len(ERROR_404)))
])
return iter([ERROR_404])
def stream(self, start_response):
start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
q = Queue(4096)
self.queues.append(q)
while True:
try:
yield q.get()
time.sleep(0.001)
except:
if q in self.queues:
self.queues.remove(q)
return
def input_loop(app):
sock = socket()
sock.bind(('127.0.0.1', 9999))
sock.listen(1)
while True:
print('Waiting for input stream')
sd, addr = sock.accept()
print('Accepted input stream from', addr)
data_flag = True
while data_flag:
readable = select([sd], [], [], 0.1)[0]
for s in readable:
data = s.recv(1024)
if not data:
break
for q in app.queues:
q.put(data)
time.sleep(0.001)
print('Lost input stream from', addr)
if __name__ == '__main__':
#Launch an instance of wsgi server
app = IPCameraApp()
port = 1337
print('Launching camera server on port', port)
httpd = create_server('127.0.0.1', port, app)
print('Launch input stream thread')
t1 = Thread(target=input_loop, args=[app])
#t1.setDaemon(True)
t1.daemon = True
t1.start()
try:
print('Httpd serve forever')
httpd.serve_forever()
except KeyboardInterrupt:
httpd.kill()
print("Shutdown camera server ...")
@Hypnotriod
Copy link

Great! Thank you so much!

@misaelnieto
Copy link
Author

Great! Thank you so much!

I'm glad you found it useful

@Pin80
Copy link

Pin80 commented Mar 20, 2024

It seems the code is deprecated, I updated code, but errors didn't disappear.
My code proposal:

#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999

from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn


class MyWSGIServer(ThreadingMixIn, WSGIServer):
     pass 

def create_server(host, port, app, server_class=MyWSGIServer,  
          handler_class=WSGIRequestHandler):
     return make_server(host, port, app, server_class, handler_class) 

INDEX_PAGE = """
<html>
<head>
    <title>Gstreamer testing</title>
</head>
<body>
<h1>Testing a dummy camera with GStreamer</h1>
<img src="127.0.0.1:1337/mjpeg_stream"/>
<hr />
</body>
</html>
"""
ERROR_404 = """
<html>
  <head>
    <title>404 - Not Found</title>
  </head>
  <body>
    <h1>404 - Not Found</h1>
  </body>
</html>
"""


class IPCameraApp(object):
    queues = []

    def __call__(self, environ, start_response):    
        if environ['PATH_INFO'] == '/':
            start_response("200 OK", [
                ("Content-Type", "text/html"),
                ("Content-Length", str(len(INDEX_PAGE)))
            ])
            return iter([INDEX_PAGE])    
        elif environ['PATH_INFO'] == '127.0.0.1:1337/mjpeg_stream':
            return self.stream(start_response)
        else:
            start_response("404 Not Found", [
                ("Content-Type", "text/html"),
                ("Content-Length", str(len(ERROR_404)))
            ])
            return iter([ERROR_404])

    def stream(self, start_response):
        start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
        q = Queue()
        self.queues.append(q)
        while True:
            try:
                yield q.get()
            except:
                if q in self.queues:
                    self.queues.remove(q)
                return


def input_loop(app):
    sock = socket()
    sock.bind(('127.0.0.1', 9999))
    sock.listen(1)
    while True:
        print('Waiting for input stream')
        sd, addr = sock.accept()
        print('Accepted input stream from', addr)
        data = True
        while data:
            readable = select([sd], [], [], 0.1)[0]
            for s in readable:
                data = s.recv(1024)
                if not data:
                    break
                for q in app.queues:
                    q.put(data)
        print('Lost input stream from', addr)

if __name__ == '__main__':

    #Launch an instance of wsgi server
    app = IPCameraApp()
    port = 1337
    print('Launching camera server on port', port)
    httpd = create_server('127.0.0.1', port, app)

    print('Launch input stream thread')
    t1 = Thread(target=input_loop, args=[app])
    #t1.setDaemon(True)
    t1.daemon = True
    t1.start()

    try:
        print('Httpd serve forever')
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.kill()
        print("Shutdown camera server ...")

Errors:
user@usercomp:~/MySoftware/study_gstreamer/123$ python ./live-mjpeg-stream.py
Launching camera server on port 1337
Launch input stream thread
Httpd serve forever
Waiting for input stream
Accepted input stream from ('127.0.0.1', 36966)
Lost input stream from ('127.0.0.1', 36966)
Waiting for input stream
Accepted input stream from ('127.0.0.1', 46856)
Traceback (most recent call last):
File "/usr/lib/python3.10/wsgiref/handlers.py", line 138, in run
self.finish_response()
File "/usr/lib/python3.10/wsgiref/handlers.py", line 184, in finish_response
self.write(data)
File "/usr/lib/python3.10/wsgiref/handlers.py", line 279, in write
assert type(data) is bytes,
AssertionError: write() argument must be a bytes instance
127.0.0.1 - - [20/Mar/2024 12:27:13] "GET / HTTP/1.1" 500 59

@Pin80
Copy link

Pin80 commented Mar 21, 2024

I fixed all bugs, but I haven't watched the picture yet in browser.


#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999

from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator

class MyWSGIServer(ThreadingMixIn, WSGIServer):
     pass 

def create_server(host, port, app, server_class=MyWSGIServer,  
          handler_class=WSGIRequestHandler):
     return make_server(host, port, app, server_class, handler_class) 

INDEX_PAGE = b"""
<html>
<head>
    <title>Gstreamer testing</title>
</head>
	<body>
		<h1>Test page for dummy camera with GStreamer</h1>
			<img src="/mjpeg_stream"/ >
		<hr />
	</body>
</html>
"""
ERROR_404 = b"""
<html>
  <head>
    <title>404 - Not Found</title>
  </head>
  <body>
    <h1>404 - Not Found</h1>
  </body>
</html>
"""


class IPCameraApp(object):
	queues = []
		
	def __call__(self,environ, start_response):
		if environ['PATH_INFO'] == '/':
			start_response("200 OK", [
			("Content-Type", "text/html"),
			("Content-Length", str(len(INDEX_PAGE)))
			])
			return iter([INDEX_PAGE])
		elif environ['PATH_INFO'] == '/mjpeg_stream':
			return self.stream(start_response)
		else:
			start_response("404 Not Found", [
				("Content-Type", "text/html"),
				("Content-Length", str(len(ERROR_404)))
			])
			return iter([ERROR_404]) 
	def stream(self, start_response):
		print("start mjpegt video") 
		start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
		q = Queue()
		self.queues.append(q)
		while True:
			try:
				yield q.get()
			except:
				if q in self.queues:
				    self.queues.remove(q)
			return


def input_loop(app):
    sock = socket()
    sock.bind(('127.0.0.1', 9999))
    sock.listen(1)
    while True:
        print('Waiting for input stream')
        sd, addr = sock.accept()
        print('Accepted input stream from', addr)
        data = True
        while data:
            readable = select([sd], [], [], 0.1)[0]
            for s in readable:
                data = s.recv(1024)
                if not data:
                    break
                for q in app.queues:
                    q.put(data)
        print('Lost input stream from', addr)

if __name__ == '__main__':

    #Launch an instance of wsgi server
    app = IPCameraApp()
    port = 1337
    print('Launching camera server on port', port)
    httpd = create_server('127.0.0.1', port, app)

    print('Launch input stream thread')
    t1 = Thread(target=input_loop, args=[app])
    #t1.setDaemon(True)
    t1.daemon = True
    t1.start()

    try:
        print('Httpd serve forever')
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.kill()
        print("Shutdown camera server ...")

@Pin80
Copy link

Pin80 commented Mar 21, 2024

Final edition:

#!/usr/bin/python
#based on the ideas from http://synack.me/blog/implementing-http-live-streaming
# Run this script and then launch the following pipeline:
# gst-launch videotestsrc pattern=ball ! video/x-raw-rgb, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! tcpclientsink port=9999
#updated command line
#gst-launch-1.0 videotestsrc pattern=ball ! videoconvert ! video/x-raw, framerate=15/1, width=640, height=480 !  jpegenc ! multipartmux boundary=spionisto ! #tcpclientsink port=9999

from multiprocessing import Queue
from threading import Thread
from socket import socket
from select import select
from wsgiref.simple_server import WSGIServer, make_server, WSGIRequestHandler
from socketserver import ThreadingMixIn
from wsgiref.validate import validator
import time
class MyWSGIServer(ThreadingMixIn, WSGIServer):
     pass 

def create_server(host, port, app, server_class=MyWSGIServer,  
          handler_class=WSGIRequestHandler):
     return make_server(host, port, app, server_class, handler_class) 

INDEX_PAGE = b"""
<html>
<head>
    <title>Gstreamer testing</title>
</head>
	<body>
		<h1>Test page for dummy camera with GStreamer</h1>
			<img src="http://127.0.0.1:1337/mjpeg_stream"/>
		<hr />
	</body>
</html>
"""

INDEX_PAGE2 = b"""
<html>
<head>
    <title>Gstreamer testing</title>
</head>
	<body>
		<h1>Test page for dummy camera with GStreamer</h1>
			<source src="http://127.0.0.1:1337/mjpeg_stream"type="video/jpeg"></source>
		<hr />
	</body>
</html>
"""


ERROR_404 = b"""
<html>
  <head>
    <title>404 - Not Found</title>
  </head>
  <body>
    <h1>404 - Not Found</h1>
  </body>
</html>
"""


class IPCameraApp(object):
	queues = []
		
	def __call__(self,environ, start_response):
		print("env=",environ['PATH_INFO']) 
		if environ['PATH_INFO'] == '/':
			start_response("200 OK", [
			("Content-Type", "text/html"),
			("Content-Length", str(len(INDEX_PAGE)))
			])
			return iter([INDEX_PAGE])
		elif environ['PATH_INFO'] == '/mjpeg_stream':
			return self.stream(start_response)
		else:
			start_response("404 Not Found", [
				("Content-Type", "text/html"),
				("Content-Length", str(len(ERROR_404)))
			])
			return iter([ERROR_404]) 
	def stream(self, start_response):
		start_response('200 OK', [('Content-type', 'multipart/x-mixed-replace; boundary=--spionisto')])
		q = Queue(4096)
		self.queues.append(q)
		while True:
			try:
				yield q.get()
				time.sleep(0.001)
			except:
				if q in self.queues:
				    self.queues.remove(q)
				return


def input_loop(app):
	sock = socket()
	sock.bind(('127.0.0.1', 9999))
	sock.listen(1)
	while True:
		print('Waiting for input stream')
		sd, addr = sock.accept()
		print('Accepted input stream from', addr)
		data_flag = True
		while data_flag:
			readable = select([sd], [], [], 0.1)[0]
			for s in readable:
				data = s.recv(1024)
				if not data:
				    break
				for q in app.queues:
					q.put(data)
					time.sleep(0.001)
		print('Lost input stream from', addr)

if __name__ == '__main__':

    #Launch an instance of wsgi server
    app = IPCameraApp()
    port = 1337
    print('Launching camera server on port', port)
    httpd = create_server('127.0.0.1', port, app)

    print('Launch input stream thread')
    t1 = Thread(target=input_loop, args=[app])
    #t1.setDaemon(True)
    t1.daemon = True
    t1.start()

    try:
        print('Httpd serve forever')
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.kill()
        print("Shutdown camera server ...")

@unitedtimur
Copy link

Great job

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment