Skip to content

Instantly share code, notes, and snippets.

@RedBlaze42
Created April 28, 2024 13:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RedBlaze42/10c29a02475d6d510e0c6d2a68608615 to your computer and use it in GitHub Desktop.
Save RedBlaze42/10c29a02475d6d510e0c6d2a68608615 to your computer and use it in GitHub Desktop.
import subprocess
import io
import cv2
class FfmpegVideo():
def __init__(self, output_path, framerate=30):
self.output_path = output_path
self.closed = False
self.ffmpeg_proc = subprocess.Popen(f"ffmpeg -y -loglevel error -framerate {framerate} -f image2pipe -c:v mjpeg -i - -c:v libx264 -b:v 10M -preset ultrafast -r {framerate} {output_path}", shell=True, stdin=subprocess.PIPE)#, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def add_image(self, data):
if self.closed: return
try:
self.ffmpeg_proc.stdin.write(data)
except (ValueError, BrokenPipeError):
print(f"Broken pipe {self.output_path}")
self.closed = True
def add_pil_image(self, pil_image):
with io.BytesIO() as output:
pil_image.save(output, format="JPEG")
self.add_image(output.getvalue())
def add_opencv_image(self, opencv_image):
_, buffer = cv2.imencode(".jpg", opencv_image)
self.add_image(buffer)
def stop(self):
if not self.closed:
self.ffmpeg_proc.stdin.close()
self.ffmpeg_proc.wait()
self.closed = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment