Skip to content

Instantly share code, notes, and snippets.

@samrat
Last active April 3, 2024 19:32
Show Gist options
  • Save samrat/17b7423a1dbb1fa939594f263e6e4d6b to your computer and use it in GitHub Desktop.
Save samrat/17b7423a1dbb1fa939594f263e6e4d6b to your computer and use it in GitHub Desktop.
defmodule Wonderfluent.MembranePipeline do
use Membrane.Pipeline
def start_link(%{module: module, channel: channel, parent_pid: parent_pid} = opts) do
Membrane.Pipeline.start_link(__MODULE__, opts)
end
@impl true
def handle_init(_ctx, %{module: module, channel: channel, parent_pid: parent_pid}) do
sample_rate = 16_000
channels = 1
deepgram_opts =
Map.merge(
%{},
%{
sample_rate: sample_rate,
channels: channels,
encoding: "linear16",
diarize: false,
interim_results: false
}
)
file = File.open!("transcription.txt", [:write, :append])
structure = [
# source
child(:source, %Wonderfluent.PubSubSource{
channel: channel,
module: module
})
|> child(:parser, Membrane.RawAudioParser)
|> child(:f32_to_s16_converter, %Membrane.FFmpeg.SWResample.Converter{
output_stream_format: %Membrane.RawAudio{
channels: 1,
sample_format: :s16le,
sample_rate: 16_000
}
})
|> child(:deepgram, %Wonderfluent.DeepgramTranscribe{
deepgram_opts: deepgram_opts,
file: file
})
|> child(%Membrane.Debug.Sink{
handle_buffer: fn buffer ->
send(parent_pid, {:transcript, buffer.payload})
IO.inspect(buffer.payload, label: "buffer.payload")
# IO.puts(file, Jason.encode!(buffer.payload))
end
})
]
{[spec: structure], %{parent_pid: parent_pid}}
end
end
defmodule Wonderfluent.DeepgramTranscribe do
use Membrane.Filter
require Membrane.Logger
@stream_format %Membrane.RawAudio{
sample_format: :s16le,
channels: 1,
sample_rate: 16_000
}
def_input_pad(:input,
accepted_format: @stream_format,
# flow_control: :push
)
def_output_pad(:output,
accepted_format: Membrane.RemoteStream
)
def_options(
deepgram_opts: [
spec: any()
],
file: [
spec: any()
]
)
alias Wonderfluent.Deepgram
@impl true
def handle_init(_ctx, %{
deepgram_opts: deepgram_opts,
file: file
}) do
{:ok, ws_pid} =
Deepgram.Streaming.start_link(%{
deepgram_opts: deepgram_opts,
file: file,
parent_pid: self()
})
# Send 1s of silence to Deepgram. The VAD filter might result in no audio being sent,
# and they require some audio at the start of the stream.
silence = Membrane.RawAudio.silence(@stream_format, 1000)
WebSockex.send_frame(ws_pid, {:binary, silence})
Process.send_after(self(), :send_keepalive, 10_000)
{[], %{ws_pid: ws_pid, file: file}}
end
@impl true
def handle_stream_format(:input, _stream_format, _ctx, state) do
{[stream_format: {:output, %Membrane.RemoteStream{}}], state}
end
@impl true
def handle_buffer(
:input,
buffer,
_ctx,
%{ws_pid: ws_pid} = state
) do
WebSockex.send_frame(ws_pid, {:binary, buffer.payload})
{[], state}
end
@impl true
def handle_info(:send_keepalive, _ctx, %{ws_pid: ws_pid} = state) do
WebSockex.send_frame(ws_pid, {:text, "{\"type\":\"KeepAlive\"}"})
Process.send_after(self(), :send_keepalive, 10_000)
{[], state}
end
@impl true
def handle_info({:transcript, transcript}, _ctx, state) do
Membrane.Logger.info("DeepgramTranscribe received transcript: #{inspect(transcript)}")
buffer = %Membrane.Buffer{payload: transcript}
{[buffer: {:output, buffer}], state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment