Skip to content

Instantly share code, notes, and snippets.

@justfoolingaround
Created July 11, 2024 13:49
Show Gist options
  • Save justfoolingaround/2412b9df7494acd73488f3ddf8aa5d62 to your computer and use it in GitHub Desktop.
Save justfoolingaround/2412b9df7494acd73488f3ddf8aa5d62 to your computer and use it in GitHub Desktop.
A seekable OPUS discord.AudioSource sub-class
"""
Author: KR (@justfoolingaround)
A semi-safe code implementation for buffering and
seeking through a discord.AudioSource object.
Ensure that there are proper checks to forbid live
streams and larger files since we're using in-memory
buffering.
"""
import math
import subprocess
import threading
from functools import cached_property
from discord.oggparse import OggStream
from discord.player import AudioSource
class SeekableOpusSource(AudioSource):
SIZE_UPPER_LIMIT = 50 * 1024 * 1024
DURATION_UPPER_LIMIT = 2 * 60 * 60
FRAME_SIZE = 20 / 1000
def __init__(
self,
src: str,
codec_name: str,
codec_type: str,
duration: float,
size: int,
bitrate: int,
probe_score: int,
*,
target_bitrate: int = 128000,
ffmpeg_options: list = None,
):
if size > self.SIZE_UPPER_LIMIT:
raise ValueError("File size exceeds the upper limit")
if duration > self.DURATION_UPPER_LIMIT:
raise ValueError("Duration exceeds the upper limit")
self.src = src
self.codec_name = codec_name
self.codec_type = codec_type
self.size = size
self.probe_score = probe_score
self.duration = duration
self.bitrate = bitrate
self.packet_pos = 0
self.packets = []
self.proc = subprocess.Popen(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-i",
self.src,
*(ffmpeg_options if ffmpeg_options else []),
"-f",
"opus",
"-ar",
"48000",
"-b:a",
str(target_bitrate),
"-",
],
stdout=subprocess.PIPE,
)
threading.Thread(target=self.__buffer_thread).start()
self.closed_event = threading.Event()
@cached_property
def total_packet_count(self):
return math.ceil(self.duration / SeekableOpusSource.FRAME_SIZE)
def read(self):
if self.packet_pos >= self.total_packet_count:
return b""
while self.packet_pos >= len(self.packets):
pass
packet = self.packets[self.packet_pos]
self.packet_pos += 1
return packet
def cleanup(self) -> None:
self.closed_event.set()
if self.proc is not None:
self.proc.kill()
self.proc.wait()
return super().cleanup()
def is_opus(self) -> bool:
return True
def seek(self, seconds: float):
self.packet_pos = max(
round(seconds / SeekableOpusSource.FRAME_SIZE), self.total_packet_count
)
@property
def pos(self):
return self.packet_pos * SeekableOpusSource.FRAME_SIZE
@property
def buffer_pos(self):
return len(self.packets) * SeekableOpusSource.FRAME_SIZE
def __buffer_thread(self):
main_thread = threading.main_thread()
for packet in OggStream(self.proc.stdout).iter_packets():
if main_thread.is_alive() and not self.closed_event.is_set():
self.packets.append(packet)
else:
break
@classmethod
def from_probe(cls, src: str, *, target_bitrate=128000, ffmpeg_options=None):
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"stream=codec_type,codec_name,duration:format=size,bit_rate,probe_score",
"-of",
"default=nw=1:nk=1",
src,
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
proc.wait()
data = proc.stdout.read().decode("utf-8")
(
codec_type,
codec_name,
duration,
size,
bit_rate,
probe_score,
) = data.splitlines()
return cls(
src,
codec_type,
codec_name,
float(duration),
int(size),
int(bit_rate),
int(probe_score),
target_bitrate=target_bitrate,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment