Last active
July 1, 2024 10:00
-
-
Save joshprice/b854bd8b27d16451aa1e8f6b402ff55e to your computer and use it in GitHub Desktop.
An attempt at a Property Table Ash Data Layer leveraging the ETS data layer by delegation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Ash.DataLayer.PropertyTable do | |
require Ash.Query | |
import Ash.Expr | |
require Logger | |
@property_table %Spark.Dsl.Section{ | |
name: :property_table, | |
describe: """ | |
A section for configuring the property_table data layer | |
""", | |
examples: [ | |
""" | |
property_table do | |
# Used in testing | |
private? true | |
end | |
""" | |
], | |
schema: [ | |
private?: [ | |
type: :boolean, | |
default: false, | |
doc: | |
"Sets the ets table protection to private, and scopes it to only this process. The table name will not be used directly if this is true, to allow multiple processes to use this resource separately." | |
], | |
table: [ | |
type: :atom, | |
doc: """ | |
The name of the table. Defaults to the resource name. | |
""" | |
] | |
] | |
} | |
@moduledoc """ | |
An ETS (Erlang Term Storage) backed Ash Datalayer, for testing and lightweight usage. | |
Remember, this does not have support for transactions! This is not recommended for production | |
use, especially in multi-user applications. It can, however, be great for prototyping. | |
""" | |
use Spark.Dsl.Extension, | |
sections: [@property_table], | |
verifiers: [Ash.DataLayer.Verifiers.RequirePreCheckWith] | |
use Ash.DataLayer.Ets | |
@behaviour Ash.DataLayer | |
# Delegate operations to Ash.DataLayer.Ets | |
defdelegate can?(action, resource), to: Ash.DataLayer.Ets | |
defdelegate filter(query, opts), to: Ash.DataLayer.Ets | |
defdelegate limit(query, limit, opts), to: Ash.DataLayer.Ets | |
defdelegate offset(query, offset, opts), to: Ash.DataLayer.Ets | |
defdelegate sort(query, sort, opts), to: Ash.DataLayer.Ets | |
defdelegate run_query(resource, query, opts), to: Ash.DataLayer.Ets | |
defdelegate create(resource, changeset), to: Ash.DataLayer.Ets | |
defdelegate update(resource, changeset), to: Ash.DataLayer.Ets | |
defdelegate destroy(resource, record), to: Ash.DataLayer.Ets | |
defdelegate resource_to_query(resource), to: Ash.DataLayer.Ets | |
defdelegate create_table(resource), to: Ash.DataLayer.Ets | |
defdelegate destroy_table(resource), to: Ash.DataLayer.Ets | |
defdelegate transaction(resource, opts, fun), to: Ash.DataLayer.Ets | |
defdelegate rollback(resource, value), to: Ash.DataLayer.Ets | |
defdelegate in_transaction?(resource), to: Ash.DataLayer.Ets | |
defdelegate wrap_in_transaction(resource, opts, fun), to: Ash.DataLayer.Ets | |
defdelegate generic_get(resource, resource_identities, related), to: Ash.DataLayer.Ets | |
defdelegate alter_table(resource, opts), to: Ash.DataLayer.Ets | |
# Implementation of the functions which call writes in ETS, we replace the ETS writes with PropertyTable equivalents | |
defp put_or_insert_new(table, {pkey, record}, resource) do | |
attributes = resource |> Ash.Resource.Info.attributes() | |
case dump_to_native(record, attributes) do | |
{:ok, casted} -> | |
case PropertyTable.put(table, pkey, casted) do | |
:ok -> | |
case PropertyTable.get(table, pkey) do | |
{:ok, record} -> cast_record(record, resource) | |
error -> error | |
end | |
error -> | |
error | |
end | |
other -> | |
other | |
end | |
end | |
defp put_or_insert_new_batch(table, records, resource, return_records?) do | |
attributes = resource |> Ash.Resource.Info.attributes() | |
Enum.reduce_while(records, {:ok, [], []}, fn {pkey, index, record}, {:ok, acc, indices} -> | |
case dump_to_native(record, attributes) do | |
{:ok, casted} -> | |
{:cont, {:ok, [{pkey, casted} | acc], [{pkey, index} | indices]}} | |
{:error, error} -> | |
{:halt, {:error, error}} | |
end | |
end) | |
|> case do | |
{:ok, batch, indices} -> | |
results = Enum.map(batch, fn {pkey, casted} -> PropertyTable.put(table, pkey, casted) end) | |
if Enum.all?(results, &(&1 == :ok)) do | |
if return_records? do | |
Enum.reduce_while(indices, {:ok, []}, fn {pkey, index}, {:ok, acc} -> | |
case PropertyTable.get(table, pkey) do | |
{:ok, record} -> | |
case cast_record(record, resource) do | |
{:ok, casted} -> | |
{:cont, | |
{:ok, [Ash.Resource.put_metadata(casted, :bulk_create_index, index) | acc]}} | |
{:error, error} -> | |
{:halt, {:error, error}} | |
end | |
error -> | |
{:halt, error} | |
end | |
end) | |
else | |
:ok | |
end | |
else | |
{:error, :batch_insert_failed} | |
end | |
other -> | |
other | |
end | |
end | |
defp put_data(table, pkey, data) do | |
case PropertyTable.put(table, pkey, data) do | |
:ok -> | |
{:ok, data} | |
error -> | |
error | |
end | |
end | |
defp do_destroy(resource, record, tenant, filter, domain) do | |
with {:ok, table} <- wrap_or_create_table(resource, tenant) do | |
pkey = Map.take(record, Ash.Resource.Info.primary_key(resource)) | |
if has_filter?(filter) do | |
case PropertyTable.get(table, pkey) do | |
{:ok, record} when is_map(record) -> | |
with {:ok, record} <- cast_record(record, resource), | |
{:ok, [_]} <- filter_matches([record], filter, domain, tenant) do | |
case PropertyTable.delete(table, pkey) do | |
:ok -> :ok | |
error -> error | |
end | |
else | |
{:ok, []} -> | |
{:error, | |
Ash.Error.Changes.StaleRecord.exception( | |
resource: resource, | |
filter: filter | |
)} | |
{:error, error} -> | |
{:error, error} | |
end | |
{:error, error} -> | |
{:error, error} | |
end | |
else | |
case PropertyTable.delete(table, pkey) do | |
:ok -> :ok | |
error -> error | |
end | |
end | |
end | |
end | |
@doc "Stops the storage for a given resource/tenant (deleting all of the data)" | |
# sobelow_skip ["DOS.StringToAtom"] | |
def stop(resource, tenant \\ nil) do | |
tenant = | |
if Ash.Resource.Info.multitenancy_strategy(resource) == :context do | |
tenant | |
end | |
if Ash.DataLayer.Ets.Info.private?(resource) do | |
case Process.get({:ash_ets_table, resource, tenant}) do | |
nil -> | |
:ok | |
table -> | |
PropertyTable.clear(table) | |
end | |
else | |
table = | |
if tenant && Ash.Resource.Info.multitenancy_strategy(resource) == :context do | |
String.to_atom(to_string(tenant) <> to_string(resource)) | |
else | |
resource | |
end | |
name = Module.concat(table, TableManager) | |
case Process.whereis(name) do | |
nil -> | |
:ok | |
pid -> | |
Process.exit(pid, :shutdown) | |
end | |
end | |
end | |
# Include any other necessary functions or helpers | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment