Skip to content

Instantly share code, notes, and snippets.

@TIS-Arne
Created November 8, 2022 12:31
Show Gist options
  • Save TIS-Arne/1d2cea0d51194f515a96e5eb27f0a9dc to your computer and use it in GitHub Desktop.
Save TIS-Arne/1d2cea0d51194f515a96e5eb27f0a9dc to your computer and use it in GitHub Desktop.
Slightly improved tcam-webserver example
# pip3 install flask
import sys
from flask import Flask, Response
import logging
import tcam_camera
import time
app = Flask(__name__)
# Create the camera object and start the stream
camera = tcam_camera.tcam_camera()
camera.set_max_rate(5)
def imagegenerator():
try:
while True:
frame = camera.snap_image(1000)
if frame is not None:
# Create the boundary between the sent jpegs
yield (b'--imagingsource\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
else:
logging.info("No frame received.")
finally:
logging.info("Stream closed")
def loadfile(name):
f = open(name,"r")
if f is not None:
return f.read()
return "File " + name + " not found"
@app.route('/mjpeg_stream')
def imagestream():
logging.info("Starting mjpeg stream")
return Response( imagegenerator() , mimetype='multipart/x-mixed-replace; boundary=imagingsource')
@app.route('/',methods=['GET', 'POST'])
def index():
logging.info('Send index')
return loadfile("index.html")
def main(argv):
loglevel = "INFO"
numeric_level = getattr(logging, loglevel,None)
logging.basicConfig(level=numeric_level)
logging.info("Logging enabled")
app.run(host='0.0.0.0', threaded=True)
if __name__ == "__main__":
main(sys.argv[1:])
import time
import gi
gi.require_version("Gst", "1.0")
gi.require_version("Tcam", "1.0")
from gi.repository import GLib, GObject, Gst, Tcam
'''
A very small camera class, that creates the pipeline with an sink.
In the pipeline the image is rezised to 640x480 resolution.
The JPEG Quality is set to 60%.
'''
class tcam_camera(object):
def __init__(self):
Gst.init([])
self.sample = None
self.samplelocked = False
self.newsample = False
self.pipeline = Gst.parse_launch("tcambin ! videorate name=videorate ! videoconvert ! videoscale ! video/x-raw, width=640, height=480 ! jpegenc quality=60 ! appsink name=sink")
self.appsink = self.pipeline.get_by_name('sink')
self.appsink.set_property("max-buffers",5)
self.appsink.set_property("drop",1)
self.appsink.set_property("emit-signals",1)
self.appsink.connect('new-sample', self.on_new_buffer)
self.videorate = self.pipeline.get_by_name("videorate")
self.pipeline.set_state(Gst.State.PLAYING)
self.pipeline.get_state(40000000)
self.image = None
def on_new_buffer(self, appsink):
'''
Callback for new images.
:param appsink: The sink, that called the callback
'''
self.newsample = True
if self.samplelocked is False:
try:
self.sample = appsink.get_property('last-sample')
except GLib.Error as error:
print("Error on_new_buffer pipeline: {0}".format(error))
self.newsample = False
raise
return False
def wait_for_buffer(self,timeout):
''' Wait for a new image with timeout
:param timeout: wait time in second, should be a float number
:return:
'''
tries = 10
while tries > 0 and not self.newsample:
tries -= 1
time.sleep(float(timeout) / 10.0)
def snap_image(self, timeout):
'''
Snap an image from stream using a timeout.
:param timeout: wait time in second, should be a float number. Not used
:return: The image data or None.
'''
self.wait_for_buffer(timeout)
if( self.sample != None and self.newsample == True):
buf = self.sample.get_buffer()
mem = buf.get_all_memory()
success, info = mem.map(Gst.MapFlags.READ)
if success:
data = bytes(info.data)
mem.unmap(info)
self.image = data
return self.image
return None
def set_max_rate(self, fps: int):
'''
Set maximum frames per second to pass through
'''
self.videorate.set_property("max-rate", fps)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment