Skip to content

Instantly share code, notes, and snippets.

@ak64th
Created June 16, 2023 06:49
Show Gist options
  • Save ak64th/74ba6834415db128be68f5743f0bf24a to your computer and use it in GitHub Desktop.
Save ak64th/74ba6834415db128be68f5743f0bf24a to your computer and use it in GitHub Desktop.
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
NAL_BREAKER = b'\x00\x00\x00\x01'
def rtspsrc_pad_added(element, pad, target):
caps = pad.query_caps(None).to_string()
if caps.startswith('application/x-rtp, media=(string)video'):
pad.link(target.get_static_pad('sink'))
def decodebin_pad_added(element, pad, target):
caps = pad.query_caps(None).to_string()
if caps.startswith('video/x-raw'):
pad.link(target.get_static_pad('sink'))
def appsink_new_sample(appsink):
# use action-signal instead of appsink.pull_sample()
sample = appsink.emit("pull-sample")
if isinstance(sample, Gst.Sample):
buffer = sample.get_buffer()
ret, map_info = buffer.map(Gst.MapFlags.READ)
try:
print(NAL_BREAKER in map_info.data, len(map_info.data))
print(map_info.data)
finally:
buffer.unmap(map_info) # avoid memory leaking
return Gst.FlowReturn.OK
return Gst.FlowReturn.ERROR
def main():
Gst.init(None)
player = Gst.Pipeline.new('player')
rtspsrc = Gst.ElementFactory.make('rtspsrc', 'rtspsrc')
rtspsrc.set_property('location', 'rtsp://username:password@192.168.0.100/av_stream')
rtspsrc.set_property('protocols', 'tcp')
rtspsrc.set_property('latency', 500)
decodebin = Gst.ElementFactory.make('decodebin', 'decodebin')
encoder = Gst.ElementFactory.make('x264enc', 'encoder')
capsfilter = Gst.ElementFactory.make('capsfilter', 'capsfilter')
# must set stream-format=byte-stream
encoder_caps = Gst.Caps.from_string('video/x-h264,profile=main,stream-format=byte-stream,tune=zerolatency ')
capsfilter.set_property('caps', encoder_caps)
appsink = Gst.ElementFactory.make('appsink', 'appsink')
appsink.set_property('emit-signals', True)
player.add(rtspsrc)
player.add(decodebin)
player.add(encoder)
player.add(capsfilter)
player.add(appsink)
rtspsrc.connect('pad-added', rtspsrc_pad_added, decodebin)
decodebin.connect('pad-added', decodebin_pad_added, encoder)
encoder.link(capsfilter)
capsfilter.link(appsink)
appsink.connect('new-sample', appsink_new_sample)
try:
player.set_state(Gst.State.PLAYING)
bus = player.get_bus()
while True:
msg = bus.timed_pop_filtered(1000000000, Gst.MessageType.ERROR | Gst.MessageType.EOS)
if msg:
t = msg.type
if t == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print('ERROR:', msg.src.get_name(), '\n', err.message)
if dbg:
print('Debugging info:', dbg)
elif t == Gst.MessageType.EOS:
print('End-Of-Stream reached')
except KeyboardInterrupt:
rtspsrc.send_event(Gst.Event.new_eos())
rtspsrc.set_state(Gst.State.NULL)
player.set_state(Gst.State.NULL)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment