Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import time
import psutil
import multiprocessing as mp
from multiprocessing import Process
def f(thread, duty, freq, q):
p = psutil.Process()
p.cpu_affinity([thread])
while True:
while not q.empty():
duty = float(q.get())
t0 = time.time()
while time.time() - t0 < duty / freq:
pass
t1 = time.time()
dest = t0 + 1 / freq
delay = dest - t1
if delay > 0:
time.sleep(delay)
if __name__ == '__main__':
p = psutil.Process()
threads = p.cpu_affinity()
workers = []
ctx = mp.get_context('spawn')
bpp = 4
width = 16
height = 4
fps = 2.0
gain = 1.0
for thread in threads:
q = ctx.Queue()
duty = 1
# Uncomment to fade from 0 to 100% from left to right
# duty = (thread % width) / (width - 1)
p = Process(target=f, args=(thread, duty, fps, q))
p.start()
workers.append({"p": p, "q": q})
# ffmpeg -i input.mp4 -vf scale=16:4,hue=s=0 -r 2 -c:v rawvideo -pix_fmt rgb32 out.rgb
# scale=width:height
# -r : fps
f = open("out.rgb", "rb")
video = f.read()
f.close()
frames = len(video) // (bpp * width * height)
for frame in range(frames):
for j, worker in enumerate(workers):
pixel = video[frame * bpp * width * height + j * bpp] / 256. * gain
worker["q"].put(1.0 - pixel)
time.sleep(1 / fps)
for worker in workers:
worker["p"].kill()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.