Skip to content

Instantly share code, notes, and snippets.

@misaelnieto
Created May 4, 2011 00:17
Show Gist options
  • Save misaelnieto/954509 to your computer and use it in GitHub Desktop.
Save misaelnieto/954509 to your computer and use it in GitHub Desktop.
Gunicorn with gstreamer to make a MJPEG IP Camera
#Python imports
#GStreamer imports
import pygst
pygst.require("0.10")
import gst
class MjpegCamera(object):
pipeline = gst.parse_launch("videotestsrc ! jpegenc ! multipartmux boundary=spionisto ! appsink name=endpoint")
def __init__(self):
self.endpoint = self.pipeline.get_by_name('endpoint')
self.pipeline.set_state(gst.STATE_PLAYING)
def next(self):
return self.endpoint.emit('pull-buffer')
def stop(self):
self.pipeline.set_state(gst.STATE_NULL)
def __iter__(self):
return self
#GUnicorn stuff
HTML5_PAGE = """
<html>
<head>
<title>Gstreamer testing</title>
</head>
<body>
<h1>Testing a dummy camera with GStreamer</h1>
<img src="http://localhost:8000/mjpeg_stream" class="" alt="" />
<hr />
</body>
</html>
"""
ERROR_404 = """
<html>
<head>
<title>404 - Not Found</title>
</head>
<body>
<h1>404 - Not Found</h1>
</body>
</html>
"""
def server(environ, start_response):
if environ['PATH_INFO'] == '/':
start_response("200 OK", [
("Content-Type", "text/html"),
("Content-Length", str(len(HTML5_PAGE)))
])
return iter([HTML5_PAGE])
elif environ['PATH_INFO'] == '/mjpeg_stream':
start_response("200 OK", [
("Content-Type", "multipart/x-mixed-replace; boundary=--spionisto"),
])
return iter(MjpegCamera())
else:
data = "Direccion incorrecta"
start_response("404 Not Found", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment