Mix.install(
[
{:kino_live_audio, "~> 0.1"},
{:nx, "~> 0.7.1"},
{:kino_bumblebee, path: "/Users/samrat/code/kino_bumblebee"},
{:exla, ">= 0.0.0"},
{:kino, [env: :prod, git: "https://github.com/livebook-dev/kino.git", override: true]}
],
config: [nx: [default_backend: EXLA.Backend]]
)
{:ok, model_info} = Bumblebee.load_model({:hf, "openai/whisper-tiny"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"})
generation_config = Bumblebee.configure(generation_config, max_new_tokens: 100)
serving =
Bumblebee.Audio.speech_to_text_whisper(
model_info,
featurizer,
tokenizer,
generation_config,
compile: [batch_size: 1],
chunk_num_seconds: 10,
timestamps: :segments,
stream: true,
defn_options: [compiler: EXLA]
)
chunk_size = Kino.Input.text("Chunk Size", default: "1")
sample_rate = Kino.Input.text("Sample Rate", default: "16000")
unit =
Kino.Input.select(
"Unit",
[samples: "Samples", s: "Seconds", ms: "Miliseconds", mu: "Microseconds"],
default: :s
)
top_row = Kino.Layout.grid([sample_rate, chunk_size, unit], columns: 3)
Kino.Layout.grid([top_row])
liveAudio =
KinoLiveAudio.new(
chunk_size: Kino.Input.read(chunk_size) |> Integer.parse() |> elem(0),
unit: Kino.Input.read(unit),
sample_rate: Kino.Input.read(sample_rate) |> Integer.parse() |> elem(0)
)
audio_input = Kino.Input.audio("Audio", sampling_rate: featurizer.sampling_rate)
form = Kino.Control.form([audio: audio_input], submit: "Run")
frame = Kino.Frame.new()
Kino.listen(form, fn %{data: %{audio: audio}} ->
if audio do
audio =
audio.file_ref
|> Kino.Input.file_path()
|> File.read!()
|> Nx.from_binary(:f32)
|> Nx.reshape({:auto, audio.num_channels})
|> Nx.mean(axes: [1])
IO.inspect(audio)
Kino.Frame.render(frame, Kino.Text.new("(Start of transcription)", chunk: true))
for chunk <- Nx.Serving.run(serving, audio) do
[start_mark, end_mark] =
for seconds <- [chunk.start_timestamp_seconds, chunk.end_timestamp_seconds] do
seconds |> round() |> Time.from_seconds_after_midnight() |> Time.to_string()
end
text = "
#{start_mark}-#{end_mark}: #{chunk.text}"
Kino.Frame.append(frame, Kino.Text.new(text, chunk: true))
end
Kino.Frame.append(frame, Kino.Text.new("\n(End of transcription)", chunk: true))
end
end)
Kino.Layout.grid([form, frame], boxed: true, gap: 16)
# frame = Kino.Frame.new()
# liveAudio
# |> Kino.Control.stream()
# |> Kino.listen({Nx.broadcast(0.0, {2, 1, 64}), Nx.broadcast(0.0, {2, 1, 64})}, fn
# %{event: :audio_chunk, chunk: data}, {hn, cn} ->
# audio = Nx.tensor(data)
# |> Nx.stack()
# |> Nx.reshape({:auto, 1})
# |> Nx.mean(axes: [1])
# for chunk <- Nx.Serving.run(serving, audio) do
# IO.inspect(chunk)
# [start_mark, end_mark] =
# for seconds <- [chunk.start_timestamp_seconds, chunk.end_timestamp_seconds] do
# seconds |> round() |> Time.from_seconds_after_midnight() |> Time.to_string()
# end
# text = "
# #{start_mark}-#{end_mark}: #{chunk.text}"
# Kino.Frame.append(frame, Kino.Text.new(text, chunk: true))
# end
# {:cont, {hn, cn}}
# end)
# Kino.Layout.grid([frame], boxed: true, gap: 16)
audio_stream =
liveAudio
|> Kino.Control.stream()
|> Enum.map(fn %{chunk: data} ->
Nx.tensor(data)
|> Nx.stack()
|> Nx.reshape({:auto, 1})
|> Nx.mean(axes: [1])
end)
frame = Kino.Frame.new()
for chunk <- Nx.Serving.run(serving, audio_stream) do
IO.inspect(chunk)
Kino.Frame.append(frame, Kino.Text.new(chunk.text, chunk: true))
end