Created
March 30, 2021 06:30
-
-
Save plaes/bf07da80a6b2dc9819e8dec94154dd8d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import random | |
import argparse | |
import gi | |
import logging | |
gi.require_version("GLib", "2.0") | |
gi.require_version("Gst", "1.0") | |
elements = "agingtv,videoflip,vertigotv,gaussianblur,shagadelictv,edgetv,identity,exclusion,navigationtest".split( | |
"," | |
) | |
from gi.repository import Gst, GLib | |
class Main: | |
def __init__(self): | |
Gst.init(None) | |
self.pipeline = Gst.Pipeline() | |
src = Gst.ElementFactory.make("videotestsrc") | |
src.set_property("is-live", True) | |
filter1 = Gst.ElementFactory.make("capsfilter") | |
## XXX: set caps | |
# gst_util_set_object_arg (G_OBJECT (filter1), "caps", | |
# "video/x-raw, width=320, height=240, " | |
# "format={ I420, YV12, YUY2, UYVY, AYUV, Y41B, Y42B, " | |
# "YVYU, Y444, v210, v216, NV12, NV21, UYVP, A420, YUV9, YVU9, IYU1 }"); | |
q1 = Gst.ElementFactory.make("queue") | |
self.blockpad = q1.get_static_pad("src") | |
self.current_effect = Gst.ElementFactory.make("vertigotv") | |
self.conv_before = Gst.ElementFactory.make("videoconvert") | |
self.conv_after = Gst.ElementFactory.make("videoconvert") | |
q2 = Gst.ElementFactory.make("queue") | |
# Not used ? | |
## filter2 = Gst.ElementFactory.make("capsfilter") | |
## XXX: set caps | |
# gst_util_set_object_arg (G_OBJECT (filter1), "caps", | |
# "video/x-raw, width=320, height=240, " | |
# "format={ RGBx, BGRx, xRGB, xBGR, RGBA, BGRA, ARGB, ABGR, RGB, BGR }"); | |
sink = Gst.ElementFactory.make("waylandsink") | |
## | |
self.pipeline.add(src) | |
self.pipeline.add(filter1) | |
self.pipeline.add(q1) | |
self.pipeline.add(self.conv_before) | |
self.pipeline.add(self.current_effect) | |
self.pipeline.add(self.conv_after) | |
self.pipeline.add(q2) | |
self.pipeline.add(sink) | |
src.link(filter1) | |
filter1.link(q1) | |
q1.link(self.conv_before) | |
self.conv_before.link(self.current_effect) | |
self.current_effect.link(self.conv_after) | |
self.conv_after.link(q2) | |
q2.link(sink) | |
self.pipeline.set_state(Gst.State.PLAYING) | |
GLib.timeout_add_seconds(1, self.change_element) | |
GLib.MainLoop().run() | |
def change_element(self, user_data=None): | |
self.blockpad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, self.probe_callback) | |
return True | |
def probe_callback(self, pad, info, user_data=None): | |
# Unlock pad/probe ? | |
pad.remove_probe(info.id) | |
# Install new probe for EOS.. | |
srcpad = self.current_effect.get_static_pad("src") | |
srcpad.add_probe( | |
Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, | |
self.event_probe_callback, | |
) | |
# srcpad.unref() | |
# del srcpad | |
# ..and push EOS into element, | |
# the probe fill be fired when EOS leaves the effect and has drained all its data | |
sinkpad = self.current_effect.get_static_pad("sink") | |
sinkpad.send_event(Gst.Event.new_eos()) | |
# sinkpad.unref() | |
# del sinkpad | |
return Gst.PadProbeReturn.OK | |
def event_probe_callback(self, pad, info, user_data=None): | |
if info.get_event().type != Gst.EventType.EOS: | |
return Gst.PadProbeReturn.PASS | |
pad.remove_probe(info.id) | |
## Clean up current effect | |
self.current_effect.set_state(Gst.State.NULL) | |
self.pipeline.remove(self.current_effect) | |
self.current_effect.unref() | |
del self.current_effect | |
# Add and link new effect element | |
next_effect = Gst.ElementFactory.make(random.choice(elements)) | |
self.pipeline.add(next_effect) | |
self.conv_before.link(next_effect) | |
next_effect.link(self.conv_after) | |
next_effect.set_state(Gst.State.PLAYING) | |
self.current_effect = next_effect | |
return Gst.PadProbeReturn.DROP | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.DEBUG) | |
parser = argparse.ArgumentParser() | |
args = parser.parse_args() | |
Main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment