Skip to content

Instantly share code, notes, and snippets.

@JacobTDC
Last active March 3, 2024 21:11
Show Gist options
  • Save JacobTDC/524322a78bb0ba5008604d905ccd4270 to your computer and use it in GitHub Desktop.
Save JacobTDC/524322a78bb0ba5008604d905ccd4270 to your computer and use it in GitHub Desktop.
Flipper Zero Video Converter
#!/usr/bin/env python3
import argparse
import locale
import math
import struct
import subprocess
import sys
import textwrap
from fractions import Fraction
from pathlib import Path
import ffmpeg
# set locale to user default
locale.setlocale(locale.LC_ALL, '')
# just some "constants"
BUNDLE_SIGNATURE = "BND!VID"
BUNDLE_VERSION = 1
SCREEN_WIDTH = 128
SCREEN_HEIGHT = 64
# a class to make sure a valid scale is given in args
class VideoScale:
def __init__(self, scale):
[self.width, self.height] = list(map(int, scale.split('x')))
if not (1 <= self.width <= SCREEN_WIDTH and 1 <= self.height <= SCREEN_HEIGHT):
raise argparse.ArgumentTypeError(f"{scale} is not in range 1x1 to {SCREEN_WIDTH:d}x{SCREEN_HEIGHT:d}")
def __str__(self):
return f'{self.width:d}x{self.height:d}'
# a function to make sure a valid threshold is given in args
def Threshold(t):
t = int(t)
if not (0 <= t <= 256):
raise argparse.ArgumentTypeError(f"{t:d} is not in range 0 to 256")
return t
# a function to make sure a valid bayer_scale is given in args
def BayerScale(s):
s = int(s)
if not (0 <= s <= 5):
raise argparse.ArgumentTypeError(f"{s:d} is not in range 0 to 5")
return s
# python uses half round even, but we need to use half round up
# in order to get an accurate frame count, because that's what
# ffmpeg uses when changing frame rates
def half_round_up(fraction):
if (fraction % 1 < Fraction(1, 2)):
return math.floor(fraction)
return math.ceil(fraction)
# setup the argument parser
parser = argparse.ArgumentParser(
description="A utility to convert videos to a format playable on the Flipper Zero.")
parser_exclusive_1 = parser.add_mutually_exclusive_group()
parser.add_argument('source',
type=Path,
help="the source file; must contain a video and audio stream")
parser.add_argument('output',
type=Path,
help="the resulting bundle")
parser_exclusive_1.add_argument('-d', '--dither',
choices=["bayer",
"heckbert",
"floyd_steinberg",
"sierra2",
"sierra2_4a",
"sierra3",
"burkes",
"atkinson",
"none"],
default="sierra3",
metavar='ALGORITHM',
help="the dithering algorithm to use, or 'none' to disable; for a list of options, see FFmpeg's 'paletteuse'; defaults to 'sierra3'")
parser.add_argument('--bayer-scale',
type=BayerScale,
dest='bayer_scale',
help="used with '-d/--dither bayer' to define the scale of the pattern (how much crosshatch is visible), from 0 to 5; defaults to '2'")
parser_exclusive_1.add_argument('-t', '--threshold',
type=Threshold,
help="the threshold to apply when converting to black and white, from 0 to 256; cannot be used with dithering")
parser.add_argument('-f', '--frame-rate',
type=Fraction,
dest='frame_rate',
help="the desired video frame rate, may be a fraction; defaults to source frame rate")
parser.add_argument('-s', '--scale',
type=VideoScale,
dest='scale',
help=f"the desired video size, cannot be larger than {SCREEN_WIDTH:d}x{SCREEN_HEIGHT:d}; default best fit")
parser.add_argument('-r', '--sample-rate',
type=int,
dest='sample_rate',
help="the desired audio sample rate; defaults to the source sample rate")
parser.add_argument('-q', '--quiet',
action='count',
default=0,
help="don't output info to stdout, use twice to silence warnings")
args = parser.parse_args()
# only allow '--bayer-scale` to be used with bayer dithering
if args.bayer_scale != None and args.dither != 'bayer':
parser.error("--bayer-scale can only be used with '-d/--dither bayer'")
# get media information
video_index = None
audio_index = None
ffprobe_result = ffmpeg.probe(args.source, count_packets=None)
for stream in ffprobe_result['streams']:
if stream['disposition']['default']:
if stream['codec_type'] == 'video' and video_index == None:
source_frame_count = int(stream['nb_read_packets'])
source_width = int(stream['width'])
source_height = int(stream['height'])
source_frame_rate = Fraction(stream['r_frame_rate'])
video_index = stream['index']
if stream['codec_type'] == 'audio' and audio_index == None:
source_sample_rate = int(stream['sample_rate'])
audio_index = stream['index']
# display an error if video or audio is missing
if video_index == None:
parser.error("source file does not contain a default video stream")
if audio_index == None:
parser.error("source file does not contain a default audio stream")
# get the video dimensions before padding
if args.scale == None:
# default: maintain aspect ratio and scale to fit screen
scale_factor = max(source_width / SCREEN_WIDTH, source_height / SCREEN_HEIGHT)
pre_pad_width = math.floor(source_width / scale_factor)
frame_height = math.floor(source_height / scale_factor)
else:
# user defined dimensions
pre_pad_width = args.scale.width
frame_height = args.scale.height
# get width after padding and final frame size
frame_width = pre_pad_width + 8 - (pre_pad_width % 8)
frame_size = int(frame_width * frame_height / 8)
# determine sample and frame rates
sample_rate = args.sample_rate or source_sample_rate
frame_rate = args.frame_rate or source_frame_rate
# calculate new frame count
frame_count = half_round_up(source_frame_count * frame_rate / source_frame_rate)
# calculate audio chunk size
audio_chunk_size = (source_frame_count * sample_rate) / (source_frame_rate * frame_count)
# used to calculate which samples to drop to prevent desync
audio_sample_drop_rate = audio_chunk_size % 1
audio_chunk_size = int(audio_chunk_size)
# estimate the final size, used later to check for errors
estimated_file_size = int(
(((frame_width * frame_height) / 8) + audio_chunk_size)
* frame_count + len(BUNDLE_SIGNATURE) + 11)
# print final bundle info
if args.quiet < 1:
print(textwrap.dedent(f'''\
Frame rate: {float(frame_rate):g} fps
Frame count: {frame_count:d} frames
Video scale (before padding): {pre_pad_width:d}x{frame_height:d}
Video scale (after padding): {frame_width:d}x{frame_height:d}
Audio sample rate: {sample_rate:d} Hz
Audio chunk size: {audio_chunk_size:d} bytes
Estimated file size: {estimated_file_size:n} bytes
'''))
if frame_count > source_frame_count:
print(f"{frame_count - source_frame_count:d} frames will be duplicated\n")
if frame_count < source_frame_count:
print(f"{source_frame_count - frame_count:d} frames will be dropped\n")
if args.quiet < 2:
if frame_rate > 30:
print("warning: frame rate is greater than maximum recommended 30 fps\n")
if sample_rate > 48000:
print("warning: sample rate is greater than maximum recommended 48 kHz\n")
# open the output file for writing
output = open(args.output, 'wb')
# specify the input file
input = ffmpeg.input(args.source)
audio_process = (
input[str(audio_index)]
# output raw 8-bit audio
.output('pipe:',
format='u8',
acodec='pcm_u8',
ac=1,
ar=sample_rate)
# only display errors
.global_args('-v', 'error')
.run_async(pipe_stdout=True)
)
scaled_video = (
input[str(video_index)]
# scale the video
.filter('scale', pre_pad_width, frame_height)
# convert to grayscale
.filter('format', 'gray')
# set the frame rate
.filter('fps', frame_rate)
)
if args.threshold != None:
# convert to black and white with threshold
video_input = scaled_video.filter('maskfun',
low=args.threshold - 1,
high=args.threshold - 1,
sum=256,
fill=255)
else:
# the palette used for dithering
palette = ffmpeg.filter([
ffmpeg.input('color=c=black:r=1:d=1:s=8x16', f='lavfi'),
ffmpeg.input('color=c=white:r=1:d=1:s=8x16', f='lavfi')
], 'hstack', 2)
# convert to black and white with dithering
if (args.dither == 'bayer' and args.bayer_scale != None):
# if a bayer_scale was provided
video_input = ffmpeg.filter([scaled_video, palette],
'paletteuse',
new='true',
dither=args.dither,
bayer_scale=args.bayer_scale)
else:
video_input = ffmpeg.filter([scaled_video, palette],
'paletteuse',
new='true',
dither=args.dither)
video_process = (
video_input
# pad the width to make sure it is a multiple of 8
.filter('pad', frame_width, frame_height, -1, 0, 'white')
# output raw video data, one bit per pixel, inverted, and
# disable dithering (we've already handled it)
.output('pipe:',
sws_dither='none',
format='rawvideo',
pix_fmt='monow')
# only display errors
.global_args('-v', 'error')
.run_async(pipe_stdout=True)
)
# header format:
# signature (char[7] / 7s): "BND!VID"
# version (uint8 / B): 1
# frame_count (uint32 / I)
# audio_chunk_size (uint16 / H): sample_rate / frame_rate
# sample_rate (uint16 / H)
# frame_height (uint8 / B)
# frame_width (uint8 / B)
header = struct.pack(f'<{len(BUNDLE_SIGNATURE):d}sBIHHBB',
BUNDLE_SIGNATURE.encode('utf8'),
BUNDLE_VERSION,
frame_count,
audio_chunk_size,
sample_rate,
frame_height,
frame_width)
# write the header to the file
output.write(header)
bytes_written = len(header)
# the number of audio samples that need to be dropped
drop_samples = audio_sample_drop_rate
dropped_samples = 0
for frame_num in range(1, frame_count + 1):
# print current progress every 10 seconds of video
if args.quiet < 1 and (
frame_num % math.floor(frame_rate * 10) == 0 or
frame_num == 1 or
frame_num == frame_count):
print(f"Processing frame {frame_num:>{len(str(frame_count))}d} / {frame_count:d}: {frame_num / frame_count:>7.2%}")
# read a single frame and audio chunk
frame = video_process.stdout.read(frame_size)
audio_chunk = audio_process.stdout.read(audio_chunk_size)
# reverse the bit-order of each byte in the frame
frame_data = bytearray()
for byte in frame:
frame_data.append(int(f'{byte:08b}'[::-1], 2))
# calculate and drop samples; prevents desync
drop_samples += audio_sample_drop_rate
audio_process.stdout.read(int(drop_samples))
dropped_samples += int(drop_samples)
drop_samples %= 1
# write frame and audio data
output.write(frame_data)
output.write(audio_chunk)
bytes_written += len(frame_data) + len(audio_chunk)
# close the file descriptor
output.close()
# wait for ffmpeg processes to finish
video_process.wait()
audio_process.wait()
if args.quiet < 1:
print()
if dropped_samples > 0:
print(f"{dropped_samples:n} audio samples were dropped to prevent desync\n")
print(f"{bytes_written:n} bytes written to {args.output}\n")
if args.quiet < 2:
if bytes_written != estimated_file_size:
print(f"warning: number of bytes written does not match estimated file size, something may have gone wrong\n")
@JacobTDC
Copy link
Author

Replying to @jose1711:

please add error handling for videos which contain no audio. thanks.

So, because of the way the video player works (it's basically just a WAV player that also displays pictures), it requires an audio stream. The best I can do is have it throw an error if no audio stream is detected, or possibly generate an empty audio stream, but for now I'll probably just do the former.

some mkv videos may contain png images (thumbnails) while codec_type is set to video. example:

...

the script should either detect and ignore some streams as for instance this is what i get in r_frame_rate field for such stream: 90000/1.

I'll see what I can do about that. It shouldn't be too hard to just ignore a stream, right?

@jose1711
Copy link

Thank you.

It shouldn't be too hard to just ignore a stream, right?

Yeah, perhaps.. if there are multiple streams, then take the first audio and first video one - although it may still have some caveats such as if the one with the thumbnail happens to be the first one.

@JacobTDC
Copy link
Author

JacobTDC commented Mar 1, 2024

It shouldn't be too hard to just ignore a stream, right?

Yeah, perhaps.. if there are multiple streams, then take the first audio and first video one - although it may still have some caveats such as if the one with the thumbnail happens to be the first one.

It looks like there is always a default video and audio stream, so I should be able to just select those. I'm working on it now.

@JacobTDC
Copy link
Author

JacobTDC commented Mar 1, 2024

Changes

As requested by @jose1711, the script now uses the default streams for files that have multiple audio or video streams. If you find any issues with the implementation, please let me know, and feel free to provide a sample file that replicates the issue.

I also removed the loudnorm filter, as it lowers the audio too much for some videos.

Next on my to-do list is to add an option to select which streams the user would like to encode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment