Created
November 8, 2022 12:31
-
-
Save TIS-Arne/1d2cea0d51194f515a96e5eb27f0a9dc to your computer and use it in GitHub Desktop.
Slightly improved tcam-webserver example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip3 install flask | |
import sys | |
from flask import Flask, Response | |
import logging | |
import tcam_camera | |
import time | |
app = Flask(__name__) | |
# Create the camera object and start the stream | |
camera = tcam_camera.tcam_camera() | |
camera.set_max_rate(5) | |
def imagegenerator(): | |
try: | |
while True: | |
frame = camera.snap_image(1000) | |
if frame is not None: | |
# Create the boundary between the sent jpegs | |
yield (b'--imagingsource\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
else: | |
logging.info("No frame received.") | |
finally: | |
logging.info("Stream closed") | |
def loadfile(name): | |
f = open(name,"r") | |
if f is not None: | |
return f.read() | |
return "File " + name + " not found" | |
@app.route('/mjpeg_stream') | |
def imagestream(): | |
logging.info("Starting mjpeg stream") | |
return Response( imagegenerator() , mimetype='multipart/x-mixed-replace; boundary=imagingsource') | |
@app.route('/',methods=['GET', 'POST']) | |
def index(): | |
logging.info('Send index') | |
return loadfile("index.html") | |
def main(argv): | |
loglevel = "INFO" | |
numeric_level = getattr(logging, loglevel,None) | |
logging.basicConfig(level=numeric_level) | |
logging.info("Logging enabled") | |
app.run(host='0.0.0.0', threaded=True) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import gi | |
gi.require_version("Gst", "1.0") | |
gi.require_version("Tcam", "1.0") | |
from gi.repository import GLib, GObject, Gst, Tcam | |
''' | |
A very small camera class, that creates the pipeline with an sink. | |
In the pipeline the image is rezised to 640x480 resolution. | |
The JPEG Quality is set to 60%. | |
''' | |
class tcam_camera(object): | |
def __init__(self): | |
Gst.init([]) | |
self.sample = None | |
self.samplelocked = False | |
self.newsample = False | |
self.pipeline = Gst.parse_launch("tcambin ! videorate name=videorate ! videoconvert ! videoscale ! video/x-raw, width=640, height=480 ! jpegenc quality=60 ! appsink name=sink") | |
self.appsink = self.pipeline.get_by_name('sink') | |
self.appsink.set_property("max-buffers",5) | |
self.appsink.set_property("drop",1) | |
self.appsink.set_property("emit-signals",1) | |
self.appsink.connect('new-sample', self.on_new_buffer) | |
self.videorate = self.pipeline.get_by_name("videorate") | |
self.pipeline.set_state(Gst.State.PLAYING) | |
self.pipeline.get_state(40000000) | |
self.image = None | |
def on_new_buffer(self, appsink): | |
''' | |
Callback for new images. | |
:param appsink: The sink, that called the callback | |
''' | |
self.newsample = True | |
if self.samplelocked is False: | |
try: | |
self.sample = appsink.get_property('last-sample') | |
except GLib.Error as error: | |
print("Error on_new_buffer pipeline: {0}".format(error)) | |
self.newsample = False | |
raise | |
return False | |
def wait_for_buffer(self,timeout): | |
''' Wait for a new image with timeout | |
:param timeout: wait time in second, should be a float number | |
:return: | |
''' | |
tries = 10 | |
while tries > 0 and not self.newsample: | |
tries -= 1 | |
time.sleep(float(timeout) / 10.0) | |
def snap_image(self, timeout): | |
''' | |
Snap an image from stream using a timeout. | |
:param timeout: wait time in second, should be a float number. Not used | |
:return: The image data or None. | |
''' | |
self.wait_for_buffer(timeout) | |
if( self.sample != None and self.newsample == True): | |
buf = self.sample.get_buffer() | |
mem = buf.get_all_memory() | |
success, info = mem.map(Gst.MapFlags.READ) | |
if success: | |
data = bytes(info.data) | |
mem.unmap(info) | |
self.image = data | |
return self.image | |
return None | |
def set_max_rate(self, fps: int): | |
''' | |
Set maximum frames per second to pass through | |
''' | |
self.videorate.set_property("max-rate", fps) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment