Skip to content

Instantly share code, notes, and snippets.

@yoelk
Created June 20, 2018 19:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yoelk/9d57fac416fea8017e0c233dffaf67a5 to your computer and use it in GitHub Desktop.
Save yoelk/9d57fac416fea8017e0c233dffaf67a5 to your computer and use it in GitHub Desktop.
modified tis-camera python example
import numpy
import gi
from collections import namedtuple
import cv2
gi.require_version("Gst", "1.0")
gi.require_version("Tcam", "0.1")
from gi.repository import Tcam, Gst, GLib, GObject
class TIS(object):
'The Imaging Source Camera'
def __init__(self, serial, width, height, numerator, denumerator, color):
Gst.init([])
self.height = height
self.width = width
self.sample = None
self.samplelocked = False
self.newsample = False
format = "BGRx"
if (color == False):
format = "GRAY8"
p = 'tcambin serial="%s" name=source ! video/x-raw,format=%s,width=%d,height=%d,framerate=%d/%d' % (
serial, format, width, height, numerator, denumerator,)
p += ' ! videoconvert ! appsink name=sink'
print(p)
try:
self.pipeline = Gst.parse_launch(p)
except GLib.Error as error:
print("Error creating pipeline: {0}".format(err))
raise
self.pipeline.set_state(Gst.State.READY)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
# Query a pointer to our source, so we can set properties.
self.source = self.pipeline.get_by_name("source")
# Query a pointer to the appsink, so we can assign the callback function.
self.appsink = self.pipeline.get_by_name("sink")
self.appsink.set_property("max-buffers", 5)
self.appsink.set_property("drop", 1)
self.appsink.set_property("emit-signals", 1)
self.appsink.connect('new-sample', self.on_new_buffer)
def on_new_buffer(self, appsink):
self.newsample = True
if (self.samplelocked == False):
try:
self.sample = appsink.get_property('last-sample')
except GLib.Error as error:
print("Error on_new_buffer pipeline: {0}".format(err))
raise
return False
def Start_pipeline(self):
try:
self.pipeline.set_state(Gst.State.PLAYING)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
except GLib.Error as error:
print("Error starting pipeline: {0}".format(err))
raise
def Get_image(self):
# Sample code from https://gist.github.com/cbenhagen/76b24573fa63e7492fb6#file-gst-appsink-opencv-py-L34
if (self.sample != None and self.newsample == True):
self.samplelocked = True
buf = self.sample.get_buffer()
caps = self.sample.get_caps()
bpp = 4;
if (caps.get_structure(0).get_value('format') == "BGRx"):
bpp = 4;
if (caps.get_structure(0).get_value('format') == "GRAY8"):
bpp = 1;
self.img_mat = numpy.ndarray(
(caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width'),
bpp),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=numpy.uint8)
self.newsample = False
self.samplelocked = False
return self.img_mat
return None
def Stop_pipeline(self):
self.pipeline.set_state(Gst.State.PAUSED)
self.pipeline.set_state(Gst.State.READY)
self.pipeline.set_state(Gst.State.NULL)
Gst.init([])
serial = "20810358"
################################################
# ADDING THESE TWO LINES MAKES THE CODE NOT WORK
tcam1 = Gst.ElementFactory.make('tcamsrc')
tcam1.set_property("serial", "20810358")
################################################
LeftCam = TIS(serial, 640, 480, 15, 1, False)
cv2.namedWindow('Left Camera', 1)
# Start the pipeline
LeftCam.Start_pipeline()
cv2.waitKey(1000)
error = 0
print('Press Esc to stop')
lastkey = 0
try:
while (lastkey != 27):
# Grab an image
LeftImage = LeftCam.Get_image()
if LeftImage is not None:
error = 0
# cv2.imwrite('img.jpg' , LeftImage)
# show the image
cv2.imshow('Left Camera', LeftImage)
else:
print("No left image reveived ")
error = error + 1
# wait. The wait period should be somewhat longer than 1000 / fps.
lastkey = cv2.waitKey(67) # 67 = 1000 / 15 fps
except KeyboardInterrupt:
cv2.destroyWindow('Window')
# Stop the pipeline and clean ip
LeftCam.Stop_pipeline()
print('Program ended')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment