Skip to content

Instantly share code, notes, and snippets.

@MaxXSoft
Last active November 18, 2023 08:14
Show Gist options
  • Save MaxXSoft/9b24bca1edb3cd08349e8e798a5d989c to your computer and use it in GitHub Desktop.
Save MaxXSoft/9b24bca1edb3cd08349e8e798a5d989c to your computer and use it in GitHub Desktop.
MaxXing's PV tools.
#!/usr/bin/env python3
'''
Tool for making datamosh style videos with ffmpeg.
Written by MaxXing, licensed under GPL-v3.
'''
import subprocess
import os
import shutil
import sys
import argparse
import atexit
import re
import random
# Regex for extracting the frame number.
FRAME_NO = re.compile(r'\] n:(\d+?)\.')
def eprint(*args, **kwargs):
'''
Prints to `stderr`.
'''
print(*args, file=sys.stderr, **kwargs)
# Parse command line arguments.
parser = argparse.ArgumentParser(description='A datamosh video generator.')
parser.add_argument('input', type=str, help='the input video file')
parser.add_argument('-o', '--output', type=str, default='output.mov', help='the output video file, default to `output.mov`')
parser.add_argument('-d', '--work_dir', type=str, default='datamosh', help='the working directory, default to `datamosh`')
parser.add_argument('-f', '--force', action='store_true', help='remove the working directory if it already exists')
parser.add_argument('-r', '--rm', action='store_true', help='remove the working directory before exit')
args = parser.parse_args()
# Create working directory.
if args.force and os.path.exists(args.work_dir):
shutil.rmtree(args.work_dir)
os.makedirs(args.work_dir, exist_ok=True)
# Setup exit handler.
def cleanup() -> None:
if args.rm and os.path.exists(args.work_dir):
shutil.rmtree(args.work_dir)
atexit.register(cleanup)
# Generate XVID.
xvid = f'{args.work_dir}/xvid.avi'
if not os.path.exists(xvid):
subprocess.run(['ffmpeg', '-i', args.input, '-vcodec', 'libxvid',
'-q:v', '1', '-g', '1000', '-qmin', '1', '-qmax', '1',
'-flags', 'qpel+mv4', '-an', '-y', xvid])
# Extract frames.
frames_dir = f'{args.work_dir}/frames'
if os.path.exists(frames_dir):
shutil.rmtree(frames_dir)
os.makedirs(frames_dir)
subprocess.run(['ffmpeg', '-i', xvid, '-vcodec', 'copy',
'-start_number', '0', f'{frames_dir}/f_%05d.raw'])
# Get name of all frames.
frames = os.listdir(frames_dir)
frames.sort()
frames = [f'{frames_dir}/{f}' for f in frames]
# Find I-frames.
process = subprocess.Popen(['ffmpeg', '-i', xvid, '-vf', "select='eq(pict_type,PICT_TYPE_I)'",
'-vsync', '2', '-f', 'null', '/dev/null', '-loglevel', 'debug', '-hide_banner'],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _ = process.communicate()
output = stdout.decode('utf-8')
iframes = []
for line in output.splitlines():
if 'pict_type:I' in line:
iframes.append(int(FRAME_NO.findall(line)[0]))
# Remove I-frames.
for f in iframes:
if f:
file = f'{frames_dir}/f_{f:05}.raw'
if os.path.exists(file):
os.remove(file)
# Find P-frames.
pframes = os.listdir(frames_dir)
pframes.sort()
pframes.pop(0)
pframes = [f'{frames_dir}/{f}' for f in pframes]
# Fill removed frames with other P-frames.
for i, f in enumerate(frames):
if not os.path.exists(f):
j = i + 1
while j < len(frames):
candidate = f'{frames_dir}/f_{j:05}.raw'
j += 1
if os.path.exists(candidate):
shutil.copy(candidate, f)
break
else:
pframe = random.choice(pframes)
eprint(f'Warning: picked frame {pframe} for frame {f}')
shutil.copy(pframe, f)
# Assemble frames.
frames_avi = f'{args.work_dir}/frames.avi'
with open(frames_avi, 'wb') as out_file:
for frame in frames:
with open(f'{frame}', 'rb') as f:
out_file.write(f.read())
# Generate output.
subprocess.run(['ffmpeg', '-i', frames_avi, '-vcodec', 'h264', args.output])
#!/usr/bin/env python3
from mido import MidiFile, tick2second, MidiTrack
'''
Tool for generating Apple Motion 5 time markers from MIDI files.
Written by MaxXing, licensed under GPL-v3.
'''
TIME_MARKER = ''' <timemarker>
<inpoint>{time} {resolution} 1 0</inpoint>
<color>0</color>
<type>0</type>
</timemarker>'''
def get_tempo(mid: MidiFile) -> int:
'''
Returns the tempo of the given MIDI file.
'''
for track in mid.tracks:
for m in track:
if m.type == 'set_tempo':
return m.tempo
raise RuntimeError('tempo not found')
def track_to_secs(track: MidiTrack, ticks_per_beat: int, tempo: int) -> list[float]:
'''
Converts the given MIDI track to a list of time markers in second.
'''
secs = []
ticks = 0
for m in track:
ticks += m.time
if m.type == 'note_on':
secs.append(tick2second(ticks, ticks_per_beat, tempo))
return secs
def secs_to_markers(secs: list[float], fps: int) -> str:
'''
Converts the given second list to a time marker set XML.
'''
tms = '\n'.join(map(lambda s: TIME_MARKER.format(time=int(s * fps), resolution=fps), secs))
return f'<timemarkerset>\n{tms}\n</timemarkerset>'
def midi_to_markers(file: str, fps: int) -> str:
'''
Converts the given MIDI file to a time marker set XML.
'''
result = ''
with MidiFile(file) as mid:
tempo = get_tempo(mid)
for track in mid.tracks:
result += f'<!-- {track.name} -->\n'
result += secs_to_markers(track_to_secs(track, mid.ticks_per_beat, tempo), fps) + '\n'
return result.strip()
if __name__ == '__main__':
import sys
if len(sys.argv) < 3:
print(f'usage: {sys.argv[0]} MIDI_FILE FPS')
exit(1)
file = sys.argv[1]
fps = int(sys.argv[2])
print(midi_to_markers(file, fps))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment