Skip to content

Instantly share code, notes, and snippets.

@samiur
Created November 15, 2023 16:44
Show Gist options
  • Save samiur/534e97aa2a2c16028e86308c06ed0325 to your computer and use it in GitHub Desktop.
Save samiur/534e97aa2a2c16028e86308c06ed0325 to your computer and use it in GitHub Desktop.
Example of using FFMPEG and Deepgram to do multispeaker transcription
import asyncio
import os
from typing import Literal, NamedTuple, cast
import aiofiles
SupportedFormats = Literal['opus', 'wav', 'aac']
class FormatInfo(NamedTuple):
codec: str
extra_params: list[str]
format_info: dict[SupportedFormats, FormatInfo] = {
'opus': FormatInfo('libopus', ['-b:a', '320K', '-mapping_family', '255']),
'wav': FormatInfo('pcm_s16le', ['-ar', '16K']),
'aac': FormatInfo('aac', ['-b:a', '160K']),
}
async def merge_audio_to_multichannel(input_byte_streams: list[bytes], format: SupportedFormats = "opus") -> bytes:
"""
Merge multiple audio byte streams into one multichannel WAV file.
Parameters:
input_byte_streams (list): List of input audio byte streams (MP4 format).
"""
if len(input_byte_streams) < 2:
raise ValueError("Please provide at least two input byte stream.")
if len(input_byte_streams) > 8:
raise ValueError("At most 8 input streams can be merged")
# Create temporary files and write the byte streams to these files
temp_file_names: list[str] = []
try:
for byte_stream in input_byte_streams:
async with aiofiles.tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
await temp_file.write(byte_stream)
temp_file_names.append(cast(str, temp_file.name))
input_args = []
for f_name in temp_file_names:
input_args.extend(["-i", f_name])
# See https://trac.ffmpeg.org/wiki/AudioChannelManipulation#Listchannelnamesandstandardchannellayouts
num_streams = len(input_byte_streams)
if num_streams == 2:
channel_layout = "stereo"
elif num_streams == 8:
channel_layout = "octagonal"
else:
channel_layout = f"{num_streams}.0"
input_str = ''.join([f'[{c}]' for c in range(num_streams)])
pan_str = '|'.join([channel_layout, *[f'c{i}=c{i}' for i in range(num_streams)]])
filter_complex_arg = f"{input_str}amerge=inputs={num_streams},pan={pan_str}[aout]"
codec, extra_params = format_info[format]
command: list[str] = [
"ffmpeg", *input_args, "-filter_complex", filter_complex_arg, "-map", "[aout]", "-c:a", codec, "-f", format, *extra_params, "-"
]
p = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
p_out, p_err = await p.communicate()
if p.returncode != 0 or len(p_out) == 0:
raise Exception(
"Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format(
p.returncode, p_err.decode(errors='ignore')
)
)
return p_out
finally:
# Cleanup: Remove temporary files
for f_name in temp_file_names:
os.remove(f_name)
from typing import IO, Optional, TypedDict, Union, cast
import toolz
from deepgram import Deepgram
from deepgram._types import BufferSource, Paragraph, PrerecordedOptions, PrerecordedTranscriptionResponse
from audio_utils import SupportedFormats, merge_audio_to_multichannel
class ParagraphWithChannel(Paragraph):
channel: int
class Sentence(TypedDict):
text: str
start: float
end: float
speaker_id: Optional[int]
channel: Optional[int]
class DeepgramClient:
def __init__(self, api_key: str):
self.dg_client = Deepgram(api_key)
async def transcribe_audio(
self, buffer: Union[bytes, IO], mimetype: str = 'audio/mp4', dg_options: PrerecordedOptions = {}
) -> PrerecordedTranscriptionResponse:
options: PrerecordedOptions = {'smart_format': True, 'paragraphs': True, 'tier': 'nova', **dg_options}
return await self.dg_client.transcription.prerecorded(cast(BufferSource, {'buffer': buffer, 'mimetype': mimetype}), options)
async def transcribe_meeting(self, audio_files: list[bytes], export_format: SupportedFormats = "opus") -> list[Sentence]:
if len(audio_files) == 0:
raise ValueError("Must provide at least one audio file")
elif len(audio_files) == 1:
transcription = await self.transcribe_audio(audio_files[0], mimetype=f'audio/mp4', dg_options={'diarize': 'true'})
paragraphs = cast(
list[Paragraph], toolz.get_in(['results', 'channels', 0, 'alternatives', 0, 'paragraphs', 'paragraphs'], transcription, default=[])
)
elif len(audio_files) < 9:
combined_audio = await merge_audio_to_multichannel(audio_files, format=export_format)
transcription = await self.transcribe_audio(combined_audio, mimetype=f'audio/{export_format}', dg_options={'multichannel': True})
paragraphs = cast(list[ParagraphWithChannel], toolz.get_in(['results', 'paragraphs', 'paragraphs'], transcription, default=[]))
else:
raise ValueError("Too many audio files, max 8")
return [{
'text': s['text'],
'start': s['start'],
'end': s['end'],
'speaker_id': p.get('speaker'),
'channel': p.get('channel'),
} for p in paragraphs for s in p['sentences']]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment