Skip to content

Instantly share code, notes, and snippets.

@nik123
Last active November 7, 2023 17:53
Show Gist options
  • Save nik123/fb6c653bbe3081a097ae56a7dd1b8212 to your computer and use it in GitHub Desktop.
Save nik123/fb6c653bbe3081a097ae56a7dd1b8212 to your computer and use it in GitHub Desktop.
Delayed video recording in gstreamer
import gi
from time import sleep, time
from threading import Thread
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib # noqa
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("EOS message. Quitting loop.")
loop.quit()
elif t == Gst.MessageType.ERROR:
print(f"ERROR message: {message}. Quitting loop.")
err, debug = message.parse_error()
loop.quit()
return True
def main():
Gst.init(None)
loop = GLib.MainLoop()
thread = Thread(target=loop.run)
thread.start()
pipeline = Gst.ElementFactory.make("pipeline")
src = Gst.ElementFactory.make("videotestsrc")
tee = Gst.ElementFactory.make("tee")
q = Gst.ElementFactory.make("queue")
convert = Gst.ElementFactory.make("videoconvert")
encode = Gst.ElementFactory.make("x264enc")
mux = Gst.ElementFactory.make("mp4mux")
sink = Gst.ElementFactory.make("filesink")
sink.set_property("location", "output.mp4")
pipeline.add(src, convert, encode, mux, tee, q, sink)
src.link(tee)
tee.link(q)
q.link(convert)
convert.link(encode)
encode.link(mux)
mux.link(sink)
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
pipeline.set_state(Gst.State.NULL)
return
recording = False
def start_recording():
print("start_recording called")
# Create extra elements for recording
q2 = Gst.ElementFactory.make("queue")
convert2 = Gst.ElementFactory.make("videoconvert")
encode2 = Gst.ElementFactory.make("x264enc")
mux2 = Gst.ElementFactory.make("mp4mux")
sink2 = Gst.ElementFactory.make("filesink")
sink2.set_property("location", "output2.mp4")
pipeline.add(q2, convert2, encode2, mux2, sink2)
q2.link(convert2)
convert2.link(encode2)
encode2.link(mux2)
mux2.link(sink2)
q2.sync_state_with_parent()
convert2.sync_state_with_parent()
encode2.sync_state_with_parent()
mux2.sync_state_with_parent()
sink2.sync_state_with_parent()
tee_pad_templ = tee.get_pad_template("src_%u")
tee_pad = tee.request_pad(tee_pad_templ)
sink_pad = q2.get_static_pad("sink")
tee_pad.link(sink_pad)
try:
start_ts = time()
while loop.is_running():
sleep(0.01)
if time() - start_ts > 2 and not recording:
recording = True
print("Adding 'start_recording' callback")
GLib.idle_add(start_recording)
except KeyboardInterrupt:
print("KeyboardInterrupt. Sending eos...")
pipeline.send_event(Gst.Event.new_eos())
while loop.is_running():
sleep(0.01)
finally:
pipeline.set_state(Gst.State.NULL)
loop.quit()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment