Skip to content

Instantly share code, notes, and snippets.

@zougloub
Last active May 13, 2023 07:58
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save zougloub/0747f84d45bc35413c0c19584c398b3d to your computer and use it in GitHub Desktop.
Save zougloub/0747f84d45bc35413c0c19584c398b3d to your computer and use it in GitHub Desktop.
Tiny DVR for RTSP/H264 cameras
#!/usr/bin/env python
# -*- coding:utf-8 vi:noet
# RTSP/H264 simple camera DVR
__author__ = "Jérôme Carretero <cJ-tub@zougloub.eu>"
__license__ = "MIT"
import sys, io, os, re, time, datetime
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GObject, GstBase
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description="Production data analysis",
)
parser.add_argument("--url",
)
parser.add_argument("--duration",
type=float,
default=365*24*60*60,
)
parser.add_argument("--chunk-size",
type=float,
default=60*60,
)
parser.add_argument("basename",
)
try:
import argcomplete
argcomplete.autocomplete(parser)
except:
pass
args = parser.parse_args()
basename = args.basename
url = args.url
if url is None:
url = "rtsp://%s:554/user=admin&password=&channel=1&stream=0.sdp" % basename
GObject.threads_init()
Gst.init(None)
chunk_time = int(args.chunk_size*1e9)
duration = args.duration
def generator():
while True:
fn = "%s-%s.mp4" % (datetime.datetime.now().strftime("%Y%m%dT%H%M%S"), basename)
print("- %s" % fn)
yield fn
fn_gen = generator()
file = next(fn_gen)
pipeline = Gst.parse_launch("""
rtspsrc location={url} protocols=4 name=rtsp
rtsp.
! rtph264depay
! queue
! h264parse config-interval=1 name=parse
! splitmuxsink name=sink
mux="mp4mux fragment-duration=1000"
max-size-time={chunk_time}
location={file}
""".format(**locals()))
it0 = pipeline.iterate_elements()
while True:
res0, e = it0.next()
if e is None:
break
if e.name == "parse":
"""
Workaround camera PTS issue
"""
GstBase.BaseParse.set_infer_ts(e, True)
GstBase.BaseParse.set_pts_interpolation(e, True)
pipeline.set_state(Gst.State.PLAYING)
t0 = time.time()
bus = pipeline.get_bus()
try:
while True:
msg = bus.poll(Gst.MessageType.ANY, int(100*1e6))
if msg is None:
now = time.time()
if now - t0 > duration:
print("break")
pipeline.send_event(Gst.Event.new_eos())
break
continue
t = msg.type
if t == Gst.MessageType.EOS:
print("Error, EOS")
break
elif t == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print("Error: %s" % err, debug)
break
elif t == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
print("Warning: %s" % err, debug)
elif t == Gst.MessageType.STATE_CHANGED:
pass
elif t == Gst.MessageType.STREAM_STATUS:
pass
elif t == Gst.MessageType.STREAM_START:
pass
elif t == Gst.MessageType.PROGRESS:
pass
elif t == Gst.MessageType.ASYNC_DONE:
pass
elif t == Gst.MessageType.NEW_CLOCK:
pass
elif t == Gst.MessageType.ELEMENT:
"""
Assume we receive a new chunk info.
"""
e = msg.src
s = msg.get_structure()
if "splitmuxsink-fragment-closed" in s.to_string():
file = next(fn_gen)
e.set_property("location", file)
else:
print("Error, unknown message: %s: %s" % (t, msg))
break
except KeyboardInterrupt:
pipeline.send_event(Gst.Event.new_eos())
while True:
msg = bus.poll(Gst.MessageType.ANY, int(100*1e6))
if msg is None:
continue
t = msg.type
if t == Gst.MessageType.EOS:
break
finally:
pipeline.set_state(Gst.State.NULL)
@wlinInspire
Copy link

This is awesome! I had Buffer has no PTS issue and adding interpolation seems to solve it. Thank you so much

@zougloub
Copy link
Author

zougloub commented Dec 3, 2020

Hi, I'll create a proper repo for it then :)

@wlinInspire
Copy link

Can you explain why there is config-interval=1 for h264parse?

@toanqng
Copy link

toanqng commented May 13, 2023

This is awesome, Thanks for fix No PTS issue.
How can i capture image each "chunk-size"?
Thank you so much

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment