Skip to content

Instantly share code, notes, and snippets.

@nickdichev-firework
Last active January 20, 2023 01:06
defmodule FireworkDemoSwitcher do
use Membrane.Pipeline
alias Membrane.Pad
alias Membrane.Time
alias Membrane.SDL.Player
alias Membrane.VideoCutAndMerge
alias Membrane.RawVideo
alias Membrane.VideoCompositor
alias Membrane.VideoCompositor.RustStructs.BaseVideoPlacement
@bunny_url "http://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_720x480.h264"
@firework_url "redacted link to video from an s3 bucket"
# @bunny_url "/Users/firework/Downloads/samples_big-buck-bunny_bun33s_720x480.h264"
# @firework_url "/Users/firework/Downloads/test.h264"
@video_urls %{bunny: @bunny_url, firework: @firework_url}
# {id_of_video_to_play, num_of_seconds_to_switch}
@cue_points [{:bunny, 0}, {:firework, 3}, {:bunny, 12}]
# @cue_points [{:bunny, 0}, {:firework, 4}, {:bunny, 8}, {:firework, 15}, {:bunny, 18}]
@full_video %RawVideo{
width: 1440,
height: 960,
framerate: {25, 1},
pixel_format: :I420,
aligned: true
}
@placement %BaseVideoPlacement{
size: {720, 480},
position: {0, 0},
z_value: 0.0
}
@positions [{0, 0}, {720, 0}, {0, 480}, {720, 480}]
@impl true
def handle_init(_) do
children = [
cut_and_merge: VideoCutAndMerge,
compositor: %VideoCompositor{caps: @full_video},
player: Player
]
links = [
link(:cut_and_merge)
|> via_in(Pad.ref(:input, :init), options: [initial_placement: @placement])
|> to(:compositor),
#############################
link(:compositor)
|> to(:player)
]
spec = %ParentSpec{children: children, links: links}
Enum.each(@cue_points, fn {idx, seconds} ->
Process.send_after(self(), {:add_video, idx}, :timer.seconds(seconds))
end)
{{:ok, [spec: spec, playback: :playing]}, %{tick: 0, timestamps: %{}}}
end
@impl true
def handle_other({:add_video, video_id}, _ctx, %{tick: tick, timestamps: timestamps} = state) do
previous_tick = tick - 1
next_tick = tick + 1
{^video_id, seconds} = Enum.at(@cue_points, tick)
{previous_id, _prev_seconds} = Enum.at(@cue_points, previous_tick)
{_next_id, next_seconds} = Enum.at(@cue_points, next_tick, {nil, nil})
start_interval = Map.get(timestamps, video_id, 0) |> Time.seconds()
end_interval = if next_seconds, do: Time.seconds(next_seconds), else: :infinity
stream = %VideoCutAndMerge.Stream{
intervals: [{start_interval, end_interval}]
}
child_name = video_child_name(video_id, tick)
children = %{child_name => video_bin(video_id)}
links = [
link(child_name)
|> via_in(Pad.ref(:input, child_name),
options: [stream: stream]
)
|> to(:cut_and_merge)
]
spec = %ParentSpec{children: children, links: links}
actions =
if tick > 0 do
previous_child_name = video_child_name(previous_id, previous_tick)
# update_placement_message =
# {:update_placement,
# [
# {{Membrane.Pad, :input, :init},
# %{
# @placement
# | z_value: tick / 100,
# position: Stream.cycle(@positions) |> Enum.at(tick)
# }}
# ]}
[
remove_child: [previous_child_name],
# forward: [{:compositor, update_placement_message}],
# playback: :playing,
spec: spec
]
else
[spec: spec]
end
state =
if tick > 0 do
# Cache the current second offset for when we switch back to `previous_id`
timestamps = Map.put(timestamps, previous_id, seconds)
%{state | tick: next_tick, timestamps: timestamps}
else
%{state | tick: next_tick}
end
{{:ok, actions}, state}
end
@impl true
def handle_other(msg, _ctx, state) do
IO.inspect(msg)
{:ok, state}
end
defp video_bin(video_id) do
%VideoBin{id: video_id, url: Map.fetch!(@video_urls, video_id), pid: self()}
end
defp video_child_name(video_id, tick), do: :"#{video_id}_#{tick}"
end
defmodule FireworkDemoSwitcher do
use Membrane.Pipeline
alias Membrane.SDL.Player
alias Membrane.Pad
alias Membrane.Time
alias Membrane.VideoCutAndMerge
# @bunny_url "http://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_720x480.h264"
# @firework_url "redacted link to video from an s3 bucket"
@bunny_url "/Users/firework/Downloads/samples_big-buck-bunny_bun33s_720x480.h264"
@firework_url "/Users/firework/Downloads/test.h264"
@video_urls %{bunny: @bunny_url, firework: @firework_url}
# {id_of_video_to_play, num_of_seconds_to_switch}
@cue_points [{:bunny, 0}, {:firework, 4}, {:bunny, 8}, {:firework, 15}, {:bunny, 18}]
@impl true
def handle_init(_) do
children = [
cut_and_merge: VideoCutAndMerge,
player: Player
]
links = [
link(:cut_and_merge)
|> to(:player)
]
spec = %ParentSpec{children: children, links: links}
Enum.each(@cue_points, fn {idx, seconds} ->
Process.send_after(self(), {:add_video, idx}, :timer.seconds(seconds))
end)
{{:ok, [spec: spec, playback: :playing]}, %{tick: 0, timestamps: %{}}}
end
@impl true
def handle_other({:add_video, video_id}, _ctx, %{tick: tick, timestamps: timestamps} = state) do
previous_tick = tick - 1
next_tick = tick + 1
{^video_id, seconds} = Enum.at(@cue_points, tick)
{previous_id, _prev_seconds} = Enum.at(@cue_points, previous_tick)
{_next_id, next_seconds} = Enum.at(@cue_points, next_tick, {nil, nil})
start_interval = Map.get(timestamps, video_id, 0) |> Time.seconds()
end_interval = if next_seconds, do: Time.seconds(next_seconds), else: :infinity
stream = %VideoCutAndMerge.Stream{
intervals: [{start_interval, end_interval}]
}
child_name = video_child_name(video_id, tick)
children = %{child_name => video_bin(video_id)}
links = [
link(child_name)
|> via_in(Pad.ref(:input, child_name),
options: [stream: stream]
)
|> to(:cut_and_merge)
]
spec = %ParentSpec{children: children, links: links}
actions =
if tick > 0 do
previous_child_name = video_child_name(previous_id, previous_tick)
[remove_child: [previous_child_name], spec: spec]
else
[spec: spec]
end
state =
if tick > 0 do
# Cache the current second offset for when we switch back to `previous_id`
timestamps = Map.put(timestamps, previous_id, seconds)
%{state | tick: next_tick, timestamps: timestamps}
else
%{state | tick: next_tick}
end
{{:ok, actions}, state}
end
@impl true
def handle_other(msg, _ctx, state) do
IO.inspect(msg)
{:ok, state}
end
defp video_bin(video_id) do
%VideoBin{id: video_id, url: Map.fetch!(@video_urls, video_id), pid: self()}
end
defp video_child_name(video_id, tick), do: :"#{video_id}_#{tick}"
end
defmodule FireworkDemo.MixProject do
use Mix.Project
def project do
[
app: :firework_demo,
version: "0.1.0",
elixir: "~> 1.14",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
def application do
[
extra_applications: [:logger]
]
end
defp deps do
[
{:membrane_core, "~> 0.10.2"},
{:membrane_sdl_plugin, "~> 0.14.0"},
{:membrane_h264_ffmpeg_plugin, "~> 0.24.0"},
{:membrane_realtimer_plugin, "~> 0.5.0"},
# {:membrane_video_compositor_plugin, path: "./deps/membrane_video_compositor_plugin"},
{:membrane_video_compositor_plugin,
git: "https://github.com/membraneframework-labs/membrane_video_compositor_plugin.git",
branch: "master"},
{:membrane_hackney_plugin, "~> 0.8.0"},
{:membrane_video_merger_plugin, "~> 0.6.0"},
{:membrane_file_plugin, "~> 0.12.0"},
{:membrane_ffmpeg_swscale_plugin, "~> 0.10.0"},
{:observer_cli, "~> 1.7"},
{:recon, "~> 2.5"}
]
end
end
defmodule VideoBin do
use Membrane.Bin
alias Membrane.Hackney
alias Membrane.File
alias Membrane.H264.FFmpeg
alias Membrane.FFmpeg.SWScale.Scaler
def_options(
pid: [spec: pid(), description: "PID of the parent pipeline"],
id: [spec: atom(), description: "ID of the video"],
url: [spec: String.t(), description: "URL to the video"]
)
def_output_pad :output, caps: :any, mode: :push
def handle_init(options) do
children = %{
# media_source: %Hackney.Source{
# location: options.url,
# hackney_opts: [follow_redirect: true],
# max_retries: 3
# },
media_source: %File.Source{location: options.url},
parser: %FFmpeg.Parser{framerate: {25, 1}},
decoder: FFmpeg.Decoder,
scaler: %Scaler{output_width: 720, output_height: 480},
realtimer: Membrane.Realtimer
}
links = [
link(:media_source)
|> to(:parser)
|> to(:decoder)
|> to(:scaler)
|> to(:realtimer)
|> to_bin_output(:output)
]
spec = %ParentSpec{children: children, links: links}
{{:ok, spec: spec}, %{id: options.id, parent: options.pid}}
end
def handle_element_end_of_stream({:realtimer, _pad}, context, state) do
send(state.parent, {:end_of_playback, state.id})
{{:ok, []}, state}
end
def handle_element_end_of_stream({_child, _pad}, _context, state) do
{{:ok, []}, state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment