Created November 15, 2023 16:44
Example of using FFMPEG and Deepgram to do multispeaker transcription
import asyncio
import os
from typing import Literal, NamedTuple, cast
import aiofiles
SupportedFormats = Literal['opus', 'wav', 'aac']
class FormatInfo(NamedTuple):
codec: str
extra_params: list[str]
format_info: dict[SupportedFormats, FormatInfo] = {
'opus': FormatInfo('libopus', ['-b:a', '320K', '-mapping_family', '255']),
'wav': FormatInfo('pcm_s16le', ['-ar', '16K']),
'aac': FormatInfo('aac', ['-b:a', '160K']),
async def merge_audio_to_multichannel(input_byte_streams: list[bytes], format: SupportedFormats = "opus") -> bytes:
Merge multiple audio byte streams into one multichannel WAV file.
input_byte_streams (list): List of input audio byte streams (MP4 format).
if len(input_byte_streams) < 2:
raise ValueError("Please provide at least two input byte stream.")
if len(input_byte_streams) > 8:
raise ValueError("At most 8 input streams can be merged")
# Create temporary files and write the byte streams to these files
temp_file_names: list[str] = []
for byte_stream in input_byte_streams:
async with aiofiles.tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
await temp_file.write(byte_stream)
input_args = []
for f_name in temp_file_names:
input_args.extend(["-i", f_name])
# See
num_streams = len(input_byte_streams)
if num_streams == 2:
channel_layout = "stereo"
elif num_streams == 8:
channel_layout = "octagonal"
channel_layout = f"{num_streams}.0"
input_str = ''.join([f'[{c}]' for c in range(num_streams)])
pan_str = '|'.join([channel_layout, *[f'c{i}=c{i}' for i in range(num_streams)]])
filter_complex_arg = f"{input_str}amerge=inputs={num_streams},pan={pan_str}[aout]"
codec, extra_params = format_info[format]
command: list[str] = [
"ffmpeg", *input_args, "-filter_complex", filter_complex_arg, "-map", "[aout]", "-c:a", codec, "-f", format, *extra_params, "-"
p = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
p_out, p_err = await p.communicate()
if p.returncode != 0 or len(p_out) == 0:
raise Exception(
"Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format(
p.returncode, p_err.decode(errors='ignore')
return p_out
# Cleanup: Remove temporary files
for f_name in temp_file_names:
from typing import IO, Optional, TypedDict, Union, cast
import toolz
from deepgram import Deepgram
from deepgram._types import BufferSource, Paragraph, PrerecordedOptions, PrerecordedTranscriptionResponse
from audio_utils import SupportedFormats, merge_audio_to_multichannel
class ParagraphWithChannel(Paragraph):
channel: int
class Sentence(TypedDict):
text: str
start: float
end: float
speaker_id: Optional[int]
channel: Optional[int]
class DeepgramClient:
def __init__(self, api_key: str):
self.dg_client = Deepgram(api_key)
async def transcribe_audio(
self, buffer: Union[bytes, IO], mimetype: str = 'audio/mp4', dg_options: PrerecordedOptions = {}
) -> PrerecordedTranscriptionResponse:
options: PrerecordedOptions = {'smart_format': True, 'paragraphs': True, 'tier': 'nova', **dg_options}
return await self.dg_client.transcription.prerecorded(cast(BufferSource, {'buffer': buffer, 'mimetype': mimetype}), options)
async def transcribe_meeting(self, audio_files: list[bytes], export_format: SupportedFormats = "opus") -> list[Sentence]:
if len(audio_files) == 0:
raise ValueError("Must provide at least one audio file")
elif len(audio_files) == 1:
transcription = await self.transcribe_audio(audio_files[0], mimetype=f'audio/mp4', dg_options={'diarize': 'true'})
paragraphs = cast(
list[Paragraph], toolz.get_in(['results', 'channels', 0, 'alternatives', 0, 'paragraphs', 'paragraphs'], transcription, default=[])
elif len(audio_files) < 9:
combined_audio = await merge_audio_to_multichannel(audio_files, format=export_format)
transcription = await self.transcribe_audio(combined_audio, mimetype=f'audio/{export_format}', dg_options={'multichannel': True})
paragraphs = cast(list[ParagraphWithChannel], toolz.get_in(['results', 'paragraphs', 'paragraphs'], transcription, default=[]))
raise ValueError("Too many audio files, max 8")
return [{
'text': s['text'],
'start': s['start'],
'end': s['end'],
'speaker_id': p.get('speaker'),
'channel': p.get('channel'),
} for p in paragraphs for s in p['sentences']]
