Skip to content

Instantly share code, notes, and snippets.

@xorvo
Last active April 11, 2024 17:23
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save xorvo/983232cd0f6204a57a6be7613cd1e493 to your computer and use it in GitHub Desktop.
Save xorvo/983232cd0f6204a57a6be7613cd1e493 to your computer and use it in GitHub Desktop.
Elixir HLS Server Example

HLS Server Demo - My TV Station

Mix.install([
  {:plug_cowboy, "~> 2.6"}
])

Background Story

Welcome to the world of television broadcasting! In this demo, we'll be building a high-quality live streaming server for a brand new TV station that's just starting up. The station, called 'ElixirTV', is a cutting-edge network that aims to revolutionize the way we consume TV content.

The founders of ElixirTV are a group of passionate tech enthusiasts who believe that the future of TV is in live streaming. They want to create a platform that allows viewers to watch their favorite shows and events, live and in real-time, from anywhere in the world. But they need your help to make it happen!

As the lead developer of ElixirTV, you've been tasked with building the server that will power the network's live streaming capabilities. Your challenge is to create a robust, scalable solution that can handle a large number of simultaneous viewers, without sacrificing video quality or performance. Requirements

Technical Requirements

  • We will need to generate a live stream based on a set of regular mp4 files
  • We will append new videos or override the video list on the fly.
  • Loop through the videos to form an endless stream of videos.

Here are the main components of this service

  • Transcoder converts regular videos into VOD HLS format. Powered by FFmpeg.
  • Scheduler rotate HLS segments in a circular queue.
  • HLS Server for serving the HLS ready content over HTTP.
graph LR
  scheduler(Scheduler)
  server(Server)
  live(Live Stream)
  video(Video Files)
  transcoder(Transcoder)


  video --> transcoder
  transcoder --> scheduler
  scheduler --> server
  server --> live

Transcoder

Requirements

  • Input: mp4 files
  • Output: HLS files (x3 resolutions)
    • Playlists playlist.m3u8
    • Segments segment_xxx.ts
graph LR
  sv(MP4 Video)
  transcode[Transcode]
  scale[Scale]
  
  sv --> transcode
  transcode --> scale
  scale --> 720P
  scale --> 1080P
  scale --> 2160P
  720P --> hls_v0
  1080P --> hls_v1
  2160P --> hls_v2

FFmpeg

FFmpeg is a powerful open source video processing tool. https://ffmpeg.org/

defmodule Transcoder do
  @variants [
    %{
      name: "0",
      resolution: "1280x720",
      bitrate: 2800,
      h264_profile: "main",
      encoding_level: "3.1",
      audio_bitrate: 128
    },
    %{
      name: "1",
      resolution: "1920x1080",
      bitrate: 5000,
      h264_profile: "high",
      encoding_level: "4.0",
      audio_bitrate: 320
    },
    %{
      name: "2",
      resolution: "3840x2160",
      bitrate: 14000,
      h264_profile: "high10",
      encoding_level: "5.1",
      audio_bitrate: 640
    }
  ]

  def transcode_videos(video_paths) when is_list(video_paths) do
    Enum.map(video_paths, &transcode_video/1)
  end

  def transcode_video(video_path) when is_binary(video_path) do
    video_path
    |> assign_video_id()
    |> determine_command(video_path)
    |> execute()
  end

  defp assign_video_id(video_filename) do
    ext_name = Path.extname(video_filename)
    file_basename = Path.basename(video_filename, ext_name)
    ts_suffix = DateTime.utc_now() |> DateTime.to_unix()
    "#{file_basename}-#{ts_suffix}"
  end

  defp determine_command(video_id, video_path) do
    variant_opts = determine_variant_opts()
    output_dir = "hls_out/#{video_id}"
    File.mkdir!(output_dir)

    cmd = """
    ffmpeg -i #{video_path} \
      #{variant_opts} \
      -c:a aac -b:a 128k -ac 2 \
      -map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 \
      -f hls -hls_time 6 -hls_segment_filename "#{output_dir}/%v/segment_%03d.ts" \
      -hls_playlist_type vod -hls_flags independent_segments \
      -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2" \
      -master_pl_name #{output_dir}/master.m3u8 \
      #{output_dir}/%v/playlist.m3u8
    """

    %{command: cmd, video_id: video_id, video_path: video_path, output_dir: output_dir}
  end

  defp determine_variant_opts do
    @variants
    |> Enum.map(fn variant ->
      [width, height] = String.split(variant.resolution, "x")

      [
        ["-filter:v", "scale=w=#{width}:h=#{height}:force_original_aspect_ratio=decrease"],
        ["-c:v:#{variant.name}", "libx264"],
        ["-b:v:#{variant.name}", "#{variant.bitrate}k"],
        ["-s:v:#{variant.name}", variant.resolution],
        ["-preset", "medium"],
        ["-profile:v:#{variant.name}", variant.h264_profile],
        ["-level:v:#{variant.name}", variant.encoding_level],
        # to force the segment length to 6 seconds
        ["-force_key_frames", "\"expr:gte(t,n_forced*6)\""],
        ["-sc_threshold", "0"]
      ]
      |> List.flatten()
      |> Enum.join(" ")
    end)
    |> Enum.concat([""])
    |> Enum.join(" \\\n")
  end

  defp execute(%{command: cmd, video_id: video_id, video_path: video_path, output_dir: output_dir}) do
    case System.cmd("sh", ["-c", cmd], stderr_to_stdout: true) do
      {output, 0} ->
        IO.puts("Transcoding for #{video_path} completed successfully.")

        {:ok,
         %{
           video_id: video_id,
           video_path: video_path,
           command: cmd,
           video_output_dir: output_dir,
           command_output: output
         }}

      {error_output, error_code} ->
        IO.puts("Transcoding for #{video_path} failed with error code: #{error_code}.")
        IO.puts("Error output: #{error_output}")
        {:error, %{video_path: video_path, error: error_output}}
    end
  end
end

Command Example

ffmpeg -i ${NAME}.mp4 \
 -filter:v scale=w=640:h=360:force_original_aspect_ratio=decrease \
 -c:v:0 libx264 -b:v:0 800k -s:v:0 640x360 -preset medium -profile:v:0 baseline \
 -level:v:0 3.0 -force_key_frames "expr:gte(t,n_forced*6)" -sc_threshold 0 \
 -filter:v scale=w=1280:h=720:force_original_aspect_ratio=decrease \
 -c:v:1 libx264 -b:v:1 2800k -s:v:1 1280x720 -preset medium -profile:v:1 main \
 -level:v:1 3.1 -force_key_frames "expr:gte(t,n_forced*6)" -sc_threshold 0 \
 -filter:v scale=w=1920:h=1080:force_original_aspect_ratio=decrease \
 -c:v:2 libx264 -b:v:2 5000k -s:v:2 1920x1080 -preset medium -profile:v:2 high \
 -level:v:2 4.0 -force_key_frames "expr:gte(t,n_forced*6)" -sc_threshold 0 \
 -c:a aac -b:a 128k -ac 2   -map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 -map 0:v:0 -map 0:a:0 \
 -f hls -hls_time 6 -hls_playlist_type vod -hls_flags independent_segments \
 -hls_segment_filename "hls_out/${VIDEO_ID}/%v/segment_%03d.ts" \
 -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2" -master_pl_name hls_out/${VIDEO_ID}/master.m3u8 \
 hls_out/${VIDEO_ID}/%v/playlist.m3u8

Data Schema

Let's define some data structures that we'll use for passing data between modules.

defmodule Segment do
  defstruct [:video_id, :variant, :filename, :duration_ms, :first?]
end
defmodule Video do
  defstruct [:id, :segments, :location]
end

Playlist Encoder / Decoder

graph LR;
  segments(List of Segments)
  playlist(HLS Playlist)

  segments --encode--> playlist
  playlist --decode--> segments
defmodule Playlist do
  @moduledoc """
  Encode/decode HLS playlists
  """

  @headers [
    "#EXTM3U",
    "#EXT-X-VERSION:3",
    "#EXT-X-TARGETDURATION:6"
  ]

  @doc """
  Takes a `Video` and a variant name
  Return a list of segments for that variant
  """
  def parse_segments(%{location: video_dir, id: video_id}, variant) do
    Path.join([video_dir, variant, "playlist.m3u8"])
    |> File.stream!()
    |> Stream.filter(&segment_line?/1)
    |> Stream.chunk_every(2)
    |> Stream.with_index()
    |> Enum.map(fn {["#EXTINF:" <> duration, filename], index} ->
      {duration_s, _} = Float.parse(duration)

      %Segment{
        video_id: video_id,
        variant: variant,
        filename: String.trim(filename),
        duration_ms: duration_s * 1000,
        # mark the first segment
        first?: index == 0
      }
    end)
  end

  defp segment_line?(line) do
    !String.starts_with?(line, "#EXT") || String.starts_with?(line, "#EXTINF")
  end

  @doc """
  Takes a list of segments and a starting media_sequence_number
  Returns a media playlist with those segments
  """
  def generate({segments, media_sequence_number}) do
    msq_line = "#EXT-X-MEDIA-SEQUENCE:#{media_sequence_number}"
    segment_lines = Enum.flat_map(segments, &generate_segment_lines/1)
    playlist = [@headers, [msq_line], segment_lines] |> Enum.concat() |> Enum.join("\n")
    {:ok, playlist}
  end

  defp generate_segment_lines(%{first?: true} = segment) do
    [
      "#EXT-X-DISCONTINUITY",
      "#EXTINF:#{segment.duration_ms / 1000}",
      "#{segment.video_id}/#{segment.filename}"
    ]
  end

  defp generate_segment_lines(segment) do
    [
      "#EXTINF:#{segment.duration_ms / 1000}",
      "#{segment.video_id}/#{segment.filename}"
    ]
  end
end

Scheduler

Core module for maintaining live stream state.

API

  • Importing videos to the program queue Scheduler.load(videos)
  • Retrieving the current live playlist Scheduler.get_playlist(variant)

Behavior

graph LR
  scheduler(Scheduler)
  server(Server)
  player(Player)

  hls1 --> scheduler
  hls2 --> scheduler
  hls3 --> scheduler
  scheduler --> server
  server --> player

Data Structure

%{
  "variant_name" => %{
    segment_buffer: ["segment_1", "segment_2", "segment_3"],
    current_buffer_start_ts: 1684983730, # timestamp - head of the buffer queue
    current_video_index: 0, # current cursor position
    current_global_sequence_number: 0, # value used in media playlist header
    videos: [
      # videos/segments
    ]
  }
}
defmodule Scheduler do
  @moduledoc """
  Handles the server state
  """
  use GenServer

  @variants ["0", "1", "2"]
  @min_buffer_size 15
  @playlist_segment_count 10

  # Client API

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def get_playlist(variant) when variant in @variants do
    GenServer.call(__MODULE__, {:get_current_segments, variant, @playlist_segment_count})
    |> Playlist.generate()
  end

  def load(videos, opts \\ []) do
    GenServer.call(__MODULE__, {:load, videos, opts})
  end

  # Server

  def init(_) do
    schedule_next()

    init_state =
      for v <- @variants, into: %{} do
        {v,
         %{
           segment_buffer: [],
           current_buffer_start_ts: now(),
           program_cursor: 0,
           current_global_sequence_number: 0,
           program_queue: []
         }}
      end

    {:ok, init_state}
  end

  def handle_info(:rotate, state) do
    current_ts = now()

    new_state =
      state
      |> Task.async_stream(&rotate_variant_buffer(&1, current_ts))
      |> Stream.map(fn {:ok, x} -> x end)
      |> Enum.into(%{})

    schedule_next()

    {:noreply, new_state}
  end

  def handle_call({:load, videos, opts}, _from, state) do
    mode = Keyword.get(opts, :mode, :replace)

    new_state =
      state
      |> Task.async_stream(&load_variant(&1, videos, mode))
      |> Stream.map(fn {:ok, x} -> x end)
      |> Enum.into(%{})

    {:reply, :ok, new_state}
  end

  def handle_call({:get_current_segments, variant, count}, _from, state) do
    sub_state = state[variant]
    segments = Enum.slice(sub_state.segment_buffer, 0, count)
    media_sequence_number = sub_state.current_global_sequence_number
    {:reply, {segments, media_sequence_number}, state}
  end

  def schedule_next do
    Process.send_after(self(), :rotate, 1000)
  end

  # Helpers

  def now do
    DateTime.utc_now() |> DateTime.to_unix(:millisecond)
  end

  defp rotate_variant_buffer({v, sub_state}, current_ts) do
    {popped, rest} =
      pop_expired_segments(
        sub_state.segment_buffer,
        current_ts,
        sub_state.current_buffer_start_ts
      )

    {v,
     sub_state
     |> Map.put(:segment_buffer, rest)
     |> update_sequence_number(popped)
     |> update_buffer_start_ts(popped)
     |> maybe_load_next_program()}
  end

  defp pop_expired_segments([], _, _) do
    {[], []}
  end

  defp pop_expired_segments([segment | rest] = segments, now, start_ts) do
    end_ts = start_ts + segment.duration_ms

    if end_ts < now do
      {popped, rest} = pop_expired_segments(rest, now, end_ts)
      {[segment | popped], rest}
    else
      {[], segments}
    end
  end

  defp maybe_load_next_program(
         %{program_queue: program_queue, segment_buffer: buffer} = sub_state
       )
       when length(buffer) <= @min_buffer_size and length(program_queue) > 0 do
    next_cursor =
      rem(
        sub_state.program_cursor + 1,
        length(sub_state.program_queue)
      )

    next_program = Enum.at(sub_state.program_queue, next_cursor)

    %{
      sub_state
      | segment_buffer: Enum.concat(buffer, next_program),
        program_cursor: next_cursor
    }
  end

  defp maybe_load_next_program(sub_state), do: sub_state

  defp update_sequence_number(sub_state, popped) do
    # each popped segment will increase current_global_sequence by 1
    new_sequence_number = sub_state.current_global_sequence_number + Enum.count(popped)
    %{sub_state | current_global_sequence_number: new_sequence_number}
  end

  defp update_buffer_start_ts(sub_state, popped) do
    time_shift_ms = popped |> Enum.map(& &1.duration_ms) |> Enum.sum()
    new_buffer_start_ts = sub_state.current_buffer_start_ts + time_shift_ms
    %{sub_state | current_buffer_start_ts: new_buffer_start_ts}
  end

  defp load_variant({v, sub_state}, videos, :replace) do
    new_videos = Enum.map(videos, &Playlist.parse_segments(&1, v))
    {v, %{sub_state | program_queue: new_videos, program_cursor: -1}}
  end

  defp load_variant({v, sub_state}, videos, :append) do
    new_videos = Enum.map(videos, &Playlist.parse_segments(&1, v))
    {v, %{sub_state | program_queue: Enum.concat(sub_state.program_queue, new_videos)}}
  end
end

Server

HLS Server Routes Summary

  • GET /playlist.m3u8 => static master playlist
  • GET /:variant/playlist.m3u8 => live variant playlist
  • GET /:variant/:video_id/:media_segment_name.ts => serves video files from local file system
defmodule Server do
  use Plug.Router
  use Plug.ErrorHandler

  plug(Plug.Logger)
  plug(:match)
  plug(:dispatch)

  get "/playlist.m3u8" do
    send_resp(conn, 200, """
    #EXTM3U
    #EXT-X-VERSION:3
    #EXT-X-STREAM-INF:BANDWIDTH=2960000,RESOLUTION=1280x720
    0/playlist.m3u8
    #EXT-X-STREAM-INF:BANDWIDTH=5360000,RESOLUTION=1920x1080
    1/playlist.m3u8
    #EXT-X-STREAM-INF:BANDWIDTH=13000000,RESOLUTION=3840x2160
    2/playlist.m3u8
    """)
  end

  get "/:variant/playlist.m3u8" do
    {:ok, playlist} = Scheduler.get_playlist(variant)

    conn
    |> put_resp_header("content-type", "application/vnd.apple.mpegurl")
    |> send_resp(200, playlist)
  end

  get "/:variant/:video_id/:segment_id.ts" do
    path = "./hls_out/#{video_id}/#{variant}/#{segment_id}.ts"

    if File.exists?(path) do
      conn
      |> put_resp_header("content-type", "video/MP2T")
      |> send_file(200, path)
    else
      conn |> send_resp(404, "Not Found")
    end
  end
end

Run

Make sure you have ffmpeg installed and is available from your system shell.

Start a new iex session in terminal. Note: the directory where you run that iex session here will be used as working directory of this server.

iex --sname test --cookie mycookie

Connect this livebook session to that running iex session as attached node.

Put the mp4 files under the working directory. And run the snippets below.

defmodule MediaLoader do
  @moduledoc """
  Wraps the transcoding and shceduling steps into higher level API functions.
  """

  def load_files(video_files, opts \\ []) do
    video_files
    |> Transcoder.transcode_videos()
    |> prepare_load_files()
    |> Scheduler.load(opts)
  end

  defp prepare_load_files(transcoding_results) do
    for {:ok, %{video_id: video_id, video_output_dir: video_dir}} <- transcoding_results do
      %Video{id: video_id, location: video_dir}
    end
  end

  def load_videos(video_ids, opts \\ []) do
    video_ids
    |> prepare_load_videos()
    |> Scheduler.load(opts)
  end

  defp prepare_load_videos(video_ids) do
    for video_id <- video_ids do
      %Video{
        id: video_id,
        location: "hls_out/#{video_id}"
      }
    end
  end
end
defmodule Demo do
  def start() do
    children = [
      {Plug.Cowboy, scheme: :http, plug: Server, port: 4000},
      Scheduler
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: Server.Supervisor)
  end
end

Demo.start()
# transcode and load from video files
video_files = ~w[                       
  video_0.mp4
  video_1.mp4
  video_2.mp4
  video_3.mp4
]

MediaLoader.load_files(video_files, mode: :replace)

# video_ids = ~w[
#   video_0-1685148754
#   video_1-1685148761
#   video_2-1685148768
#   video_3-1685148776
# ]

# MediaLoader.load_videos(video_ids, mode: :replace)

Loading Videos

Run the elixir snippet below to load video files into the server. You can choose between different loading modes which will have different effects on the queue.

Visit http://localhost:4000/playlist.m3u8

# Load video by id
video_ids = ~w[
 lg-uhd-paris-1684820748
 lg-uhd-walking-in-the-air-saint-petersburg-1684820902
]

MediaLoader.load_videos(video_ids, mode: :replace)


# transcode and load from video files
video_files = ~w[                       
  video_0.mp4
  video_1.mp4
  video_2.mp4
  video_3.mp4
]

MediaLoader.load_files(video_files, mode: :replace)
# # For terminating the server
# pid = Process.whereis(Server.Supervisor)
# Process.exit(pid, :normal)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment