Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rlb3/a7bbd1081bc901d15acedec8e94e098a to your computer and use it in GitHub Desktop.
Save rlb3/a7bbd1081bc901d15acedec8e94e098a to your computer and use it in GitHub Desktop.
Getting Lazy with Dataflow Graphs in Elixir

Getting Lazy with Dataflow Graphs in Elixir

Intro

What do Tensorflow, Apache Airflow, Rule Engines, and Excel have in common?

Under the hood they all use DAGs to model data-flow dependencies of the program. Using graphs to model programs is great because you can modify the program at runtime. Lets talk about doing this in Elixir for great good.

Graph data structures have many use cases in computing. We'll focus on using Directed Acyclic Graphs (DAGs) for forward chaining dataflow models.

This notebook illustrates how to use dataflow graphs in Elixir tackle tricky problems like expert systems / rule engines, dynamic data pipelines and more.

What are Graphs?

graph png

A graph is

A graph data structure consists of a finite (and possibly mutable) set of vertices (also called nodes or points), together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph. These pairs are known as edges (also called links or lines), and for a directed graph are also known as edges but also sometimes arrows or arcs.

TLDR;

A graph is circles and lines. Nodes and connections. Vertices and edges.

What about DAGs then?

DAG stands for Directed Acyclic Graph

i.e. Circles with lines that have an arrow between them.

The "acyclic" means no "cycles" i.e. there cannot be a "loop back" where an arrow takes you back up the graph where you've already been.

DAGs / Dataflow Graphs are everywhere in computing

Bringing the program into the data

The common use case for DAGs in dataflow models is to represent parts of your program as data where it can be transorfmed and composed at runtime.

There are other ways we can represent a program at runtime like with a list of functions.

The advantage of modeling the program as data is it allows for deferred execution. The caller can choose how and when to execute the function(s).

Representation of program as data

"Using a datastructure to hold a program/instructions for later use"

funs = [
  fn num -> num + 1 end,
  fn num -> num * 2 end,
  fn num -> num - 1 end
]

Evaluation

"Using that program-as-data to do stuff at runtime"

Enum.map(funs, & &1.(2))
funs
|> Enum.map(& &1.(2))
|> Enum.sum()

Deps/Setup

Mix.install([
  {:kino, "~> 0.5.2"},
  {:libgraph, github: "bitwalker/libgraph", branch: "main"},
  {:req, "~> 0.2.0"},
  {:jason, "~> 1.1.0"},
  {:floki, "~> 0.32.0"}
])

Visualizations: Graph -> Mermaid

defmodule Kino.Mermaid do
  use Kino.JS

  def new(%Graph{} = graph) do
    graph
    |> to_mermaid()
    |> new()
  end

  def new(graph) do
    Kino.JS.new(__MODULE__, graph)
  end

  def to_mermaid(%Graph{} = graph) do
    Enum.reduce(
      Graph.edges(graph),
      "graph TD;",
      fn
        %{v1: v1, v2: v2}, acc ->
          acc <>
            "#{vertex_id(v1)}([#{vertex_name(v1)}])-->#{vertex_id(v2)}([#{vertex_name(v2)}]);"
      end
    )
  end

  defp vertex_name(:root), do: :root
  defp vertex_name(%{work: fun}), do: "step_#{fun_name(fun)}"
  defp vertex_name(%{check: fun}), do: "cond_#{fun_name(fun)}"
  defp vertex_name(fun) when is_function(fun), do: "fun_#{fun_name(fun)}-#{:erlang.phash2(fun)}"
  defp vertex_name(vertex) when is_atom(vertex), do: to_string(vertex)

  defp vertex_name(otherwise) do
    try do
      to_string(otherwise)
    catch
      _any -> :erlang.phash2(otherwise)
    end
  end

  defp vertex_id(thing) when is_atom(thing), do: to_string(thing)
  defp vertex_id(thing), do: :erlang.phash2(thing)

  defp fun_name(fun), do: Function.info(fun, :name) |> elem(1)

  asset "main.js" do
    """
    import "https://cdn.jsdelivr.net/npm/mermaid@8.13.3/dist/mermaid.min.js";

    mermaid.initialize({ startOnLoad: false });

    export function init(ctx, graph) {
      mermaid.render("graph1", graph, (svgSource, bindListeners) => {
        ctx.root.innerHTML = svgSource;
        bindListeners && bindListeners(ctx.root);
      });
    }
    """
  end
end

Making a DAG

g =
  Graph.new(type: :directed)
  |> Graph.add_edges([
    {:a, :b},
    {:a, :c},
    {:b, :d}
  ])

g
|> Kino.Mermaid.new()

Making a dataflow graph of lambdas

If functions are just data and graphs are data structures - what if we put functions in our graphs?

fun_1 = fn num -> num * 2 end
fun_2 = fn num -> num + 2 end
fun_3 = fn num -> num * 42 end
fun_4 = fn num -> num + 42 end

fun_dag =
  Graph.new(type: :directed)
  |> Graph.add_edges([
    {:root, fun_1},
    {fun_1, fun_2},
    {fun_1, fun_3},
    {fun_2, fun_4}
  ])

fun_dag
|> Kino.Mermaid.new()

Evaluation of a DAG

Now that we've got lambda functions in our dag - how to we evaluate it?

  1. start at the top and run that function with the input to get a new output
  2. find the next step(s) by traveling down the arrows (i.e. the out_neighbors)
  3. feed the output from the previous step into the next steps
  4. Profit

TLDR; "Travel down the arrows"

fun_dag_input = Kino.Input.number("Num")
input = Kino.Input.read(fun_dag_input)

Finding steps off of the root node

neighbors_of_root = Graph.out_neighbors(fun_dag, :root)

Building lazy "runnables" by pairing the function and its input to run with

first_runnables = Enum.map(neighbors_of_root, fn fun -> {fun, input} end)

Executing our runnables

first_result =
  first_runnables
  |> Enum.map(fn {fun, input} -> fun.(input) end)
  |> List.first()

Executing the next layer

fun_from_first_runnable = first_runnables |> List.first() |> elem(0)

neighbors_of_first_runnable =
  fun_dag
  |> Graph.out_neighbors(fun_from_first_runnable)
  |> Enum.map(fn fun -> {fun, first_result} end)
  |> Enum.map(fn {fun, input} ->
    fun.(input)
  end)

Executing the next layer in parallel

neighbors_of_first_runnable =
  fun_dag
  |> Graph.out_neighbors(fun_from_first_runnable)
  |> Task.async_stream(fn fun
    fun.(first_result)
  end)
  |> Enum.map(fn {:ok, result} -> result end)

Changing our DAG

fun_dag
|> Kino.Mermaid.new()
new_fun_dag = Graph.add_edge(fun_dag, fun_from_first_runnable, fn _num -> 42 end)

new_fun_dag
|> Kino.Mermaid.new()

What Else?

What other features would a DAG / Pipeline buider tool like this want?

Joins

Steps with more than 1 parent dependency.

Rules

Steps with constraints controlling when to execute.

State

Accumulating data to act on or control behavior.

Joins

join_example =
  Graph.new()
  |> Graph.add_edges([
    {:root, :step_a},
    {:root, :step_b},
    {:step_a, :step_c},
    {:step_b, :step_c}
  ])

join_example
|> Kino.Mermaid.new()

Here we can only execute step_c when both step_a and step_b have resolved.

This means we have to keep intermediate results somewhere in memory so we can grab the result of either a or b and use both to invoke step c.

Rules

A rule is a step with a constraint on when to execute it.

A function is kind of like a rule where fn :potato -> 42 end will only return 42 when given :potato.

The usual verbiage is the lhs and rhs or condition and reaction.

Rules engines are the OG AI solving business problems since the 60's.

Rules are common abstractions for the representation of expert knowledge.

They're handy because they're easily composed, evaluated, stored, and organized.

Resources to check out for rule based systems

Compiling and Evaluating Rules

Say we want to make a graph with our example rule from before:

fn :potato -> 42 end

We know we need to evaluate the lhs before the rhs and only evaluate the rhs if the lhs returns true.

We can do this by breaking out our one rule into the two pieces:

lhs = fn :potato -> true end
rhs = fn _any -> 42 end

Now we need to get these into our DAG and since the lhs has to run first - we'll make sure the arrow goes lhs -> rhs.

potato_rule =
  Graph.new()
  |> Graph.add_edges([
    {:is_potato?, 42}
  ])

potato_rule
|> Kino.Mermaid.new()

Composition of Rules

composed_rule =
  Graph.new()
  |> Graph.add_edges([
    {:root, :is_potato?},
    {:is_potato?, :boil_em_mash_em},
    {:root, :is_ham?},
    {:is_ham?, :it_is_delicious},
    {:root, :is_beet?},
    {:is_beet?, :good_in_salad}
  ])

composed_rule
|> Kino.Mermaid.new()

Now we could give our our Graph of Rules some inputs and get results like

> :ham
:it_is_delicious
> :potato
boil_em_mash_em
> :beet
:good_in_salad

Conditional Expansion

For Elixir and Erlang function calls - we're scoped to that module, function, and arity when evaluating. This has advantages because it means we can evaluate patterns top to bottom executing the RHS of whichever pattern matches first.

In these cases - outside of optimizing your pattern matches you don't really need or want conditional expansion.

However in the case of the evaluation of many rules where the conditions might overlap - its often useful to expand the expressions within the LHS into smaller conditions bound together in the graph.

Here's an example of the initial approach of expanding just the LHS and RHS.

lhs = fn input when (input == :potato and input != :ham) or input == "potato" -> true end
rhs = fn _any -> f2 end

But if we also bring in another rule such as

other_rule = fn :potato -> 42 end

We might be matching against :potato twice for the same input and do unecessary work.

So if we're composing many rules with like conditions for the same execution context we can do this by expanding each individual expression as its own condition.

fn input
   when (input == :potato and input != :ham) or
          input == "potato" ->
  42
end

# full expansion
lhs_1 = fn input when input == :potato -> true end
lhs_1 = fn input when input != :ham -> true end
lhs_or = fn input == "potato" -> true end
expanded_rule =
  Graph.new()
  |> Graph.add_edges([
    {:root, :lhs_1},
    {:root, :lhs_2},
    {:root, :other_path},
    {:lhs_1, :conjunction_2_and_1},
    {:lhs_2, :conjunction_2_and_1},
    {:conjunction_2_and_1, :rhs},
    {:other_path, :rhs}
  ])

expanded_rule
|> Kino.Mermaid.new()

State Machines

Joining ongoing expressions is stateful - sort of.

To ensure our step dependencies are resolved we have to hold its dependents in memory and wait until the final step can be executed.

A DAG of Lambda functions are easy to evaluate because they're stateless meaning our scheduler can blindly parallel map over steps keeping only the 1 step in context.

potato_lock =
  Dagger.state_machine(
    init: %{code: "potato", state: :locked, contents: "ham"},
    reducer: fn
      :lock, state ->
        %{state | state: :locked}

      {:unlock, input_code}, %{code: code, state: :locked} = state when input_code == code ->
        %{state | state: :unlocked}

      _input_code, %{state: :unlocked} ->
        state
    end,
    reactors: [
      fn %{state: :unlocked, contents: contents} -> contents end,
      fn %{state: :locked} -> {:error, :locked} end
    ]
  )
lock_state_machine =
  Graph.new()
  |> Graph.add_edges([
    {:root, :arity_check_1},
    {:arity_check_1, :is_lock_command?},
    {:arity_check_1, :is_unlock_command?},
    {:arity_check_1, :is_code_correct?},
    {:is_lock_command?, :lock_conjunction},
    {:is_unlock_command?, :lock_conjunction},
    {:is_code_correct?, :lock_conjunction},
    {:lock_conjunction, :lock_reducer},
    {:root, :reactor_lhs_clause_1_matched?},
    {:root, :reactor_lhs_clause_2_matched?},
    {:reactor_lhs_clause_1_matched?, :reactor_rhs_1},
    {:reactor_lhs_clause_2_matched?, :reactor_rhs_2}
  ])

Kino.Mermaid.new(lock_state_machine)

DIY Text Processing

defmodule TextProcessing do
  def tokenize(text) do
    text
    |> String.downcase()
    |> String.split(~R/[^[:alnum:]\-]/u, trim: true)
  end

  def count_words(list_of_words) do
    list_of_words
    |> Enum.reduce(Map.new(), fn word, map ->
      Map.update(map, word, 1, &(&1 + 1))
    end)
  end

  def count_uniques(word_count) do
    Enum.count(word_count)
  end

  def first_word(list_of_words) do
    List.first(list_of_words)
  end

  def last_word(list_of_words) do
    List.last(list_of_words)
  end
end
text = "boil em mash em stick em in a stew. Po Tay Toes."

text
|> TextProcessing.tokenize()
|> TextProcessing.count_words()
|> TextProcessing.count_uniques()
text
|> TextProcessing.tokenize()
|> TextProcessing.count_words()
text
|> TextProcessing.tokenize()
|> TextProcessing.first_word()
text
|> TextProcessing.tokenize()
|> TextProcessing.last_word()

We tokenized 3 times and counted words twice!

What if we made a Pipeline module so we can pipeline while we pipeline?

Pipe all the things

defmodule Fact do
  defstruct ~w(
    value
    runnable
  )a
end

defmodule Step do
  defstruct ~w(
    work
  )a

  def new(work), do: %__MODULE__{work: work}

  def run(%__MODULE__{work: work} = step, %Fact{value: input} = input_fact) do
    %Fact{value: apply(work, [input]), runnable: {step, input_fact}}
  end

  def run(%__MODULE__{work: work} = step, input) do
    %Fact{value: apply(work, [input]), runnable: {step, %Fact{value: input}}}
  end
end

defmodule Rule do
  defstruct ~w(
    lhs
    rhs
  )a

  def new(lhs, rhs) do
    %__MODULE__{lhs: lhs, rhs: rhs}
  end

  def check(%__MODULE__{} = rule, input) do
    apply(rule.lhs, [input])
  end

  def run(%__MODULE__{} = rule, input) do
    if check(rule, input) do
      apply(rule.rhs, input)
    end
  end

  def to_pipeline(%__MODULE__{} = rule) do
    Pipeline.new()
    |> Pipeline.add_step(Condition.new(rule.lhs), Step.new(rule.rhs))
  end
end

defmodule Condition do
  defstruct ~w(
    check
  )a

  def new(check), do: %__MODULE__{check: check}

  def run(%Condition{} = condition, %Fact{} = fact) do
    %Fact{value: run(condition.check, fact.value)}
  end

  def run(condition, input) do
    with true <- apply(condition.check, [input]) do
      true
    else
      _otherise ->
        false
    end
  end
end

defmodule Pipeline do
  defstruct ~w(
    flow
    facts
  )a

  def new() do
    %__MODULE__{
      flow: Graph.new(type: :directed) |> Graph.add_vertex(:root),
      facts: []
    }
  end

  def run({%Step{} = step, %Fact{} = fact} = _runnable) do
    Step.run(step, fact)
  end

  def add_step(%__MODULE__{flow: flow} = pipeline, step) do
    %__MODULE__{pipeline | flow: Graph.add_edge(flow, :root, step)}
  end

  def add_step(%__MODULE__{} = pipeline, fun) when is_function(fun) do
    add_step(pipeline, Step.new(fun))
  end

  def add_step(%__MODULE__{flow: flow} = pipeline, parent_step, child_step) do
    if Graph.has_vertex?(flow, parent_step) do
      %__MODULE__{pipeline | flow: Graph.add_edge(flow, parent_step, child_step)}
    else
      pipeline
      |> add_step(parent_step)
      |> add_step(parent_step, child_step)
    end
  end

  @doc """
  Merges the second pipeline into the first.
  """
  def merge(%__MODULE__{} = pipeline_1, %__MODULE__{} = pipeline_2) do
    new_flow =
      Enum.reduce(Graph.edges(pipeline_2.flow), pipeline_1.flow, fn edge, flow ->
        Graph.add_edge(flow, edge)
      end)

    %__MODULE__{pipeline_1 | flow: new_flow}
  end

  def next_steps(%__MODULE__{flow: flow}, step) do
    Graph.out_neighbors(flow, step)
  end

  def next_runnables(
        %__MODULE__{} = pipeline,
        %Fact{runnable: {previous_step, _previous_input}} = fact
      ) do
    pipeline
    |> next_steps(previous_step)
    |> Enum.map(fn step ->
      {step, fact}
    end)
  end

  @doc """
  Returns a list of runnables (`work, input` pairs).
  """
  def next_runnables(%__MODULE__{} = pipeline, some_input) do
    pipeline
    |> next_steps(:root)
    |> Enum.map(fn step ->
      {step, %Fact{value: some_input}}
    end)
  end
end
tokenize_step = Step.new(&TextProcessing.tokenize/1)
count_words_step = Step.new(&TextProcessing.count_words/1)
count_uniques_step = Step.new(&TextProcessing.count_uniques/1)
first_word_step = Step.new(&TextProcessing.first_word/1)
last_word_step = Step.new(&TextProcessing.last_word/1)

text_processing_pipeline =
  Pipeline.new()
  |> Pipeline.add_step(tokenize_step)
  |> Pipeline.add_step(tokenize_step, count_words_step)
  |> Pipeline.add_step(tokenize_step, first_word_step)
  |> Pipeline.add_step(tokenize_step, last_word_step)
  |> Pipeline.add_step(count_words_step, count_uniques_step)

text_processing_pipeline.flow
|> Kino.Mermaid.new()
next_runnables = Pipeline.next_runnables(text_processing_pipeline, text)
results_1 =
  next_runnables
  |> Enum.map(fn runnable ->
    Pipeline.run(runnable)
  end)
next_runnables =
  results_1
  |> Enum.flat_map(fn %Fact{} = fact ->
    Pipeline.next_runnables(text_processing_pipeline, fact)
  end)

results_2 =
  next_runnables
  |> Enum.map(&Pipeline.run/1)

Resources

Videos

Papers

Cool Projects

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment