Skip to content

Instantly share code, notes, and snippets.

@slashdotdash
Last active March 11, 2019 10:03
Show Gist options
  • Save slashdotdash/031a043796c0753240b8f1d740db53ed to your computer and use it in GitHub Desktop.
Save slashdotdash/031a043796c0753240b8f1d740db53ed to your computer and use it in GitHub Desktop.
Dealing with eventually consistent read model projections in Commanded

Dealing with eventually consistent read model projections in Commanded

Example

with {:ok, version} <- Router.dispatch(command, include_aggregate_version: true),
     {:ok, projection} <- wait_for_projection_version(ExampleProjection, uuid, version) do   

  # ... safely use up-to-date read model projection at expected version
  
else
  reply -> reply
end

Uses the following helper function.

defp wait_for_projection_version(schema, uuid, version) do
  Projections.wait_for(schema, uuid, fn projection ->
    projection.version >= version
  end)
end

You can use any predicate to wait for the read model to be updated, not just the version (e.g. fn projection -> projection.deleted end).


NOTE: This approach has been superceeded by the command dispatch consistency guarantee feature added in Commanded v0.15.0

defmodule ExampleProjection do
use Commanded.Projections.Ecto, name: "ExampleProjection"
@primary_key {:uuid, :string, []}
schema "examples" do
field :name, :string
timestamps()
end
project %ExampleCreated{uuid: uuid, name: name}, %{stream_version: version} do
Ecto.Multi.insert(multi, :example, %ExampleProjection{
uuid: uuid,
name: name,
version: version,
})
end
project %ExampleRenamed{uuid: uuid, name: name}, %{stream_version: version} do
Ecto.Multi.update_all(multi, :example, example_query(uuid), [
set: [
name: name,
version: version,
]
], returning: true)
end
# Publish changes after each update
def after_update(_event, _metadata, changes), do: Projections.publish_changes(changes)
defp example_query(uuid) do
from e in ExampleProjection,
where: e.uuid == ^uuid
end
end
defmodule Projections do
@doc """
Wait until the given read model is updated to the given version
"""
def wait_for(schema, uuid, predicate \\ fn _ -> true end, timeout \\ 5_000)
def wait_for(schema, uuid, predicate, timeout) when is_function(predicate, 1) do
reply_to = self()
tasks = [
Task.Supervisor.async_nolink(Projections.TaskDispatcher, fn -> subscribe_and_wait(schema, uuid, predicate, reply_to, timeout) end),
Task.Supervisor.async_nolink(Projections.TaskDispatcher, fn -> get_by(schema, uuid, predicate, reply_to) end),
]
# wait for the first task to send the projection, or the timeout is reached
reply =
receive do
{^schema, projection} -> {:ok, projection}
after
timeout -> {:error, :timeout}
end
shutdown(tasks)
reply
end
@doc """
Publish updated projection to interested subscribers
"""
def publish_changes(%{example: projection}), do: publish(projection)
def publish_changes(%{example: {_, projections}}) when is_list(projections), do: Enum.each(projections, &publish/1)
def publish_changes(_changes), do: :ok
defp publish(%ExampleProjection{uuid: uuid} = projection) do
dispatch(ExampleProjection, uuid, projection)
end
defp dispatch(schema, uuid, projection) do
Registry.dispatch(__MODULE__, {schema, uuid}, fn entries ->
for {_pid, {predicate, reply_to}} <- entries do
if predicate.(projection), do: send(reply_to, {schema, projection})
end
end)
end
defp get_by(queryable, uuid, predicate, reply_to) do
with projection when not is_nil(projection) <- Repo.get(queryable, uuid) do
if predicate.(projection), do: send(reply_to, {queryable, projection})
end
end
# Subscribe to notifications of read model updates and wait for the expected version
defp subscribe_and_wait(schema, uuid, predicate, reply_to, timeout) do
Registry.register(__MODULE__, {schema, uuid}, {predicate, reply_to})
# wait until the given timeout to receive the response
Process.sleep(timeout)
end
defp shutdown(tasks), do: Enum.each(tasks, &Task.shutdown(&1))
end
defmodule App.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_) do
children = [
supervisor(Task.Supervisor, [[name: Projections.TaskDispatcher]]),
# pub/sub projections registry
supervisor(Registry, [:duplicate, Projections]),
]
supervise(children, strategy: :one_for_one)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment