Last active
December 13, 2023 09:22
-
-
Save cgrothaus/4cfc2c65c5af146abae84db75c560444 to your computer and use it in GitHub Desktop.
A replicated cache in 60 lines using ConCache and Phoenix PubSub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MyApp.ReplicatedConCache do | |
@moduledoc """ | |
Utility to replicate local ConCache operations via Phoenix.PubSub. | |
Only those ConCache functions that we need are replicated. | |
Prerequisites: | |
- for all relevant cache ids, there is a ConCache instance running on | |
each node connected to the PubSub system | |
- these ConCache instances are all configured same (TTL etc.) | |
Usage: | |
- start up the ReplicatedConCache process, e.g. by adding it to the list | |
of children of your OTP application | |
- use ReplicatedConCache functions as a drop-in replacement of ConCache | |
functions | |
Notes: | |
- when a new elixir node connects to a cluster, the existing ConCache | |
entries are NOT replicated | |
- the server implementation (which receives the replication messages | |
via PubSub) is a single GenServer process, it is NOT optimized for | |
high throughput | |
""" | |
use GenServer | |
alias Phoenix.PubSub | |
require Logger | |
@pubsub_system MyApp.PubSub | |
@topic "replicated_con_cache" | |
##### Client implementation ##### | |
# `fetch` is local, `store` is replicated | |
def fetch_or_store(cache_id, key, producer_fun) do | |
ConCache.fetch_or_store(cache_id, key, fn -> | |
with {:ok, value} <- producer_fun.() do | |
broadcast_put(cache_id, key, value) | |
{:ok, value} | |
end | |
end) | |
end | |
# `get` is local, `store` is replicated | |
def get_or_store(cache_id, key, producer_fun) do | |
ConCache.get_or_store(cache_id, key, fn -> | |
value = producer_fun.() | |
broadcast_put(cache_id, key, value) | |
value | |
end) | |
end | |
def delete(cache_id, key) do | |
ConCache.delete(cache_id, key) | |
broadcast_delete(cache_id, key) | |
end | |
defp broadcast_put(cache_id, key, value), | |
do: broadcast_message({:put, node(), cache_id, key, value}) | |
defp broadcast_delete(cache_id, key), | |
do: broadcast_message({:delete, node(), cache_id, key}) | |
defp broadcast_message(message) do | |
log_debug("broadcast message #{message |> inspect()}") | |
PubSub.broadcast(@pubsub_system, @topic, message) | |
end | |
##### Server implementation ##### | |
def start_link(_) do | |
GenServer.start_link(__MODULE__, nil) | |
end | |
def init(_) do | |
log_debug("init - subscribing to PubSub '#{@pubsub_system}' topic '#{@topic}'") | |
case PubSub.subscribe(@pubsub_system, @topic) do | |
:ok -> | |
log_debug("init - subscribed to PubSub") | |
{:ok, nil} | |
{:error, reason} -> | |
{:stop, reason} | |
end | |
end | |
def handle_info({:put, sender_node_name, cache_id, key, value} = message, _) do | |
unless sender_node_name == node() do | |
log_receive(message) | |
ConCache.put(cache_id, key, value) | |
end | |
{:noreply, nil} | |
end | |
def handle_info({:delete, sender_node_name, cache_id, key} = message, _) do | |
unless sender_node_name == node() do | |
log_receive(message) | |
ConCache.delete(cache_id, key) | |
end | |
{:noreply, nil} | |
end | |
##### Shared implementation ##### | |
defp log_receive(message), | |
do: log_debug("received message #{message |> inspect()}") | |
defp log_debug(message), do: Logger.debug("#{__MODULE__}: #{message}") | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MyApp.ReplicatedConCacheTest do | |
use MyApp.DataCase, async: true | |
alias MyApp.ReplicatedConCache | |
alias Phoenix.PubSub | |
describe "fetch_or_store/3" do | |
test "stores the value in the local ConCache" do | |
ReplicatedConCache.fetch_or_store(:global_cache, :key1, fn -> {:ok, :value1} end) | |
assert_local_cache_value(:key1, :value1) | |
end | |
test "broadcasts a put message" do | |
subscribe_pubsub() | |
ReplicatedConCache.fetch_or_store(:global_cache, :key2, fn -> {:ok, :value1} end) | |
assert_put_message_received(node(), :key2, :value1) | |
end | |
test "when a cache value already exists, stores and broadcasts nothing" do | |
subscribe_pubsub() | |
local_cache_put(:key3, :value1) | |
ReplicatedConCache.fetch_or_store(:global_cache, :key3, fn -> {:ok, :value2} end) | |
assert_local_cache_value(:key3, :value1) | |
refute_put_message_received(node(), :key3, :value1) | |
end | |
end | |
describe "get_or_store/3" do | |
test "stores the value in the local ConCache" do | |
ReplicatedConCache.get_or_store(:global_cache, :key4, fn -> :value1 end) | |
assert_local_cache_value(:key4, :value1) | |
end | |
test "broadcasts a put message" do | |
subscribe_pubsub() | |
ReplicatedConCache.get_or_store(:global_cache, :key5, fn -> :value1 end) | |
assert_put_message_received(node(), :key5, :value1) | |
end | |
test "when a cache value already exists, stores and broadcasts nothing" do | |
subscribe_pubsub() | |
local_cache_put(:key6, :value1) | |
ReplicatedConCache.get_or_store(:global_cache, :key6, fn -> :value2 end) | |
assert_local_cache_value(:key6, :value1) | |
refute_put_message_received(node(), :key6, :value1) | |
end | |
end | |
describe "delete/2" do | |
test "deletes the value from the local ConCache" do | |
local_cache_put(:key7, :value1) | |
ReplicatedConCache.delete(:global_cache, :key7) | |
refute_local_cache_value(:key7) | |
end | |
test "broadcasts a delete message" do | |
subscribe_pubsub() | |
ReplicatedConCache.delete(:global_cache, :key8) | |
assert_delete_message_received(node(), :key8) | |
end | |
end | |
describe "handle_info/2 for a put message" do | |
test "stores the value in the local ConCache" do | |
refute_local_cache_value(:key9) | |
broadcast_pubsub({:put, :some_node_name, :global_cache, :key9, :value1}) | |
assert_local_cache_value(:key9, :value1) | |
end | |
test "when a local cache value already exists, overwrites it" do | |
local_cache_put(:key10, :value1) | |
broadcast_pubsub({:put, :some_node_name, :global_cache, :key10, :value2}) | |
assert_local_cache_value(:key10, :value2) | |
end | |
test "when the message is from the same node, does nothing" do | |
broadcast_pubsub({:put, node(), :global_cache, :key11, :value1}) | |
refute_local_cache_value(:key11) | |
end | |
end | |
describe "handle_info/2 for a delete message" do | |
test "deletes the value from the local ConCache" do | |
local_cache_put(:key12, :value1) | |
broadcast_pubsub({:delete, :some_node_name, :global_cache, :key12}) | |
refute_local_cache_value(:key12) | |
end | |
test "when the message is from the same node, does nothing" do | |
local_cache_put(:key13, :value1) | |
broadcast_pubsub({:delete, node(), :global_cache, :key13}) | |
assert_local_cache_value(:key13, :value1) | |
end | |
end | |
defp local_cache_put(key, value) do | |
ConCache.put(:global_cache, key, value) | |
assert_local_cache_value(key, value) | |
end | |
defp assert_local_cache_value(key, value), do: assert(ConCache.get(:global_cache, key) == value) | |
defp refute_local_cache_value(key), do: refute(ConCache.get(:global_cache, key)) | |
defp assert_put_message_received(node, key, value), | |
do: assert_received({:put, ^node, :global_cache, ^key, ^value}) | |
defp refute_put_message_received(node, key, value), | |
do: refute_received({:put, ^node, :global_cache, ^key, ^value}) | |
defp assert_delete_message_received(node, key), | |
do: assert_received({:delete, ^node, :global_cache, ^key}) | |
defp subscribe_pubsub, | |
do: PubSub.subscribe(MyApp.PubSub, "replicated_con_cache") | |
defp broadcast_pubsub(message) do | |
PubSub.broadcast(MyApp.PubSub, "replicated_con_cache", message) | |
# wait for local ConCache update | |
Process.sleep(75) | |
end | |
end |
Hej @btajudee, thanks for your feedback 😄 .
Regarding your question, I don't know if a cache is what you need. Storing the data in a database seems much more straightforward, even if it changes very rarely. In almost all cases the database is fast enough, so you don't need the extra complexity of a cache.
Hint: https://elixirforum.com/ is a good place to ask such question, there are plenty of helpful people there.
Regards!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Many thanks for this excellent paper. But I am new to both elixir and caching. I need a primer to get off the ground. My project need to implement cache . All my other OTP apps need to do a lookup for product name that rarely change-in 5 years (hscode) and location that's determined -eg local government areas in a country. Once I load the product, I don't see any write again, same for the location data. My strategy is to prepare a csv file for product and location and save to ETS instead of a database. Advice if am doing the right thing...and module to allow me query what I seek from the cache..eg I might just require the hscode for a produce and the title< while skipping other fields such as description or unit. My request is a link to how to implement my idea. I have read through Elixir school specifically on ETS and read many excellent contribution but your article seem to a headway but it leaves with questions. Any help is welcome