Created
October 2, 2019 15:03
-
-
Save emcodem/8ec146ca0e7e602b9ea200ec29520e47 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aaf2 | |
import sys | |
import json | |
import urllib.parse | |
from timecode import Timecode | |
class TimelineFrame: | |
#contains relevant information about a single frame | |
def __init__(self, url, start_offset,left_neighbour,framenum): | |
self.url = url | |
self.start_offset = start_offset | |
self.left_neighbour = left_neighbour | |
self.framenum = framenum | |
self.tracknum = 0 #audio frames only, we assume one video output track... | |
def frames_to_TC (frames): | |
h = int(frames / 86400) | |
m = int(frames / 1440) % 60 | |
s = int((frames % 1440)/24) | |
f = frames % 1440 % 24 | |
return ( "%02d:%02d:%02d:%02d" % ( h, m, s, f)) | |
def get_tracks(mob,type): | |
tracks = [] | |
for slot in mob["Slots"]: | |
segment = slot.segment | |
tt = segment.media_kind | |
if tt == type: | |
if isinstance(segment, aaf2.components.NestedScope): | |
for nested_segment in segment["Slots"]: | |
if isinstance(nested_segment,aaf2.components.Sequence): | |
tracks.append(nested_segment) | |
elif isinstance(segment, aaf2.components.Sequence): | |
tracks.append(segment) | |
elif isinstance(segment, aaf2.components.SourceClip): | |
tracks.append([segment]) | |
elif isinstance(segment, aaf2.components.Selector): | |
tracks.append([segment.selected]) | |
elif isinstance(segment, aaf2.components.EssenceGroup): | |
tracks.append([segment]) | |
return tracks | |
#return tuple: url,phyiscaltracknum | |
def get_source_clip_url(item): | |
sourceMobSlotId = item["SourceMobSlotID"].value | |
ref = item.slot | |
if ref: | |
for subitm in item.walk(): | |
try: | |
_ref = f.content.mobs.get(subitm["SourceID"].value) | |
try: | |
phystracknum = _ref.slot_at(sourceMobSlotId)["PhysicalTrackNumber"].value #physicaltracknumber is optional, todo: check if we need to find out src track num in another way | |
except: | |
phystracknum = 0 | |
url = _ref["EssenceDescription"].value["Locator"].value[0]['URLString'].value | |
return url,phystracknum | |
except Exception as e: | |
continue | |
#could not resolve url, print debug info | |
for subitm in item.walk(): | |
print ("-----------ITEM--------------") | |
print (str(subitm)) | |
print(subitm.dump()) | |
print ("-----------END OF ITEM----------") | |
print ("----------REF--------------") | |
_ref = f.content.mobs.get(subitm["SourceID"].value) | |
print (_ref.dump()) | |
print ("-------------END OF REF--------------") | |
print (_ref["EssenceDescription"].value["Locator"].value[0]['URLString'].value) | |
print (_ref.slot_at(sourceMobSlotId)["PhysicalTrackNumber"].value) | |
raise Exception("Cannot resolve url for item, not supported." + str(item)) | |
else: | |
raise Exception("Cannot resolve url for item, not supported.") | |
def get_video_at_frame(f_num,v_tracks,last_frame):#return either "filler" or url to video | |
out_frame = TimelineFrame(None,None,last_frame,f_num) | |
#for track_num, segment in (list(enumerate(v_tracks))): | |
#iterate all video tracks until we found a track that has something to show at the current edit_unit | |
for subtrack_num, v_track in (list(enumerate(v_tracks))): | |
if isinstance(v_track, list): | |
components = v_track | |
else: | |
components = v_track['Components'] | |
c = v_track.component_at_time(framenum) | |
if isinstance(c,aaf2.components.SourceClip): | |
out_frame.start_offset = c["StartTime"].value | |
out_frame.url = get_source_clip_url(c)[0] | |
return out_frame | |
# if url is ok, show frame. if not, check next video track if it has something to show at this position | |
elif(isinstance(c, aaf2.components.Filler)): | |
continue #check other vtracks if they have something to show | |
elif(isinstance(c, aaf2.components.OperationGroup)): | |
#TODO: Operationgroup is very complex, engineer all possibilities | |
for component in components: | |
for _cmp in (component.walk_references()):#walk_ref returns tuple | |
if isinstance(_cmp[0], aaf2.components.SourceClip): | |
out_frame.url = get_source_clip_url(_cmp[0])[0] | |
out_frame.start_offset = _cmp[0]["StartTime"].value | |
return out_frame | |
raise Exception ("Fatal, Could not find SourceClip in Operationgroup for framenum "+ str(f_num)) | |
print("Warning, unexpected issue: no sourceclip found in Operationgroup on frame " + str(f_num)) | |
continue #check other vtracks if they have something to show | |
else: | |
out_frame.url = ("FILLER " + str(type(c))) # TODO: check if other types need to be supported | |
return out_frame | |
out_frame.url = "FINAL FILLER" | |
return (out_Frame) | |
out_frame.url="UNEXPECTED" | |
return out_frame | |
def get_audio_frames_at_time(framenum,a_track,tracknum,last_frame): | |
#Iterate each audio track and find the stuff we need to present to the user | |
out_frame = TimelineFrame(None,None,last_frame,framenum) | |
c = a_track.component_at_time(framenum) | |
if isinstance(c,aaf2.components.SourceClip): | |
out_frame.start_offset = c["StartTime"].value | |
tuple = get_source_clip_url(c) | |
out_frame.url = tuple[0] | |
out_frame.track_num = tuple[1] | |
return out_frame | |
# if url is ok, show frame. if not, check next video track if it has something to show at this position | |
elif(isinstance(c, aaf2.components.Filler)): | |
out_frame.url = "FILLER" | |
return out_frame | |
elif(isinstance(c, aaf2.components.OperationGroup)): | |
for _cmp in (c.walk_references()):#walk_ref returns tuple | |
if isinstance(_cmp[0], aaf2.components.SourceClip): | |
tuple = get_source_clip_url(_cmp[0]) | |
out_frame.url = tuple[0] | |
out_frame.track_num = tuple[1] | |
out_frame.start_offset = _cmp[0]["StartTime"].value | |
return out_frame | |
raise Exception ("Fatal, Could not find SourceClip in Operationgroup for framenum "+ str(f_num)) | |
else: | |
print ("Fatal, unexpected type in audio ") | |
if __name__ == "__main__": | |
f = aaf2.open(sys.argv[1]) | |
for mob in f.content.compositionmobs(): # timeline, most likely just one piece in the aaf file! | |
#GET ALL TRACKS IN TIMELINE | |
video_tracks = (get_tracks (mob,"Picture")) | |
audio_tracks = (get_tracks (mob,"Sound")) | |
sequence_length = (video_tracks[0]["Length"].value) | |
print ("Sequence length " + str(sequence_length)) | |
video_frames = [] | |
last_frame = None | |
#GET ALL "VISIBLE" VIDEO FRAMES | |
for framenum in range(sequence_length): #TODO: check if we can just walk through timeline and see for each frame what file is there... | |
last_frame = get_video_at_frame(framenum,video_tracks,last_frame) | |
video_frames.append(last_frame) | |
#GET ALL AUDIO FRAMES Separately for each a track | |
audio_frames = [] | |
for trck_idx, a_track in (list(enumerate(audio_tracks))): | |
audio_frames.append([]) | |
last_frame = None | |
audio_frames[trck_idx] = [] #initialize new array for this track | |
for framenum in range(sequence_length): | |
last_frame = get_audio_frames_at_time(framenum,a_track,trck_idx,last_frame) | |
audio_frames[trck_idx].append(last_frame) | |
#PROCESS RESULT | |
filev = open("c:\\temp\\V0.txt", "w") | |
filev.write ("ffconcat version 1.0\n") | |
#VIDEO OUTPUT | |
last_frame = TimelineFrame(None,None,None,None) | |
for framenum in range(sequence_length): #TODO: check if we can just walk through timeline and see for each frame what file is there... | |
if (video_frames[framenum].url != last_frame.url ): | |
if not "FILLER" in video_frames[framenum].url: | |
filev.write("file '" + urllib.parse.unquote(video_frames[framenum].url).replace("file:///","").replace("file:","") +"'\n") | |
print ("New File at Pos: " + str(video_frames[framenum].framenum) + " Origin: " + str(video_frames[framenum].start_offset) + " " + video_frames[framenum].url) | |
#todo: count duration in else | |
last_frame = video_frames[framenum] | |
filev.close() | |
#AUDIO OUTPUT | |
for track_idx in range(len(audio_tracks)): #iterate each audio track (one output file per track) | |
filea = open("c:\\temp\\A"+str(track_idx)+".txt", "w") | |
filea.write ("ffconcat version 1.0\n") | |
last_frame = TimelineFrame(None,None,None,None) | |
for framenum in range(sequence_length): # iterate each audio frame | |
audio_frames[track_idx] | |
if (audio_frames[track_idx][framenum].url != last_frame.url ): | |
if not "FILLER" in video_frames[framenum].url: | |
filea.write("file '" + urllib.parse.unquote(audio_frames[track_idx][framenum].url).replace("file:///","").replace("file:","") +"'\n") | |
tc1 = Timecode('25', '00:00:00;00')# TODO: GET FRAMERATE FROM FILE (OR TIMELINE)? | |
tc1.frames = framenum | |
filea.write("inpoint " + str(tc1) + "\n") | |
print ("New AFile Framenum: " + str(audio_frames[track_idx][framenum].framenum) + " Origin: " + str(audio_frames[track_idx][framenum].start_offset) + " " + audio_frames[track_idx][framenum].url + " tracknum: " + str(audio_frames[track_idx][framenum].tracknum)) | |
last_frame = audio_frames[track_idx][framenum] | |
print ("ffmpeg -safe 0 -i \"c:\\temp\\V0.txt\" -map 0:v? -map 1:a? -map 2:a? c:\\temp\\out.mp4") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Unfinished try to parse the timeline out of an aaf file (where effects should be already rendered) in order to create a concat file for ffmpeg encoding from it. Project was stopped because of the unexpected complexity, missing documentation, its hard to test...