Last active
October 13, 2018 13:30
-
-
Save mjaric/b4bf2026d639956f134ce665ddbce2d9 to your computer and use it in GitHub Desktop.
Ring facade for extreme_system apps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule RingFacade do | |
@moduledoc """ | |
A hash ring facade calulates to which node and facade in ring command will be sent. | |
## Example | |
iex> params = %{"id" => 1, "first_name" => "Foo", "last_name" => "Baz", "email" => "foo.baz@example.com"} | |
iex> RingFacade.call(:create_user, params, routing_key: 1, timout: 5_000) | |
{:ok, :created} | |
""" | |
require Logger | |
@max_retries 2 | |
@ring :other_app | |
@facade OtherApp.Facade | |
def call(cmd, params, opts \\ []) do | |
timeout = Keyword.get(opts, :timeout, 15_000) | |
routing_key = Keyword.get(opts, :routing_key, nil) | |
case get_ring_node(@ring, routing_key) do | |
nil -> | |
{:error, :facade_unavailable} | |
node when is_atom(node) -> | |
msg = %{ | |
routing_key: routing_key, | |
command: cmd, | |
timeout: timeout, | |
params: params | |
} | |
call_ring_node(@ring, node, @facade, msg, timeout) | |
{:error, {:invalid_ring, :no_nodes}} -> | |
{:error, :facade_unavailable} | |
{:error, :node_unreacable} -> | |
{:error, :facade_unavailable} | |
end | |
end | |
@spec get_ring_node(ring :: Atom.t(), routing_key :: term | nil) :: Atom.t() | nil | |
def get_ring_node(ring, routing_key \\ nil) do | |
topology = Application.get_env(:libcluster, :topologies, []) | |
cluster_nodes = Node.list() | |
nodes = | |
topology | |
|> pop_in([ring, :config, :hosts]) | |
|> elem(0) | |
|> List.wrap() | |
|> intersect(cluster_nodes) | |
Logger.warn(inspect(nodes)) | |
ring = HashRing.new() | |
ring = HashRing.add_nodes(ring, nodes) | |
if routing_key == nil do | |
Enum.random(nodes) | |
else | |
HashRing.key_to_node(ring, routing_key) | |
end | |
end | |
defp call_ring_node( | |
ring, | |
node, | |
facade, | |
msg, | |
attempt \\ 1 | |
) do | |
timeout = Map.get(msg, :timeout) || 15_000 | |
try do | |
GenServer.call({facade, node}, {msg.command, msg.params}, timeout) | |
catch | |
:exit, {{:nodedown, node_name}, _} when attempt < @max_retries -> | |
Logger.debug(fn -> "Node #{node_name} is reported as down, retrying..." end) | |
case get_ring_node(ring, msg.routing_key) do | |
{:ok, node} -> | |
call_ring_node(ring, node, facade, msg, attempt + 1) | |
{:error, :node_unreacable} -> | |
{:error, :facade_unavailable} | |
end | |
:exit, reason -> | |
if attempt < @max_retries do | |
Logger.warn(fn -> | |
[ | |
"Attempt ##{inspect(attempt)} fail to call remote facade, exited with reson: ", | |
inspect(reason), | |
" Retrying..." | |
] | |
|> IO.iodata_to_binary() | |
end) | |
call_ring_node(ring, node, facade, msg, attempt + 1) | |
else | |
Logger.error(""" | |
Attempt ##{inspect(attempt)} failed to call remote facade, exited with reson: | |
#{inspect(reason)} | |
""") | |
{:error, :facade_unavailable} | |
end | |
end | |
end | |
defp intersect(list, list2) do | |
list2 = List.wrap(list2) | |
list = List.wrap(list) | |
Enum.filter(list, &(&1 in list2)) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment