Skip to content

Instantly share code, notes, and snippets.

@jackersson
Last active March 5, 2019 14:59
Show Gist options
  • Save jackersson/1aa58b70690330777240aab90af9c7d2 to your computer and use it in GitHub Desktop.
Save jackersson/1aa58b70690330777240aab90af9c7d2 to your computer and use it in GitHub Desktop.
"""
python3 gst_video_split.py -f "/media/taras/Transcend/Dataai/OBI/test_with_overlay/00000004063000000.mp4" -o /media/taras/Transcend/Dataai/OBI/test_with_overlay/output -t 60
python3 gst_video_split.py -f "/media/taras/Transcend/Dataai/OBI/test/00000004063000000.mp4" -o /media/taras/Transcend/Dataai/OBI/test_with_overlay/output -t 60
"""
import os
import datetime
import traceback
import subprocess
import time
import cv2
import timeit
import argparse
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GObject, GstBase
GObject.threads_init()
Gst.init(None)
ap = argparse.ArgumentParser()
ap.add_argument("-f", "--file", required=True, help="Path to video file")
ap.add_argument("-o", "--output", default="output",
required=False, help="Output folder with video")
ap.add_argument("-b", "--bitrate", default=1024,
required=False, help="Bitrate")
ap.add_argument("-t", "--maxtime", default=60,
required=False, help="Video files max size time in seconds")
args = vars(ap.parse_args())
def create_dir(directory):
if not os.path.exists(directory):
os.makedirs(directory)
IN_LOCATION = args["file"] # "/media/taras/Transcend/Dataai/OBI/test_with_overlay/00000004063000000.mp4"
assert os.path.exists(IN_LOCATION)
OUT_LOCATION = os.path.abspath(args["output"])
create_dir(OUT_LOCATION)
OUT_LOCATION = os.path.join(OUT_LOCATION, "video%05d.mp4")
BITRATE = args['bitrate']
MAX_SIZE_TIME = int(args['maxtime']) * 10**9
FRAME_WIDTH = 1344
FRAME_HEIGHT = 520
def get_frames_count(filename: str) -> int:
"""
Returns frames count from video file
Arguments:
filename: str
Returns:
frame_count: int
"""
assert os.path.exists(filename), 'Invalid filename {}'.format(filename)
cap = cv2.VideoCapture(filename)
count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
return count
FRAMES_COUNT = get_frames_count(IN_LOCATION)
assert FRAMES_COUNT > 0, f"Invalid frames count {FRAMES_COUNT}"
# out_location = "/media/taras/Transcend/Dataai/OBI/test_with_overlay/00000004063000000_o.mp4"
identity = "identity"
textoverlay = "textoverlay"
gst_launch = f"filesrc location={IN_LOCATION} ! decodebin ! identity name={identity} ! \
textoverlay name={textoverlay} ! x264enc tune=zerolatency bitrate={BITRATE} ! \
splitmuxsink location={OUT_LOCATION} max-size-time={MAX_SIZE_TIME}"
print("gst-launch-1.0 ", gst_launch)
class UserData:
def __init__(self, loop, offset=0, max_offset=1, textoverlay=None):
self.offset = offset
self.local_offset = 0
self.max_offset = max_offset
self.textoverlay = textoverlay
self.loop = loop
self.previous_offsets = 0
def quit(self):
self.loop.quit()
def identity_message_handler(element, buffer, user_data):
if not user_data:
return
if user_data.textoverlay:
text = f"Local Offset={user_data.offset - user_data.previous_offsets}, Global Offset={user_data.offset}"
print(text)
user_data.textoverlay.set_property("text", text)
user_data.offset += 1
user_data.local_offset += 1
if user_data.offset % 100 == 0:
print("Progress {0:.2f}%".format((user_data.offset / user_data.max_offset) * 100))
def bus_message_handler(bus, message, loop):
mtype = message.type
if mtype == Gst.MessageType.EOS:
print("Video ended")
loop.quit()
elif mtype == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("{0}: {1}".format(err, debug))
loop.quit()
elif mtype == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print("{0}: {1}".format(err, debug))
loop.quit()
else:
structure = message.get_structure()
if structure is None:
return
if structure.has_name('splitmuxsink-fragment-opened'):
print("opened ", structure.get_value('location'), structure.get_value('running-time'))
loop.local_offset = 0
# print(loop.local_offset)
elif structure.has_name('splitmuxsink-fragment-closed'):
print("closed ", structure.get_value('location'), structure.get_value('running-time'))
# loop.previous_offsets += get_frames_count(structure.get_value('location'))
# print(Gst.TIME_ARGS(structure.get_value('running-time')))
else:
pass
return True
pipeline = Gst.parse_launch(gst_launch)
bus = pipeline.get_bus()
bus.add_signal_watch()
loop = GObject.MainLoop()
user_data = UserData(loop, textoverlay=pipeline.get_by_name(textoverlay),
max_offset=FRAMES_COUNT)
bus.connect("message", bus_message_handler, user_data)
element = pipeline.get_by_name(identity)
if element:
element.connect("handoff", identity_message_handler, user_data)
pass
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
traceback.print_exc()
loop.quit()
pipeline.set_state(Gst.State.NULL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment