Skip to content

Instantly share code, notes, and snippets.

@hugobarauna
Last active January 24, 2024 18:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hugobarauna/f6268bfe93ae8a19d0fba03f20d1b897 to your computer and use it in GitHub Desktop.
Save hugobarauna/f6268bfe93ae8a19d0fba03f20d1b897 to your computer and use it in GitHub Desktop.
Hydrate ETS from DETS using an init GenServer - Livebook Notebook - free sample from Elixir Patterns book

Hydrate ETS from DETS using an init GenServer

Mix.install([
  {:kino, "~> 0.6.2"},
  {:telemetry, "~> 1.2.1"}
])

This is a free sample from the Elixir Patterns book

Run in Livebook

Elixir Patterns Book Cover

If you want to know more about the book:

Introduction

We'll learn how to run initialization processes within our supervision trees. This technique is useful in a wide array of scenarios when you need to perform some unit of work as part of the supervision tree startup.

In this section, we'll be running our initialization jobs synchronously. You will need to lean on this technique when there is a dependency between your processes, and running your init job asynchronously would introduce race conditions.

Real-world application

This may seem like an esoteric technique, but we have used this approach in production when the need was there for it. If you have data that rarely changes but is read often, that is a good candidate to have persisted to DETS and then loaded into ETS when your application begins. That way, you have access to the data in its fastest form versus having to make network requests to your database. In addition, you can also lean on something like Git LFS to store large DETS files along with your code so that they can be deployed together when data changes occur.

Let's start by learning how we can create DETS files with our data, and then we'll see how we can load that data into ETS when the application starts using an init GenServer.

Hydrate ETS from DETS Using an Init GenServer

Let's walk through how to create a DETS file so you will know how to create them going forward. Like ETS, DETS is a key value store and the Erlang :dets module actually implements many of the same functions that the :ets module has. This should make it pretty familiar and should look similar to what you saw in Chapter 2.

# We define the path to our DETS file as a charlist. Often when we work with
# Erlang libraries, we'll need to send charlists instead of strings.
dets_file_path =
  __DIR__
  |> Path.join("./exotic_car_filter.dets")
  |> String.to_charlist()

# If the DETS file we specify does not exist, we create it. If it does
# exist, it is opened, and we can update its contents. If you want to clear the
# contents of a DETS file to ensure you are starting from a clean state, you can
# use the `:dets.delete_all_objects/1` function.
{:ok, dets_table} =
  :dets.open_file(
    :vehicles_to_track,
    file: dets_file_path,
    type: :set,
    auto_save: 10_000
  )

# DETS and ETS are capable of housing gigabytes of data, so our little map of data
# should be no problem at all.
vehicles_to_track = %{
  2023 => %{
    "Bugatti" => ["Chiron", "Veyron"],
    "Ferrari" => ["488", "F90"],
    "Lamborghini" => ["Aventador", "Huracan"],
    "Pagani" => ["Hyuara", "Zonda"],
    "Porsche" => ["918 Spyder", "911 GT3 RS"]
  },
  2022 => %{
    "Bugatti" => ["Chiron", "Veyron"],
    "Ferrari" => ["488", "F90"],
    "Lamborghini" => ["Aventador", "Huracan"],
    "Pagani" => ["Hyuara", "Zonda"],
    "Porsche" => ["918 Spyder", "911 GT3 RS"]
  }
}

# We insert all of our key-value data into the DETS file.
Enum.each(vehicles_to_track, fn {year, makes_models} ->
  Enum.each(makes_models, fn {make, models} ->
    :ok = :dets.insert(dets_table, {{year, make}, MapSet.new(models)})
  end)
end)

# Here, we flush any pending writes to disk.
:ok = :dets.sync(dets_table)

# We properly close our handler to the DETS file in case of an
# unexpected shutdown. This avoids us having to repair the file the next time
# it is opened in case it was left in a bad state.
:ok = :dets.close(dets_table)

After running the above code, you should have a DETS file written out to the dets_file directory wherever you unzipped Elixir Patterns. With that in place, we can create the GenServer module that will transfer all the contents of the DETS file into ETS. One thing to note since we are using ETS is that an ETS table will be garbage collected if no process owns the ETS table. As a result, our init GenServer this time will not return :ignore in the init/1 callback and will continue to run so that our ETS table has a process that owns it.

defmodule ExoticCarLookup do
  # If this process terminates, we want it to restart, given that
  # the ETS table will also be deleted since it is tied to the
  # process.
  use GenServer

  @ets_table_name __MODULE__.ETS

  # +--------------------+
  # |     Public API     |
  # +--------------------+

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  # Other modules can use this function to determine
  # if the provided year+make+model is in the list of exotic
  # cars.
  def exotic_car?(year, make, model) do
    case :ets.lookup(@ets_table_name, {year, make}) do
      [{_year_make, models}] ->
        MapSet.member?(models, model)

      _ ->
        false
    end
  end

  # +---------------------+
  # |      Callbacks      |
  # +---------------------+

  @impl true
  def init(_) do
    # Create the ETS table before hydrating it
    :ets.new(
      @ets_table_name,
      [:set, :protected, :named_table, {:read_concurrency, true}]
    )

    # We call the `hydrate_ets/0` function in the `init/1` callback
    # so that the supervision tree is blocked until the hydration process
    # is complete. That way, we ensure that there are no race conditions
    # when our application starts receiving requests.
    hydrate_ets()

    # Instead of returning `:ignore` and terminating the process, we
    # allow the process to keep running since the ETS table will be tied
    # to the lifecycle of the process.
    {:ok, nil}
  end

  # +---------------------------+
  # |      Private Helpers      |
  # +---------------------------+

  # In this function, we hydrate our ETS table using a DETS file. We can
  # hydrate the ETS table from anywhere (CSV, database, HTTP call, etc.),
  # but it is useful to know how you can lean on DETS to restore ETS
  # tables.
  defp hydrate_ets do
    # This specifies where the DETS file is
    exotic_car_dets_file =
      __DIR__
      |> Path.join("./exotic_car_filter.dets")
      |> String.to_charlist()

    # Open the existing DETS file
    {:ok, dets_instance} =
      :dets.open_file(
        :exotic_car_dets_backup,
        file: exotic_car_dets_file,
        type: :set,
        access: :read
      )

    # Copy all of the contents of the DETS file to the ETS table that
    # we created in the `init/1` callback so that all of our operations
    # take place in memory as opposed to on disk.
    :dets.to_ets(dets_instance, @ets_table_name)

    # Close the DETS file, as we no longer need it
    :ok = :dets.close(dets_instance)
  end
end

The layout of this module looks similar to what we had in the previous Livebook. We have the exotic_car?/3 public function that other modules can use to determine whether the car in question is an exotic car. Granted, the implementation has switched from querying :persistent_term to querying :ets, but downstream consumers of this module need not be concerned with the implementation details. Then we have our init/1 callback that creates an empty ETS table and then calls the hydrate_ets/0 private function where the DETS file that we created previously is copied to our newly created ETS table. With that in place, our init/1 callback returns an ok tuple with a nil state since we do not need to track any state in this GenServer, but we need it to stick around since it owns the ETS table where our data is held.

With that in place, we can wholesale copy the code that we had in the previous Livebook and fire up the supervision tree.

# Copied from the previous Livebook without change
defmodule ExoticCarSaleTracker do
  use GenServer

  # +--------------------+
  # |     Public API     |
  # +--------------------+

  # Start this process as a singleton process since we
  # only need one instance of it running and processing data.
  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def get_avg_price_stats do
    GenServer.call(__MODULE__, :get_avg_price_stats)
  end

  # We lean on `GenServer.cast/2` here since we don't want to block
  # the calling process in order to run our analytics processing.
  def track_sale(year, make, model, price) do
    GenServer.cast(__MODULE__, {:track_sale, {year, make, model, price}})
  end

  # +---------------------+
  # |      Callbacks      |
  # +---------------------+

  @impl true
  def init(_) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(:get_avg_price_stats, _from, state) do
    reply =
      Enum.map(state, fn {year_make_model, {count, total_price}} ->
        {year_make_model, total_price / count}
      end)
      |> Map.new()

    {:reply, reply, state}
  end

  @impl true
  def handle_cast({:track_sale, {year, make, model, price}}, state) do
    vehicle = {year, make, model}

    # Only add the vehicle sale to the GenServer state if it is an
    # exotic vehicle that we are interested in tracking.
    new_state =
      if ExoticCarLookup.exotic_car?(year, make, model) do
        Map.update(state, vehicle, {1, price}, fn {count, total_price} ->
          {count + 1, total_price + price}
        end)
      else
        state
      end

    {:noreply, new_state}
  end
end
# Copied from the previous Livebook without change
defmodule ExoticCarSaleTrackerSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_) do
    # To ensure that `ExoticCarSaleTracker` has access to all of
    # the necessary data in ETS, we need to start `ExoticCarLookup`
    # first.
    children = [
      ExoticCarLookup,
      ExoticCarSaleTracker
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

With all those pieces in place, we are ready to fire up the supervision tree and see how it performs. We'll actually use the same test code from the previous Livebook so you can see that the output lines up exactly with what we previously had.

mock_sales_list = [
  {2000, "Honda", "Accord", 22_000},
  {2019, "Acura", "Integra", 32_000},
  {2022, "Pagani", "Zonda", 4_181_000},
  {2023, "Lamborghini", "Huracan", 324_000},
  {2022, "Ferrari", "488", 450_000},
  {2018, "Mazda", "MX-5", 29_000},
  {2015, "Chevy", "Camaro", 65_000},
  {2023, "Lamborghini", "Huracan", 349_500},
  {2015, "Chevy", "Camaro", 62_000},
  {2022, "Ferrari", "488", 492_600}
]

# Stop the supervisor if it is already running
if is_pid(Process.whereis(ExoticCarSaleTrackerSupervisor)) do
  Supervisor.stop(ExoticCarSaleTrackerSupervisor)
end

# Start the supervisor
ExoticCarSaleTrackerSupervisor.start_link([])

# Mock out a list of sales that could be coming off of a
# stream of data
mock_sales_list
|> Enum.each(fn {year, make, model, price} ->
  ExoticCarSaleTracker.track_sale(year, make, model, price)
end)

ExoticCarSaleTracker.get_avg_price_stats()

As you can see, by using the same list of mock sales, we can generate the same output as before. The nice thing is that the ExoticCarSaleTracker GenServer didn't have to change in the slightest to accommodate the changed ExoticCarLookup implementation. So long as the public interface remains the same, the supervisor is able to start all the necessary components to work together harmoniously. This is one of the niceties of breaking down your problem space into self-contained components. As needed, you can adjust implementations, and downstream-dependent processes can continue to operate without needing to know the internal implementation details of other processes.

How performant is this?

You may be wondering: "This setup does not look very complicated...how fast could it possibly be?" Let's put a number to that question and stress test this simple supervision tree. We'll create a list of 1,000,000 mock events and see how long our supervision tree takes to work through the data.

mock_sales_list = [
  {2000, "Honda", "Accord", 22_000},
  {2019, "Acura", "Integra", 32_000},
  {2022, "Pagani", "Zonda", 4_181_000},
  {2023, "Lamborghini", "Huracan", 324_000},
  {2022, "Ferrari", "488", 450_000},
  {2018, "Mazda", "MX-5", 29_000},
  {2015, "Chevy", "Camaro", 65_000},
  {2023, "Lamborghini", "Huracan", 349_500},
  {2015, "Chevy", "Camaro", 62_000},
  {2022, "Ferrari", "488", 492_600}
]

num_sale_events = 1_000_000

# Stop the supervisor if it is already running
if is_pid(Process.whereis(ExoticCarSaleTrackerSupervisor)) do
  Supervisor.stop(ExoticCarSaleTrackerSupervisor)
end

# Start the supervisor
ExoticCarSaleTrackerSupervisor.start_link([])

# Mock out a list of sales that could be coming off of a
# stream of data
{time_in_native, output} =
  :timer.tc(fn ->
    # These events will pile up on the process message queue since they are
    # all casts
    mock_sales_list
    |> Stream.cycle()
    |> Enum.take(num_sale_events)
    |> Enum.each(fn {year, make, model, price} ->
      ExoticCarSaleTracker.track_sale(year, make, model, price)
    end)

    # This will be the last message that the process receives, and it is synchronous
    # since it is a call
    output_result = ExoticCarSaleTracker.get_avg_price_stats()

    # This will always return 0 since the final call makes sure that all of the
    # async messages are processed
    {:message_queue_len, message_queue_length} =
      ExoticCarSaleTracker
      |> Process.whereis()
      |> Process.info(:message_queue_len)

    IO.inspect("Message queue length: #{message_queue_length}")

    output_result
  end)

time_in_ms = System.convert_time_unit(time_in_native, :microsecond, :millisecond)

IO.inspect("Worked through #{num_sale_events} mock sale events in #{time_in_ms}ms")
output

On my Mac Studio, I was able to work through 1,000,000 mock sales events in just over one second. That is just with one process working through sales events and an ETS table to check every mock sale event that flows through the process. To ensure that our timer is measuring correctly, we call ExoticCarSaleTracker.get_avg_price_stats/0 as part of the timing function. This will be the last message that the ExoticCarSaleTracker process receives, and it is also a call that blocks the calling process until a response is sent (or until the call times out). To make sure that we are not measuring the wrong things, we also have the log statement that checks to see how many messages are in the process mailbox after running through everything. This will always come back as zero since the synchronous call occurs right before it and ensures we work through all the mock data.

As you can see, by just using the primitives available to us via Elixir, Erlang, and the BEAM, we can create very performant application components with very little code. No messy micoservices, or external key-value stores, or caching layers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment