Skip to content

Instantly share code, notes, and snippets.

@sb8244
Last active May 23, 2024 07:29
Show Gist options
  • Save sb8244/371335946d444bd8c5786571cacef4d6 to your computer and use it in GitHub Desktop.
Save sb8244/371335946d444bd8c5786571cacef4d6 to your computer and use it in GitHub Desktop.
Local/Distributed Caching
defmodule MyApp.AccountLookup.Cache do
@moduledoc """
Provides a cache that can be used for account lookups. This cache is backed by
Cachex for local storage and pg2 for remote distribution. Keys are set to expire
after 7-10 days (randomly distributed) in order to prevent stale data in our cache
over a long time period.
"""
use Cachex.DistributedCache
@doc """
Get the cache entry for a tenant and user guid
"""
def get(tenant_id, user_guid) do
get(key(tenant_id, user_guid))
end
@doc """
Set the cache entry for a tenant and user guid. It will be automatically
configured with a TTL and distribute to remote nodes.
"""
def set(tenant_id, user_guid, value) do
set(key(tenant_id, user_guid), value)
end
defp key(tenant_id, user_guid), do: "tenant:#{tenant_id}__guid:#{user_guid}"
end
defmodule Cachex.DistributedCache do
defmacro __using__(_) do
quote do
use GenServer
@cachex __MODULE__.Cachex
@topic __MODULE__
require Cachex.Spec
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
{:ok, _pid} = Cachex.start_link(@cachex, stats: true)
:ok = :pg2.create(@topic)
:ok = :pg2.join(@topic, self())
{:ok, [], {:continue, :init}}
end
@doc """
Get the cache entry for a key
"""
def get(key) do
Cachex.get(@cachex, key)
end
@doc """
Set the cache entry for a key. It will be automatically
configured with a TTL and distribute to remote nodes.
"""
def set(key, value) do
expires_at = :timer.seconds(random_between_days(7, 10))
Cachex.put(@cachex, key, value, ttl: expires_at)
|> case do
{:ok, true} ->
update_remotes(key, value, expires_at)
:ok
rest ->
rest
end
end
@doc """
Clear the cache across the entire cluster
"""
def clear() do
{:ok, _} = Cachex.clear(@cachex)
clear_remotes()
end
# private
defp random_between_days(day_a, day_b) when day_b > day_a do
day_a_seconds = 60 * 60 * 24 * day_a
day_b_seconds = 60 * 60 * 24 * day_b
:rand.uniform(day_b_seconds - day_a_seconds) + day_a_seconds
end
defp update_remotes(cache_key, value, ttl) do
Task.start(fn ->
on_remotes(fn pid ->
send(pid, {:broadcast, @topic, {:set, cache_key, value, ttl}})
end)
end)
end
defp clear_remotes() do
on_remotes(fn pid ->
send(pid, {:broadcast, @topic, {:clear}})
end)
end
defp request_remote_dump() do
[local_pid] = :pg2.get_local_members(@topic)
on_remotes(fn pid ->
send(pid, {:broadcast, @topic, {:send_dump_to_pid, local_pid}})
end)
end
defp on_remotes(func) do
@topic
|> :pg2.get_members()
|> Kernel.--(:pg2.get_local_members(@topic))
|> Enum.each(func)
end
# callbacks
def handle_continue(:init, state) do
request_remote_dump()
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:set, cache_key, value, ttl}}, state) do
opts = Cachex.Spec.const(:notify_false) ++ [ttl: ttl]
Cachex.put(@cachex, cache_key, value, opts)
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:send_dump_to_pid, pid}}, state) do
Task.start(fn ->
{:ok, export} = Cachex.export(@cachex)
send(pid, {:broadcast, @topic, {:load_dump, export}})
end)
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:load_dump, entries}}, state) do
Cachex.Actions.MemoryLoad.execute(@cachex, entries)
# Trigger GC after things have calmed down due to potential large size of cache binary
Process.send_after(self(), :gc, 1_000)
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:clear}}, state) do
{:ok, _} = Cachex.clear(@cachex)
{:noreply, state}
end
def handle_info(:gc, state) do
:erlang.garbage_collect()
{:noreply, state}
end
end
end
end
defmodule Cachex.Actions.MemoryLoad do
@moduledoc false
import Cachex.Spec
def execute(cache, entries) do
Cachex.execute(cache, fn worker ->
Enum.each(entries, &import(worker, &1, now()))
end)
end
# Private
# Extracted from https://github.com/whitfin/cachex/blob/2b0687682622fbf99c741c0b9fe3bb1d36834ab4/lib/cachex/actions/load.ex
# in order to avoid any conditions that they handle properly.
defp import(cache, entry(key: k, ttl: nil, value: v), _time),
do: {:ok, true} = Cachex.put(cache, k, v, const(:notify_false))
defp import(_cache, entry(touched: t1, ttl: t2), time)
when t1 + t2 < time,
do: nil
defp import(cache, entry(key: k, touched: t1, ttl: t2, value: v), time) do
{:ok, true} =
Cachex.put(
cache,
k,
v,
const(:notify_false) ++
[
ttl: t1 + t2 - time
]
)
end
end
Usage:
MyApp.AccountLookup.Cache is placed in the application supervisor, then can be used in the app.
MyApp.AccountLookup.Cache.get("1", "1")
MyApp.AccountLookup.Cache.set("1", "1", "2")
MyApp.AccountLookup.Cache.get("1", "1")
Enum.each((1..1_000_000), fn i ->
MyApp.AccountLookup.Cache.set(to_string(i), "1111-0000-111111-11111", i)
end)
MyApp.AccountLookup.Cache.get("50000", "1111-0000-111111-11111")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment