Skip to content

Instantly share code, notes, and snippets.

@slashdotdash
Last active June 5, 2023 12:46
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save slashdotdash/1287f0c0735b49cca8afcc1b67665b42 to your computer and use it in GitHub Desktop.
Save slashdotdash/1287f0c0735b49cca8afcc1b67665b42 to your computer and use it in GitHub Desktop.
Building projections with Ecto using Commanded event handlers
defmodule Projections.Repo.Migrations.CreateProjectionVersions do
use Ecto.Migration
def change do
create table(:projection_versions, primary_key: false) do
add :projection_name, :text, primary_key: true
add :last_seen_event_id, :bigint
timestamps
end
end
end
defmodule Projections.ExampleProjection do
use Projection
schema "examples" do
field :name, :string
timestamps
end
defmodule Projector do
@behaviour Commanded.Event.Handler
@projection_name "example"
def handle(%SomeEvent{name: name}, %{event_id: event_id}) do
ExampleProjection.update_projection(@projection_name, event_id, fn multi ->
Ecto.Multi.insert(multi, :example, %ExampleProjection{
name: name
})
end)
end
# ignore all other events
def handle(_event, _metadata), do: :ok
end
defmodule Projections.Projection do
@moduledoc false
defmacro __using__(_) do
quote do
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query
alias Projections.{Repo,Projection,ProjectionVersion}
def update_projection(projection_name, event_id, multi_fn) do
multi =
Ecto.Multi.new
|> Ecto.Multi.run(:verify_projection_version, fn _ ->
version = case Repo.get(ProjectionVersion, projection_name) do
nil -> Repo.insert!(%ProjectionVersion{projection_name: projection_name, last_seen_event_id: 0})
version -> version
end
if version.last_seen_event_id < event_id do
{:ok, %{version: version}}
else
{:error, :already_seen_event}
end
end)
|> Ecto.Multi.update(:projection_version, ProjectionVersion.changeset(%ProjectionVersion{projection_name: projection_name}, %{last_seen_event_id: event_id}))
multi = apply(multi_fn, [multi])
case Repo.transaction(multi) do
{:ok, _changes} -> :ok
{:error, :verify_projection_version, :already_seen_event, _changes_so_far} -> :ok
{:error, stage, reason, _changes_so_far} -> {:error, reason}
end
end
end
end
end
defmodule Projections.ProjectionVersion do
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query
@primary_key {:projection_name, :string, []}
schema "projection_versions" do
field :last_seen_event_id, :integer
timestamps
end
@required_fields ~w(last_seen_event_id)
def changeset(model, params \\ :empty) do
model
|> cast(params, @required_fields)
end
end
defmodule Projections.Repo do
use Ecto.Repo, otp_app: :example
end
defmodule Projections.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_) do
children = [
worker(Projections.Repo, []),
# projections
worker(Commanded.Event.Handler, ["ExampleProjection", Projections.ExampleProjection.Projector], id: :example_projection)
]
supervise(children, strategy: :one_for_one)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment