Skip to content

Instantly share code, notes, and snippets.

@jb3
Last active April 26, 2022 22:31
Show Gist options
  • Save jb3/a4bc32d237975cd78cedc7e2ab594ae5 to your computer and use it in GitHub Desktop.
Save jb3/a4bc32d237975cd78cedc7e2ab594ae5 to your computer and use it in GitHub Desktop.
Crash file detector for Discord
import asyncio
import sys
import pprint
loop = asyncio.get_event_loop()
# List of files to check
FILES = [
"/Users/joseph/Movies/coursework_update.mp4",
"/Users/joseph/Movies/coursework_video.mp4",
"/Users/joseph/Movies/2020-06-09 18-34-35.mkv",
"/Users/joseph/video.mp4"
]
async def task(path):
proc = await asyncio.create_subprocess_exec(
# Call ffprobe, a video probing tool
"/usr/local/bin/ffprobe",
# Show errors
"-v", "error",
# Return each frame as a CSV entry including the time, width, height and pixel format
"-show_entries", "frame=pkt_pts_time,width,height,pix_fmt",
# Only process video
"-select_streams", "v",
# Set the output format to CSV
"-of", "csv=p=0",
# Provide the path to the video
path,
stdout=asyncio.subprocess.PIPE,
)
# Store the last frames width, height and pixel format
w, h, fmt = None, None, None
scanned_frames = 0
safe = True
# Iterate over every frame in the video
async for line in proc.stdout:
scanned_frames += 1
# Parse the line as CSV and unpack the values
line = line.decode().strip()
_pkt_time, frame_w, frame_h, frame_fmt = line.split(",")
# If we haven't seen a frame before, init some variables
if w is None:
w = frame_w
if h is None:
h = frame_h
if fmt is None:
fmt = frame_fmt
# If the height, width or pixel format have changed from the
# last frame to the current one then we know the video is unsafe
# and is intended to crash the Discord client.
if w != frame_w or h != frame_h or fmt != frame_fmt:
safe = False
print(f"ERR! Frame is different size/format in {path}")
# Wait for the process to exit
await proc.wait()
# Return some video metadata and whether it is safe or not
return {
"safe": safe,
"filename": path,
"scanned": scanned_frames,
"dimensions": [w, h],
"format": fmt
}
async def main():
tasks = []
for file in FILES:
# Create a new video search task for every file listed above
tasks.append(loop.create_task(task(file)))
# Run all searches in parallel and await the return of all
results = await asyncio.gather(*tasks)
# Print out each result in a pretty format
for result in results:
pprint.pprint(result)
# Start the program
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment