Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Tool to pipe openexr images, apply an ocio display, and output through a bash pipe to ffmpeg and encode to mov

OpenImageIO / FFMPEG Daily

Tool to pipe openexr images, apply an ocio display, and output through a bash pipe to ffmpeg and encode to mov

Reads from a config file with the ability to set custom codec presets

Example Usage

	Process given image sequence with ocio display, resize and output to ffmpeg for encoding into a dailies movie.

	optional arguments:
	  -h, --help            show this help message and exit
	  -i IMAGE_SEQUENCE, --image_sequence IMAGE_SEQUENCE
	                        Input exr image sequence. Can be a folder containing
	                        images, a path to the first image, a percent 05d path,
	                        or a ##### path. If this is not given the tool searches for
	                        images in the current directory to process.
	  -c CODEC, --codec CODEC
	                        Codec name: Possible options are defined in the
	                        DAILIES_CONFIG: Possible options are
								dnxhd_175
								dnxhd_36
								dnxhr_hqx
								h264_hq
								h264_lq
								mjpeg
								prores_422
								prores_422hq
								prores_4444
	# For example
	daily -i /drive/video/20181108/exr/M02-0014/M02-0014_%06d.exr -c h264_hq

globals:
###############################################
## Color
###############################################
# Path to the ocio config to use
ocioconfig: /opt/ocio/aces/config.ocio
# ocio display transform
ociodisplay: ACES
# ocio view transform
ocioview: RRT
ociocolorconvert:
ociolook:
###############################################
## Reformatting and Cropping
###############################################
# If debug is true, no encoding will be done and there will be debug prints
debug: true
# Number of pixels to crop from width and height before resizing to dailies res.
# You can enter an int pixel number, or a percentage like 5% or 6.25%
cropwidth:
cropheight:
# width and height are the resolution to reformat to.
width: 2048
height:
# If fit=no, height will be ignored and the aspect ratio of the input will be preserved.
fit: true
# Enable an oiio pixel filter to use for scaling. Default is good: blackman-harris for scaling up, lanczos3 for scaling down.
#
# Possible filters: https://github.com/OpenImageIO/oiio/blob/master/src/libutil/filter.cpp
# // name dim width fixedwidth scalable separable
# { "box", 1, 1, false, true, true },
# { "triangle", 1, 2, false, true, true },
# { "gaussian", 1, 3, false, true, true },
# { "sharp-gaussian", 1, 2, false, true, true },
# { "catmull-rom", 1, 4, false, true, true },
# { "blackman-harris", 1, 3, false, true, true },
# { "sinc", 1, 4, false, true, true },
# { "lanczos3", 1, 6, false, true, true },
# { "mitchell", 1, 4, false, true, true },
# { "bspline", 1, 4, false, true, true },
# { "cubic", 1, 4, false, true, true },
# { "keys", 1, 4, false, true, true },
# { "simon", 1, 4, false, true, true },
# { "rifman", 1, 4, false, true, true }
# // clang-format on
filter:
# Output framerate. film=24, hdtv=23.98 = 24000/1001, pal=25, ntsc=30000/1001
framerate: 24
# specify a default video filter string to use in ffmpeg.
# https://trac.ffmpeg.org/wiki/Scaling
# https://ffmpeg.org/ffmpeg-scaler.html
# Example: "lut3d=${lut},scale=w=${width}:h=${height}:force_original_aspect_ratio=1,pad=${width}:${height}:(ow-iw)/2:(oh-ih)/2"
vf:
# Choose which output profile to use by default. See options below.
output_profile: h264_hq
###############################################
## Overlays
###############################################
# Enable an overlay to crop the pixels to a specific aspect ratio.
# For example to do a hard crop to 2.35 set the cropmask-ar to 2.35 and the crop mask opacity to 1.0
cropmask: true
cropmask_ar: 2.35
cropmask_opacity: 1.0
# Choose the font file to use
font: /opt/font.ttc
framecounter: true
# Output profile definitions.
output_profiles:
h264_hq:
name: h264_hq
width:
height:
fit:
framerate:
codec: libx264
profile: high444
qscale:
preset: veryslow
keyint: 1
bframes: 0
tune: film
crf: 13
pix_fmt: yuv444p10le
vf: colormatrix=bt601:bt709
bitdepth: 10
vendor:
metadata_s:
bitrate:
h264_lq:
name: h264_lq
width:
height:
fit:
framerate:
codec: libx264
profile: high
qscale:
preset: veryslow
keyint: 1
bframes: 0
tune: film
crf: 17
pix_fmt: yuv420p
vf: colormatrix=bt601:bt709
bitdepth: 8
vendor:
metadata_s:
bitrate:
prores_4444:
name: prores_4444
width:
height:
fit:
framerate:
codec: prores_ks
profile: 4
qscale: 7
preset:
keyint:
bframes:
tune:
crf:
pix_fmt: yuva444p10le
vf: colormatrix=bt601:bt709
bitdepth: 10
vendor: ap10
metadata_s: encoder="Apple ProRes 4444"
bitrate:
prores_422hq:
name: prores_422hq
width:
height:
fit:
framerate:
codec: prores_ks
profile: 3
qscale: 7
preset:
keyint:
bframes:
tune:
crf:
pix_fmt: yuva444p10le
vf: colormatrix=bt601:bt709
bitdepth: 10
vendor: ap10
metadata_s: encoder="Apple ProRes 422 HQ"
bitrate:
prores_422:
dnxhd_36:
dnxhd_175:
# http://forum.selur.de/topic1348-dnxhr-encoding.html
# 1920x1080p 175 10 24000/1001
name: dnxhd_175
width:
height:
fit:
framerate: 24000/1001
codec: dnxhd
profile: dnxhd
qscale:
preset:
keyint:
bframes:
tune:
crf:
pix_fmt: yuva444p10le
vf: colormatrix=bt601:bt709
bitdepth: 10
vendor:
metadata_s:
bitrate: 175M
dnxhr_hqx:
# https://askubuntu.com/questions/907398/how-to-convert-a-video-with-ffmpeg-into-the-dnxhd-dnxhr-format
# dnxhd, dnxhr_444, dnxhr_hqx, dnxhr_hq, dnxhr_sq, dnxhr_lb
# 12 bit 4:2:2
name: dnxhr_hqx
width:
height:
fit:
framerate:
codec: dnxhd
profile: dnxhr_hqx
qscale:
preset:
keyint:
bframes:
tune:
crf:
pix_fmt: yuva444p10le
vf: colormatrix=bt601:bt709
bitdepth: 12
vendor:
metadata_s:
bitrate:
mjpeg:
#!/usr/bin/env python
from __future__ import with_statement
from __future__ import print_function
import os, sys, yaml
import OpenImageIO as oiio
import numpy as np
import os, sys, re, argparse, shlex
from glob import glob
import logging
import time
from datetime import timedelta
import subprocess
from tc import Timecode
"""
Daily
---------------------
This is a program to render a dailies movie from an input image sequence (jpegs or exrs).
It reads from a configuration file to define things like resize, color transforms, padding,
text overalys, slate frames and so forth.
"""
"""
Commandline python program to take an openexr image sequence, apply an ocio display transform, resize,
and output to sdtout as raw uint16 byte data.
Inputs:
image sequence in /path/to/imagename.%05d.exr format
optional: framerange to use starframe-endframe
ocio display and ocio view to apply.
ocio config to use
resize width
resize pad to fit (optional)
Example Command:
./exrpipe -i '/mnt/cave/dev/__pipeline-tools/generate_dailies/test_footage/monkey_test/M07-2031.%05d.exr' -s 190 -e 200 -d ACES -v RRT -r 2048x1152 | ffmpeg-10bit -f rawvideo -pixel_format rgb48le -video_size 1920x1080 -framerate 24 -i pipe:0
-c:v libx264 -profile:v high444 -preset veryslow -g 1 -tune film -crf 13 -pix_fmt yuv444p10le -vf "colormatrix=bt601:bt709" test.mov
"""
# Set up logging
log = logging.getLogger(__name__)
def oiio_transform(buf, xoffset, yoffset):
# transform an image without filtering
orig_roi = buf.roi
buf.specmod().x += xoffset
buf.specmod().y += yoffset
buf_trans = oiio.ImageBuf()
oiio.ImageBufAlgo.crop(buf_trans, buf, orig_roi)
return buf_trans
def process_frame(frame, framenumber, globals_config, codec_config):
"""
Apply all color and reformat operations to input image, then write the frame to stdout
"""
# Setup image buffer
buf = oiio.ImageBuf(frame)
spec = buf.spec()
# Get Codec Config and gather information
iwidth = spec.width
iheight = spec.height
iar = float(iwidth) / float(iheight)
bitdepth = codec_config['bitdepth']
if bitdepth > 8:
pixel_data_type = oiio.UINT16
else:
pixel_data_type = oiio.UINT8
px_filter = globals_config['filter']
owidth = globals_config['width']
oheight = globals_config['height']
fit = globals_config['fit']
cropwidth = globals_config['cropwidth']
cropheight = globals_config['cropheight']
# Remove alpha channel
oiio.ImageBufAlgo.channels(buf, buf, (0,1,2))
# Apply OCIO Display
ocioconfig = globals_config['ocioconfig']
ociocolorconvert = globals_config['ociocolorconvert']
ociolook = globals_config['ociolook']
ociodisplay = globals_config['ociodisplay']
ocioview = globals_config['ocioview']
if ocioconfig:
if ociocolorconvert:
oiio.ImageBufAlgo.ociocolorconvert(buf, buf, ociocolorconvert, ocioview, colorconfig=ocioconfig)
if ociolook:
oiio.ImageBufAlgo.ociolook(buf, buf, ociolook, ocioview, colorconfig=ocioconfig)
if ociodisplay and ocioview:
# Apply OCIO display transform onto specified image buffer
oiio.ImageBufAlgo.ociodisplay(buf, buf, ociodisplay, ocioview, colorconfig=ocioconfig)
# Setup for width and height
if not owidth:
resize = False
else:
resize = True
# If no output height specified, resize keeping aspect ratio, long side = width - calc height
oheight_noar = int(owidth / iar)
if not oheight:
oheight = oheight_noar
oar = float(owidth) / float(oheight)
# Apply cropwidth / cropheight to remove pixels on edges before applying resize
if cropwidth or cropheight:
# Handle percentages
if "%" in cropwidth:
cropwidth = int(float(cropwidth.split('%')[0])/100*iwidth)
log.info("Got crop percentage: {0}".format(cropwidth))
if "%" in cropheight:
cropheight = int(float(cropheight.split('%')[0])/100*iheight)
log.info("Got crop percentage: {0}".format(cropheight))
buf = oiio.ImageBufAlgo.crop(buf, roi=oiio.ROI(cropwidth / 2, iwidth - cropwidth / 2, cropheight / 2, iheight - cropheight / 2))
# buf.set_full(cropwidth / 2, iwidth - cropwidth / 2, cropheight / 2, iheight - cropheight / 2, 0, 0)
log.debug("CROPPED:{0} {1}".format(buf.spec().width, buf.spec().height))
# Recalculate input resolution and aspect ratio - since it may have changed with crop
iwidth = iwidth - cropwidth / 2
iheight = iheight - cropheight / 2
iar = float(iwidth) / float(iheight)
# Apply Resize / Fit
# If input and output resolution are the same, do nothing
# If output width is bigger or smaller than input width, first resize without changing input aspect ratio
# If "fit" is true,
# If output height is different than input height: transform by the output height - input height / 2 to center,
# then crop to change the roi to the output res (crop moves upper left corner)
identical = owidth == iwidth and oheight == iheight
resize = not identical and resize
if resize:
log.info("Performing Resize: \n\tinput: {0}x{1} ar{2}\n\toutput: {3}x{4} ar{5}".format(iwidth, iheight, iar, owidth, oheight, oar))
if iwidth != owidth:
# Perform resize, no change in AR
log.debug("{0}, {1}".format(oheight_noar, px_filter))
if px_filter:
# (bug): using "lanczos3", 6.0, and upscaling causes artifacts
# (bug): dst buf must be assigned or ImageBufAlgo.resize doesn't work
buf = oiio.ImageBufAlgo.resize(buf, px_filter, roi=oiio.ROI(0, owidth, 0, oheight_noar))
else:
buf = oiio.ImageBufAlgo.resize(buf, roi=oiio.ROI(0, owidth, 0, oheight_noar))
if fit:
# # If fitting is enabled..
height_diff = oheight - oheight_noar
log.debug("Height difference: {0} {1} {2}".format(height_diff, oheight, oheight_noar))
# If we are cropping to a smaller height we need to transform first then crop
# If we pad to a taller height, we need to crop first, then transform.
if oheight < oheight_noar:
# If we are cropping...
buf = oiio_transform(buf, 0, height_diff/2)
buf = oiio.ImageBufAlgo.crop(buf, roi=oiio.ROI(0, owidth, 0, oheight))
elif oheight > oheight_noar:
# If we are padding...
buf = oiio.ImageBufAlgo.crop(buf, roi=oiio.ROI(0, owidth, 0, oheight))
buf = oiio_transform(buf, 0, height_diff/2)
# Apply Cropmask if enabled
enable_cropmask = globals_config['cropmask']
if enable_cropmask:
cropmask_ar = globals_config['cropmask_ar']
cropmask_opacity = globals_config['cropmask_opacity']
if not cropmask_ar or not cropmask_opacity:
loggger.error("Cropmask enabled, but no crop specified. Skipping cropmask...")
else:
cropmask_height = int(round(owidth / cropmask_ar))
cropmask_bar = int((oheight - cropmask_height)/2)
log.debug("Cropmask height: \t{0} = {1} / {2} = {3} left".format(cropmask_height, oheight, cropmask_ar, cropmask_bar))
cropmask_buf = oiio.ImageBuf(oiio.ImageSpec(owidth, oheight, 4, pixel_data_type))
# Fill with black, alpha = cropmask opacity
oiio.ImageBufAlgo.fill(cropmask_buf, (0, 0, 0, cropmask_opacity))
# Fill center with black
oiio.ImageBufAlgo.fill(cropmask_buf, (0, 0, 0, 0), oiio.ROI(0, owidth, cropmask_bar, oheight - cropmask_bar))
# Merge cropmask buf over image
oiio.ImageBufAlgo.channels(buf, buf, (0,1,2, 1.0))
buf = oiio.ImageBufAlgo.over(cropmask_buf, buf)
oiio.ImageBufAlgo.channels(buf, buf, (0,1,2))
# buf.write(os.path.splitext(os.path.split(frame)[-1])[0]+".jpg")
return buf
def setup_ffmpeg(globals_config, codec_config, start_tc):
"""
Set up ffmpeg command according to codec config.
Return entire command.
"""
if codec_config['bitdepth'] >= 10:
ffmpeg_command = "ffmpeg-10bit"
pixel_format = "rgb48le"
else:
ffmpeg_command = "ffmpeg"
pixel_format = "rgb24"
# Set up input arguments for pipe:
args = "{0} -y -f rawvideo -pixel_format {1} -video_size {2}x{3} -framerate {4} -i pipe:0".format(
ffmpeg_command, pixel_format, globals_config['width'], globals_config['height'], globals_config['framerate'])
# Add timecode so that start frame will display correctly in RV etc
args += " -timecode {0}".format(start_tc)
if codec_config['codec']:
args += " -c:v {0}".format(codec_config['codec'])
if codec_config['profile']:
args += " -profile:v {0}".format(codec_config['profile'])
if codec_config['qscale']:
args += " -qscale:v {0}".format(codec_config['qscale'])
if codec_config['preset']:
args += " -preset {0}".format(codec_config['preset'])
if codec_config['keyint']:
args += " -g {0}".format(codec_config['keyint'])
if codec_config['bframes']:
args += " -bf {0}".format(codec_config['bframes'])
if codec_config['tune']:
args += " -tune {0}".format(codec_config['tune'])
if codec_config['crf']:
args += " -crf {0}".format(codec_config['crf'])
if codec_config['pix_fmt']:
args += " -pix_fmt {0}".format(codec_config['pix_fmt'])
if globals_config['framerate']:
args += " -r {0}".format(globals_config['framerate'])
if codec_config['vf']:
args += " -vf {0}".format(codec_config['vf'])
if codec_config['vendor']:
args += " -vendor {0}".format(codec_config['vendor'])
if codec_config['metadata_s']:
args += " -metadata:s {0}".format(codec_config['metadata_s'])
if codec_config['bitrate']:
args += " -b:v {0}".format(codec_config['bitrate'])
return args
def parse_framenumbers(filename):
# Remove frame numbers: assumes frame padding is seperated by a _ or . character
# Takes a file name as input - no absolute paths!
filename_base = re.split("[_\.][0-9\%\#].*", filename)
if filename_base:
filename_base = filename_base[0]
log.debug("FILENAME BASE" + str(filename_base))
filename_noext = os.path.splitext(os.path.split(filename)[-1])[0]
log.debug("FILENAME NOEXT" + str(filename_noext))
framenumber = re.findall("[_\.][0-9].*$", filename_noext)
if framenumber:
# Chop off the separater at the beginning of the framenumber
framenumber = framenumber[0][1:]
try:
framenumber = int(framenumber)
log.debug(framenumber)
except:
log.error("Could not convert frame number {0} to int!".format(framenumber))
return None, None
else:
framenumber = None
return filename_base, framenumber
def get_frames(image_sequence):
# Get input image sequence and ensure it is existing and set up correctly.
if os.path.isdir(image_sequence):
# Assume there is only one image sequence in this directory
frames = glob(image_sequence + "/*")
dirname = image_sequence
if frames:
frames.sort()
first_file = frames[0]
dirname, filename = os.path.split(first_file)
filename, extension = os.path.splitext(filename)
filename = parse_framenumbers(filename)[0]
else:
log.error("Could not find any frames to operate on!")
return None, None, None, None
elif os.path.isfile(image_sequence):
# Assume it's the first frame of the image sequence
dirname, filename = os.path.split(image_sequence)
filename, extension = os.path.splitext(filename)
filename = parse_framenumbers(filename)[0]
frames = glob(os.path.join(dirname, filename) + "*")
else:
# Assume this is a %05d or ### image sequence.
dirname, filename = os.path.split(image_sequence)
filename, extension = os.path.splitext(filename)
filename = parse_framenumbers(filename)[0]
print(os.path.join(dirname, filename))
frames = glob(os.path.join(dirname, filename) + "*")
if not frames:
log.error("Could not find any frames to operate on!")
return None, None, None, None
frames.sort()
known_extensions = ["tiff", "tif", "jpg", "jpeg", "exr", "png", "jp2", "j2c", "tga"]
# Create a tuple of (path, number) for each frame
frame_tuples = []
for frame in frames:
if os.path.splitext(frame)[-1].split('.')[-1] in known_extensions:
filename = os.path.split(frame)[-1]
filename = os.path.splitext(filename)[0]
filename, framenumber = parse_framenumbers(filename)
frame_tuples.append((frame, framenumber))
print("FILENAME", filename)
log.info("\nFound {0} {1} frames named {2} in \n\t{3}".format(len(frames), extension, filename, dirname))
return dirname, filename, extension, frame_tuples
def setup():
start_time = time.time()
# Parse Config File
DAILIES_CONFIG = os.getenv("DAILIES_CONFIG")
if not DAILIES_CONFIG:
DAILIES_CONFIG = "/mnt/cave/dev/__pipeline-tools/generate_dailies/generate_dailies/DAILIES_CONFIG.yaml"
# Get Config
if os.path.isfile(DAILIES_CONFIG):
with open(DAILIES_CONFIG, 'r') as configfile:
config = yaml.load(configfile)
else:
print("Error: Could not find config file {0}".format(DAILIES_CONFIG))
return
# Get list of possible output profiles from config.
output_codecs = config["output_codecs"].keys()
output_codecs.sort()
# Parse input arguments
parser = argparse.ArgumentParser(description='Process given exr image sequence with ocio display, resize and output to stdout.')
parser.add_argument("-i", "--image_sequence", help="Input exr image sequence. Can be a folder containing images, a path to the first image, a percent 05d path, or a ##### path.")
parser.add_argument('-c', "--codec", help="Codec name: Possible options are defined in the DAILIES_CONFIG:\n{0}".format("\n\t".join(output_codecs)))
args = parser.parse_args()
image_sequence = args.image_sequence
codec = args.codec
# Validate codec
if codec not in output_codecs:
print("Error: invalid codec specified. Possible options are \n\t{0}".format("\n\t".join(output_codecs)))
return
if not image_sequence:
image_sequence = os.getcwd()
if not codec:
codec = "h264_hq"
# Get Config dicts for globals and the "codec" codec from the config file
globals_config = config["globals"]
codec_config = config["output_codecs"][codec]
# Try to get ocio config from $OCIO env-var if it's not defined
if not globals_config['ocioconfig']:
if os.getenv("OCIO"):
globals_config['ocioconfig'] = os.getenv("OCIO")
# Codec Overrides Globals
for key, value in codec_config.iteritems():
if key in globals_config:
if codec_config[key]:
globals_config[key] = value
# Find image sequence to operate on
dirname, filename, extension, frames = get_frames(image_sequence)
if not frames:
print("Error: No frames found...")
return
# If output width or height is not defined, we need to calculate it from the input
owidth = globals_config['width']
oheight = globals_config['height']
if not owidth or not oheight:
buf = oiio.ImageBuf(frames[0][0])
spec = buf.spec()
iar = float(spec.width) / float(spec.height)
if not owidth:
owidth = spec.width
globals_config['width'] = owidth
if not oheight:
oheight = int(round(owidth / iar))
globals_config['height'] = oheight
# Set up timecode and start / end frames
firstframe = frames[0][1]
lastframe = frames[-1][1]
totalframes = len(frames)
tc = Timecode(globals_config['framerate'], start_timecode='00:00:00:00')
start_tc = tc + firstframe
bitdepth = codec_config['bitdepth']
if bitdepth > 8:
pixel_data_type = oiio.UINT16
else:
pixel_data_type = oiio.UINT8
# Set up ffmpeg command
args = setup_ffmpeg(globals_config, codec_config, start_tc)
# Append output movie file to ffmpeg command
movie_ext = globals_config['movie_ext']
# Append codec to dailies movie name if requested
if globals_config['movie_append_codec']:
movie_ext = "_" + codec_config['name'] + "." + movie_ext
movie_name = filename + movie_ext
args += " {0}".format(os.path.join(dirname, globals_config['movie_location']) + movie_name)
# Setup logger
logpath = os.path.join(dirname, globals_config['movie_location']) + os.path.splitext(movie_name)[0] + ".log"
if os.path.exists(logpath):
os.remove(logpath)
handler = logging.FileHandler(logpath)
handler.setFormatter(
logging.Formatter('%(levelname)s %(asctime)s \t%(message)s', '%Y-%m-%dT%H:%M:%S')
)
log.addHandler(handler)
if globals_config['debug']:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
log.debug("Got config:\n\tCodec Config:\t{0}\n\tImage Sequence Path:\n\t\t{1}".format(
codec_config['name'], image_sequence))
log.info("ffmpeg command:\n\t{0}".format(args))
# Invoke ffmpeg subprocess
ffproc = subprocess.Popen(shlex.split(args),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
# Loop through every frame, passing the result to the ffmpeg subprocess
for i, frame in enumerate(frames, 1):
framepath, framenumber = frame
log.info("Processing frame {0:04d}: \t{1:04d} of {2:04d}".format(framenumber, i, totalframes))
# elapsed_time = timedelta(seconds = time.time() - start_time)
# log.info("Time Elapsed: \t{0}".format(elapsed_time))
buf = process_frame(framepath, framenumber, globals_config, codec_config)
buf.get_pixels(pixel_data_type).tofile(ffproc.stdin)
result, error = ffproc.communicate()
elapsed_time = timedelta(seconds = time.time() - start_time)
log.info("Total Processing Time: \t{0}".format(elapsed_time))
if __name__=="__main__":
setup()
#!-*- coding: utf-8 -*-
# The MIT License (MIT)
#
# Copyright (c) 2014 Joshua Banton and PyTimeCode developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
__version__ = '1.2.0'
class Timecode(object):
"""The main timecode class.
Does all the calculation over frames, so the main data it holds is
frames, then when required it converts the frames to a timecode by
using the frame rate setting.
:param framerate: The frame rate of the Timecode instance. It
should be one of ['23.98', '24', '25', '29.97', '30', '50', '59.94',
'60', 'NUMERATOR/DENOMINATOR', ms'] where "ms" equals to 1000 fps.
Can not be skipped.
Setting the framerate will automatically set the :attr:`.drop_frame`
attribute to correct value.
:param start_timecode: The start timecode. Use this to be able to
set the timecode of this Timecode instance. It can be skipped and
then the frames attribute will define the timecode, and if it is also
skipped then the start_second attribute will define the start
timecode, and if start_seconds is also skipped then the default value
of '00:00:00:00' will be used.
When using 'ms' frame rate, timecodes like '00:11:01.040' use '.040'
as frame number. When used with other frame rates, '.040' represents
a fraction of a second. So '00:00:00.040'@25fps is 1 frame.
:type framerate: str or int or float or tuple
:type start_timecode: str or None
:param start_seconds: A float or integer value showing the seconds.
:param int frames: Timecode objects can be initialized with an
integer number showing the total frames.
"""
def __init__(self, framerate, start_timecode=None, start_seconds=None,
frames=None):
self.drop_frame = False
self.ms_frame = False
self.fraction_frame = False
self._int_framerate = None
self._framerate = None
self.framerate = framerate
self.frames = None
# attribute override order
# start_timecode > frames > start_seconds
if start_timecode:
self.frames = self.tc_to_frames(start_timecode)
else:
if frames is not None: # because 0==False, and frames can be 0
self.frames = frames
elif start_seconds is not None:
if start_seconds == 0:
raise ValueError("``start_seconds`` argument can not be 0")
self.frames = self.float_to_tc(start_seconds)
else:
# use default value of 00:00:00:00
self.frames = self.tc_to_frames('00:00:00:00')
@property
def framerate(self):
"""getter for _framerate attribute
"""
return self._framerate
@framerate.setter
def framerate(self, framerate): # lint:ok
"""setter for the framerate attribute
:param framerate:
:return:
"""
# Convert rational frame rate to float
numerator = None
denominator = None
try:
if '/' in framerate:
numerator, denominator = framerate.split('/')
except TypeError:
# not a string
pass
if isinstance(framerate, tuple):
numerator, denominator = framerate
if numerator and denominator:
framerate = round(float(numerator) / float(denominator), 2)
if framerate.is_integer():
framerate = int(framerate)
# check if number is passed and if so convert it to a string
if isinstance(framerate, (int, float)):
framerate = str(framerate)
# set the int_frame_rate
if framerate == '29.97':
self._int_framerate = 30
self.drop_frame = True
elif framerate == '59.94':
self._int_framerate = 60
self.drop_frame = True
elif framerate == '23.98':
framerate = '24'
self._int_framerate = 24
elif framerate in ['ms', '1000']:
self._int_framerate = 1000
self.ms_frame = True
framerate = 1000
elif framerate == 'frames':
self._int_framerate = 1
else:
self._int_framerate = int(float(framerate))
self._framerate = framerate
def set_fractional(self, state):
"""Set or unset timecode to be represented with fractional seconds
:param bool state:
"""
self.fraction_frame = state
def set_timecode(self, timecode):
"""Sets the frames by using the given timecode
"""
self.frames = self.tc_to_frames(timecode)
def float_to_tc(self, seconds):
"""set the frames by using the given seconds
"""
return int(seconds * self._int_framerate)
def tc_to_frames(self, timecode):
"""Converts the given timecode to frames
"""
hours, minutes, seconds, frames = map(int,
self.parse_timecode(timecode)
)
if isinstance(timecode, int):
time_tokens = [hours, minutes, seconds, frames]
timecode = ':'.join(str(t) for t in time_tokens)
if self.drop_frame:
timecode = ';'.join(timecode.rsplit(':', 1))
ffps = float(self._framerate)
if self.drop_frame:
# Number of drop frames is 6% of framerate rounded to nearest
# integer
drop_frames = int(round(ffps * .066666))
else:
drop_frames = 0
# We don't need the exact framerate anymore, we just need it rounded to
# nearest integer
ifps = self._int_framerate
# Number of frames per hour (non-drop)
hour_frames = ifps * 60 * 60
# Number of frames per minute (non-drop)
minute_frames = ifps * 60
# Total number of minutes
total_minutes = (60 * hours) + minutes
# Handle case where frames are fractions of a second
if len(timecode.split('.')) == 2 and not self.ms_frame:
self.fraction_frame = True
fraction = timecode.rsplit('.', 1)[1]
frames = int(round(float('.' + fraction) * ffps))
frame_number = \
((hour_frames * hours) + (minute_frames * minutes) +
(ifps * seconds) + frames) - \
(drop_frames * (total_minutes - (total_minutes // 10)))
frames = frame_number + 1
return frames
def frames_to_tc(self, frames):
"""Converts frames back to timecode
:returns str: the string representation of the current time code
"""
ffps = float(self._framerate)
if self.drop_frame:
# Number of frames to drop on the minute marks is the nearest
# integer to 6% of the framerate
drop_frames = int(round(ffps * .066666))
else:
drop_frames = 0
# Number of frames in an hour
frames_per_hour = int(round(ffps * 60 * 60))
# Number of frames in a day - timecode rolls over after 24 hours
frames_per_24_hours = frames_per_hour * 24
# Number of frames per ten minutes
frames_per_10_minutes = int(round(ffps * 60 * 10))
# Number of frames per minute is the round of the framerate * 60 minus
# the number of dropped frames
frames_per_minute = int(round(ffps) * 60) - drop_frames
frame_number = frames - 1
if frame_number < 0:
# Negative time. Add 24 hours.
frame_number += frames_per_24_hours
# If frame_number is greater than 24 hrs, next operation will rollover
# clock
frame_number %= frames_per_24_hours
if self.drop_frame:
d = frame_number // frames_per_10_minutes
m = frame_number % frames_per_10_minutes
if m > drop_frames:
frame_number += (drop_frames * 9 * d) + \
drop_frames * ((m - drop_frames) // frames_per_minute)
else:
frame_number += drop_frames * 9 * d
ifps = self._int_framerate
frs = frame_number % ifps
if self.fraction_frame:
frs = round(frs / float(ifps), 3)
secs = (frame_number // ifps) % 60
mins = ((frame_number // ifps) // 60) % 60
hrs = (((frame_number // ifps) // 60) // 60)
return hrs, mins, secs, frs
def tc_to_string(self, hrs, mins, secs, frs):
if self.fraction_frame:
return "{hh:02d}:{mm:02d}:{ss:06.3f}".format(hh=hrs,
mm=mins,
ss=secs + frs
)
ff = "%02d"
if self.ms_frame:
ff = "%03d"
return ("%02d:%02d:%02d%s" + ff) % (hrs,
mins,
secs,
self.frame_delimiter,
frs)
@classmethod
def parse_timecode(cls, timecode):
"""parses timecode string NDF '00:00:00:00' or DF '00:00:00;00' or
milliseconds/fractionofseconds '00:00:00.000'
"""
if isinstance(timecode, int):
indices = range(2, 10, 2)
hrs, mins, secs, frs = [hex(timecode)[i:i + 2] for i in indices]
else:
bfr = timecode.replace(';', ':').replace('.', ':').split(':')
hrs = int(bfr[0])
mins = int(bfr[1])
secs = int(bfr[2])
frs = int(bfr[3])
return hrs, mins, secs, frs
@property
def frame_delimiter(self):
"""Return correct symbol based on framerate."""
if self.drop_frame:
return ';'
elif self.ms_frame or self.fraction_frame:
return '.'
else:
return ':'
def __iter__(self):
return self
def next(self):
self.add_frames(1)
return self
def back(self):
self.sub_frames(1)
return self
def add_frames(self, frames):
"""adds or subtracts frames number of frames
"""
self.frames += frames
def sub_frames(self, frames):
"""adds or subtracts frames number of frames
"""
self.add_frames(-frames)
def mult_frames(self, frames):
"""multiply frames
"""
self.frames *= frames
def div_frames(self, frames):
"""adds or subtracts frames number of frames"""
self.frames = self.frames / frames
def __eq__(self, other):
"""the overridden equality operator
"""
if isinstance(other, Timecode):
return self._framerate == other._framerate and \
self.frames == other.frames
elif isinstance(other, str):
new_tc = Timecode(self._framerate, other)
return self.__eq__(new_tc)
elif isinstance(other, int):
return self.frames == other
def __ge__(self, other):
"""override greater or equal to operator"""
if isinstance(other, Timecode):
return self._framerate == other._framerate and \
self.frames >= other.frames
elif isinstance(other, str):
new_tc = Timecode(self._framerate, other)
return self.frames >= new_tc.frames
elif isinstance(other, int):
return self.frames >= other
def __le__(self, other):
"""override less or equal to operator"""
if isinstance(other, Timecode):
return self._framerate == other._framerate and \
self.frames <= other.frames
elif isinstance(other, str):
new_tc = Timecode(self._framerate, other)
return self.frames <= new_tc.frames
elif isinstance(other, int):
return self.frames <= other
def __add__(self, other):
"""returns new Timecode instance with the given timecode or frames
added to this one
"""
# duplicate current one
tc = Timecode(self._framerate, frames=self.frames)
if isinstance(other, Timecode):
tc.add_frames(other.frames)
elif isinstance(other, int):
tc.add_frames(other)
else:
raise TimecodeError(
'Type %s not supported for arithmetic.' %
other.__class__.__name__
)
return tc
def __sub__(self, other):
"""returns new Timecode instance with subtracted value"""
if isinstance(other, Timecode):
subtracted_frames = self.frames - other.frames
elif isinstance(other, int):
subtracted_frames = self.frames - other
else:
raise TimecodeError(
'Type %s not supported for arithmetic.' %
other.__class__.__name__
)
return Timecode(self._framerate, frames=subtracted_frames)
def __mul__(self, other):
"""returns new Timecode instance with multiplied value"""
if isinstance(other, Timecode):
multiplied_frames = self.frames * other.frames
elif isinstance(other, int):
multiplied_frames = self.frames * other
else:
raise TimecodeError(
'Type %s not supported for arithmetic.' %
other.__class__.__name__
)
return Timecode(self._framerate, frames=multiplied_frames)
def __div__(self, other):
"""returns new Timecode instance with divided value"""
if isinstance(other, Timecode):
div_frames = self.frames / other.frames
elif isinstance(other, int):
div_frames = self.frames / other
else:
raise TimecodeError(
'Type %s not supported for arithmetic.' %
other.__class__.__name__
)
return Timecode(self._framerate, frames=div_frames)
def __repr__(self):
return self.tc_to_string(*self.frames_to_tc(self.frames))
@property
def hrs(self):
hrs, mins, secs, frs = self.frames_to_tc(self.frames)
return hrs
@property
def mins(self):
hrs, mins, secs, frs = self.frames_to_tc(self.frames)
return mins
@property
def secs(self):
hrs, mins, secs, frs = self.frames_to_tc(self.frames)
return secs
@property
def frs(self):
hrs, mins, secs, frs = self.frames_to_tc(self.frames)
return frs
@property
def frame_number(self):
"""returns the 0 based frame number of the current timecode instance
"""
return self.frames - 1
@property
def float(self):
"""returns the seconds as float
"""
return self.frames / float(self.framerate)
class TimecodeError(Exception):
"""Raised when an error occurred in timecode calculation
"""
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.