Skip to content

Instantly share code, notes, and snippets.

@pedrovhb
Last active March 16, 2023 18:40
Show Gist options
  • Save pedrovhb/a8d8249a0008b3a0a80aff32f943d729 to your computer and use it in GitHub Desktop.
Save pedrovhb/a8d8249a0008b3a0a80aff32f943d729 to your computer and use it in GitHub Desktop.
from __future__ import annotations
from dataclasses import dataclass
from typing import Tuple, List, Union
import cairo
from abc import ABC, abstractmethod
@dataclass
class HasPosition(ABC):
x: float
y: float
@dataclass(kw_only=True)
class Shape(HasPosition, ABC):
angle: float = 0
scale: float = 1
color: Tuple[float, float, float] = (0, 0, 0)
pivot_x: float = 0
pivot_y: float = 0
layer: int = 0
@abstractmethod
def draw(self, ctx: cairo.Context):
pass
def translate(self, dx: float, dy: float):
self.x += dx
self.y += dy
def rotate(self, d_angle: float):
self.angle += d_angle
def set_scale(self, scale_x: float, scale_y: float = None):
if scale_y is None:
scale_y = scale_x
self.scale *= scale_x
self.scale *= scale_y
@dataclass
class Group(Shape):
elements: List[Union[Shape, 'Group']] = None
def __post_init__(self):
if self.elements is None:
self.elements = []
def add_element(self, element: Union[Shape, 'Group']):
self.elements.append(element)
def draw(self, ctx: cairo.Context):
ctx.save()
ctx.translate(self.x, self.y)
ctx.rotate(self.angle)
ctx.scale(self.scale, self.scale)
for element in self.elements:
element.draw(ctx)
ctx.restore()
@dataclass
class Circle(Shape):
radius: float
def draw(self, ctx: cairo.Context):
ctx.save()
ctx.translate(self.x, self.y)
ctx.rotate(self.angle)
ctx.scale(self.scale, self.scale)
ctx.set_source_rgb(*self.color)
ctx.arc(self.pivot_x, self.pivot_y, self.radius, 0, 2 * 3.14159)
ctx.fill()
ctx.restore()
@dataclass
class Rectangle(Shape):
width: float
height: float
def draw(self, ctx: cairo.Context):
ctx.save()
ctx.translate(self.x, self.y)
ctx.rotate(self.angle)
ctx.scale(self.scale, self.scale)
ctx.set_source_rgb(*self.color)
ctx.rectangle(self.pivot_x, self.pivot_y, self.width, self.height)
ctx.fill()
ctx.restore()
@dataclass
class Arrow(Shape):
length: float
def draw(self, ctx: cairo.Context):
ctx.set_source_rgb(*self.color)
ctx.save()
ctx.translate(self.x, self.y)
ctx.rotate(self.angle)
ctx.scale(self.scale, self.scale)
ctx.move_to(0, 0)
ctx.line_to(self.length, 0)
ctx.line_to(self.length - 10, -5)
ctx.move_to(self.length, 0)
ctx.line_to(self.length - 10, 5)
ctx.set_line_width(2)
ctx.stroke()
ctx.restore()
class Canvas:
def __init__(self, width: int, height: int):
self.width = width
self.height = height
self.surface = cairo.ImageSurface(cairo.FORMAT_RGB24, width, height)
self.ctx = cairo.Context(self.surface)
self.shapes: List[Shape] = []
def add_shape(self, shape: Shape):
self.shapes.append(shape)
def draw(self):
sorted_shapes = sorted(self.shapes, key=lambda shape: shape.layer)
for shape in sorted_shapes:
shape.draw(self.ctx)
def get_rgb24_blob(self) -> bytes:
self.draw()
self.surface.flush()
return self.surface.get_data()
# def chunker():
# for i in range(len(data)//4):
# chunk = data[i*4:i*4+3]
# yield chunk
# return b"".join(chunker())
def write_to_png(self) -> None:
self.draw()
self.surface.flush()
self.surface.write_to_png('shapes.png')
canvas = Canvas(640, 480)
circle = Circle(100, 100, 30, color=(0, 0, 1))
rectangle = Rectangle(200, 200, 100, 50, color=(0.117, 0.117, 1))
arrow = Arrow(300, 300, 50, angle=3.14159 * 0.5, color=(1, 0, 0))
canvas.add_shape(circle)
canvas.add_shape(rectangle)
canvas.add_shape(arrow)
rgb24_blob = canvas.get_rgb24_blob()
# Save the blob to a file
with open('output.rgb24', 'wb') as f:
f.write(rgb24_blob)
canvas.write_to_png()
import time
# n_runs = 1000
# t = time.perf_counter()
#
# with open("t.rgb", "wb") as fd:
# for i in range(n_runs):
# canvas = Canvas(640, 480)
#
# circle = Circle(100, i/500, 30, color=(0, 0, 1), layer=1)
# rectangle = Rectangle(200, 200, 100, 50, color=(0.117, 0.117, 1), layer=2)
#
# group = Group(i, 300)
# group.add_element(Circle(0, 0, 30, color=(0, 1, 1)))
# group.add_element(Circle(0, 100, 30, color=(1, 1, 0)))
# group.add_element(Circle(0, 200, 30, color=(0.5, 1, 0)))
# group2 = Group(300, 300)
# group2.add_element(Circle(0, 0, 30, color=(i/1000, 1, 1)))
# group2.add_element(Circle(0, 100, 20, color=(1, 1, 0)))
# group2.add_element(Circle(0, 200, 50, color=(0.5, 1, 0)))
#
# group2.rotate(-0.3)
# group2.set_scale(0.5*((10+i)/500))
#
# canvas.add_shape(group)
# canvas.add_shape(group2)
# group.rotate(3)
# blob = canvas.get_rgb24_blob()
# fd.write(blob)
# print(i)
# t2 = time.perf_counter()
# print(f"{(t2 - t) / n_runs:.5f} secs per run")
# # then - ffmpeg -y -pix_fmt rgb0 -format rawvideo -s 640x480 -i t.rgb out.mp4
#
# canvas.write_to_png()
import asyncio
import shlex
class FFmpegEncoder:
def __init__(self, output_file_path: str, width: int, height: int, pix_fmt: str = "rgb0"):
self.output_file_path = output_file_path
self.width = width
self.height = height
self.pix_fmt = pix_fmt
self.process = None
async def _initialize(self):
loop = asyncio.get_running_loop()
cmd = (
f"ffmpeg -y -f rawvideo -pix_fmt {self.pix_fmt} -s {self.width}x{self.height} -i - "
f"-c:v libx264 -pix_fmt yuv420p {shlex.quote(self.output_file_path)}"
)
self.process = await asyncio.create_subprocess_exec(
*shlex.split(cmd),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
# loop=loop,
)
async def encode_frame(self, frame: bytes):
if self.process is None:
await self._initialize()
if self.process.stdin:
self.process.stdin.write(frame)
await self.process.stdin.drain()
async def finish_encoding(self):
if self.process is None:
return
if self.process.stdin:
self.process.stdin.close()
await self.process.wait()
async def main(output_file_path: str):
# Initialize the FFmpegEncoder instance with the same width and height as the input video
encoder = FFmpegEncoder(output_file_path, 640, 480)
n_runs = 1000
t = time.perf_counter()
queue = asyncio.Queue()
async def do_encode():
while True:
frame = await queue.get()
if frame is None:
break
else:
await encoder.encode_frame(frame)
encode_task = asyncio.create_task(do_encode())
for i in range(n_runs):
canvas = Canvas(640, 480)
circle = Circle(100, i/500, 30, color=(0, 0, 1), layer=1)
rectangle = Rectangle(200, 200, 100, 50, color=(0.117, 0.117, 1), layer=2)
group = Group(i, 300)
group.add_element(Circle(0, 0, 30, color=(0, 1, 1)))
group.add_element(Circle(0, 100, 30, color=(1, 1, 0)))
group.add_element(Circle(0, 200, 30, color=(0.5, 1, 0)))
group2 = Group(300, 300)
group2.add_element(Circle(0, 0, 30, color=(i/1000, 1, 1)))
group2.add_element(Circle(0, 100, 20, color=(1, 1, 0)))
group2.add_element(Circle(0, 200, 50, color=(0.5, 1, 0)))
group2.rotate(-0.3)
group2.set_scale(0.5*((10+i)/500))
canvas.add_shape(group)
canvas.add_shape(group2)
group.rotate(3)
blob = canvas.get_rgb24_blob()
await queue.put(blob)
await queue.put(None)
await encode_task
t2 = time.perf_counter()
print(f"{(t2 - t) / n_runs:.5f} secs per run")
if __name__ == "__main__":
output_file_path = "file.mp4"
asyncio.run(main(output_file_path))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment