Skip to content

Instantly share code, notes, and snippets.

@spscream
Created March 7, 2024 18:09
Show Gist options
  • Save spscream/a978129cf3d99bc2d1d5cb8b1eefe944 to your computer and use it in GitHub Desktop.
Save spscream/a978129cf3d99bc2d1d5cb8b1eefe944 to your computer and use it in GitHub Desktop.
defmodule SipTrunk.ParticipantPipeline.UDP.Endpoint do
@moduledoc """
Element that reads packets from a UDP socket and sends their payloads through the output pad.
And reads from input pad and sends through the same socket
"""
use Membrane.Endpoint
import Mockery.Macro
alias Membrane.{Buffer, RemoteStream}
alias Membrane.UDP.Socket
require Membrane.Logger
def_options local_port_no: [
spec: pos_integer(),
default: 5000,
description: "A UDP port number used when opening a receiving socket."
],
local_address: [
spec: :inet.socket_address(),
default: :any,
description: """
An IP Address on which the socket will listen. It allows to choose which
network interface to use if there's more than one.
"""
],
destination_address: [
spec: :inet.ip_address(),
description: "An IP Address that the packets will be sent to."
],
destination_port_no: [
spec: :inet.port_number(),
description: "A UDP port number of a target."
],
recv_buffer_size: [
spec: pos_integer(),
default: 1024 * 1024,
description: """
Size of the receive buffer. Packages of size greater than this buffer will be truncated
"""
],
pierce_nat_ctx: [
spec:
%{
uri: URI.t(),
address: :inet.ip_address(),
port: pos_integer()
}
| nil,
default: nil,
description: """
Context necessary to make an attempt to create client-side NAT binding
by sending an empty datagram from the `#{inspect(__MODULE__)}` to an arbitrary host.
* If left as `nil`, no attempt will ever be made.
* If filled in, whenever the pipeline switches playback to `:playing`,
one datagram (with an empty payload) will be sent from the opened socket
to the `:port` at `:address` provided via this option.
If `:address` is not present, it will be parsed from `:uri`.
Disclaimer: This is a potential vulnerability. Use with caution.
"""
]
def_input_pad :input,
accepted_format: _any,
availability: :on_request,
flow_control: :manual,
demand_unit: :buffers
def_output_pad :output,
accepted_format: %RemoteStream{type: :packetized},
flow_control: :push
@impl true
def handle_init(_context, %__MODULE__{} = opts) do
state = %{
local_socket: %Socket{
ip_address: opts.local_address,
port_no: opts.local_port_no,
sock_opts: [recbuf: opts.recv_buffer_size]
},
dst_socket: %Socket{
ip_address: opts.destination_address,
port_no: opts.destination_port_no
},
pierce_nat_ctx: opts.pierce_nat_ctx
}
{[], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, _ref) = pad, _ctx, state) do
{[demand: pad], state}
end
@impl true
def handle_playing(_ctx, %{pierce_nat_ctx: nil} = state) do
{[stream_format: {:output, %RemoteStream{type: :packetized}}], state}
end
@impl true
def handle_playing(_ctx, %{pierce_nat_ctx: nat_ctx} = state) do
ip =
if is_nil(Map.get(nat_ctx, :address)),
do: parse_address(nat_ctx.uri),
else: nat_ctx.address
nat_ctx = Map.put(nat_ctx, :address, ip)
Socket.send(%Socket{ip_address: ip, port_no: nat_ctx.port}, state.local_socket, <<>>)
{[stream_format: {:output, %RemoteStream{type: :packetized}}],
%{state | pierce_nat_ctx: nat_ctx}}
end
@impl true
def handle_buffer(Pad.ref(:input, _ref) = pad, %Buffer{payload: payload}, _context, state) do
%{dst_socket: dst_socket, local_socket: local_socket} = state
case mockable(Socket).send(dst_socket, local_socket, payload) do
:ok -> {[demand: pad], state}
{:error, :eagain} ->
Membrane.Logger.warning("Get eagain on sending of UDP packet")
{[demand: pad], state}
{:error, cause} -> raise "Error sending UDP packet, reason: #{inspect(cause)}"
end
end
@impl true
def handle_parent_notification(
{:udp, _socket_handle, _addr, _port_no, _payload} = meta,
ctx,
state
) do
handle_info(meta, ctx, state)
end
@impl true
def handle_info(
{:udp, _socket_handle, address, port_no, payload},
%{playback: :playing},
state
) do
metadata =
Map.new()
|> Map.put(:udp_source_address, address)
|> Map.put(:udp_source_port, port_no)
|> Map.put(:arrival_ts, Membrane.Time.vm_time())
actions = [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}]
{actions, state}
end
@impl true
def handle_info(
{:udp, _socket_handle, _address, _port_no, _payload},
_ctx,
state
) do
{[], state}
end
@impl true
def handle_setup(ctx, %{local_socket: %Socket{} = local_socket} = state) do
case mockable(Socket).open(local_socket) do
{:ok, socket} ->
notification = {:connection_info, socket.ip_address, socket.port_no}
Membrane.ResourceGuard.register(
ctx.resource_guard,
fn -> close_socket(socket) end,
tag: :udp_guard
)
{[notify_parent: notification], %{state | local_socket: socket}}
{:error, reason} ->
raise "Error opening UDP socket, reason: #{inspect(reason)}"
end
end
defp close_socket(%Socket{} = local_socket) do
mockable(Socket).close(local_socket)
end
defp parse_address(uri) do
hostname =
URI.parse(uri)
|> Map.get(:host)
|> to_charlist()
Enum.find_value([:inet, :inet6, :local], fn addr_family ->
case :inet.getaddr(hostname, addr_family) do
{:ok, address} -> address
{:error, _reason} -> false
end
end)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment