Skip to content

Instantly share code, notes, and snippets.

@robobe
Created December 24, 2023 17:17
Show Gist options
  • Save robobe/5e61b0fdaeeea5aa60b865abdb1e72a4 to your computer and use it in GitHub Desktop.
Save robobe/5e61b0fdaeeea5aa60b865abdb1e72a4 to your computer and use it in GitHub Desktop.
python gst compositor dynamic
import gi
import logging
gi.require_version("Gst", "1.0")
from gi.repository import GObject, Gst
from threading import Thread, Event
from dataclasses import dataclass
from fastapi import FastAPI
log = logging.getLogger(__name__)
Gst.init(None)
@dataclass
class Settings():
name: str
pos_x: int
pos_y: int
class Axis:
def __init__(self):
pipe = """
compositor name=m \
sink_1::xpos=50 sink_1::ypos=50 sink_1::width=150\
sink_2::xpos=400 sink_2::ypos=50 \
! videoconvert \
! autovideosink \
videotestsrc pattern=1 \
! video/x-raw, format=I420, framerate=5/1, width=740, height=480 \
! m. \
videotestsrc name=right pattern=red \
! video/x-raw, format=I420, framerate=5/1, width=300, height=200 \
! queue \
! m.sink_2 \
videotestsrc name=left pattern=green \
! video/x-raw, format=I420, framerate=5/1, width=300, height=200 \
! queue \
! m.sink_1
"""
self.pipeline = Gst.parse_launch(pipe)
self.bus = self.pipeline.get_bus()
self.mixer = self.pipeline.get_by_name("m")
self.settings = {
1: Settings("left", pos_x=50, pos_y=50),
2: Settings("right", pos_x=400, pos_y=50)
}
def custom_message(self, name, action, index):
if index not in self.settings:
log.warning(f"camera / source index not found: {index}")
return
custom_structure = Gst.Structure.new_empty(name)
custom_structure.set_value("index", index)
custom_structure.set_value("action", action)
custom_message = Gst.Message.new_application(None, custom_structure)
self.bus.post(custom_message)
def get_mapping(self, index: int) -> str:
data = {
1: "left",
2: "right"
}
return data[index]
def run(self):
self.pipeline.set_state(Gst.State.PLAYING)
while True:
try:
message = self.bus.timed_pop(Gst.SECOND)
if message == None:
pass
elif message.type == Gst.MessageType.APPLICATION:
data = message.get_structure()
if data.get_name().startswith("start"):
index = data.get_value("index")
action = data.get_string("action")
def action_message(pad, info):
self.custom_message(action, action, index)
pad.remove_probe(info.id)
return Gst.PadProbeReturn.OK
self.mixer.get_static_pad("src").add_probe(
Gst.PadProbeType.BLOCK, action_message
)
elif message.get_structure().get_name() == "close":
index = data.get_value("index")
src = self.pipeline.get_by_name(self.settings[index].name)
if not src:
log.warning(f"source not found: {self.settings[index].name}")
continue
src.set_state(Gst.State.NULL)
mixer_pad = self.mixer.get_static_pad(f"sink_{index}")
src_pad = mixer_pad.get_peer()
src_pad.unlink(mixer_pad)
self.pipeline.remove(src)
del src
self.mixer.remove_pad(mixer_pad)
del mixer_pad
elif message.get_structure().get_name() == "do_move":
index = data.get_value("index")
src = self.pipeline.get_by_name(self.settings[index].name)
mixer_pad = self.mixer.get_static_pad(f"sink_{index}")
current_x = mixer_pad.get_property("xpos")
mixer_pad.set_property("xpos", current_x+20)
elif message.get_structure().get_name() == "add":
index = data.get_value("index")
setting = self.settings[index]
src = self.pipeline.get_by_name(self.settings[index].name)
if src:
log.warning(f"source allready exists: {self.settings[index].name}")
continue
src = Gst.ElementFactory.make(
"videotestsrc", setting.name
)
src.set_property("pattern", 0)
self.pipeline.add(src)
mixer_pad = self.mixer.request_pad_simple(f"sink_{index}")
mixer_pad.set_property("xpos", setting.pos_x)
mixer_pad.set_property("ypos", setting.pos_y)
new_src_src_pad = src.get_static_pad("src")
new_src_src_pad.link(mixer_pad)
src.set_state(Gst.State.PLAYING)
elif message.type == Gst.MessageType.EOS:
log.warning("EOS")
break
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error(err)
log.error(debug)
break
except KeyboardInterrupt:
break
self.pipeline.set_state(Gst.State.NULL)
axis = Axis()
g_thread = Thread(target=axis.run, daemon=True, name="g_thread")
g_thread.start()
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/close")
async def close(index: int):
axis.custom_message("start_close", "close", index)
@app.get("/add")
async def add(index: int):
axis.custom_message("start_add", "add", index)
@app.get("/move")
async def add(index: int):
axis.custom_message("do_move", "move", index)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment