Created November 3, 2022 07:39
from typing import List, Optional, Tuple, Union
import numpy as np
import ffmpeg
import torch
import torch.nn.functional as F
import whisper
from import SAMPLE_RATE, N_FRAMES, HOP_LENGTH, pad_or_trim, log_mel_spectrogram
from whisper.decoding import DecodingOptions, DecodingResult
from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE, get_tokenizer
from whisper.utils import exact_div, format_timestamp, optional_int, optional_float, str2bool, write_txt, write_vtt, write_srt
from whisper.model import Whisper
import time
model = whisper.load_model("tiny")
def load_audio(buffer: bytearray, seek, sr: int = 16000):
Open an audio file and read as mono waveform, resampling as necessary
file: str
The audio file to open
sr: int
The sample rate to resample the audio if necessary
A NumPy array containing the audio waveform, in float33 dtype.
# # This launches a subprocess to decode audio while down-mixing and resampling as necessary.
# # Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
# process = (
# ffmpeg.input("pipe:", threads=1)
# .output("-", format="f32le", acodec="pcm_s16le", ac=1, ar=sr)
# .run_async(cmd=["ffmpeg", "-nostdin"], pipe_stdout=True, pipe_stderr=True, pipe_stdin=True)
# )
# out, _ = process.communicate(input=buffer)
#except ffmpeg.Error as e:
# raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
#return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0
# This launches a subprocess to decode audio while down-mixing and resampling as necessary.
# Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
out, _ = (
ffmpeg.input(buffer, threads=0, ss=seek)
.output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr)
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
except ffmpeg.Error as e:
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0
def _user_transcriber(user):
current_h = []
global_h = []
chain_start_ts = int(time.time())
def internal(path, seek):
nonlocal current_h, global_h, chain_start_ts
if seek % 30 == 0:
offset = (seek // 30) * 30
audio = load_audio(path, offset)
if audio.shape[0] == 0:
current_h = []
ts = chain_start_ts + offset
#print(user, ts)
current_h = [ (s['text'], user, s['start'] + ts, s['end'] + ts) for s in model.transcribe(audio, language= 'ru', fp16=False)['segments'] ]
return global_h + current_h
def history():
return global_h + current_h
return internal, history
def _merge_history(transcribers):
history = []
for _, hist in transcribers.values():
r= sorted(history, key=lambda t: t[2])
return r
transcribers = {}
def clear():
def history() -> List[str]:
return _merge_history(transcribers)
def add2hist_transcribed(buffer: bytearray, user, seek):
global transcribers
handler = transcribers.get(user)
if handler == None:
trnscrb, hist = _user_transcriber(user)
trnscrb, hist = handler
transcribers[user] = (trnscrb, hist)
trnscrb(buffer, seek)
