Created
November 15, 2023 16:44
-
-
Save samiur/534e97aa2a2c16028e86308c06ed0325 to your computer and use it in GitHub Desktop.
Example of using FFMPEG and Deepgram to do multispeaker transcription
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
from typing import Literal, NamedTuple, cast | |
import aiofiles | |
SupportedFormats = Literal['opus', 'wav', 'aac'] | |
class FormatInfo(NamedTuple): | |
codec: str | |
extra_params: list[str] | |
format_info: dict[SupportedFormats, FormatInfo] = { | |
'opus': FormatInfo('libopus', ['-b:a', '320K', '-mapping_family', '255']), | |
'wav': FormatInfo('pcm_s16le', ['-ar', '16K']), | |
'aac': FormatInfo('aac', ['-b:a', '160K']), | |
} | |
async def merge_audio_to_multichannel(input_byte_streams: list[bytes], format: SupportedFormats = "opus") -> bytes: | |
""" | |
Merge multiple audio byte streams into one multichannel WAV file. | |
Parameters: | |
input_byte_streams (list): List of input audio byte streams (MP4 format). | |
""" | |
if len(input_byte_streams) < 2: | |
raise ValueError("Please provide at least two input byte stream.") | |
if len(input_byte_streams) > 8: | |
raise ValueError("At most 8 input streams can be merged") | |
# Create temporary files and write the byte streams to these files | |
temp_file_names: list[str] = [] | |
try: | |
for byte_stream in input_byte_streams: | |
async with aiofiles.tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: | |
await temp_file.write(byte_stream) | |
temp_file_names.append(cast(str, temp_file.name)) | |
input_args = [] | |
for f_name in temp_file_names: | |
input_args.extend(["-i", f_name]) | |
# See https://trac.ffmpeg.org/wiki/AudioChannelManipulation#Listchannelnamesandstandardchannellayouts | |
num_streams = len(input_byte_streams) | |
if num_streams == 2: | |
channel_layout = "stereo" | |
elif num_streams == 8: | |
channel_layout = "octagonal" | |
else: | |
channel_layout = f"{num_streams}.0" | |
input_str = ''.join([f'[{c}]' for c in range(num_streams)]) | |
pan_str = '|'.join([channel_layout, *[f'c{i}=c{i}' for i in range(num_streams)]]) | |
filter_complex_arg = f"{input_str}amerge=inputs={num_streams},pan={pan_str}[aout]" | |
codec, extra_params = format_info[format] | |
command: list[str] = [ | |
"ffmpeg", *input_args, "-filter_complex", filter_complex_arg, "-map", "[aout]", "-c:a", codec, "-f", format, *extra_params, "-" | |
] | |
p = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) | |
p_out, p_err = await p.communicate() | |
if p.returncode != 0 or len(p_out) == 0: | |
raise Exception( | |
"Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format( | |
p.returncode, p_err.decode(errors='ignore') | |
) | |
) | |
return p_out | |
finally: | |
# Cleanup: Remove temporary files | |
for f_name in temp_file_names: | |
os.remove(f_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import IO, Optional, TypedDict, Union, cast | |
import toolz | |
from deepgram import Deepgram | |
from deepgram._types import BufferSource, Paragraph, PrerecordedOptions, PrerecordedTranscriptionResponse | |
from audio_utils import SupportedFormats, merge_audio_to_multichannel | |
class ParagraphWithChannel(Paragraph): | |
channel: int | |
class Sentence(TypedDict): | |
text: str | |
start: float | |
end: float | |
speaker_id: Optional[int] | |
channel: Optional[int] | |
class DeepgramClient: | |
def __init__(self, api_key: str): | |
self.dg_client = Deepgram(api_key) | |
async def transcribe_audio( | |
self, buffer: Union[bytes, IO], mimetype: str = 'audio/mp4', dg_options: PrerecordedOptions = {} | |
) -> PrerecordedTranscriptionResponse: | |
options: PrerecordedOptions = {'smart_format': True, 'paragraphs': True, 'tier': 'nova', **dg_options} | |
return await self.dg_client.transcription.prerecorded(cast(BufferSource, {'buffer': buffer, 'mimetype': mimetype}), options) | |
async def transcribe_meeting(self, audio_files: list[bytes], export_format: SupportedFormats = "opus") -> list[Sentence]: | |
if len(audio_files) == 0: | |
raise ValueError("Must provide at least one audio file") | |
elif len(audio_files) == 1: | |
transcription = await self.transcribe_audio(audio_files[0], mimetype=f'audio/mp4', dg_options={'diarize': 'true'}) | |
paragraphs = cast( | |
list[Paragraph], toolz.get_in(['results', 'channels', 0, 'alternatives', 0, 'paragraphs', 'paragraphs'], transcription, default=[]) | |
) | |
elif len(audio_files) < 9: | |
combined_audio = await merge_audio_to_multichannel(audio_files, format=export_format) | |
transcription = await self.transcribe_audio(combined_audio, mimetype=f'audio/{export_format}', dg_options={'multichannel': True}) | |
paragraphs = cast(list[ParagraphWithChannel], toolz.get_in(['results', 'paragraphs', 'paragraphs'], transcription, default=[])) | |
else: | |
raise ValueError("Too many audio files, max 8") | |
return [{ | |
'text': s['text'], | |
'start': s['start'], | |
'end': s['end'], | |
'speaker_id': p.get('speaker'), | |
'channel': p.get('channel'), | |
} for p in paragraphs for s in p['sentences']] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment