Skip to content

Instantly share code, notes, and snippets.

@MaZderMind
Created November 28, 2018 21:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MaZderMind/16d2a74e5b0e650a59d18bebf063964c to your computer and use it in GitHub Desktop.
Save MaZderMind/16d2a74e5b0e650a59d18bebf063964c to your computer and use it in GitHub Desktop.
quick and dirty gstreamer videomixer, switching between 1 of 20 sources
#!/usr/bin/env python3
import logging
import random
import signal
import sys
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstNet', '1.0')
from gi.repository import Gst, GObject
# check min-version
minGst = (1, 5)
minPy = (3, 0)
Gst.init([])
if Gst.version() < minGst:
raise Exception('GStreamer version', Gst.version(),
'is too old, at least', minGst, 'is required')
if sys.version_info < minPy:
raise Exception('Python version', sys.version_info,
'is too old, at least', minPy, 'is required')
# init GObject & Co. before importing local classes
GObject.threads_init()
class Pipeline(object):
n_sources = 20
active_source = 0
state_dirty = False
def __init__(self):
pipeline = ""
for n in range(0, self.n_sources):
color = "%06x" % random.randint(0, 0xffffff)
pipeline += """
videotestsrc is-live=true foreground-color=0x00{color} ! video/x-raw,width=1920,height=1080 ! mix.
""".format(
color=color
)
pipeline += """
compositor name=mix ! identity name=sig ! ximagesink
"""
print(pipeline)
self.pipeline = Gst.parse_launch(pipeline)
self.apply_mixer_state()
sig = self.pipeline.get_by_name('sig')
sig.connect('handoff', self.on_handoff)
def start(self):
self.pipeline.set_state(Gst.State.PLAYING)
def select_next_source(self):
self.active_source += 1
if self.active_source > self.n_sources:
self.active_source = 0
self.state_dirty = True
def on_handoff(self, object, buffer):
if self.state_dirty:
self.state_dirty = False
self.apply_mixer_state()
def apply_mixer_state(self):
print("switching to source ", self.active_source)
for n in range(0, self.n_sources):
mixerpad = (self.pipeline
.get_by_name('mix')
.get_static_pad('sink_%u' % n))
alpha = 1.0 if n == self.active_source else 0.0
mixerpad.set_property('alpha', alpha)
class CarlMix(object):
def __init__(self):
self.mainloop = GObject.MainLoop()
self.pipeline = Pipeline()
def next_input(self):
self.pipeline.select_next_source()
return True
def run(self):
self.pipeline.start()
GObject.timeout_add(1000, self.next_input)
try:
self.mainloop.run()
except KeyboardInterrupt:
return
# run mainclass
def main():
carl_mix = CarlMix()
carl_mix.run()
if __name__ == '__main__':
try:
main()
except RuntimeError as e:
logging.error(str(e))
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment