Last active
May 23, 2024 07:29
-
-
Save sb8244/371335946d444bd8c5786571cacef4d6 to your computer and use it in GitHub Desktop.
Local/Distributed Caching
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MyApp.AccountLookup.Cache do | |
@moduledoc """ | |
Provides a cache that can be used for account lookups. This cache is backed by | |
Cachex for local storage and pg2 for remote distribution. Keys are set to expire | |
after 7-10 days (randomly distributed) in order to prevent stale data in our cache | |
over a long time period. | |
""" | |
use Cachex.DistributedCache | |
@doc """ | |
Get the cache entry for a tenant and user guid | |
""" | |
def get(tenant_id, user_guid) do | |
get(key(tenant_id, user_guid)) | |
end | |
@doc """ | |
Set the cache entry for a tenant and user guid. It will be automatically | |
configured with a TTL and distribute to remote nodes. | |
""" | |
def set(tenant_id, user_guid, value) do | |
set(key(tenant_id, user_guid), value) | |
end | |
defp key(tenant_id, user_guid), do: "tenant:#{tenant_id}__guid:#{user_guid}" | |
end | |
defmodule Cachex.DistributedCache do | |
defmacro __using__(_) do | |
quote do | |
use GenServer | |
@cachex __MODULE__.Cachex | |
@topic __MODULE__ | |
require Cachex.Spec | |
def start_link(_) do | |
GenServer.start_link(__MODULE__, [], name: __MODULE__) | |
end | |
def init(_) do | |
{:ok, _pid} = Cachex.start_link(@cachex, stats: true) | |
:ok = :pg2.create(@topic) | |
:ok = :pg2.join(@topic, self()) | |
{:ok, [], {:continue, :init}} | |
end | |
@doc """ | |
Get the cache entry for a key | |
""" | |
def get(key) do | |
Cachex.get(@cachex, key) | |
end | |
@doc """ | |
Set the cache entry for a key. It will be automatically | |
configured with a TTL and distribute to remote nodes. | |
""" | |
def set(key, value) do | |
expires_at = :timer.seconds(random_between_days(7, 10)) | |
Cachex.put(@cachex, key, value, ttl: expires_at) | |
|> case do | |
{:ok, true} -> | |
update_remotes(key, value, expires_at) | |
:ok | |
rest -> | |
rest | |
end | |
end | |
@doc """ | |
Clear the cache across the entire cluster | |
""" | |
def clear() do | |
{:ok, _} = Cachex.clear(@cachex) | |
clear_remotes() | |
end | |
# private | |
defp random_between_days(day_a, day_b) when day_b > day_a do | |
day_a_seconds = 60 * 60 * 24 * day_a | |
day_b_seconds = 60 * 60 * 24 * day_b | |
:rand.uniform(day_b_seconds - day_a_seconds) + day_a_seconds | |
end | |
defp update_remotes(cache_key, value, ttl) do | |
Task.start(fn -> | |
on_remotes(fn pid -> | |
send(pid, {:broadcast, @topic, {:set, cache_key, value, ttl}}) | |
end) | |
end) | |
end | |
defp clear_remotes() do | |
on_remotes(fn pid -> | |
send(pid, {:broadcast, @topic, {:clear}}) | |
end) | |
end | |
defp request_remote_dump() do | |
[local_pid] = :pg2.get_local_members(@topic) | |
on_remotes(fn pid -> | |
send(pid, {:broadcast, @topic, {:send_dump_to_pid, local_pid}}) | |
end) | |
end | |
defp on_remotes(func) do | |
@topic | |
|> :pg2.get_members() | |
|> Kernel.--(:pg2.get_local_members(@topic)) | |
|> Enum.each(func) | |
end | |
# callbacks | |
def handle_continue(:init, state) do | |
request_remote_dump() | |
{:noreply, state} | |
end | |
def handle_info({:broadcast, @topic, {:set, cache_key, value, ttl}}, state) do | |
opts = Cachex.Spec.const(:notify_false) ++ [ttl: ttl] | |
Cachex.put(@cachex, cache_key, value, opts) | |
{:noreply, state} | |
end | |
def handle_info({:broadcast, @topic, {:send_dump_to_pid, pid}}, state) do | |
Task.start(fn -> | |
{:ok, export} = Cachex.export(@cachex) | |
send(pid, {:broadcast, @topic, {:load_dump, export}}) | |
end) | |
{:noreply, state} | |
end | |
def handle_info({:broadcast, @topic, {:load_dump, entries}}, state) do | |
Cachex.Actions.MemoryLoad.execute(@cachex, entries) | |
# Trigger GC after things have calmed down due to potential large size of cache binary | |
Process.send_after(self(), :gc, 1_000) | |
{:noreply, state} | |
end | |
def handle_info({:broadcast, @topic, {:clear}}, state) do | |
{:ok, _} = Cachex.clear(@cachex) | |
{:noreply, state} | |
end | |
def handle_info(:gc, state) do | |
:erlang.garbage_collect() | |
{:noreply, state} | |
end | |
end | |
end | |
end | |
defmodule Cachex.Actions.MemoryLoad do | |
@moduledoc false | |
import Cachex.Spec | |
def execute(cache, entries) do | |
Cachex.execute(cache, fn worker -> | |
Enum.each(entries, &import(worker, &1, now())) | |
end) | |
end | |
# Private | |
# Extracted from https://github.com/whitfin/cachex/blob/2b0687682622fbf99c741c0b9fe3bb1d36834ab4/lib/cachex/actions/load.ex | |
# in order to avoid any conditions that they handle properly. | |
defp import(cache, entry(key: k, ttl: nil, value: v), _time), | |
do: {:ok, true} = Cachex.put(cache, k, v, const(:notify_false)) | |
defp import(_cache, entry(touched: t1, ttl: t2), time) | |
when t1 + t2 < time, | |
do: nil | |
defp import(cache, entry(key: k, touched: t1, ttl: t2, value: v), time) do | |
{:ok, true} = | |
Cachex.put( | |
cache, | |
k, | |
v, | |
const(:notify_false) ++ | |
[ | |
ttl: t1 + t2 - time | |
] | |
) | |
end | |
end | |
Usage: | |
MyApp.AccountLookup.Cache is placed in the application supervisor, then can be used in the app. | |
MyApp.AccountLookup.Cache.get("1", "1") | |
MyApp.AccountLookup.Cache.set("1", "1", "2") | |
MyApp.AccountLookup.Cache.get("1", "1") | |
Enum.each((1..1_000_000), fn i -> | |
MyApp.AccountLookup.Cache.set(to_string(i), "1111-0000-111111-11111", i) | |
end) | |
MyApp.AccountLookup.Cache.get("50000", "1111-0000-111111-11111") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://stephenbussey.com/2019/01/29/distributed-in-memory-caching-in-elixir.html