Skip to content

Instantly share code, notes, and snippets.

@benwilson512
Last active May 9, 2018 17:47
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benwilson512/db861d118545abd8cd92c11bf3ccf382 to your computer and use it in GitHub Desktop.
Save benwilson512/db861d118545abd8cd92c11bf3ccf382 to your computer and use it in GitHub Desktop.
defmodule Raven.Util.Transaction do
use GenServer, restart: :temporary
@moduledoc """
Claim a given transaction handle
Blocks if other callers have already claimed the handle. This will return with
`:ok` if the current process wins the claim, otherwise it will return `:already_taken`
### Example Usage
This is would be more ergonomic by doing it Repo.transaction still with an wrapping
function, but the split up explicit form we use here makes it easier to do
commanded stuff.
```
email = "foo@localhost.com"
lookup_fun = fn ->
User.find_by_email(email)
end
with :ok <- Transaction.claim(email, lookup_fun),
{:ok, user} <- create_user(email: email) do
:ok = Transaction.release(email)
{:ok, user}
else
error ->
:ok = Transaction.release(email)
error
end
```
The idea here is that we ask for a claim with a specific handle, usually the handle
is the value we want to have unique access to. The lookup fun is run from within
the GenServer, and if it returns a truthy value then the value is already present,
and you get back `{:error, :already_taken}`. Running this from within the GenServer
avoids the usual race condition around this check. If the value is does not exist
then you successfully claim the handle and get back `:ok`.
With the claim in hand we go ahead doing whatever we want with the value. It is critical
that whatever action we take be strongly consistent with respect to the check done by
the lookup fun. After we have done what we want we release the transaction handle. This is where the lookup
function comes back in. The GenServer runs the lookup function again and if it returns a truthy
value then the transaction is considered to have been successful and any other
processes waiting on a claim will receive `{:error, :already_taken}`.
If the lookup function returns a falsey value then the next process gets a shot.
"""
def claim(handle, lookup_fun, timeout \\ 15_000) do
case DynamicSupervisor.start_child(
Raven.Util.Transactions,
{__MODULE__, handle}
) do
{:error, {:already_started, pid}} ->
GenServer.call(pid, {:claim, lookup_fun}, timeout)
{:ok, pid} ->
GenServer.call(pid, {:claim, lookup_fun}, timeout)
end
end
def release(handle, timeout \\ 5_000) do
GenServer.call({:global, handle}, :release, timeout)
end
defstruct [
:current,
:lookup_fun,
claimants: []
]
def start_link(handle) do
GenServer.start_link(__MODULE__, [], name: {:global, handle})
end
def init(_) do
state = %__MODULE__{}
# if nobody actually makes a claim, then exit in 5
Process.send_after(self(), :timeout, 5_000)
{:ok, state}
end
def handle_call({:claim, _fun}, {caller, _}, %{current: caller} = state) do
{:reply, :ok, state}
end
def handle_call({:claim, fun}, {caller, _}, %{current: nil} = state) do
if fun.() do
{:reply, {:error, :already_taken}, state}
else
Process.monitor(caller)
state = %{state | current: caller, lookup_fun: fun}
{:reply, :ok, state}
end
end
def handle_call({:claim, _fun}, {caller, _} = claim, state) do
Process.monitor(caller)
# ++ is not the most efficient but the number of callers will generally be low.
# consider fixing later.
state = %{state | claimants: state.claimants ++ [claim]}
{:noreply, state}
end
def handle_call(:release, {caller, _}, %{current: caller} = state) do
case next(state) do
{:done, state} -> {:reply, :ok, state, 5_000}
{:next, state} -> {:reply, :ok, state}
end
end
def handle_call(:release, _, state) do
{:reply, :ok, state}
end
def handle_info({:DOWN, _, :process, pid, _}, %{current: pid} = state) do
case next(state) do
{:done, state} -> {:noreply, state, 5_000}
{:next, state} -> {:noreply, state}
end
end
def handle_info(:timeout, %{current: nil} = state) do
{:stop, :normal, state}
end
def handle_info(:timeout, state) do
{:noreply, state}
end
def handle_info({:DOWN, _, :process, dead_pid, _}, state) do
claimants = for {pid, _} = claim <- state.claimants, pid != dead_pid, do: claim
{:noreply, %{state | claimants: claimants}}
end
defp next(state) do
if state.lookup_fun.() do
for claim <- state.claimants do
GenServer.reply(claim, {:error, :already_taken})
end
state = %{state | current: nil, claimants: []}
{:done, state}
else
case next_claimant(%{state | current: nil}) do
{:none, state} ->
{:done, state}
{:ok, next, state} ->
GenServer.reply(next, :ok)
{:next, state}
end
end
end
defp next_claimant(%{claimants: []} = state) do
{:none, state}
end
defp next_claimant(%{claimants: [{pid, _} = next | rest]} = state) do
{:ok, next, %{state | current: pid, claimants: rest}}
end
end
defmodule Raven.Ordering.Uniqueness do
@behaviour Commanded.Middleware
alias Commanded.Middleware.Pipeline
alias Raven.Ordering.Commands.{CreateStaff}
alias Raven.Util.Transaction
alias Raven.Ordering
def before_dispatch(%Pipeline{command: %CreateStaff{email: email}} = pipeline) do
transaction = Transaction.claim(email, fn ->
Ordering.get_by_email(Ordering.Staff, email)
end)
case transaction do
:ok ->
Pipeline.assign(pipeline, :transaction_handle, email)
{:error, :already_taken} ->
pipeline
|> Pipeline.respond({:error, email: :already_taken})
|> Pipeline.halt
end
end
def before_dispatch(pipeline) do
pipeline
end
def after_dispatch(pipeline) do
if handle = pipeline.assigns[:transaction_handle] do
Transaction.release(handle)
end
pipeline
end
def after_failure(pipeline) do
if handle = pipeline.assigns[:transaction_handle] do
Transaction.release(handle)
end
pipeline
end
end
@benwilson512
Copy link
Author

Note the expectation for a dynamic supervisor on line 53, could be adjusted according to use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment