Skip to content

Instantly share code, notes, and snippets.

@sb8244 sb8244/caching.ex
Last active Nov 17, 2019

Embed
What would you like to do?
Local/Distributed Caching
defmodule MyApp.AccountLookup.Cache do
@moduledoc """
Provides a cache that can be used for account lookups. This cache is backed by
Cachex for local storage and pg2 for remote distribution. Keys are set to expire
after 7-10 days (randomly distributed) in order to prevent stale data in our cache
over a long time period.
"""
use Cachex.DistributedCache
@doc """
Get the cache entry for a tenant and user guid
"""
def get(tenant_id, user_guid) do
get(key(tenant_id, user_guid))
end
@doc """
Set the cache entry for a tenant and user guid. It will be automatically
configured with a TTL and distribute to remote nodes.
"""
def set(tenant_id, user_guid, value) do
set(key(tenant_id, user_guid), value)
end
defp key(tenant_id, user_guid), do: "tenant:#{tenant_id}__guid:#{user_guid}"
end
defmodule Cachex.DistributedCache do
defmacro __using__(_) do
quote do
use GenServer
@cachex __MODULE__.Cachex
@topic __MODULE__
require Cachex.Spec
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
{:ok, _pid} = Cachex.start_link(@cachex, stats: true)
:ok = :pg2.create(@topic)
:ok = :pg2.join(@topic, self())
{:ok, [], {:continue, :init}}
end
@doc """
Get the cache entry for a key
"""
def get(key) do
Cachex.get(@cachex, key)
end
@doc """
Set the cache entry for a key. It will be automatically
configured with a TTL and distribute to remote nodes.
"""
def set(key, value) do
expires_at = :timer.seconds(random_between_days(7, 10))
Cachex.put(@cachex, key, value, ttl: expires_at)
|> case do
{:ok, true} ->
update_remotes(key, value, expires_at)
:ok
rest ->
rest
end
end
@doc """
Clear the cache across the entire cluster
"""
def clear() do
{:ok, _} = Cachex.clear(@cachex)
clear_remotes()
end
# private
defp random_between_days(day_a, day_b) when day_b > day_a do
day_a_seconds = 60 * 60 * 24 * day_a
day_b_seconds = 60 * 60 * 24 * day_b
:rand.uniform(day_b_seconds - day_a_seconds) + day_a_seconds
end
defp update_remotes(cache_key, value, ttl) do
Task.start(fn ->
on_remotes(fn pid ->
send(pid, {:broadcast, @topic, {:set, cache_key, value, ttl}})
end)
end)
end
defp clear_remotes() do
on_remotes(fn pid ->
send(pid, {:broadcast, @topic, {:clear}})
end)
end
defp request_remote_dump() do
[local_pid] = :pg2.get_local_members(@topic)
on_remotes(fn pid ->
send(pid, {:broadcast, @topic, {:send_dump_to_pid, local_pid}})
end)
end
defp on_remotes(func) do
@topic
|> :pg2.get_members()
|> Kernel.--(:pg2.get_local_members(@topic))
|> Enum.each(func)
end
# callbacks
def handle_continue(:init, state) do
request_remote_dump()
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:set, cache_key, value, ttl}}, state) do
opts = Cachex.Spec.const(:notify_false) ++ [ttl: ttl]
Cachex.put(@cachex, cache_key, value, opts)
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:send_dump_to_pid, pid}}, state) do
Task.start(fn ->
{:ok, export} = Cachex.export(@cachex)
send(pid, {:broadcast, @topic, {:load_dump, export}})
end)
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:load_dump, entries}}, state) do
Cachex.Actions.MemoryLoad.execute(@cachex, entries)
# Trigger GC after things have calmed down due to potential large size of cache binary
Process.send_after(self(), :gc, 1_000)
{:noreply, state}
end
def handle_info({:broadcast, @topic, {:clear}}, state) do
{:ok, _} = Cachex.clear(@cachex)
{:noreply, state}
end
def handle_info(:gc, state) do
:erlang.garbage_collect()
{:noreply, state}
end
end
end
end
defmodule Cachex.Actions.MemoryLoad do
@moduledoc false
import Cachex.Spec
def execute(cache, entries) do
Cachex.execute(cache, fn worker ->
Enum.each(entries, &import(worker, &1, now()))
end)
end
# Private
# Extracted from https://github.com/whitfin/cachex/blob/2b0687682622fbf99c741c0b9fe3bb1d36834ab4/lib/cachex/actions/load.ex
# in order to avoid any conditions that they handle properly.
defp import(cache, entry(key: k, ttl: nil, value: v), _time),
do: {:ok, true} = Cachex.put(cache, k, v, const(:notify_false))
defp import(_cache, entry(touched: t1, ttl: t2), time)
when t1 + t2 < time,
do: nil
defp import(cache, entry(key: k, touched: t1, ttl: t2, value: v), time) do
{:ok, true} =
Cachex.put(
cache,
k,
v,
const(:notify_false) ++
[
ttl: t1 + t2 - time
]
)
end
end
Usage:
MyApp.AccountLookup.Cache is placed in the application supervisor, then can be used in the app.
MyApp.AccountLookup.Cache.get("1", "1")
MyApp.AccountLookup.Cache.set("1", "1", "2")
MyApp.AccountLookup.Cache.get("1", "1")
Enum.each((1..1_000_000), fn i ->
MyApp.AccountLookup.Cache.set(to_string(i), "1111-0000-111111-11111", i)
end)
MyApp.AccountLookup.Cache.get("50000", "1111-0000-111111-11111")
@sb8244

This comment has been minimized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.