Skip to content

Instantly share code, notes, and snippets.

@cgrothaus
Last active December 13, 2023 09:22
Show Gist options
  • Save cgrothaus/4cfc2c65c5af146abae84db75c560444 to your computer and use it in GitHub Desktop.
Save cgrothaus/4cfc2c65c5af146abae84db75c560444 to your computer and use it in GitHub Desktop.
A replicated cache in 60 lines using ConCache and Phoenix PubSub
defmodule MyApp.ReplicatedConCache do
@moduledoc """
Utility to replicate local ConCache operations via Phoenix.PubSub.
Only those ConCache functions that we need are replicated.
Prerequisites:
- for all relevant cache ids, there is a ConCache instance running on
each node connected to the PubSub system
- these ConCache instances are all configured same (TTL etc.)
Usage:
- start up the ReplicatedConCache process, e.g. by adding it to the list
of children of your OTP application
- use ReplicatedConCache functions as a drop-in replacement of ConCache
functions
Notes:
- when a new elixir node connects to a cluster, the existing ConCache
entries are NOT replicated
- the server implementation (which receives the replication messages
via PubSub) is a single GenServer process, it is NOT optimized for
high throughput
"""
use GenServer
alias Phoenix.PubSub
require Logger
@pubsub_system MyApp.PubSub
@topic "replicated_con_cache"
##### Client implementation #####
# `fetch` is local, `store` is replicated
def fetch_or_store(cache_id, key, producer_fun) do
ConCache.fetch_or_store(cache_id, key, fn ->
with {:ok, value} <- producer_fun.() do
broadcast_put(cache_id, key, value)
{:ok, value}
end
end)
end
# `get` is local, `store` is replicated
def get_or_store(cache_id, key, producer_fun) do
ConCache.get_or_store(cache_id, key, fn ->
value = producer_fun.()
broadcast_put(cache_id, key, value)
value
end)
end
def delete(cache_id, key) do
ConCache.delete(cache_id, key)
broadcast_delete(cache_id, key)
end
defp broadcast_put(cache_id, key, value),
do: broadcast_message({:put, node(), cache_id, key, value})
defp broadcast_delete(cache_id, key),
do: broadcast_message({:delete, node(), cache_id, key})
defp broadcast_message(message) do
log_debug("broadcast message #{message |> inspect()}")
PubSub.broadcast(@pubsub_system, @topic, message)
end
##### Server implementation #####
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
def init(_) do
log_debug("init - subscribing to PubSub '#{@pubsub_system}' topic '#{@topic}'")
case PubSub.subscribe(@pubsub_system, @topic) do
:ok ->
log_debug("init - subscribed to PubSub")
{:ok, nil}
{:error, reason} ->
{:stop, reason}
end
end
def handle_info({:put, sender_node_name, cache_id, key, value} = message, _) do
unless sender_node_name == node() do
log_receive(message)
ConCache.put(cache_id, key, value)
end
{:noreply, nil}
end
def handle_info({:delete, sender_node_name, cache_id, key} = message, _) do
unless sender_node_name == node() do
log_receive(message)
ConCache.delete(cache_id, key)
end
{:noreply, nil}
end
##### Shared implementation #####
defp log_receive(message),
do: log_debug("received message #{message |> inspect()}")
defp log_debug(message), do: Logger.debug("#{__MODULE__}: #{message}")
end
defmodule MyApp.ReplicatedConCacheTest do
use MyApp.DataCase, async: true
alias MyApp.ReplicatedConCache
alias Phoenix.PubSub
describe "fetch_or_store/3" do
test "stores the value in the local ConCache" do
ReplicatedConCache.fetch_or_store(:global_cache, :key1, fn -> {:ok, :value1} end)
assert_local_cache_value(:key1, :value1)
end
test "broadcasts a put message" do
subscribe_pubsub()
ReplicatedConCache.fetch_or_store(:global_cache, :key2, fn -> {:ok, :value1} end)
assert_put_message_received(node(), :key2, :value1)
end
test "when a cache value already exists, stores and broadcasts nothing" do
subscribe_pubsub()
local_cache_put(:key3, :value1)
ReplicatedConCache.fetch_or_store(:global_cache, :key3, fn -> {:ok, :value2} end)
assert_local_cache_value(:key3, :value1)
refute_put_message_received(node(), :key3, :value1)
end
end
describe "get_or_store/3" do
test "stores the value in the local ConCache" do
ReplicatedConCache.get_or_store(:global_cache, :key4, fn -> :value1 end)
assert_local_cache_value(:key4, :value1)
end
test "broadcasts a put message" do
subscribe_pubsub()
ReplicatedConCache.get_or_store(:global_cache, :key5, fn -> :value1 end)
assert_put_message_received(node(), :key5, :value1)
end
test "when a cache value already exists, stores and broadcasts nothing" do
subscribe_pubsub()
local_cache_put(:key6, :value1)
ReplicatedConCache.get_or_store(:global_cache, :key6, fn -> :value2 end)
assert_local_cache_value(:key6, :value1)
refute_put_message_received(node(), :key6, :value1)
end
end
describe "delete/2" do
test "deletes the value from the local ConCache" do
local_cache_put(:key7, :value1)
ReplicatedConCache.delete(:global_cache, :key7)
refute_local_cache_value(:key7)
end
test "broadcasts a delete message" do
subscribe_pubsub()
ReplicatedConCache.delete(:global_cache, :key8)
assert_delete_message_received(node(), :key8)
end
end
describe "handle_info/2 for a put message" do
test "stores the value in the local ConCache" do
refute_local_cache_value(:key9)
broadcast_pubsub({:put, :some_node_name, :global_cache, :key9, :value1})
assert_local_cache_value(:key9, :value1)
end
test "when a local cache value already exists, overwrites it" do
local_cache_put(:key10, :value1)
broadcast_pubsub({:put, :some_node_name, :global_cache, :key10, :value2})
assert_local_cache_value(:key10, :value2)
end
test "when the message is from the same node, does nothing" do
broadcast_pubsub({:put, node(), :global_cache, :key11, :value1})
refute_local_cache_value(:key11)
end
end
describe "handle_info/2 for a delete message" do
test "deletes the value from the local ConCache" do
local_cache_put(:key12, :value1)
broadcast_pubsub({:delete, :some_node_name, :global_cache, :key12})
refute_local_cache_value(:key12)
end
test "when the message is from the same node, does nothing" do
local_cache_put(:key13, :value1)
broadcast_pubsub({:delete, node(), :global_cache, :key13})
assert_local_cache_value(:key13, :value1)
end
end
defp local_cache_put(key, value) do
ConCache.put(:global_cache, key, value)
assert_local_cache_value(key, value)
end
defp assert_local_cache_value(key, value), do: assert(ConCache.get(:global_cache, key) == value)
defp refute_local_cache_value(key), do: refute(ConCache.get(:global_cache, key))
defp assert_put_message_received(node, key, value),
do: assert_received({:put, ^node, :global_cache, ^key, ^value})
defp refute_put_message_received(node, key, value),
do: refute_received({:put, ^node, :global_cache, ^key, ^value})
defp assert_delete_message_received(node, key),
do: assert_received({:delete, ^node, :global_cache, ^key})
defp subscribe_pubsub,
do: PubSub.subscribe(MyApp.PubSub, "replicated_con_cache")
defp broadcast_pubsub(message) do
PubSub.broadcast(MyApp.PubSub, "replicated_con_cache", message)
# wait for local ConCache update
Process.sleep(75)
end
end
@btajudeen
Copy link

Many thanks for this excellent paper. But I am new to both elixir and caching. I need a primer to get off the ground. My project need to implement cache . All my other OTP apps need to do a lookup for product name that rarely change-in 5 years (hscode) and location that's determined -eg local government areas in a country. Once I load the product, I don't see any write again, same for the location data. My strategy is to prepare a csv file for product and location and save to ETS instead of a database. Advice if am doing the right thing...and module to allow me query what I seek from the cache..eg I might just require the hscode for a produce and the title< while skipping other fields such as description or unit. My request is a link to how to implement my idea. I have read through Elixir school specifically on ETS and read many excellent contribution but your article seem to a headway but it leaves with questions. Any help is welcome

@cgrothaus
Copy link
Author

Hej @btajudee, thanks for your feedback 😄 .

Regarding your question, I don't know if a cache is what you need. Storing the data in a database seems much more straightforward, even if it changes very rarely. In almost all cases the database is fast enough, so you don't need the extra complexity of a cache.

Hint: https://elixirforum.com/ is a good place to ask such question, there are plenty of helpful people there.

Regards!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment