Skip to content

Instantly share code, notes, and snippets.

@willpatera
Last active June 24, 2020 12:22
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save willpatera/7984486 to your computer and use it in GitHub Desktop.
Save willpatera/7984486 to your computer and use it in GitHub Desktop.
GStreamer 1.0 + Python Tests

Setup

Modified brew formula for gstreamer gsettings-desktop-schemas. Only change made was to remove (comment out) disable-schemas-compile, which enables gst to work on MacOS 10.9 (Mavericks).

Test Scripts Using appsink

Documentation

require 'formula'
class GsettingsDesktopSchemas < Formula
homepage 'http://ftp.gnome.org/pub/GNOME/sources/gsettings-desktop-schemas/'
url 'http://ftp.gnome.org/pub/GNOME/sources/gsettings-desktop-schemas/3.10/gsettings-desktop-schemas-3.10.1.tar.xz'
sha256 '452378c4960a145747ec69f8c6a874e5b7715454df3e2452d1ff1a0a82e76811'
depends_on 'xz' => :build
depends_on 'pkg-config' => :build
depends_on 'intltool' => :build
depends_on 'glib' => :build # Yep, for glib-mkenums
depends_on 'gettext'
def install
system "./configure", "--disable-dependency-tracking",
"--prefix=#{prefix}"#,
#{}"--disable-schemas-compile"
system "make install"
end
end
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# GStreamer SDK Tutorials in Python
#
# basic-tutorial-2
#
"""
basic-tutorial-2: GStreamer concepts
http://docs.gstreamer.com/display/GstSDK/Basic+tutorial+2%3A+GStreamer+concepts
"""
import sys
from gi.repository import Gst
import cv2
import numpy
Gst.init(None)
image_arr = None
def gst_to_opencv(sample):
buf = sample.get_buffer()
caps = sample.get_caps()
print caps.get_structure(0).get_value('format')
print caps.get_structure(0).get_value('height')
print caps.get_structure(0).get_value('width')
print buf.get_size()
arr = numpy.ndarray(
(caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width'),
3),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=numpy.uint8)
return arr
def new_buffer(sink, data):
global image_arr
sample = sink.emit("pull-sample")
# buf = sample.get_buffer()
# print "Timestamp: ", buf.pts
arr = gst_to_opencv(sample)
image_arr = arr
return Gst.FlowReturn.OK
# Create the elements
source = Gst.ElementFactory.make("videotestsrc", "source")
convert = Gst.ElementFactory.make("videoconvert", "convert")
sink = Gst.ElementFactory.make("appsink", "sink")
# Create the empty pipeline
pipeline = Gst.Pipeline.new("test-pipeline")
if not source or not sink or not pipeline:
print("Not all elements could be created.")
exit(-1)
sink.set_property("emit-signals", True)
# sink.set_property("max-buffers", 2)
# # sink.set_property("drop", True)
# # sink.set_property("sync", False)
caps = Gst.caps_from_string("video/x-raw, format=(string){BGR, GRAY8}; video/x-bayer,format=(string){rggb,bggr,grbg,gbrg}")
sink.set_property("caps", caps)
sink.connect("new-sample", new_buffer, sink)
# Build the pipeline
pipeline.add(source)
pipeline.add(convert)
pipeline.add(sink)
if not Gst.Element.link(source, convert):
print("Elements could not be linked.")
exit(-1)
if not Gst.Element.link(convert, sink):
print("Elements could not be linked.")
exit(-1)
# Modify the source's properties
source.set_property("pattern", 0)
# Start playing
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state.")
exit(-1)
# Wait until error or EOS
bus = pipeline.get_bus()
# Parse message
while True:
message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
# print "image_arr: ", image_arr
if image_arr is not None:
cv2.imshow("appsink image arr", image_arr)
cv2.waitKey(1)
if message:
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error received from element %s: %s" % (
message.src.get_name(), err))
print("Debugging information: %s" % debug)
break
elif message.type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
break
elif message.type == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
print("Pipeline state changed from %s to %s." %
(old_state.value_nick, new_state.value_nick))
else:
print("Unexpected message received.")
# Free resources
pipeline.set_state(Gst.State.NULL)
import sys
from gi.repository import Gst
import cv2
import numpy
image_arr = None
Gst.init(None)
def gst_to_opencv(sample):
buf = sample.get_buffer()
caps = sample.get_caps()
# print caps.get_structure(0).get_value('format')
# print caps.get_structure(0).get_value('height')
# print caps.get_structure(0).get_value('width')
# print buf.get_size()
arr = numpy.ndarray(
(caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width'),
3),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=numpy.uint8)
return arr
def new_buffer(sink, data):
global image_arr
sample = sink.emit("pull-sample")
# buf = sample.get_buffer()
# print "Timestamp: ", buf.pts
arr = gst_to_opencv(sample)
image_arr = arr
return Gst.FlowReturn.OK
# Create the elements
source = Gst.ElementFactory.make("playbin", "source")
sink = Gst.ElementFactory.make("appsink", "sink")
# Create the empty pipeline
pipeline = Gst.Pipeline.new("test-pipeline")
if not source or not sink or not pipeline:
print("Not all elements could be created.")
exit(-1)
source.set_property("uri", "file:///Volumes/HD_Two/Users/Will/Documents/Programming/pupil/recordings/2013_12_16/000/world.avi")
sink.set_property("emit-signals", True)
caps = Gst.caps_from_string("video/x-raw, format=BGR, width=1280, height=720")
sink.set_property("caps", caps)
source.set_property("video-sink", sink)
sink.connect("new-sample", new_buffer, sink)
# Build the pipeline
pipeline.add(source)
# pipeline.add(sink)
# Start playing
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state.")
exit(-1)
# Wait until error or EOS
bus = pipeline.get_bus()
# Parse message
while True:
message = bus.timed_pop_filtered(100 * Gst.MSECOND, Gst.MessageType.STATE_CHANGED | Gst.MessageType.ERROR | Gst.MessageType.EOS)
# print "image_arr: ", image_arr
if image_arr is not None:
cv2.imshow("appsink image arr", image_arr)
cv2.waitKey(1)
_, current = source.query_position(Gst.Format.TIME)
pos_sec = float(current)/Gst.SECOND
print "Current: ",pos_sec
if pos_sec > 5:
print("\nReached 5s, performing seek back to 1s...")
pipeline.seek_simple(Gst.Format.DEFAULT, Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE, 30)
if message:
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error received from element %s: %s" % (
message.src.get_name(), err))
print("Debugging information: %s" % debug)
break
elif message.type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
break
elif message.type == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
print("Pipeline state changed from %s to %s." %
(old_state.value_nick, new_state.value_nick))
else:
print message.type
# print("Unexpected message received.")
# Free resources
pipeline.set_state(Gst.State.NULL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment