Skip to content

Instantly share code, notes, and snippets.

@astery
Last active September 25, 2017 12:28
Show Gist options
  • Save astery/4312e38394409c168d976b03ddfe1a9e to your computer and use it in GitHub Desktop.
Save astery/4312e38394409c168d976b03ddfe1a9e to your computer and use it in GitHub Desktop.
Projection waiting example
# EventBus is just event_handler which retranslates event_store events and allow to send new events which not stored in es
# dispatch - is a wrapper above commanded dispatch which allows to handle both types of commands: commanded and non commanded commands.
defmodule VOK.User.CommandHandlers.CreateUserSyncHandler do
import VOK.User.WaitHelpers
alias VOK.Events.{
UserCreated,
RoleGranted,
RoleGrantFailed,
UserProjectionUpdated, # this event not stored in event_store
}
def handle(%VOK.Commands.CreateUserSync{user_id: user_id, roles: nil} = command, %{dispatch: dispatch} = ctx) do
EventBus.subscribe(%UserCreated{user_id: user_id})
EventBus.subscribe(%UserProjectionUpdated{user_id: user_id})
result =
with :ok <- dispatch.(struct(VOK.Commands.CreateUser, Map.from_struct(command)), ctx),
do: wait_for_user_creation(user_id)
EventBus.unsubscribe()
result
end
def handle(%VOK.Commands.CreateUserSync{user_id: user_id, roles: roles} = command, %{dispatch: dispatch} = ctx) do
EventBus.subscribe(%UserCreated{user_id: user_id})
EventBus.subscribe(%RoleGranted{user_id: user_id})
EventBus.subscribe(%RoleGrantFailed{user_id: user_id})
EventBus.subscribe(%UserProjectionUpdated{user_id: user_id})
result =
with :ok <- dispatch.(struct(VOK.Commands.CreateUser, Map.from_struct(command)), ctx),
:ok <- wait_for_user_creation(user_id),
do: wait_for_roles(user_id, roles)
EventBus.unsubscribe()
result
end
end
defmodule VOK.User.WaitHelpers do
alias VOK.Events.{
UserCreated,
UserUpdated,
UserUpdateFailed,
RoleGranted,
RoleGrantFailed,
RoleRevoked,
RoleRevokeFailed,
UserProjectionUpdated, # this event not stored in event_store
}
def wait_for_roles(user_id, roles) do
Enum.reduce_while(roles, :ok, fn role, acc ->
with :ok <- wait_for_role_update(user_id) do
{:cont, :ok}
else
error -> {:halt, error}
end
end)
end
def wait_for_user_creation(user_id) do
receive do
{:event, %UserCreated{}} -> wait_for_projection_update(user_id)
after 500 ->
{:error, VOK.Error.new("timeout reached")}
end
end
def wait_for_user_update(user_id) do
receive do
{:event, %UserUpdated{roles_changed: roles_changed}} ->
with :ok <- wait_for_projection_update(user_id), do: {:ok, roles_changed}
{:event, %UserUpdateFailed{reason: reason}} -> {:error, VOK.Error.new(reason)}
after 500 ->
{:error, VOK.Error.new("timeout reached")}
end
end
def wait_for_role_update(user_id) do
receive do
{:event, %RoleGranted{}} -> wait_for_projection_update(user_id)
{:event, %RoleGrantFailed{reason: reason}} -> {:error, VOK.Error.new("role grant failed: #{reason}")}
{:event, %RoleRevoked{}} -> wait_for_projection_update(user_id)
{:event, %RoleRevokeFailed{reason: reason}} -> {:error, VOK.Error.new("role revoke failed: #{reason}")}
after 500 ->
{:error, VOK.Error.new("timeout reached")}
end
end
def wait_for_projection_update(user_id) do
receive do
{:event, %UserProjectionUpdated{user_id: user_id}} -> :ok
after 500 ->
{:error, VOK.Error.new("timeout reached")}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment