Skip to content

Instantly share code, notes, and snippets.

@ak64th
Last active June 15, 2023 10:57
Show Gist options
  • Save ak64th/edce315ecf4ba7175b4abf264a49274b to your computer and use it in GitHub Desktop.
Save ak64th/edce315ecf4ba7175b4abf264a49274b to your computer and use it in GitHub Desktop.
Save rtsp video stream data into a .mp4 file without sounds, re-encoding with libx264.
import sys
import gi
gi.require_version('Gst', '1.0')
def rtspsrc_pad_added(element, pad, target):
caps = pad.query_caps(None).to_string()
if caps.startswith('application/x-rtp, media=(string)video'):
pad.link(target.get_static_pad('sink'))
def decodebin_pad_added(element, pad, target):
caps = pad.query_caps(None).to_string()
if caps.startswith('video/x-raw'):
pad.link(target.get_static_pad('sink'))
def main():
from gi.repository import Gst
Gst.init(None)
player = Gst.Pipeline.new('player')
rtspsrc = Gst.ElementFactory.make('rtspsrc', 'rtspsrc')
rtspsrc.set_property('location', 'rtsp://username:password@192.168.0.100/av_stream')
rtspsrc.set_property('protocols', 'tcp')
decodebin = Gst.ElementFactory.make('decodebin', 'decodebin')
encoder = Gst.ElementFactory.make('x264enc', 'encoder')
capsfilter = Gst.ElementFactory.make('capsfilter', 'capsfilter')
encoder_caps = Gst.Caps.from_string('video/x-h264,profile=main,stream_format=byte-stream,tune=zerolatency ')
capsfilter.set_property('caps', encoder_caps)
mp4mux = Gst.ElementFactory.make('mp4mux', 'mp4mux')
videosink = Gst.ElementFactory.make('filesink', 'videosink')
videosink.set_property('location', 'bleh.mp4')
player.add(rtspsrc)
player.add(decodebin)
player.add(encoder)
player.add(capsfilter)
player.add(mp4mux)
player.add(videosink)
rtspsrc.connect('pad-added', rtspsrc_pad_added, decodebin)
decodebin.connect('pad-added', decodebin_pad_added, encoder)
encoder.link(capsfilter)
capsfilter.link(mp4mux)
mp4mux.link(videosink)
try:
player.set_state(Gst.State.PLAYING)
bus = player.get_bus()
while True:
msg = bus.timed_pop_filtered(1000000000, Gst.MessageType.ERROR | Gst.MessageType.EOS)
if msg:
t = msg.type
if t == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print('ERROR:', msg.src.get_name(), '\n', err.message)
if dbg:
print('Debugging info:', dbg)
elif t == Gst.MessageType.EOS:
print('End-Of-Stream reached')
except KeyboardInterrupt:
rtspsrc.send_event(Gst.Event.new_eos())
rtspsrc.set_state(Gst.State.NULL)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment