Skip to content

Instantly share code, notes, and snippets.

@sphaero
Last active December 23, 2023 09:50
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sphaero/ddd11b6286605bd7e973 to your computer and use it in GitHub Desktop.
Save sphaero/ddd11b6286605bd7e973 to your computer and use it in GitHub Desktop.
Gstreamer seamless loop test
#/usr/bin/env python3
#
# Seamless loop test
# Copyright (c) 2015 Arnaud Loonstra <arnaud@sphaero.org>
#
# This is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# There's a bug in gstreamer-vaapi <0.5.10 see: https://bugzilla.gnome.org/show_bug.cgi?id=743035
# workaround is export LIBVA_DRIVER_NAME=bla to make vaapi fail en revert to software decoding
#
# test video:
# https://github.com/raspberrypi/firmware/raw/master/opt/vc/src/hello_pi/hello_video/test.h264
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
print(Gst.version())
import argparse
import sys
import os
def bus_call(bus, msg, *args):
"""
handling messages on the gstreamer bus
"""
global prerolled, playbin, loop
if msg.type == Gst.MessageType.ASYNC_DONE:
if not prerolled:
print("Initial seek ...");
flags = Gst.SeekFlags.FLUSH | Gst.SeekFlags.SEGMENT
playbin.seek_simple (Gst.Format.TIME, flags, 0);
prerolled = True
elif msg.type == Gst.MessageType.SEGMENT_DONE:
print("Looping...")
flags = Gst.SeekFlags.SEGMENT
playbin.seek_simple (Gst.Format.TIME, flags, 0);
elif msg.type == Gst.MessageType.ERROR:
print(msg.parse_error())
loop.quit() # quit.... (better restart app?)
return True
parser = argparse.ArgumentParser(description='Loop a videofile seamlessly.')
parser.add_argument('file', type=str, help='file to play')
args = parser.parse_args()
if not os.path.isfile(args.file):
print("File {0} doesn't exist".format(file))
sys.exit(1)
GObject.threads_init()
loop = GObject.MainLoop()
Gst.init(None)
# create playbin pipeline
prerolled = False
playbin = Gst.ElementFactory.make('playbin', 'playbin0')
bus = playbin.get_bus()
bus.add_watch(0, bus_call, loop) # 0 == GLib.PRIORITY_DEFAULT
# set properties of elements
playbin.set_property("uri", "file://{0}".format(os.path.abspath(args.file)))
playbin.set_state(Gst.State.PLAYING)
try:
loop.run()
except Exception as e:
print(e)
finally:
playbin.set_state(Gst.State.NULL)
@tylercubell
Copy link

To get it to work, I had to make two changes:

Problem: raise NotInitialized("Please call Gst.init(argv) before using GStreamer")
Solution: Comment out line 28.

Problem: GLib.Error('Could not open resource for reading.', 'gst-resource-error-quark', 5)
Solution: Add an extra forward slash after file:// on line 73.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment