Skip to content

Instantly share code, notes, and snippets.

@ChillFish8
Last active July 28, 2023 22:35
Show Gist options
  • Save ChillFish8/01983d43956fe95cb5b0c819e8bfcc37 to your computer and use it in GitHub Desktop.
Save ChillFish8/01983d43956fe95cb5b0c819e8bfcc37 to your computer and use it in GitHub Desktop.
Smoother
import asyncio
import os
import time
import cv2
import numpy
import pygame
import shutil
from threading import Thread
from queue import Queue
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
INTERVAL = 1/30
def stream_frames(queue: Queue):
cap = cv2.VideoCapture(f'./video.webm')
# this reads the first frame
success = True
# so long as vidcap can read the current frame...
x = shutil.get_terminal_size().columns
y = shutil.get_terminal_size().lines
pygame.mixer.init()
pygame.mixer.music.load(f"./badapple.mp3")
while success:
success, image = cap.read()
try:
resized_image = cv2.resize(image, (x, y), interpolation=cv2.INTER_NEAREST)
values = collect_ascii(resized_image.ravel()[0::3])
queue.put(values)
except Exception as e:
print(e)
break
queue.put(None)
class FrameRenderer:
def __init__(self, queue: Queue, fut: asyncio.Future):
self.frames = queue
self.loop = asyncio.get_running_loop()
self.complete = fut
self.counter = 0
def begin(self, trigger_at: float):
pygame.mixer.music.play()
self.step(trigger_at)
def step(self, trigger_at: float):
try:
self.counter += 1
frame = self.frames.get_nowait()
if frame is None:
self.complete.set_result(None)
return
os.system("cls")
next_trigger_at = trigger_at + INTERVAL
self.loop.call_at(next_trigger_at, self.step, next_trigger_at)
printed = frame.view(dtype=f'U{frame.size}').item()
print(printed, flush=True)
except KeyboardInterrupt:
pygame.mixer.music.stop()
self.complete.set_result(None)
exit()
async def play():
queue = Queue(maxsize=30 * 5) # 50 seconds of leeway.
t = Thread(target=stream_frames, args=(queue,))
t.start()
pygame.mixer.init()
pygame.mixer.music.load(f"./badapple.mp3")
loop = asyncio.get_running_loop()
fut = loop.create_future()
renderer = FrameRenderer(queue, fut)
trigger_at = time.perf_counter() + 3
loop.call_at(trigger_at, renderer.begin, trigger_at)
loop.call_at(trigger_at - 3, print, "Starting in 3")
loop.call_at(trigger_at - 2, print, "Starting in 2")
loop.call_at(trigger_at - 1, print, "Starting in 1")
await fut
def collect_ascii(data):
arr = numpy.array(data, dtype=numpy.uint8)
lookup_table = numpy.empty(256, dtype=numpy.dtype('U1'))
lookup_table[0:25] = " "
lookup_table[25:100] = "+"
lookup_table[100:125] = "#"
lookup_table[125:200] = "@"
lookup_table[200:] = "M"
char_array = lookup_table[arr]
return char_array
if __name__ == "__main__":
try:
asyncio.run(play())
finally:
pygame.mixer.music.stop()
pygame.mixer.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment