Building projections with Ecto using Commanded event handlers
defmodule Projections.Repo.Migrations.CreateProjectionVersions do | |
use Ecto.Migration | |
def change do | |
create table(:projection_versions, primary_key: false) do | |
add :projection_name, :text, primary_key: true | |
add :last_seen_event_id, :bigint | |
timestamps | |
end | |
end | |
end |
defmodule Projections.ExampleProjection do | |
use Projection | |
schema "examples" do | |
field :name, :string | |
timestamps | |
end | |
defmodule Projector do | |
@behaviour Commanded.Event.Handler | |
@projection_name "example" | |
def handle(%SomeEvent{name: name}, %{event_id: event_id}) do | |
ExampleProjection.update_projection(@projection_name, event_id, fn multi -> | |
Ecto.Multi.insert(multi, :example, %ExampleProjection{ | |
name: name | |
}) | |
end) | |
end | |
# ignore all other events | |
def handle(_event, _metadata), do: :ok | |
end |
defmodule Projections.Projection do | |
@moduledoc false | |
defmacro __using__(_) do | |
quote do | |
use Ecto.Schema | |
import Ecto.Changeset | |
import Ecto.Query | |
alias Projections.{Repo,Projection,ProjectionVersion} | |
def update_projection(projection_name, event_id, multi_fn) do | |
multi = | |
Ecto.Multi.new | |
|> Ecto.Multi.run(:verify_projection_version, fn _ -> | |
version = case Repo.get(ProjectionVersion, projection_name) do | |
nil -> Repo.insert!(%ProjectionVersion{projection_name: projection_name, last_seen_event_id: 0}) | |
version -> version | |
end | |
if version.last_seen_event_id < event_id do | |
{:ok, %{version: version}} | |
else | |
{:error, :already_seen_event} | |
end | |
end) | |
|> Ecto.Multi.update(:projection_version, ProjectionVersion.changeset(%ProjectionVersion{projection_name: projection_name}, %{last_seen_event_id: event_id})) | |
multi = apply(multi_fn, [multi]) | |
case Repo.transaction(multi) do | |
{:ok, _changes} -> :ok | |
{:error, :verify_projection_version, :already_seen_event, _changes_so_far} -> :ok | |
{:error, stage, reason, _changes_so_far} -> {:error, reason} | |
end | |
end | |
end | |
end | |
end |
defmodule Projections.ProjectionVersion do | |
use Ecto.Schema | |
import Ecto.Changeset | |
import Ecto.Query | |
@primary_key {:projection_name, :string, []} | |
schema "projection_versions" do | |
field :last_seen_event_id, :integer | |
timestamps | |
end | |
@required_fields ~w(last_seen_event_id) | |
def changeset(model, params \\ :empty) do | |
model | |
|> cast(params, @required_fields) | |
end | |
end |
defmodule Projections.Supervisor do | |
use Supervisor | |
def start_link do | |
Supervisor.start_link(__MODULE__, nil) | |
end | |
def init(_) do | |
children = [ | |
worker(Projections.Repo, []), | |
# projections | |
worker(Commanded.Event.Handler, ["ExampleProjection", Projections.ExampleProjection.Projector], id: :example_projection) | |
] | |
supervise(children, strategy: :one_for_one) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment