Skip to content

Instantly share code, notes, and snippets.

@kbeckmann
Last active January 3, 2024 13:59
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kbeckmann/41254cc559ee4917913e522cc529a4e5 to your computer and use it in GitHub Desktop.
Save kbeckmann/41254cc559ee4917913e522cc529a4e5 to your computer and use it in GitHub Desktop.
import time
import psutil
import multiprocessing as mp
from multiprocessing import Process
def f(thread, duty, freq, q):
p = psutil.Process()
p.cpu_affinity([thread])
while True:
while not q.empty():
duty = float(q.get())
t0 = time.time()
while time.time() - t0 < duty / freq:
pass
t1 = time.time()
dest = t0 + 1 / freq
delay = dest - t1
if delay > 0:
time.sleep(delay)
if __name__ == '__main__':
p = psutil.Process()
threads = p.cpu_affinity()
workers = []
ctx = mp.get_context('spawn')
bpp = 4
width = 16
height = 4
fps = 2.0
gain = 1.0
for thread in threads:
q = ctx.Queue()
duty = 1
# Uncomment to fade from 0 to 100% from left to right
# duty = (thread % width) / (width - 1)
p = Process(target=f, args=(thread, duty, fps, q))
p.start()
workers.append({"p": p, "q": q})
# ffmpeg -i input.mp4 -vf scale=16:4,hue=s=0 -r 2 -c:v rawvideo -pix_fmt rgb32 out.rgb
# scale=width:height
# -r : fps
f = open("out.rgb", "rb")
video = f.read()
f.close()
frames = len(video) // (bpp * width * height)
for frame in range(frames):
for j, worker in enumerate(workers):
pixel = video[frame * bpp * width * height + j * bpp] / 256. * gain
worker["q"].put(1.0 - pixel)
time.sleep(1 / fps)
for worker in workers:
worker["p"].kill()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment