Last active
May 13, 2019 21:33
-
-
Save dbousamra/cbd5c241ce66826caf99972d06caf740 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi | |
gi.require_version("Gst", "1.0") | |
from gi.repository import Gst, GstApp, GObject | |
Gst.init(None) | |
class CameraPipeline: | |
def __init__(self): | |
self.mainloop = GObject.MainLoop() | |
self.pipeline = Gst.Pipeline() | |
# Forward messages from the bins | |
self.pipeline.set_property("message-forward", True) | |
self.bus = self.pipeline.get_bus() | |
self.bus.add_signal_watch() | |
self.bus.connect("message::error", self.on_error) | |
# Src | |
self.src = Gst.ElementFactory.make("videotestsrc") | |
# Appsink | |
self.appsink = Gst.ElementFactory.make("appsink") | |
self.appsink.set_property("emit-signals", True) | |
self.appsink.connect("new-sample", self.on_new_src_buffer) | |
# Appsrc | |
self.metadata_src = Gst.ElementFactory.make("appsrc") | |
# Appsink | |
self.metadata_sink = Gst.ElementFactory.make("appsink") | |
self.metadata_sink.set_property("emit-signals", True) | |
self.metadata_sink.connect("new-sample", self.on_new_metadata_buffer) | |
self.pipeline.add(self.src) | |
self.pipeline.add(self.appsink) | |
self.pipeline.add(self.metadata_src) | |
self.pipeline.add(self.metadata_sink) | |
self.src.link(self.appsink) | |
self.metadata_src.link(self.metadata_sink) | |
def on_new_src_buffer(self, appsink): | |
sample = GstApp.AppSink.pull_sample(appsink) | |
GstApp.AppSrc.push_sample(self.metadata_src, sample) | |
print("Pushed to metadata_src") | |
return Gst.FlowReturn.OK | |
def on_new_metadata_buffer(self, appsink): | |
print("Pulled in metadata_sink") | |
return Gst.FlowReturn.OK | |
def on_error(self, bus, msg): | |
print("on_error():", msg.parse_error()) | |
def run(self): | |
self.pipeline.set_state(Gst.State.PLAYING) | |
self.mainloop.run() | |
if __name__ == "__main__": | |
pipeline = CameraPipeline() | |
pipeline.run() |
For anyone wondering, the fix for the app is to set the async=false on the appsinks. A pipeline won't preroll until all elements have received a buffer.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
These are the messages on the bus: