Created
November 11, 2023 07:13
-
-
Save damp11113/2aeda85c49e796f09941a6286bd0dde4 to your computer and use it in GitHub Desktop.
PyOgg opus encoder add set_bitrates and set_bandwidth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ctypes | |
from typing import Optional, Union, ByteString | |
from . import opus | |
from .pyogg_error import PyOggError | |
class OpusEncoder: | |
"""Encodes PCM data into Opus frames.""" | |
def __init__(self) -> None: | |
self._encoder: Optional[ctypes.pointer] = None | |
self._channels: Optional[int] = None | |
self._samples_per_second: Optional[int] = None | |
self._application: Optional[int] = None | |
self._max_bytes_per_frame: Optional[opus.opus_int32] = None | |
self._output_buffer: Optional[ctypes.Array] = None | |
self._output_buffer_ptr: Optional[ctypes.pointer] = None | |
# An output buffer of 4,000 bytes is recommended in | |
# https://opus-codec.org/docs/opus_api-1.3.1/group__opus__encoder.html | |
self.set_max_bytes_per_frame(4000) | |
# | |
# User visible methods | |
# | |
def set_channels(self, n: int) -> None: | |
"""Set the number of channels. | |
n must be either 1 or 2. | |
""" | |
if self._encoder is None: | |
if n < 0 or n > 2: | |
raise PyOggError( | |
"Invalid number of channels in call to "+ | |
"set_channels()" | |
) | |
self._channels = n | |
else: | |
raise PyOggError( | |
"Cannot change the number of channels after "+ | |
"the encoder was created. Perhaps "+ | |
"set_channels() was called after encode()?" | |
) | |
def set_sampling_frequency(self, samples_per_second: int) -> None: | |
"""Set the number of samples (per channel) per second. | |
This must be one of 8000, 12000, 16000, 24000, or 48000. | |
Regardless of the sampling rate and number of channels | |
selected, the Opus encoder can switch to a lower audio | |
bandwidth or number of channels if the bitrate selected is | |
too low. This also means that it is safe to always use 48 | |
kHz stereo input and let the encoder optimize the | |
encoding. | |
""" | |
if self._encoder is None: | |
if samples_per_second in [8000, 12000, 16000, 24000, 48000]: | |
self._samples_per_second = samples_per_second | |
else: | |
raise PyOggError( | |
"Specified sampling frequency "+ | |
"({:d}) ".format(samples_per_second)+ | |
"was not one of the accepted values" | |
) | |
else: | |
raise PyOggError( | |
"Cannot change the sampling frequency after "+ | |
"the encoder was created. Perhaps "+ | |
"set_sampling_frequency() was called after encode()?" | |
) | |
def set_application(self, application: str) -> None: | |
"""Set the encoding mode. | |
This must be one of 'voip', 'audio', or 'restricted_lowdelay'. | |
'voip': Gives best quality at a given bitrate for voice | |
signals. It enhances the input signal by high-pass | |
filtering and emphasizing formants and | |
harmonics. Optionally it includes in-band forward error | |
correction to protect against packet loss. Use this mode | |
for typical VoIP applications. Because of the enhancement, | |
even at high bitrates the output may sound different from | |
the input. | |
'audio': Gives best quality at a given bitrate for most | |
non-voice signals like music. Use this mode for music and | |
mixed (music/voice) content, broadcast, and applications | |
requiring less than 15 ms of coding delay. | |
'restricted_lowdelay': configures low-delay mode that | |
disables the speech-optimized mode in exchange for | |
slightly reduced delay. This mode can only be set on an | |
newly initialized encoder because it changes the codec | |
delay. | |
""" | |
if self._encoder is not None: | |
raise PyOggError( | |
"Cannot change the application after "+ | |
"the encoder was created. Perhaps "+ | |
"set_application() was called after encode()?" | |
) | |
if application == "voip": | |
self._application = opus.OPUS_APPLICATION_VOIP | |
elif application == "audio": | |
self._application = opus.OPUS_APPLICATION_AUDIO | |
elif application == "restricted_lowdelay": | |
self._application = opus.OPUS_APPLICATION_RESTRICTED_LOWDELAY | |
else: | |
raise PyOggError( | |
"The application specification '{:s}' ".format(application)+ | |
"wasn't one of the accepted values." | |
) | |
def set_max_bytes_per_frame(self, max_bytes: int) -> None: | |
"""Set the maximum number of bytes in an encoded frame. | |
Size of the output payload. This may be used to impose an | |
upper limit on the instant bitrate, but should not be used | |
as the only bitrate control. | |
TODO: Use OPUS_SET_BITRATE to control the bitrate. | |
""" | |
self._max_bytes_per_frame = opus.opus_int32(max_bytes) | |
OutputBuffer = ctypes.c_ubyte * max_bytes | |
self._output_buffer = OutputBuffer() | |
self._output_buffer_ptr = ( | |
ctypes.cast(ctypes.pointer(self._output_buffer), | |
ctypes.POINTER(ctypes.c_ubyte)) | |
) | |
def set_bitrates(self, bitrate: int) -> None: | |
# If we haven't already created an encoder, do so now | |
if self._encoder is None: | |
self._encoder = self._create_encoder() | |
result = opus.opus_encoder_ctl( | |
self._encoder, | |
opus.OPUS_SET_BITRATE_REQUEST, | |
bitrate | |
) | |
if result != opus.OPUS_OK: | |
raise PyOggError( | |
"Failed to set bitrates " + | |
"the Opus encoder: " + | |
opus.opus_strerror(result).decode("utf") | |
) | |
def set_bandwidth(self, bandwidth="fullband") -> None: | |
""" | |
narrowband: | |
Narrowband typically refers to a limited range of frequencies suitable for voice communication. | |
mediumband (unsupported in libopus 1.3+): | |
Mediumband extends the frequency range compared to narrowband, providing better audio quality. | |
wideband: | |
Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband. | |
superwideband: | |
Superwideband extends the frequency range beyond wideband, further enhancing audio quality. | |
fullband (default): | |
Fullband provides the widest frequency range among the listed options, offering the highest audio quality. | |
""" | |
# If we haven't already created an encoder, do so now | |
if self._encoder is None: | |
self._encoder = self._create_encoder() | |
if bandwidth == "narrowband": | |
reqband = opus.OPUS_BANDWIDTH_NARROWBAND | |
elif bandwidth == "mediumband": | |
reqband = opus.OPUS_BANDWIDTH_MEDIUMBAND | |
elif bandwidth == "wideband": | |
reqband = opus.OPUS_BANDWIDTH_WIDEBAND | |
elif bandwidth == "superwideband": | |
reqband = opus.OPUS_BANDWIDTH_SUPERWIDEBAND | |
else: | |
reqband = opus.OPUS_BANDWIDTH_FULLBAND | |
result = opus.opus_encoder_ctl( | |
self._encoder, | |
opus.OPUS_SET_BANDWIDTH_REQUEST, | |
reqband | |
) | |
if result != opus.OPUS_OK: | |
raise PyOggError( | |
"Failed to set bandwidth " + | |
"the Opus encoder: " + | |
opus.opus_strerror(result).decode("utf") | |
) | |
def encode(self, pcm: Union[bytes, bytearray, memoryview]) -> memoryview: | |
"""Encodes PCM data into an Opus frame. | |
`pcm` must be formatted as bytes-like, with each sample taking | |
two bytes (signed 16-bit integers; interleaved left, then | |
right channels if in stereo). | |
If `pcm` is not writeable, a copy of the array will be made. | |
""" | |
# If we haven't already created an encoder, do so now | |
if self._encoder is None: | |
self._encoder = self._create_encoder() | |
# Sanity checks also satisfy mypy type checking | |
assert self._channels is not None | |
assert self._samples_per_second is not None | |
assert self._output_buffer is not None | |
# Calculate the effective frame duration of the given PCM | |
# data. Calculate it in units of 0.1ms in order to avoid | |
# floating point comparisons. | |
bytes_per_sample = 2 | |
frame_size = ( | |
len(pcm) # bytes | |
// bytes_per_sample | |
// self._channels | |
) | |
frame_duration = ( | |
(10*frame_size) | |
// (self._samples_per_second//1000) | |
) | |
# Check that we have a valid frame size | |
if int(frame_duration) not in [25, 50, 100, 200, 400, 600]: | |
raise PyOggError( | |
"The effective frame duration ({:.1f} ms) " | |
.format(frame_duration/10)+ | |
"was not one of the acceptable values." | |
) | |
# Create a ctypes object sharing the memory of the PCM data | |
PcmCtypes = ctypes.c_ubyte * len(pcm) | |
try: | |
# Attempt to share the PCM memory | |
# Unfortunately, as at 2020-09-27, the type hinting for | |
# read-only and writeable buffer protocols was a | |
# work-in-progress. The following only works for writable | |
# cases, but the method's parameters include a read-only | |
# possibility (bytes), thus we ignore mypy's error. | |
pcm_ctypes = PcmCtypes.from_buffer(pcm) # type: ignore[arg-type] | |
except TypeError: | |
# The data must be copied if it's not writeable | |
pcm_ctypes = PcmCtypes.from_buffer_copy(pcm) | |
# Create a pointer to the PCM data | |
pcm_ptr = ctypes.cast( | |
pcm_ctypes, | |
ctypes.POINTER(opus.opus_int16) | |
) | |
# Create an int giving the frame size per channel | |
frame_size_int = ctypes.c_int(frame_size) | |
# Encode PCM | |
result = opus.opus_encode( | |
self._encoder, | |
pcm_ptr, | |
frame_size_int, | |
self._output_buffer_ptr, | |
self._max_bytes_per_frame | |
) | |
# Check for any errors | |
if result < 0: | |
raise PyOggError( | |
"An error occurred while encoding to Opus format: "+ | |
opus.opus_strerror(result).decode("utf") | |
) | |
# Get memoryview of buffer so that the slice operation doesn't | |
# copy the data. | |
# | |
# Unfortunately, as at 2020-09-27, the type hints for | |
# memoryview do not include ctype arrays. This is because | |
# there is no currently accepted manner to label a class as | |
# supporting the buffer protocol. However, it's clearly a | |
# work in progress. For more information, see: | |
# * https://bugs.python.org/issue27501 | |
# * https://github.com/python/typing/issues/593 | |
# * https://github.com/python/typeshed/pull/4232 | |
mv = memoryview(self._output_buffer) # type: ignore | |
# Cast the memoryview to char | |
mv = mv.cast('c') | |
# Slice just the valid data from the memoryview | |
valid_data_as_bytes = mv[:result] | |
# DEBUG | |
# Convert memoryview back to ctypes instance | |
Buffer = ctypes.c_ubyte * len(valid_data_as_bytes) | |
buf = Buffer.from_buffer( valid_data_as_bytes ) | |
# Convert PCM back to pointer and dump 4,000-byte buffer | |
ptr = ctypes.cast( | |
buf, | |
ctypes.POINTER(ctypes.c_ubyte) | |
) | |
return valid_data_as_bytes | |
def get_algorithmic_delay(self): | |
"""Gets the total samples of delay added by the entire codec. | |
This can be queried by the encoder and then the provided | |
number of samples can be skipped on from the start of the | |
decoder's output to provide time aligned input and | |
output. From the perspective of a decoding application the | |
real data begins this many samples late. | |
The decoder contribution to this delay is identical for all | |
decoders, but the encoder portion of the delay may vary from | |
implementation to implementation, version to version, or even | |
depend on the encoder's initial configuration. Applications | |
needing delay compensation should call this method rather than | |
hard-coding a value. | |
""" | |
# If we haven't already created an encoder, do so now | |
if self._encoder is None: | |
self._encoder = self._create_encoder() | |
# Obtain the algorithmic delay of the Opus encoder. See | |
# https://tools.ietf.org/html/rfc7845#page-27 | |
delay = opus.opus_int32() | |
result = opus.opus_encoder_ctl( | |
self._encoder, | |
opus.OPUS_GET_LOOKAHEAD_REQUEST, | |
ctypes.pointer(delay) | |
) | |
if result != opus.OPUS_OK: | |
raise PyOggError( | |
"Failed to obtain the algorithmic delay of "+ | |
"the Opus encoder: "+ | |
opus.opus_strerror(result).decode("utf") | |
) | |
delay_samples = delay.value | |
return delay_samples | |
# | |
# Internal methods | |
# | |
def _create_encoder(self) -> ctypes.pointer: | |
# To create an encoder, we must first allocate resources for it. | |
# We want Python to be responsible for the memory deallocation, | |
# and thus Python must be responsible for the initial memory | |
# allocation. | |
# Check that the application has been defined | |
if self._application is None: | |
raise PyOggError( | |
"The application was not specified before "+ | |
"attempting to create an Opus encoder. Perhaps "+ | |
"encode() was called before set_application()?" | |
) | |
application = self._application | |
# Check that the sampling frequency has been defined | |
if self._samples_per_second is None: | |
raise PyOggError( | |
"The sampling frequency was not specified before "+ | |
"attempting to create an Opus encoder. Perhaps "+ | |
"encode() was called before set_sampling_frequency()?" | |
) | |
# The frequency must be passed in as a 32-bit int | |
samples_per_second = opus.opus_int32(self._samples_per_second) | |
# Check that the number of channels has been defined | |
if self._channels is None: | |
raise PyOggError( | |
"The number of channels were not specified before "+ | |
"attempting to create an Opus encoder. Perhaps "+ | |
"encode() was called before set_channels()?" | |
) | |
channels = self._channels | |
# Obtain the number of bytes of memory required for the encoder | |
size = opus.opus_encoder_get_size(channels); | |
# Allocate the required memory for the encoder | |
memory = ctypes.create_string_buffer(size) | |
# Cast the newly-allocated memory as a pointer to an encoder. We | |
# could also have used opus.oe_p as the pointer type, but writing | |
# it out in full may be clearer. | |
encoder = ctypes.cast(memory, ctypes.POINTER(opus.OpusEncoder)) | |
# Initialise the encoder | |
error = opus.opus_encoder_init( | |
encoder, | |
samples_per_second, | |
channels, | |
application | |
) | |
# Check that there hasn't been an error when initialising the | |
# encoder | |
if error != opus.OPUS_OK: | |
raise PyOggError( | |
"An error occurred while creating the encoder: "+ | |
opus.opus_strerror(error).decode("utf") | |
) | |
# Return our newly-created encoder | |
return encoder |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment