Skip to content

Instantly share code, notes, and snippets.

@zblanco
Last active September 26, 2023 15:30
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save zblanco/342b2fa1a510d1c41da2dcba3a3b6bf5 to your computer and use it in GitHub Desktop.
Save zblanco/342b2fa1a510d1c41da2dcba3a3b6bf5 to your computer and use it in GitHub Desktop.
Dagger Demos 5/3/22

Dagger Demos

Summary

Dagger is a tool for modeling your workflows as data that can be composed together at runtime.

Dagger constructs can be integrated into a Dagger.Workflow and evaluated lazily in concurrent contexts.

Dagger Workflows are a decorated dataflow graph (a DAG - "directed acyclic graph") of your code that can model your rules, pipelines, and state machines.

Basic data flow dependencies such as in a pipeline are modeled as %Step{} structs (nodes/vertices) in the graph with directed edges (arrows) between steps.

Steps can be thought of as a basic input -> output lambda function.

As Facts are fed through a workflow, various steps are traversed to as needed and activated producing more Facts.

Beyond steps, Dagger has support for Rules, Joins, and State Machines for conditional, fan in/out flow, and stateful evaluation.

Dagger is targeting use cases for rule based expert systems, state machines, dynamic data pipelines, and more.

Warning!

This library is alpha software and a Work in Progress

These APIs are not stable and are likely to change. Use at your own risk.

Make it work (You are here) -> Make it Right -> Make it beautiful

Deps / Setup

Mix.install([
  {:kino, "~> 0.5.2"},
  {:dagger, github: "zblanco/dagger", branch: "zw/stateful-eval"},
  {:expreso, "~> 0.1.1"}
])
defmodule Kino.Mermaid do
  use Kino.JS
  alias Dagger.Workflow

  def new(%Graph{} = graph) do
    graph
    |> to_mermaid()
    |> new()
  end

  def new(%Workflow{} = wrk) do
    wrk
    |> workflow_to_mermaid()
    |> new()
  end

  def new(graph) do
    Kino.JS.new(__MODULE__, graph)
  end

  def new(graph, encoder) do
    graph
    |> encoder.()
    |> new()
  end

  def workflow_to_mermaid(%Workflow{} = workflow) do
    workflow.flow
    |> Graph.edges()
    |> Enum.reduce("graph TD;", fn edge, acc ->
      v1_name = vertex_name(edge.v1)
      v2_name = vertex_name(edge.v2)

      v1_id = vertex_id(edge.v1)
      v2_id = vertex_id(edge.v2)

      acc <> "#{v1_id}([#{v1_name}])-->#{v2_id}([#{v2_name}]);"
    end)
  end

  defp vertex_id(%Workflow.Root{}), do: :root
  defp vertex_id(step), do: step.hash

  defp vertex_name(vertex) do
    case vertex do
      %Workflow.Root{} -> "root"
      %Workflow.Condition{} = c -> "cond-#{c.hash}"
      %Workflow.Conjunction{} = con -> "con-#{con.hash}"
      %Workflow.Step{} = s -> "step-#{s.hash}#{s.name |> String.replace("-", "")}"
      step -> step.hash
    end
  end

  defp edge_label(%{label: nil, v1: v1, v2: v2}) when is_atom(v1) and is_atom(v2),
    do: "\"#{v1}-#{v2}\""

  defp edge_label(%{label: {%Workflow.Root{}, hash}}), do: "root-#{hash}"
  defp edge_label(%{label: {step, hash}}), do: "\"#{step}-#{hash}\""
  defp edge_label(%{label: %Workflow.Root{}}), do: "root"
  defp edge_label(%{label: label}), do: "\"#{label}\""
  defp edge_label(%{label: {:root, hash}}), do: "\"#{:root}-#{hash}\""
  defp edge_label(%{label: {step_name, hash}}), do: "\"#{step_name}-#{hash}\""

  def to_mermaid(%Graph{type: :directed} = graph) do
    Enum.reduce(Graph.edges(graph), "graph TD;", fn %{v1: v1, v2: v2} = edge, acc ->
      v1_vertex_label = vertex_label(v1)
      v2_vertex_label = vertex_label(v2)

      acc <>
        "#{v1_vertex_label}([#{v1_vertex_label}])--\"#{edge_label(edge)}\"-->#{v2_vertex_label}([#{v2_vertex_label}]);"
    end)
  end

  def to_mermaid(%Graph{type: :undirected} = graph) do
    Enum.reduce(Graph.edges(graph), "graph TD;", fn %{v1: v1, v2: v2} = edge, acc ->
      acc <> "#{v1}([#{v1}])-- \"#{edge_label(edge)}\" ---#{v2}([#{v2}]);"
    end)
  end

  defp vertex_label(%Workflow.Fact{hash: hash}), do: "fact#{hash}"
  defp vertex_label(%Workflow.Root{}), do: :root
  defp vertex_label(%{hash: hash}), do: hash
  defp vertex_label(otherwise), do: otherwise

  asset "main.js" do
    """
    import "https://cdn.jsdelivr.net/npm/mermaid@8.13.3/dist/mermaid.min.js";

    mermaid.initialize({ startOnLoad: false });

    export function init(ctx, graph) {
      mermaid.render("graph1", graph, (svgSource, bindListeners) => {
        ctx.root.innerHTML = svgSource;
        bindListeners && bindListeners(ctx.root);
      });
    }
    """
  end
end

Dataflow Pipelines

require Dagger
import Dagger
alias Dagger.Workflow
nested_step_workflow =
  workflow(
    name: "a test workflow with dependent steps",
    steps: [
      {step(fn x -> x * x end, name: "squarifier"),
       [
         step(fn x -> x * -1 end, name: "negator"),
         step(fn x -> x * 2 end, name: "doubler")
       ]},
      {step(fn x -> x * 2 end, name: "doubler"),
       [
         {step(fn x -> x * 2 end, name: "doubler"),
          [
            step(fn x -> x * 2 end, name: "doubler"),
            step(fn x -> x * -1 end, name: "negator")
          ]}
       ]}
    ]
  )

nested_step_workflow
|> Kino.Mermaid.new()
defmodule TextProcessing do
  def tokenize(text) do
    text
    |> String.downcase()
    |> String.split(~R/[^[:alnum:]\-]/u, trim: true)
  end

  def count_words(list_of_words) do
    list_of_words
    |> Enum.reduce(Map.new(), fn word, map ->
      Map.update(map, word, 1, &(&1 + 1))
    end)
  end

  def count_uniques(word_count) do
    Enum.count(word_count)
  end

  def first_word(list_of_words) do
    List.first(list_of_words)
  end

  def last_word(list_of_words) do
    List.last(list_of_words)
  end
end
text_processing_pipeline =
  workflow(
    name: "basic text processing example",
    steps: [
      {step(name: "tokenize", work: &TextProcessing.tokenize/1),
       [
         {step(name: "count words", work: &TextProcessing.count_words/1),
          [
            step(name: "count unique words", work: &TextProcessing.count_uniques/1)
          ]},
         step(name: "first word", work: &TextProcessing.first_word/1),
         step(name: "last word", work: &TextProcessing.last_word/1)
       ]}
    ]
  )

text_processing_pipeline
|> Kino.Mermaid.new()

Eager evaluation

text_processing_pipeline
|> Workflow.plan_eagerly("anybody want a peanut?")
|> Workflow.react_until_satisfied()
|> Workflow.raw_reactions()

Lazy Evaluation

text_processing_pipeline
|> Workflow.plan("anybody want a peanut?")
|> Workflow.next_runnables()

Stateless Rules

alias Dagger.Workflow.Rule

Rules can be built from complex guard clauses and pattern matches

rule_from_guard_logic =
  Dagger.rule(
    fn
      term
      when (term in [:potato, "potato"] and
              term != "tomato") or
             (binary_part(term, 0, 4) == "pota" or
                (is_atom(term) and term == :potato)) ->
        "boil em mash em stick em in a stew. Po Tay Toes."
    end,
    name: "rule from guard"
  )

rule_from_guard_logic.workflow
|> Kino.Mermaid.new()
Rule.check(rule_from_guard_logic, "potato")
Rule.run(rule_from_guard_logic, "potato")

Rules can be made from captured functions

defmodule Potatoes do
  def is_potato?(:potato), do: true
  def is_potato?("potato"), do: true
end
potato_rule =
  rule(
    condition: &Potatoes.is_potato?/1,
    reaction: "it is a valid potato"
  )

potato_rule.workflow
|> Kino.Mermaid.new()

Rules can be composed from a list of boolean functions

rule_from_list_of_conditions =
  Dagger.rule(
    condition: [
      # when this is true the rest are also always true (how to identify generic cases like this to reduce conditional evaluations when possible?)
      fn term -> term == "potato" end,
      # generic guard - this check should end up child to the root in most cases
      &is_binary/1,
      # stronger check than is_binary or not is_integer but still unecessary if is_potato? or anonymous variation passes
      fn term -> String.length(term) == 6 end,
      # weaker check that just filters out integers - lower priority - can be avoided in most cases
      fn term when not is_integer(term) -> true end,
      # captured function with 2 patterns
      &Examples.is_potato?/1
    ],
    reaction: "potato",
    name: "rule from list of conditions"
  )

rule_from_list_of_conditions.workflow
|> Kino.Mermaid.new()

Workflows can be composed of many "flowable" pieces during construction

composed_workflow =
  Dagger.workflow(
    name: "composition of rules example",
    rules: [
      Dagger.rule(
        fn
          :potato -> "potato!"
        end,
        name: "is it a potato?"
      ),
      Dagger.rule(
        fn item when is_integer(item) and item > 41 and item < 43 ->
          Enum.random(1..10)
        end,
        name: "when 42 random"
      )
    ]
  )

Kino.Mermaid.new(composed_workflow)

Or merged together under protocols

rule_composition_workflow =
  Enum.reduce(
    [
      rule_from_guard_logic,
      potato_rule,
      rule_from_list_of_conditions
    ],
    Workflow.new("rule composition workflow"),
    fn rule, wrk ->
      Workflow.merge(wrk, Dagger.Flowable.to_workflow(rule))
    end
  )

rule_composition_workflow
|> Kino.Mermaid.new()

Joins, State and Causal/Mergeable Memory

Basic pipeline memory structure

wrk =
  Dagger.workflow(
    name: "test",
    steps: [
      Dagger.step(fn num -> num * 2 end),
      Dagger.step(fn num -> num * 3 end),
      Dagger.step(fn num -> num * 4 end)
    ]
  )

wrk =
  wrk
  |> Workflow.plan_eagerly(10)
  |> Workflow.react()

wrk.memory
|> Kino.Mermaid.new()

Match phase memory structure for a rule with many conditions

some_rule =
  Dagger.rule(fn item when is_integer(item) and item > 41 and item < 43 -> "fourty two" end,
    name: "rule1"
  )

some_rule.workflow
|> Kino.Mermaid.new()
some_rule.workflow
|> Workflow.plan_eagerly(45)
|> Map.get(:memory)
|> Kino.Mermaid.new()
some_rule.workflow
|> Workflow.plan_eagerly(42)
|> Workflow.react()
|> Map.get(:memory)
|> Kino.Mermaid.new()

Our memory structure is a graph centering on %Fact{} vertices.

From our fact vertices we draw labeled edges to steps (in this case just hashes of the step).

Different kinds of vertices within a flow such as %Condition{} or %Conjunction{} have different implementations of the Dagger.Workflow.Activation protocol.

The activation protocol enforces a state monad / token approach where implementations always returns a new %Workflow{}.

In the case of concurrently activated steps for a given workflow the trick is ensuring that workflows, even those who've gone through a lifecycle of reactions, can be merged.

These labeled edges represent causal relationships of interactions which have occurred so far in the workflow's execution.

Merging these edges means ensuring that if one workflow executed a step another hasn't that the labels are set to the most advanced workflow's progress.

See Forward Chaining With State Monad or Retex for more.

import Dagger

wrk =
  workflow(
    name: "merge test",
    steps: [
      {step(fn num -> num + 1 end),
       [
         step(fn num -> num + 2 end),
         step(fn num -> num + 4 end)
       ]}
    ]
  )
  |> Workflow.react(2)

[{step_1, fact_1}, {step_2, fact_2} | _] = Workflow.next_runnables(wrk)

new_wrk_1 = Workflow.Activation.activate(step_1, wrk, fact_1)
new_wrk_2 = Workflow.Activation.activate(step_2, wrk, fact_2)
new_wrk_1.memory
|> Kino.Mermaid.new()
new_wrk_2.memory
|> Kino.Mermaid.new()
merged_wrk = Workflow.merge(new_wrk_1, new_wrk_2)

merged_wrk.memory
|> Kino.Mermaid.new()

~ The Future ~

  • State machines (Soon tm)
  • 0.1.0 Hex release (eventually)
  • Sagas
  • CQRS / ES / DDD models:
    • Command / Event Handlers, Aggregates, Process Managers
  • API for describing pure vs impure steps
  • Consistent hashing, identity of functions:
    • Merkles and Trees for aggregate identities
    • Identity by contract where any step that conforms is ☑
  • Property based tests to find the many bugs I haven't thought to test yet
  • Contracts
  • Pluggable evaluation / compilation?
  • The "it depends" runtime bits i.e. "the fun parts"
    • Dynamic Process Topologies
    • Single node / Dynamic Supervisor + Registry
    • Distributed workflow execution
    • Oban, Broadway, Flow adapters/convenience
  • UI / Builders
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment