Skip to content

Instantly share code, notes, and snippets.

@samrat

samrat/README.md Secret

Created April 3, 2024 12:22
Show Gist options
  • Save samrat/14e29e15e6190d48473e1678a49c3339 to your computer and use it in GitHub Desktop.
Save samrat/14e29e15e6190d48473e1678a49c3339 to your computer and use it in GitHub Desktop.

Problem: children in a Membrane pipeline get started twice

[debug] <0.874.0>/ Starting children: [%Membrane.ChildEntry{name: {Membrane.PortAudio.Source, #Reference<0.2383238512.2415132675.126993>}, module: Membrane.PortAudio.Source, options: %Membrane.PortAudio.Source{endpoint_id: :default, portaudio_buffer_size: 256, latency: :high, sample_format: :f32le, sample_rate: 16000, channels: 1}, component_type: :element, pid: nil, clock: nil, sync: nil, spec_ref: #Reference<0.2383238512.2415132675.127072>, group: nil, initialized?: false, ready?: false, terminating?: false}, %Membrane.ChildEntry{name: :debug, module: Membrane.Debug.Sink, options: %Membrane.Debug.Sink{handle_buffer: #Function<0.39588851/1 in Wonderfluent.MembranePipeline.handle_init/2>, handle_event: &Membrane.Debug.Sink.noop/1, handle_stream_format: &Membrane.Debug.Sink.noop/1}, component_type: :element, pid: nil, clock: nil, sync: nil, spec_ref: #Reference<0.2383238512.2415132675.127072>, group: nil, initialized?: false, ready?: false, terminating?: false}] in children group: nil
[debug] <0.867.0>/ Starting children: [%Membrane.ChildEntry{name: {Membrane.PortAudio.Source, #Reference<0.2383238512.2415132675.126993>}, module: Membrane.PortAudio.Source, options: %Membrane.PortAudio.Source{endpoint_id: :default, portaudio_buffer_size: 256, latency: :high, sample_format: :f32le, sample_rate: 16000, channels: 1}, component_type: :element, pid: nil, clock: nil, sync: nil, spec_ref: #Reference<0.2383238512.2415132675.127050>, group: nil, initialized?: false, ready?: false, terminating?: false}, %Membrane.ChildEntry{name: :debug, module: Membrane.Debug.Sink, options: %Membrane.Debug.Sink{handle_buffer: #Function<0.39588851/1 in Wonderfluent.MembranePipeline.handle_init/2>, handle_event: &Membrane.Debug.Sink.noop/1, handle_stream_format: &Membrane.Debug.Sink.noop/1}, component_type: :element, pid: nil, clock: nil, sync: nil, spec_ref: #Reference<0.2383238512.2415132675.127050>, group: nil, initialized?: false, ready?: false, terminating?: false}] in children group: nil

I suspect the problem is with how I start the pipeline in conversation.ex(in the init function) but I can't tell what the issue is.

When I try starting the same pipeline directly in a livebook cell(as below), there aren't duplicated frames:

import Membrane.ChildrenSpec
alias Membrane.RCPipeline

spec = [
      child(%Membrane.PortAudio.Source{channels: 1, sample_format: :f32le, sample_rate: 16_000})
      |> child(:debug, %Membrane.Debug.Sink{
        handle_buffer: fn buffer ->
          # Membrane.Logger.info("MembranePipeline received buffer: #{inspect(buffer)}")
          IO.inspect(buffer.payload, label: "buffer.payload")
        end
      })
    ]

pipeline = RCPipeline.start_link!()
RCPipeline.exec_actions(pipeline, spec: spec)
defmodule WonderfluentWeb.Socket.Conversation do
@moduledoc """
Implements a WebSocket API for a conversational agent.
Possible states:
- `:closed` - doing nothing
- `:waiting` - waiting for voice activity
- `:listening` - listening to voice activity
- `:transcribing` - actively transcribing a message
- `:replying` - pushing audio
We open in a closed state, until we receive a message to kick off
the conversation from the user. That message can contain conversation
parameters to include the prompt, and other settings.
"""
alias Wonderfluent.Elevenlabs.Websocket
require Logger
@behaviour Phoenix.Socket.Transport
@stream_format %Membrane.RawAudio{
sample_format: :s16le,
channels: 1,
sample_rate: 16_000
}
@impl true
def child_spec(_opts) do
# We won't spawn any process, so let's ignore the child spec
:ignore
end
@impl true
def connect(_connect_opts) do
# Callback to retrieve relevant data from the connection.
# The map contains options, params, transport and endpoint keys.
{:ok, %{}}
end
@impl true
def init(_state) do
IO.inspect("init conversation")
sample_rate = 16_000
channels = 1
{:ok, _sup_pid, pipeline_pid} =
Wonderfluent.MembranePipeline.start_link(%{
module: Wonderfluent.PubSub,
channel: "audio",
parent_pid: self()
})
{:ok,
%{
mode: :closed,
last_audio_buffer: "",
accumulated_audio_buffer: "",
transcription_pid: nil,
reply_pid: nil,
tts_pid: nil,
chat: [],
pipeline_pid: pipeline_pid
}}
end
end
defmodule Wonderfluent.MembranePipeline do
use Membrane.Pipeline
def start_link(%{module: module, channel: channel, parent_pid: parent_pid} = opts) do
Membrane.Pipeline.start_link(__MODULE__, opts)
end
@impl true
def handle_init(_ctx, _init_arg) do
structure = [
child(%Membrane.PortAudio.Source{channels: 1, sample_format: :f32le, sample_rate: 16_000})
|> child(:debug, %Membrane.Debug.Sink{
handle_buffer: fn buffer ->
Membrane.Logger.info("MembranePipeline received buffer: #{inspect(buffer)}")
IO.inspect(buffer.payload, label: "buffer.payload")
end
})
]
IO.inspect("starting pipeline")
pipeline = Membrane.RCPipeline.start_link!()
IO.inspect("exec_actions")
Membrane.RCPipeline.exec_actions(pipeline, spec: structure)
{[spec: structure], %{}}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment