Skip to content

Instantly share code, notes, and snippets.

@ChillFish8
Created July 28, 2023 22:05
Show Gist options
  • Save ChillFish8/1141d40252b770ca586271ce53277bd3 to your computer and use it in GitHub Desktop.
Save ChillFish8/1141d40252b770ca586271ce53277bd3 to your computer and use it in GitHub Desktop.
Fun
import asyncio
import os
import time
import cv2
import numpy
import pygame
import shutil
from threading import Thread
from queue import Queue
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
INTERVAL = 1/30
def stream_frames(queue: Queue):
cap = cv2.VideoCapture(f'./video.webm')
# this reads the first frame
success = True
# so long as vidcap can read the current frame...
x = shutil.get_terminal_size().columns
y = shutil.get_terminal_size().lines
pygame.mixer.init()
pygame.mixer.music.load(f"./badapple.mp3")
while success:
success, image = cap.read()
try:
resized_image = cv2.resize(image, (x, y), interpolation=cv2.INTER_NEAREST)
values = collect_ascii(resized_image.ravel()[0::3])
queue.put(values)
except Exception as e:
print(e)
break
queue.put(None)
class FrameRenderer:
def __init__(self, queue: Queue, fut: asyncio.Future):
self.frames = queue
self.loop = asyncio.get_running_loop()
self.complete = fut
def begin(self, trigger_at: float):
pygame.mixer.music.play()
self.step(trigger_at)
def step(self, trigger_at: float):
try:
frame = self.frames.get_nowait()
if frame is None:
self.complete.set_result(None)
return
os.system("clear")
printed = frame.view(dtype=f'U{frame.size}').item()
print(printed)
next_trigger_at = trigger_at + INTERVAL
self.loop.call_at(next_trigger_at, self.step, next_trigger_at)
except KeyboardInterrupt:
self.complete.set_result(None)
exit()
async def play():
queue = Queue(maxsize=30 * 50) # 50 seconds of leeway.
t = Thread(target=stream_frames, args=(queue,))
t.start()
pygame.mixer.init()
pygame.mixer.music.load(f"./badapple.mp3")
loop = asyncio.get_running_loop()
fut = loop.create_future()
renderer = FrameRenderer(queue, fut)
trigger_at = time.perf_counter() + 3
loop.call_at(trigger_at, renderer.begin, trigger_at)
loop.call_soon(print, "Starting in 3")
loop.call_at(trigger_at - 2, print, "Starting in 2")
loop.call_at(trigger_at - 1, print, "Starting in 1")
await fut
def collect_ascii(data):
arr = numpy.array(data, dtype=numpy.uint8)
lookup_table = numpy.empty(256, dtype=numpy.dtype('U1'))
lookup_table[0:25] = " "
lookup_table[25:100] = "+"
lookup_table[100:125] = "#"
lookup_table[125:200] = "@"
lookup_table[200:] = "M"
char_array = lookup_table[arr]
return char_array
if __name__ == "__main__":
asyncio.run(play())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment