Skip to content

Instantly share code, notes, and snippets.

@colinbankier
Last active September 24, 2015 08:20
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save colinbankier/a3f2eb7642b94c0d610a to your computer and use it in GitHub Desktop.
Save colinbankier/a3f2eb7642b94c0d610a to your computer and use it in GitHub Desktop.

Concurrency: the reason we're here

A bit of background

In Elixir, all code runs inside processes. Processes are isolated from each other, run concurrent to one another and communicate via message passing. Processes are not only the basis for concurrency in Elixir, but they also provide the means for building distributed and fault-tolerant programs.

Elixir’s processes should not be confused with operating system processes. Processes in Elixir are extremely lightweight in terms of memory and CPU (unlike threads in many other programming languages). Because of this, it is not uncommon to have tens or even hundreds of thousands of processes running simultaneously.

Processes

The most basic mechanism for creating processes is using the spawn/1 function:

spawn fn -> IO.puts "Hello from another process" end

You'll notice a PID was returned - this is the way you identify processes. We'll see a little later how these can be used to communicate with a process. The spawn function returns immediately and execution in the current process continues. Let's try that out:

IO.puts "About to spawn..."

spawn fn ->
  :timer.sleep 2000
  IO.puts "Hello from another process"
end

IO.puts "Spawned"

Running things in parallel

Let's pretend we need to run a few long-running database queries, here is a function that fakes some long running task.

run_query = fn query_arg ->
  :timer.sleep 2000
  "#{query_arg} result"
end

This defines an anonymous function that pretends to take a long time. If we run this query 5 times, it will take 10 seconds to get the resuls.

1..5 |> Enum.map(&run_query.("query #{&1}"))

Let's try make this more performant by doing things concurrently. We'll define a function that runs our query asyncronously.

async_query = fn query_arg ->
  spawn(fn -> IO.puts(run_query.(query_arg)) end)
end

async_query.("query 1")

Notice that the query arg is passed to the new process via the closure mechanism. This data is actually deep copied to the new process as processes can't share memory. Let's try run five queries in parallel.

1..5 |> Enum.each(&async_query.("query #{&1}"))

That's a 5 fold performance increase right there :). You'll notice that because we fire these queries off asynchronously, we don't get the results in the calling process. For that, we can use message passing - the primary way to communicate between processes in Elixir.

Message passing

Processes in Elixir can't use shared data structure to share knowledge. It is this complete isolation that gives them their excellent fault-tolerant characteristics. Instead of sharing memory, processes communicate via asynchronous message passing. The content of a message can be any elixir term.

You use the send function to send a message to a process, identified by a PID. On the other side, you use the receive function to, well, receive messages. Let's test this out by sending a message to ourselves. The self function returns the PID of the current process.

send(self, "a message")

receive do
  message -> IO.inspect message
end

You can use pattern matching to handle specific messages:

send(self, {:message, 1})

receive do
  {:message, id} ->
    IO.puts "received message #{id}"
end

If a message can't be matched, or there are no messages in the process mailbox, receive will block waiting. This will lock your iex session because there are no messages in the mailbox and you'll have to force kill it:

receive do
  message -> IO.inspect message
end

You can, however, specify an after block to handle this:

receive do
  message -> IO.inspect message
  after 5000 -> IO.puts "message not received"
end

Getting results from our parallel queries

Instead of just printing our query results, let's send them back to the calling process.

async_query = fn(query_arg) ->
  caller = self

  spawn(fn ->
    send(caller, {:query_result, run_query.(query_arg)})
  end)
end

Let's fire off our queries again:

1..5 |> Enum.each(&async_query.("query #{&1}"))

And let's define a function to receive our results:

get_result = fn ->
  receive do
    {:query_result, result} -> result
  end
end

Now we can collect all our results into a single list:

results = 1..5 |> Enum.map(fn _ -> get_result.() end)

Notice the results arrive in a non-deterministic order - because it's not certain which of our queries will finish first. This is basically a bare-bones implementation of a parallel map. Here is a re-cap of the overall flow:

1..5 |>
Enum.map(&async_query.("query #{&1}")) |>
Enum.map(fn(_) -> get_result.() end)

Perhaps as a later excercise we can iterate on this idea to produce a result set in the same order as our initial queries.

Links and Monitors

In many cases, we need to be able take some action when one of our processes dies. Let's see what happens by default when an error happens in one of our spawned processes:

spawn fn -> raise "oops" end

An error was logged, but our spawning process was unaffected - because processes are isolated. Commonly, we actually want a failure in a spawned process to propogate back to its parent. For this, we can use spawn_link instead.

spawn_link fn -> raise "oops" end

In this case our spawning process also terminated - it just happens that iex is kind to us and doesn't exit altogether, but if you look at the prompt you'll notice it has actually restarted.

You can also just receive a message when a process dies, called 'trapping the exit'.

Process.flag :trap_exit, true
spawn_link fn -> raise "oops" end
receive do
  msg -> IO.puts "Got message #{inspect msg}"
end

In practice, however, you'll rarely ever care about doing this. A little later we'll look at Supervisors which take care of this sort of thing for us in a much more convenient way.

One notable aspect of links is that both processes are linked - if the spawning process dies, the child process is taken down as well. This is important to clean up rogue processes when tearing down and restarting part of a system. Let's try something:

spawn fn ->
  spawn_link fn ->
    :timer.sleep 1000
    IO.puts "Hello from the inner process"
  end
  exit :boom
end

In this example, we never see our output. Why not? Because the linked process is taken down before it can print anything when the outer function exits.

Monitors, on the other hand, are one way. You can monitor a process to be notified of it's exit, but if the monitoring process dies, the monitored process is unaffected. This is helpful for keeping an eye on processes the current process doesn't own.

spawn fn ->
  spawn_monitor fn ->
    :timer.sleep 1000
    IO.puts "Hello from the inner process"
  end
  exit :boom
end

This time, we should see our message printed out.

Tasks

Spawning processes direcly isn't too tedious for simple tasks, but Elixir provides the Task module to make it even more convenient.

One thing it does it wrap error messages, so instead of getting an error directly from the erlang VM when a process dies, Task shows an elixir stack track so you more clearly see what actually went wrong. Tasks can also be used directly in supervision trees, which is not possible with raw processes without additional boiler-plate code. Finally, Task provides some convenience functions like Task.async and Task.await.

Let's see how our previous example of message passing is simplified with Task. Recall this block of code to run our async queries:

async_query = fn(query_arg) ->
  caller = self

  spawn(fn ->
    send(caller, {:query_result, run_query.(query_arg)})
  end)
end

get_result = fn ->
  receive do
    {:query_result, result} -> result
  end
end

async_query.("query 1")
get_result.()

Let's reimplement that with Task.

Task.async(fn -> run_query.("query 1") end) |> Task.await

Stateful Server Processes

In Elixir lingo, a server process is a process that runs for a long time. It's a common pattern to use server processes to maintain state in your application. Let's explore how we would create such a thing from basic primitives.

Let's implement the next key-value store to take the world by storm. It's gonna be called Redix. Now let's create a file redix_server.ex with the following:

defmodule RedixServer do
  def start do
    initial_state = %{}
    spawn(fn -> loop(initial_state) end)
  end

  defp loop state do
    new_state = receive do
      {:get, key, caller} ->
        send caller, {:response, Map.get(state, key)}
        state
      {:put, key, value} ->
        Map.put(state, key, value)
    end

    loop new_state
  end
end

This shows the basic structure of a server - a start function that kicks off an infinite loop. Each loop receives a message from the process maillbox, does something with it, and loops again. The loop function takes as a parameter the current state of the process. Let's try it out: Start up iex again, and compile/load our file with the c function.

c "redix_server.ex"
redix = RedixServer.start
send redix, {:put, :foo, "bar"}
send redix, {:get, :foo, self}
flush

Yay! So we have maintained some state in a long-running process. Having users of our Redix library explicitly send and receive messages to it is a little inconvenient though, it's common to wrap these details in interface functions. First, we'll register our process so we can refer to easily. Update the start function to:

def start do
  initial_state = %{}
  spawn(fn -> loop(initial_state) end)
  |> Process.register(:redix_server)

  :ok
end

Also, add the following to our redix_server:

def put key, value do
  send :redix_server, {:put, key, value}
  :ok
end

def get key do
  send :redix_server, {:get, key, self}
  receive do
    {:response, value} -> value
  end
end

Let's recompile our server and try our new interface from iex

c "redix_server.ex"
RedixServer.start
RedixServer.put :baz, "blue"
RedixServer.get :baz

If it complains that it is already started, you can just unregister or kill it:

Process.unregister(:redix_server)
#or
Process.whereis(:redix_server) |> Process.exit(:bye)

There we go! We've implemented a stateful server from base concepts. Processes used in this way resemble objects in OO programming. There are a few key differences of course. Processes are concurrent - so multiple processes run in parallel.

Although the interface we implemented above exposes a kind of mutable state, internally the server state uses immutable data structures. Using this server is entirely thread-safe - we could have 1000 clients reading and writing values to our server, and never need to think about mutexes and locks.

A notable aspect of servers using this pattern is that a single server is fact single-threaded - it processes messages from it's mailbox in sequence. This is a good thing - and in Elixir systems this trait is used as an intentional synchonisation point. One example where this might be used is a database client library where a limited number of connections are to be maintained to a database server, rather than spawning an unlimited number of processes for each query. A pool of server processes with a limited size would be used to manage these connections. Not understanding this trait, however, can lead to unintentional bottlenecks intoduced into your system.

Agent

Our Redix server didn't get too complicated yet, but the pattern of storing state is common enough that elixir provides a Module to make things even more convenient: Agent. Let's reimplement our RedixServer, this time backed by Agent. Create a new file redix_server_agent.ex

defmodule RedixServer do
  def start_link do
    Agent.start_link(fn -> %{} end, name: :redix_server)
  end

  def put key, value do
    Agent.update(:redix_server, &Map.put(&1, key, value))
  end

  def get key do
    Agent.get(:redix_server, &Map.get(&1, key))
  end
end

Easy eh? We don't need to worry about the infinite loop and all that jazz, Agent implements that pattern for us.

Compile the file and have a play with setting and getting some keys, like we did before:

c "redix_server_agent.ex"
RedixServer.start
RedixServer.put :baz, "blue"
RedixServer.get :baz

GenServer

Sometimes we need to build server processes that give us more flexibility that what Agent provides. Agent is actually built on a construct called a gen_server. gen_server is one of constructs provided by a library call OTP, which is part of the erlang standard library.

OTP provides helper modules and functions, which it calls behaviours, that provide a lot erlangs amazing distributed capabilities. There are a few predefined behaviours:

  • gen_server: Generic implementation of a stateful server process
  • supervisor: Provides error handling and recovery in concurrent systems
  • application: Generic implementation of components and libraries
  • gen_event: Provides event-handling support
  • gen_fsm: Runs a finite state machine in a stateful server process

Each of these uses the building blocks we have explored so far to provide re-usable behaviours. Digging deeper into each of these is a topic for another day, but we'll breifly look at how to implement our RedixServer on top of GenServer.

defmodule RedixServer do
 use GenServer

 def start_link do
   GenServer.start_link(RedixServer, %{}, name: :redix_server)
 end

 def put(key, value) do
   GenServer.cast(:redix_server, {:put, key, value})
 end

 def get(key) do
   GenServer.call(:redix_server, {:get, key})
 end

 def init(_) do
   {:ok, HashDict.new}
 end

 def handle_cast({:put, key, value}, state) do
   {:noreply, Map.put(state, key, value)}
 end

 def handle_call({:get, key}, _, state) do
   {:reply, Map.get(state, key), state}
 end
end

You'll note this is a bit more verbose than the Agent version - that's why agent exists, to hide this away. Note the pattern of having both interface functions for clients to use, get, put, start, and internal functions which are callbacks defined by the gen_server behavior, handle_call, handle_cast. GenServer does a lot more than this underneath, such as support for distributed systems, customisable timeouts, and more.

Supervisors

Erlang and Elixir employ a different attitude to handling exceptions than other languages/platforms. We call it the "let it crash" philosophy. It may be counter-intuitive, but this is actually the key to building robust, self-healing systems. Instead of defensively handling every possible exception, we instead use supervisors to monitor and restart and processes that crash. Supervisor is one of the OTP behaviours that we discussed in the previous section.

Let's create a supervisor for our RedixServer. Create a file "redix_supervisor.ex"

defmodule RedixSupervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    children = [
      worker(RedixServer, [])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

Here use the Supervisor behaviour, and provide the required init callback. We add our RedixServer to the list of children to supervise, and tell it to start supervising with a strategy 'one_for_one'. What that means is basically if a worker dies, restart just that worker. There are various restart strategies that can be used in different situations - you can read about them here: http://elixir-lang.org/docs/stable/elixir/#!Supervisor.html.

Let's play with our supervisor.

# If redix server already running, you'll have to stop if first
Process.whereis(:redix_server) |> Process.exit(:bye)

RedixSupervisor.start_link

# Redix server should be up and running
RedixServer.put :foo, "bar"
RedixServer.get :foo

#Let's look at it's pid
Process.whereis(:redix_server)
#Now let's kill it by force
Process.whereis(:redix_server) |> Process.exit(:bye)
#And let's look at it's pid again...you'll see it has a different number - it has been restarted
Process.whereis(:redix_server)

# And our redix server should still be working
RedixServer.put :baz, "bing"
RedixServer.get :baz

# You'll notice though, our previous state was lost
RedixServer.get :foo # => nil

Our supervisor restarted our server, with a fresh clean state. Clearly, this is not the place to store persistent state, but what if we did want to retain this state on a restart? Erlang/Elixir do provide a mechanism for this too, which we could use in our RedixServer, called ETS (Erlang Term Storage). You can find out more here: http://elixir-lang.org/getting-started/mix-otp/ets.html.

Who watches the watchers?

You might be wondering - what happens if our supervisor crashes? A typical elixir system is made up of a supervision tree - a heirachy of supervisors that supervise other supervisors. These generally look after a particular part of the system - and so isolate failure's within just that part of the system. The supervisors should also not contain any application logic - if the only thing they do is supervise other processes, there is no reason they should crash other than a catestrophic system failure (e.g. hardware failure) - in which case there's not much that be done - except distribute your app over multiple machines. That's a topic for another day :)

Challenge

  1. When we were using processes to run our long-running queries in parallel, we got our results back in a random order. Use Task to implement a parallel map function pmap, and ensures to return the elements in the correct order.

  2. Modify your RedixServer to persist state between crashes, using ETS.

Credits

Material in this workshop stolen and modified from:

// Local Variables: // mode: elixir // End:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment