Skip to content

Instantly share code, notes, and snippets.

@TIS-Arne
Created August 10, 2018 10:30
Show Gist options
  • Save TIS-Arne/76452cb32b3284984282da596ce3602a to your computer and use it in GitHub Desktop.
Save TIS-Arne/76452cb32b3284984282da596ce3602a to your computer and use it in GitHub Desktop.
Tiscamera OpenCV example printing the current framerate.
#!/usr/bin/env python2
# Copyright 2017 The Imaging Source Europe GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This example will show you how to receive data from gstreamer in your application
# and how to get the actual iamge data
#
import sys
import time
from list_devices import select_camera
from list_formats import select_format
import gi
gi.require_version("Tcam", "0.1")
gi.require_version("Gst", "1.0")
from gi.repository import Tcam, Gst, GLib
import cv2
import numpy as np
# Small helper function to display opencv buffers via GStreamer.
#
# The display_buffers list holds references to the buffers we pass down to
# Gstreamer. This is required because otherwise python would free the memory
# before we displayed it and we do not want to copy the data to an allocated
# buffer.
display_buffers = []
def show_img(display_input, img):
global display_buffers
bytebuffer = img.tobytes()
display_buffers.append(bytebuffer)
new_buf = Gst.Buffer.new_wrapped_full(Gst.MemoryFlags.READONLY, bytebuffer, len(bytebuffer), 0, None, lambda x: display_buffers.pop(0))
display_input.emit("push-buffer", new_buf)
last_ts = None
counter = 0
def callback(sink, display_input):
"""
This function will be called in a separate thread when our appsink
says there is data for us."""
sample = sink.emit("pull-sample")
global last_ts
global counter
if sample:
buf = sample.get_buffer()
caps = sample.get_caps()
width = caps[0].get_value("width")
height = caps[0].get_value("height")
if last_ts is None:
# Buffer timestamps, measured in nanoseconds
last_ts = buf.pts
counter += 1
if counter == 30:
dt = buf.pts - last_ts
seconds = dt * 1e-9 # convert to seconds
print("30 frames received in %.2f seconds ==> %.1f FPS" % (seconds, 30 / seconds))
counter = 0
last_ts = buf.pts
try:
res, mapinfo = buf.map(Gst.MapFlags.READ)
# actual image buffer and size
# data = mapinfo.data
# size = mapinfo.size
# Create a numpy array from the data
img_array = np.asarray(bytearray(mapinfo.data), dtype=np.uint8)
# Give the array the correct dimensions of the video image
img = img_array.reshape((height, width))
# Perform opencv operations on the image data
img = cv2.medianBlur(img, 5)
th = cv2.adaptiveThreshold(img, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 11, 2)
# Show the result via our display pipeline
show_img(display_input, th)
finally:
buf.unmap(mapinfo)
return Gst.FlowReturn.OK
def main():
Gst.init(sys.argv) # init gstreamer
# We create a source element to retrieve a device list through it
source = Gst.ElementFactory.make("tcambin")
serial = select_camera(source)
if serial is not None:
source.set_property("serial", serial)
# Define the format of the video buffers that will get passed to opencv
TARGET_FORMAT = "video/x-raw,width=640,height=480,format=GRAY8"
# Ask the user for the format that should be used for capturing
fmt = select_format(source.get_by_name("tcambin-source"))
# If the user selected a bayer format, we change it to BGRx so that the
# tcambin will decode the bayer pattern to a color image
if fmt.get_name() == "video/x-bayer":
fmt.set_name("video/x-raw")
fmt.set_value("format", "BGRx")
# Use a capsfilter to determine the video format of the camera source
capsfilter = Gst.ElementFactory.make("capsfilter")
capsfilter.set_property("caps", Gst.Caps.from_string(fmt.to_string()))
# Add a small queue. Everything behind this queue will run in a separate
# thread.
queue = Gst.ElementFactory.make("queue")
queue.set_property("leaky", True)
queue.set_property("max-size-buffers", 2)
# Add a videoconvert and a videoscale element to convert the format of the
# camera to the target format for opencv
convert = Gst.ElementFactory.make("videoconvert")
scale = Gst.ElementFactory.make("videoscale")
# Add an appsink. This element will receive the converted video buffers and
# pass them to opencv
output = Gst.ElementFactory.make("appsink")
output.set_property("caps", Gst.Caps.from_string(TARGET_FORMAT))
output.set_property("emit-signals", True)
pipeline = Gst.Pipeline.new()
# Add all elements
pipeline.add(source)
pipeline.add(capsfilter)
pipeline.add(queue)
pipeline.add(convert)
pipeline.add(scale)
pipeline.add(output)
# Link the elements
source.link(capsfilter)
capsfilter.link(queue)
queue.link(convert)
convert.link(scale)
scale.link(output)
# Usually one would use cv2.imgshow(...) to display an image but this is
# tends to hang in threaded environments. So we create a small display
# pipeline which we could use to display the opencv buffers.
display_pipeline = Gst.parse_launch("appsrc name=src ! videoconvert ! ximagesink")
display_input = display_pipeline.get_by_name("src")
display_input.set_property("caps", Gst.Caps.from_string(TARGET_FORMAT))
output.connect("new-sample", callback, display_input)
display_pipeline.set_state(Gst.State.PLAYING)
pipeline.set_state(Gst.State.PLAYING)
print("Press Ctrl-C to stop")
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print("Ctrl-C pressed, terminating")
# this stops the pipeline and frees all resources
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()
@TIS-Arne
Copy link
Author

This example needs to be placed in the examples/python folder.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment