Skip to content

Instantly share code, notes, and snippets.

@tylercubell
Created February 7, 2019 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tylercubell/89a1c9738c892c6e5b101e57a9c41748 to your computer and use it in GitHub Desktop.
Save tylercubell/89a1c9738c892c6e5b101e57a9c41748 to your computer and use it in GitHub Desktop.
GStreamer Python Custom Message
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from threading import Thread, Event
Gst.init(None)
class Main:
def __init__(self):
self.pipeline = Gst.Pipeline.new("pipeline")
self.bus = self.pipeline.get_bus()
self.videotestsrc = Gst.ElementFactory.make("videotestsrc", "videotestsrc")
self.pipeline.add(self.videotestsrc)
self.autovideosink = Gst.ElementFactory.make("autovideosink", "autovideosink")
self.pipeline.add(self.autovideosink)
self.videotestsrc.link(self.autovideosink)
def send_custom_message(self):
while not self.stop_event.is_set():
self.stop_event.wait(1)
if self.stop_event.is_set():
break
custom_structure = Gst.Structure.new_empty("custom_message")
custom_message = Gst.Message.new_application(None, custom_structure)
self.bus.post(custom_message)
def run(self):
self.pipeline.set_state(Gst.State.PLAYING)
self.stop_event = Event()
self.change_source_thread = Thread(target=self.send_custom_message)
self.change_source_thread.start()
while True:
try:
message = self.bus.timed_pop(Gst.SECOND)
if message == None:
pass
elif message.type == Gst.MessageType.APPLICATION:
print(message.get_structure().get_name())
elif message.type == Gst.MessageType.EOS:
break
elif message.type == Gst.MessageType.ERROR:
break
except KeyboardInterrupt:
break
self.stop_event.set()
self.pipeline.set_state(Gst.State.NULL)
start = Main()
start.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment