Skip to content

Instantly share code, notes, and snippets.

@papaver
Created October 20, 2013 20:10
Show Gist options
  • Save papaver/7074683 to your computer and use it in GitHub Desktop.
Save papaver/7074683 to your computer and use it in GitHub Desktop.
Avid Filmscribe Cutlist (.ctl) parser
#------------------------------------------------------------------------------
# filmscribeCutList.py - avid filmscribe cutlist (.ctl) parser
#------------------------------------------------------------------------------
import collections
import itertools
import logging
import os
import os.path
import re
#------------------------------------------------------------------------------
# defines
#------------------------------------------------------------------------------
kTimecodePattern = '(?P<timecode>\d{2}:\d{2}:\d{2}:\d{2})'
kFootage = 'Footage'
kRecordTC = 'Record TC'
kStartTC = 'Start TC'
kClipName = 'Clip Name'
kOptical = 'Optical'
kSequence = 'Sequence'
kDescription = 'Description'
kSideA = 'A'
kSideB = 'B'
#------------------------------------------------------------------------------
# structs
#------------------------------------------------------------------------------
Range = collections.namedtuple('Range', ['start', 'end'])
#------------------------------------------------------------------------------
# Timecode
#------------------------------------------------------------------------------
class Timecode(object):
"""Representation of a video timecode.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, timecode):
super(Timecode, self).__init__()
tokens = timecode.split(':')
self.hours, self.minutes, self.seconds, self.frames = map(int, tokens)
#--------------------------------------------------------------------------
def __str__(self):
return "%(hours)02d:%(minutes)02d:%(seconds)02d:%(frames)02d" % \
self.__dict__
#--------------------------------------------------------------------------
def __repr__(self):
return "Timecode(%s)" % str(self)
#------------------------------------------------------------------------------
# Keycode
#------------------------------------------------------------------------------
class Keycode(object):
"""Representation of a film keycode (edgecode).
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, timecode):
super(Keycode, self).__init__()
tokens = timecode.split('+')
self.feet, self.frames = map(int, tokens)
#--------------------------------------------------------------------------
def __cmp__(self, other):
return cmp((self.feet, self.frames), (other.feet, other.frames))
#--------------------------------------------------------------------------
def __add__(self, other):
frames = self.frames + int(other)
return Keycode("%(feet)04d+%(frames)02d" % {
'feet' : self.feet + (frames / 16),
'frames' : frames % 16
})
#--------------------------------------------------------------------------
def __str__(self):
return "%(feet)04d+%(frames)02d" % self.__dict__
#--------------------------------------------------------------------------
def __repr__(self):
return "Keycode(%s)" % str(self)
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def totalFrames(self):
return (self.feet * 16) + self.frames
#------------------------------------------------------------------------------
# LineIterator
#------------------------------------------------------------------------------
class LineIterator(object):
"""Handles walking through an array of lines, checking for lines with
certain content as well and returning blocks of lines.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, lines):
super(LineIterator, self).__init__()
self._lines = lines
self._index = 0
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def index(self):
return self._index
#--------------------------------------------------------------------------
@property
def line(self):
return self._lines[self._index]
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def isEOF(self):
"""Check if the current line index points past the line array.
"""
return self._index >= len(self._lines)
#--------------------------------------------------------------------------
def next(self):
"""Increment the current line index by one.
"""
self._index += 1
#--------------------------------------------------------------------------
def peek(self):
"""Return the next line without moving the current index, None is
returned if past eof.
"""
if (self._index + 1) >= len(self._lines):
return None
else:
return self._lines[self._index + 1]
#--------------------------------------------------------------------------
def isEmpty(self, line):
"""Checks to see if the line is empty.
"""
return len(line) == 0
#--------------------------------------------------------------------------
def isStartOfHeader(self, line):
"""Checks to see if the line contains only a SOH (^A|\x01) character.
"""
return re.match('^\x01$', line) != None
#--------------------------------------------------------------------------
def isEndOfTable(self, line):
"""Checks to see if the line contains an end of table entry.
"""
return re.match('^\(.+\)$', line) != None
#--------------------------------------------------------------------------
def isDoubleLineSeperator(self, line):
"""Checks to see if the line is a seperator made with '=' chars.
"""
return re.match('^=+$', line) != None
#--------------------------------------------------------------------------
def forwardToStartOfHeader(self):
"""Moves the current line forward until a start of header line is
found. The current line index will point to the next line.
"""
# save the index we started at
startIndex = self.index
# walk forward till SOH is found, raise exception if not found
while not self.isStartOfHeader(self.line):
self.next()
# check if the lines ran out
if self.isEOF():
startLine = self._lines[startIndex]
raise ValueError("Could not find SOH line, started " \
"looking at line %d (%s)" % (startIndex, startLine))
# consume the line with the SOH
self.next()
#--------------------------------------------------------------------------
def skipEmpty(self):
"""Moves the current line forward until a line with content is found.
"""
# save the index we started at
startIndex = self.index
# walk forward till content is found, raise exception if not found
while self.isEmpty(self.line):
self.next()
# check if the lines ran out
if self.isEOF():
startLine = self._lines[startIndex]
raise ValueError("Could not find more content, started " \
"looking at line %d (%s)" % (startIndex, startLine))
#--------------------------------------------------------------------------
def popLine(self):
"""Return the current line and move to the next.
"""
# check if the lines ran out
if self.isEOF():
raise ValueError("Cannot pop line if at end of block.")
# grab the current line and step forward
line = self.line
self.next()
return line
#--------------------------------------------------------------------------
def popBlock(self):
"""Find the next empty line and return the block of lines in between.
The empty line will be consumed and not returned. The current index
will point to the next line.
"""
# save the index we started at
startIndex = self._index
# walk forward till empty is found, raise exception if not found
while not self.isEmpty(self.line):
self.next()
# check if the lines ran out
if self.isEOF():
startLine = self._lines[startIndex]
raise ValueError("Could not find empty line, started " \
"looking at line %d (%s)" % (startIndex, startLine))
# consume the empty line
self.next()
# return the block iterated over
return self._lines[startIndex:self.index - 1]
#--------------------------------------------------------------------------
def popOpticalBlock(self):
"""Finds the next double line seperator and returns the lines
consisting of the block. The line seperator will be consumed and not
returned. The current index will point to the next line.
"""
# save the index we started at
startIndex = self.index
# walk forward till end of table is found, raise exception if not found
while not self.isDoubleLineSeperator(self.line):
self.next()
# check if the lines ran out
if self.isEOF():
startLine = self._lines[startIndex]
raise ValueError("Could not find double line seperator, " \
"started looking at line %d (%s)" % (startIndex, startLine))
# consume the line seperator
self.next()
# return the block iterated over
return self._lines[startIndex:self.index - 1]
#--------------------------------------------------------------------------
def popTable(self):
"""Find the next end of table line and return the lines consisting of
the table. The end of table line will be consumed and not returned.
The current index will point to the next line.
"""
# save the index we started at
startIndex = self.index
# walk forward till end of table is found, raise exception if not found
while not self.isEndOfTable(self.line):
self.next()
# check if the lines ran out
if self.isEOF():
startLine = self._lines[startIndex]
raise ValueError("Could not find end of table line, started " \
"looking at line %d (%s)" % (startIndex, startLine))
# consume the end of table tag
self.next()
# return the block iterated over
return self._lines[startIndex:self.index - 1]
#------------------------------------------------------------------------------
# Event
#------------------------------------------------------------------------------
class Event(object):
"""Represents an event found in the assemble table. Its possbile all
event data may not be available if column header information is not found.
If this event contains an optical its id will be available, if not the
optical itself.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, lines, headers):
"""Parse the event using the header information.
"""
super(Event, self).__init__()
# verify atleast the footage header was provided
if kFootage not in headers:
raise ValueError("Header '%s' is required to parse event." % \
kFootage)
# use the line iterator to ease the parsing process
iterator = LineIterator(lines)
# check if this event has an optical marker
self._hasOptical = re.match('^\s+\x01$', iterator.line) != None
if self._hasOptical:
iterator.next()
# skip to the line containing the event index
while True:
match = re.match('^\s*(?P<id>\d+)\.', iterator.line)
if match != None:
break
iterator.next()
# save the event index
self._id = int(match.group('id'))
# parse out the optical id
self._opticalId = None
if self._hasOptical:
match = re.match('^.+OPTICAL #(?P<id>\d+)', iterator.line)
if match != None:
self._opticalId = int(match.group('id'))
else:
logging.warning("Optical id could not be " \
"found on event '%s'." % self._id)
# cut up the columns for the start/end data
starts = self._parseColumns(iterator.popLine(), headers)
ends = self._parseColumns(iterator.popLine(), headers)
columns = dict(zip(starts.keys(), zip(starts.values(), ends.values())))
# save the available columns
self._footage = Range(*map(Keycode, columns[kFootage]))
self._recordTC = None if kRecordTC not in headers else \
Range(*map(Timecode, columns[kRecordTC]))
# parse remaining data if not optical event
self._startTC = None
self._clipName = ''
if not self._hasOptical:
if kStartTC in headers:
self._startTC = Range(*map(Timecode, columns[kStartTC]))
if kClipName in headers:
self._clipName = starts[kClipName].strip()
# set default for optical object
self._optical = None
#--------------------------------------------------------------------------
def __str__(self):
return ('e%s' % self._id if self._clipName == '' else self._clipName) + \
(', op%s' % self._optical if self._optical != None else '')
#--------------------------------------------------------------------------
def __repr__(self):
return "Event(%s)" % str(self)
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def id(self):
return self._id
#--------------------------------------------------------------------------
@property
def footage(self):
return self._footage
#--------------------------------------------------------------------------
@property
def recordTC(self):
return self._recordTC
#--------------------------------------------------------------------------
@property
def startTC(self):
return self._startTC
#--------------------------------------------------------------------------
@property
def clipName(self):
return self._clipName
#--------------------------------------------------------------------------
@property
def optical(self):
return self._optical
#--------------------------------------------------------------------------
@property
def hasOptical(self):
return self._hasOptical
#--------------------------------------------------------------------------
@property
def opticalId(self):
return self._opticalId
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def linkOptical(self, optical):
self._optical = optical
#--------------------------------------------------------------------------
# internal methods
#--------------------------------------------------------------------------
def _parseColumns(self, line, headers):
"""Cut up the line into the columns provided by the headers.
"""
columns = {}
# cut up all available columns
for key in headers:
start, end = headers[key]
columns[key] = line[start:end]
return columns
#------------------------------------------------------------------------------
# AssembleTable
#------------------------------------------------------------------------------
class AssembleTable(object):
"""Represents the data found in the assemble list in a filmscribe ctl
file. It's possbile the data found may be different in every file. If the
required columns aren't found an exception will be thrown.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, lines):
"""Parse the assemble table from the lines provided.
"""
super(AssembleTable, self).__init__()
# use the line iterator to ease the parsing process
iterator = LineIterator(lines)
# parse the meta data found in the table header, first 3 lines
eventCount = self._parseHeaderEvents(iterator.popLine())
iterator.next()
opticalCount, _ = self._parseHeaderOpticalsAndTime(iterator.popLine())
# the next line should be a line break seperating the table and header
line = iterator.popLine()
if re.match('^-+$', line) == None:
logging.warning("Expecting header line break, " \
"instead found: %s." % line)
# the column headers should be next
headers = self._parseColumnHeaders(iterator.popLine())
iterator.skipEmpty()
# parse the events from the rest of the table
self._events = []
while not iterator.isEOF():
# grab the next block
block = iterator.popBlock()
# skip single line blocks, events blocks should be atleast 2 lines
if len(block) == 1:
continue
# parse the event
self._events.append(Event(block, headers))
# track if any opticals were present
opticalEvents = filter(lambda e: e.hasOptical, self._events)
self._hasOpticals = len(opticalEvents) > 0
# validate the correct number of events parsed
if (eventCount != None) and (len(self._events) != eventCount):
logging.warning("Incorrect number of events parsed, only %s of " \
"%s found" % (len(self._events), eventCount))
# validate the correct number of events have opticals
if (opticalCount != None) and (len(opticalEvents) != opticalCount):
logging.warning("Incorrect number of optical events parsed, " \
"only %s of %s found" % (len(opticalEvents), opticalCount))
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def events(self):
return self._events
#--------------------------------------------------------------------------
@property
def hasOpticals(self):
return self._hasOpticals
#--------------------------------------------------------------------------
# internal methods
#--------------------------------------------------------------------------
def _parseHeaderEvents(self, line):
"""Parse the event count from the line.
"""
# parse the event count
match = re.match('^.+\s+(?P<events>\d+)\sevents\s+LFOA:.*$', line)
if match != None:
return int(match.group('events'))
logging.warning("Event count could not be parsed from header.")
return None
#--------------------------------------------------------------------------
def _parseHeaderOpticalsAndTime(self, line):
"""Parse the optical count and total time from the line.
"""
# parse the optical count and total time
match = re.match('^\s+(?P<opticals>\d+)\sopticals\s+total time:\s%s.*$' % \
kTimecodePattern, line)
if match != None:
return int(match.group('opticals')), Timecode(match.group('timecode'))
logging.warning("Optical count and total time could not be parsed " \
"from header.")
return None, None
#--------------------------------------------------------------------------
def _parseColumnHeaders(self, line):
"""Since the header positions can change we need to figure out the
cuts to properly extract the columns.
"""
headers = {}
# look for the footage keycode start, column is required
start = line.find(kFootage)
if start == -1:
raise ValueError("Required header '%s' not found in line '%s'." % \
(kFootage, line))
headers[kFootage] = Range(start, start + 7)
# look for the record timecode
start = line.find(kRecordTC)
if start != -1:
headers[kRecordTC] = Range(start, start + 11)
else:
logging.warning("Could not find expected '%s' " \
"header in line %s." % (kRecordTC, line))
# look for the start timecode
start = line.find(kStartTC)
if start != -1:
headers[kStartTC] = Range(start, start + 11)
else:
logging.warning("Could not find expected '%s' " \
"header in line %s." % (kStartTC, line))
# look for the clip name, since its a variable size, calculate the end
start = line.find(kClipName)
if start != -1:
rest = line[start+len(kClipName):]
rest = "".join((itertools.dropwhile(lambda c: c == ' ', rest)))
end = None if len(rest) == 0 else line.find(rest)
headers[kClipName] = Range(start, end)
else:
logging.warning("Could not find expected '%s' " \
"header in line %s." % (kClipName, line))
return headers
#------------------------------------------------------------------------------
# OpticalClip
#------------------------------------------------------------------------------
class OpticalClip(object):
"""Represents a single clip found in the optical block.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, lines, headers, sideHeaders):
"""Parse the optical clip content from the block.
"""
super(OpticalClip, self).__init__()
# use the line iterator to ease the parsing process
iterator = LineIterator(lines)
# first line should contain clip id
match = re.match('^\s*(?P<id>\d+)\.', iterator.line)
if match == None:
raise ValueError("Failed to parse optical clip id on line: %s" % \
line)
self._id = match.group('id')
# parse the columns with start codes
starts = self._parseColumns(iterator.popLine(), headers)
# check the next line for an overflow
if re.match('^\s+$', iterator.line[:headers[kSideA].start]) != None:
# patch the desciption
starts[kDescription] += starts[kSideA]
# reparse the sides
line = iterator.popLine()
for side in [kSideA, kSideB]:
start, end = headers[side]
starts[side] = line[start:end]
# parse the column with end codes
ends = self._parseColumns(iterator.popLine(), headers)
# save the available columns
columns = dict(zip(starts.keys(), zip(starts.values(), ends.values())))
self._opticalFootage = Range(*map(Keycode, columns[kOptical]))
self._sequenceFootage = Range(*map(Keycode, columns[kSequence]))
self._description = starts[kDescription].strip()
# parse side columns, use side B if side A is empty
side = kSideB if re.match('^\s+$', starts[kSideA]) else kSideA
sideHeaders = sideHeaders[side]
self._side = side
# check for start timecode
self._startTC = None
if kStartTC in sideHeaders:
start, end = sideHeaders[kStartTC]
startTCs = map(lambda l: l[start:end], columns[side])
validTCs = filter(lambda tc: re.match(kTimecodePattern, tc), startTCs)
if len(validTCs) == 2:
self._startTC = Range(*map(Timecode, startTCs))
# look for the clip name
self._clipName = ''
if kClipName in sideHeaders:
start, end = sideHeaders[kClipName]
self._clipName = starts[side][start:end].strip()
# possible to link to a sub optical
self._subMaster = None
#--------------------------------------------------------------------------
def __str__(self):
return self._clipName
#--------------------------------------------------------------------------
def __repr__(self):
return "OpticalClip(%s)" % str(self)
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def id(self):
return self._id
#--------------------------------------------------------------------------
@property
def opticalFootage(self):
return self._opticalFootage
#--------------------------------------------------------------------------
@property
def sequenceFootage(self):
return self._sequenceFootage
#--------------------------------------------------------------------------
@property
def description(self):
return self._description
#--------------------------------------------------------------------------
@property
def side(self):
return self._side
#--------------------------------------------------------------------------
@property
def startTC(self):
return self._startTC
#--------------------------------------------------------------------------
@property
def clipName(self):
return self._clipName
#--------------------------------------------------------------------------
@property
def subMaster(self):
return self._subMaster
#--------------------------------------------------------------------------
# methods
#--------------------------------------------------------------------------
def linkSubMaster(self, subMaster):
self._subMaster = subMaster
#--------------------------------------------------------------------------
# internal methods
#--------------------------------------------------------------------------
def _parseColumns(self, line, headers):
"""Cut up the line into the columns provided by the headers.
"""
columns = {}
# cut up all available columns
for key in headers:
start, end = headers[key]
columns[key] = line[start:end]
return columns
#------------------------------------------------------------------------------
# Optical
#------------------------------------------------------------------------------
class Optical(object):
"""Represents an optical found in the optical table. Its possbile all
optical data may not be available if column header information is not
found. Opticals may also contain sub opticals.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, lines):
"""Parse the optical block, both header and content are contained.
"""
super(Optical, self).__init__()
# use the line iterator to ease the parsing process
iterator = LineIterator(lines)
# skip any empy lines
iterator.skipEmpty()
# first content line should contain optical/event ids
match = re.match('^OPTICAL #(?P<id>\d+)\s(?:SubMaster #(?P<subId>\d+))?' \
'\s+Assemble Event #(?P<eventId>\d+)\s+total length:\s' \
'(?P<length>\d{4}\+\d{2})$', iterator.popLine())
if match == None:
raise ValueError("Could not parse optical id from block.")
# save the ids
groups = match.groupdict()
self._id = int(groups['id'])
self._subId = int(groups['subId']) if groups['subId'] != None else None
self._eventId = int(groups['eventId'])
self._length = Keycode(groups['length'])
# skip forward to the blocks header
iterator.skipEmpty()
# parse the column headers next, spread over three lines, with
# multiple sections, sideA and sideB
headers, sideHeaders = self._parseColumnHeaders(iterator.popBlock())
# parse the opticals from the rest of the table
layer = []
self._layers = []
while not iterator.isEOF():
# check for layer change
match = re.match('^Layer \d+ of \d+$', iterator.line)
if match != None:
iterator.popBlock()
if len(layer) != 0:
self._layers.append(layer)
layer = []
continue
# parse the optical clip
layer.append(OpticalClip(iterator.popBlock(), headers, sideHeaders))
# add the last layer to the list
self._layers.append(layer)
#--------------------------------------------------------------------------
def __str__(self):
return str(self._id)
#--------------------------------------------------------------------------
def __repr__(self):
return "Optical(%s)" % str(self)
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def id(self):
return self._id
#--------------------------------------------------------------------------
@property
def subMasterId(self):
return self._subId
#--------------------------------------------------------------------------
@property
def eventId(self):
return self._eventId
#--------------------------------------------------------------------------
@property
def length(self):
return self._length
#--------------------------------------------------------------------------
@property
def layers(self):
return self._layers
#--------------------------------------------------------------------------
# internal methods
#--------------------------------------------------------------------------
def _parseColumnHeaders(self, lines):
"""Since the header positions can change we need to figure out the
cuts to properly extract the columns. The optical header also
contains header sections for tapeA and tapeB.
"""
headers = {}
sideHeaders = { kSideA : {}, kSideB : {} }
# use the line iterator to ease the parsing process
iterator = LineIterator(lines)
# the inital line should contain optical footage and the side sections
line = iterator.popLine()
# look for the optical footage keycode start, column is required
start = line.find(kOptical)
if start == -1:
raise ValueError("Required header '%s' not found in line '%s'." % \
(kOptical, line))
headers[kOptical] = Range(start, start + 7)
# look for the sequence footage keycode start, column is required
start = line.find(kSequence)
if start == -1:
raise ValueError("Required header '%s' not found in line '%s'." % \
(kSequence, line))
headers[kSequence] = Range(start, start + 7)
# look for the side section markers
sectionMarkers = [match.start() for match in re.finditer('\+', line)]
if len(sectionMarkers) != 4:
raise ValueError("Required side section markers missing on " \
"line '%s'." % line)
headers[kSideA] = Range(sectionMarkers[0], sectionMarkers[1] + 1)
headers[kSideB] = Range(sectionMarkers[2], sectionMarkers[3] + 1)
# grab the description column from the second line
line = iterator.popLine()
start = line.find(kDescription)
if start == -1:
raise ValueError("Required header '%s' not found in line '%s'." % \
(kDescription, line))
headers[kDescription] = Range(start, headers[kSideA].start)
# the last line contains the side headings
line = iterator.popLine()
for side in [kSideA, kSideB]:
# cut out the side section from the line, keeps parsing easier
start, end = headers[side]
sideLine = line[start:end]
# look for start timecode
start = sideLine.find(kStartTC)
if start != -1:
sideHeaders[side][kStartTC] = Range(start, start + 11)
# look for the clip name, since its variable look for the end
start = sideLine.find(kClipName)
if start != -1:
rest = sideLine[start+len(kClipName):]
rest = "".join((itertools.dropwhile(lambda c: c == ' ', rest)))
end = None if len(rest) == 0 else sideLine.find(rest)
sideHeaders[side][kClipName] = Range(start, end)
return (headers, sideHeaders)
#------------------------------------------------------------------------------
# OpticalTable
#------------------------------------------------------------------------------
class OpticalTable(object):
"""Represents the data found in the optical list in a filmscribe ctl
file. It's possbile the data found may be different in every file.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, lines):
"""Parse the optical table from the lines provided.
"""
super(OpticalTable, self).__init__()
# the optical table doesn't contain a seperator for the last entry,
# add one to the end of the table so an edge case doesn't exist
lines[-1] = '=' * 59
# use the line iterator to ease the parsing process
iterator = LineIterator(lines)
# parse the meta data found in the table header, first 3 lines
opticalCount = self._parseHeaderOpticals(iterator.popLine())
iterator.next()
iterator.next()
# the next line should be a line break seperating the table and header
line = iterator.popLine()
if re.match('^-+$', line) == None:
logging.warning("Expecting header line break, " \
"instead found: %s." % line)
# parse the opticals from the rest of the table
submasters = []
self._opticals = []
while not iterator.isEOF():
# grab the next block
block = iterator.popOpticalBlock()
# parse the optical
optical = Optical(block)
if optical.subMasterId == None:
self._opticals.append(optical)
else:
submasters.append(optical)
# link up the sub master opticals
for submaster in submasters:
opticalId = submaster.id
startFootage = submaster.layers[0][0].sequenceFootage.start
# attempt to find by index
optical = None
if opticalId > len(self._opticals):
optical = self._opticals[opticalId]
# if not valid search for it
if (optical == None) or (optical.id != opticalId):
opticalsWithId = filter(lambda o: o.id == opticalId, self._opticals)
if len(opticalsWithId) != 1:
logging.warning("Could not find optical %d linked to " \
"submaster %d." % (opticalId, submaster.subMasterId))
continue
optical = opticalsWithId[0]
# link up submaster to proper optical clip
for clip in itertools.chain(*optical.layers):
if clip.clipName.startswith('Optical') and \
(clip.sequenceFootage.start == startFootage):
clip.linkSubMaster(submaster)
# validate the correct number of opticals parsed
if (opticalCount != None) and (len(self._opticals) != opticalCount):
logging.warning("Incorrect number of opticals parsed, " \
"only %s of %s found" % (len(self._opticals), opticalCount))
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def opticals(self):
return self._opticals
#--------------------------------------------------------------------------
# internal methods
#--------------------------------------------------------------------------
def _parseHeaderOpticals(self, line):
"""Parse the optical count from the line.
"""
# parse the event count
match = re.match('^.+\s+(?P<opticals>\d+)\soptical\sunits.*$', line)
if match != None:
return int(match.group('opticals'))
logging.warning("Optical count could not be parsed from header.")
return None
#------------------------------------------------------------------------------
# FilmscribeCutList
#------------------------------------------------------------------------------
class FilmscribeCutList(object):
"""Avid Filmscribe cutlist (.ctl) file parser.
"""
#--------------------------------------------------------------------------
# object
#--------------------------------------------------------------------------
def __init__(self, ctlPath):
"""Attempt to parse the ctl file.
"""
super(FilmscribeCutList, self).__init__()
# make sure the path is valid
if not os.path.exists(ctlPath):
raise ValueError("Filmscribe .ctl file not found at '%s'." % \
ctlPath)
# open up the file and cut it up into lines, open in universal mode
# so we don't have to deal with line ending issues...
with open(ctlPath, 'ru') as ctlFile:
lines = ctlFile.read().replace('\r', '\n').split('\n')
# if no lines found we got a problem houston
if len(lines) == 0:
raise ValueError("Invalid .ctl file, no data found!")
# wrap the lines up in a line manager to make our lives easier
iterator = LineIterator(lines)
# parse the table of contents
iterator.forwardToStartOfHeader()
block = iterator.popBlock()
tableCount = len(block)
# expecting only two tables, the assembly and opticals
if tableCount > 2:
logging.warning("%d entries in toc, expecting " \
"only 2." % tableCount)
# only parse the first two tables
tables = []
for _ in range(min(2, tableCount)):
iterator.forwardToStartOfHeader()
tables.append(iterator.popTable())
# parse the assembly table
self._assembleList = AssembleTable(tables[0])
# parse the optical table if opticals present and table available
self._opticalList = None
if (len(tables) > 1) and self._assembleList.hasOpticals:
self._opticalList = OpticalTable(tables[1])
# link up the opticals with thier events
events = self._assembleList.events
for optical in self._opticalList.opticals:
eventId = optical.eventId
# attempt to get event by indexing
event = events[eventId] if eventId < len(events) else None
# use filtering if indexing fails
if (event == None) or (event.id != eventId):
eventsWithId = filter(lambda e: e.id == eventId, events)
if len(eventsWithId) != 1:
logging.warning("Could not find event %d linked to " \
"optical %d." % (eventId, optical.id))
continue
event = eventsWithId[0]
# link optical w/ event
event.linkOptical(optical)
#--------------------------------------------------------------------------
# properties
#--------------------------------------------------------------------------
@property
def assembleList(self):
return self._assembleList
#--------------------------------------------------------------------------
@property
def opticalList(self):
return self._opticalList
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment