Skip to content

Instantly share code, notes, and snippets.

@joshprice
Last active July 1, 2024 10:00
Show Gist options
  • Save joshprice/b854bd8b27d16451aa1e8f6b402ff55e to your computer and use it in GitHub Desktop.
Save joshprice/b854bd8b27d16451aa1e8f6b402ff55e to your computer and use it in GitHub Desktop.
An attempt at a Property Table Ash Data Layer leveraging the ETS data layer by delegation
defmodule Ash.DataLayer.PropertyTable do
require Ash.Query
import Ash.Expr
require Logger
@property_table %Spark.Dsl.Section{
name: :property_table,
describe: """
A section for configuring the property_table data layer
""",
examples: [
"""
property_table do
# Used in testing
private? true
end
"""
],
schema: [
private?: [
type: :boolean,
default: false,
doc:
"Sets the ets table protection to private, and scopes it to only this process. The table name will not be used directly if this is true, to allow multiple processes to use this resource separately."
],
table: [
type: :atom,
doc: """
The name of the table. Defaults to the resource name.
"""
]
]
}
@moduledoc """
An ETS (Erlang Term Storage) backed Ash Datalayer, for testing and lightweight usage.
Remember, this does not have support for transactions! This is not recommended for production
use, especially in multi-user applications. It can, however, be great for prototyping.
"""
use Spark.Dsl.Extension,
sections: [@property_table],
verifiers: [Ash.DataLayer.Verifiers.RequirePreCheckWith]
use Ash.DataLayer.Ets
@behaviour Ash.DataLayer
# Delegate operations to Ash.DataLayer.Ets
defdelegate can?(action, resource), to: Ash.DataLayer.Ets
defdelegate filter(query, opts), to: Ash.DataLayer.Ets
defdelegate limit(query, limit, opts), to: Ash.DataLayer.Ets
defdelegate offset(query, offset, opts), to: Ash.DataLayer.Ets
defdelegate sort(query, sort, opts), to: Ash.DataLayer.Ets
defdelegate run_query(resource, query, opts), to: Ash.DataLayer.Ets
defdelegate create(resource, changeset), to: Ash.DataLayer.Ets
defdelegate update(resource, changeset), to: Ash.DataLayer.Ets
defdelegate destroy(resource, record), to: Ash.DataLayer.Ets
defdelegate resource_to_query(resource), to: Ash.DataLayer.Ets
defdelegate create_table(resource), to: Ash.DataLayer.Ets
defdelegate destroy_table(resource), to: Ash.DataLayer.Ets
defdelegate transaction(resource, opts, fun), to: Ash.DataLayer.Ets
defdelegate rollback(resource, value), to: Ash.DataLayer.Ets
defdelegate in_transaction?(resource), to: Ash.DataLayer.Ets
defdelegate wrap_in_transaction(resource, opts, fun), to: Ash.DataLayer.Ets
defdelegate generic_get(resource, resource_identities, related), to: Ash.DataLayer.Ets
defdelegate alter_table(resource, opts), to: Ash.DataLayer.Ets
# Implementation of the functions which call writes in ETS, we replace the ETS writes with PropertyTable equivalents
defp put_or_insert_new(table, {pkey, record}, resource) do
attributes = resource |> Ash.Resource.Info.attributes()
case dump_to_native(record, attributes) do
{:ok, casted} ->
case PropertyTable.put(table, pkey, casted) do
:ok ->
case PropertyTable.get(table, pkey) do
{:ok, record} -> cast_record(record, resource)
error -> error
end
error ->
error
end
other ->
other
end
end
defp put_or_insert_new_batch(table, records, resource, return_records?) do
attributes = resource |> Ash.Resource.Info.attributes()
Enum.reduce_while(records, {:ok, [], []}, fn {pkey, index, record}, {:ok, acc, indices} ->
case dump_to_native(record, attributes) do
{:ok, casted} ->
{:cont, {:ok, [{pkey, casted} | acc], [{pkey, index} | indices]}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
|> case do
{:ok, batch, indices} ->
results = Enum.map(batch, fn {pkey, casted} -> PropertyTable.put(table, pkey, casted) end)
if Enum.all?(results, &(&1 == :ok)) do
if return_records? do
Enum.reduce_while(indices, {:ok, []}, fn {pkey, index}, {:ok, acc} ->
case PropertyTable.get(table, pkey) do
{:ok, record} ->
case cast_record(record, resource) do
{:ok, casted} ->
{:cont,
{:ok, [Ash.Resource.put_metadata(casted, :bulk_create_index, index) | acc]}}
{:error, error} ->
{:halt, {:error, error}}
end
error ->
{:halt, error}
end
end)
else
:ok
end
else
{:error, :batch_insert_failed}
end
other ->
other
end
end
defp put_data(table, pkey, data) do
case PropertyTable.put(table, pkey, data) do
:ok ->
{:ok, data}
error ->
error
end
end
defp do_destroy(resource, record, tenant, filter, domain) do
with {:ok, table} <- wrap_or_create_table(resource, tenant) do
pkey = Map.take(record, Ash.Resource.Info.primary_key(resource))
if has_filter?(filter) do
case PropertyTable.get(table, pkey) do
{:ok, record} when is_map(record) ->
with {:ok, record} <- cast_record(record, resource),
{:ok, [_]} <- filter_matches([record], filter, domain, tenant) do
case PropertyTable.delete(table, pkey) do
:ok -> :ok
error -> error
end
else
{:ok, []} ->
{:error,
Ash.Error.Changes.StaleRecord.exception(
resource: resource,
filter: filter
)}
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
else
case PropertyTable.delete(table, pkey) do
:ok -> :ok
error -> error
end
end
end
end
@doc "Stops the storage for a given resource/tenant (deleting all of the data)"
# sobelow_skip ["DOS.StringToAtom"]
def stop(resource, tenant \\ nil) do
tenant =
if Ash.Resource.Info.multitenancy_strategy(resource) == :context do
tenant
end
if Ash.DataLayer.Ets.Info.private?(resource) do
case Process.get({:ash_ets_table, resource, tenant}) do
nil ->
:ok
table ->
PropertyTable.clear(table)
end
else
table =
if tenant && Ash.Resource.Info.multitenancy_strategy(resource) == :context do
String.to_atom(to_string(tenant) <> to_string(resource))
else
resource
end
name = Module.concat(table, TableManager)
case Process.whereis(name) do
nil ->
:ok
pid ->
Process.exit(pid, :shutdown)
end
end
end
# Include any other necessary functions or helpers
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment