Skip to content

Instantly share code, notes, and snippets.

@zhuker
Last active October 3, 2020 23:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zhuker/dfaddd22f6acd6aa410f500cbc72fecc to your computer and use it in GitHub Desktop.
Save zhuker/dfaddd22f6acd6aa410f500cbc72fecc to your computer and use it in GitHub Desktop.
import datetime
import threading
import traceback
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
def debug(*args) -> None:
print(datetime.datetime.now().time().isoformat(), threading.current_thread().name, *args)
# doesnt work
mipi_camera = '''nvarguscamerasrc name=front do-timestamp=true silent=false
! video/x-raw(memory:NVMM), width=(int)1280, height=(int)720, format=(string)NV12, framerate=30/1
! nvvidconv
! queue
'''
# works
usb_camera = '''
v4l2src device=/dev/video1 do-timestamp=true name=front
! image/jpeg,width=1280,height=720,framerate=30/1
! jpegdec
! videoconvert
! nvvidconv
! queue
'''
pipeline_desc = f'''
nvcompositor name=comp
sink_0::xpos=0 sink_0::ypos=0 sink_0::width=1280 sink_0::height=1080
sink_1::xpos=0 sink_1::ypos=0 sink_1::width=1280 sink_1::height=720
videotestsrc is-live=true do-timestamp=true name=background !
video/x-raw,width=320,height=180,framerate=30/1 !
nvvidconv !
comp.sink_0
comp. ! nvegltransform ! nveglglessink
'''
# works
camera_desc = usb_camera
# doesnt work
camera_desc = mipi_camera
Gst.init(None)
pipe = Gst.parse_launch(pipeline_desc)
bus = pipe.get_bus()
comp = pipe.get_by_name('comp')
def _pad_added(element, pad):
debug('_pad_added', element.name, pad.name)
srcbin = Gst.parse_bin_from_description(camera_desc, True)
static_pad = srcbin.get_static_pad('src')
pipe.add(srcbin)
pipe.sync_children_states()
static_pad.link(pad)
comp.connect('pad-added', _pad_added)
pipe.set_state(Gst.State.PLAYING)
def connect_front():
debug("connecting front camera")
comp.get_request_pad('sink_1')
gloop = GObject.MainLoop()
GLib.timeout_add(3000, connect_front)
try:
gloop.run()
except:
traceback.print_exc()
pipe.send_event(Gst.Event.new_eos())
keep_polling = True
while keep_polling:
message = bus.poll(Gst.MessageType.ANY, 1005000000)
debug(message)
keep_polling = message is not None
pipe.set_state(Gst.State.NULL)
@chalesguo
Copy link

Can the program be executed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment