In Elixir, all code runs inside processes. Processes are isolated from each other, run concurrent to one another and communicate via message passing. Processes are not only the basis for concurrency in Elixir, but they also provide the means for building distributed and fault-tolerant programs.
Elixir’s processes should not be confused with operating system processes. Processes in Elixir are extremely lightweight in terms of memory and CPU (unlike threads in many other programming languages). Because of this, it is not uncommon to have tens or even hundreds of thousands of processes running simultaneously.
The most basic mechanism for creating processes is using the spawn/1
function:
spawn fn -> IO.puts "Hello from another process" end
You'll notice a PID was returned - this is the way you identify processes. We'll see a little later how these can be used to communicate with a process. The spawn function returns immediately and execution in the current process continues. Let's try that out:
IO.puts "About to spawn..."
spawn fn ->
:timer.sleep 2000
IO.puts "Hello from another process"
end
IO.puts "Spawned"
Let's pretend we need to run a few long-running database queries, here is a function that fakes some long running task.
run_query = fn query_arg ->
:timer.sleep 2000
"#{query_arg} result"
end
This defines an anonymous function that pretends to take a long time. If we run this query 5 times, it will take 10 seconds to get the resuls.
1..5 |> Enum.map(&run_query.("query #{&1}"))
Let's try make this more performant by doing things concurrently. We'll define a function that runs our query asyncronously.
async_query = fn query_arg ->
spawn(fn -> IO.puts(run_query.(query_arg)) end)
end
async_query.("query 1")
Notice that the query arg is passed to the new process via the closure mechanism. This data is actually deep copied to the new process as processes can't share memory. Let's try run five queries in parallel.
1..5 |> Enum.each(&async_query.("query #{&1}"))
That's a 5 fold performance increase right there :). You'll notice that because we fire these queries off asynchronously, we don't get the results in the calling process. For that, we can use message passing - the primary way to communicate between processes in Elixir.
Processes in Elixir can't use shared data structure to share knowledge. It is this complete isolation that gives them their excellent fault-tolerant characteristics. Instead of sharing memory, processes communicate via asynchronous message passing. The content of a message can be any elixir term.
You use the send
function to send a message to a process, identified by a PID. On the other side, you use the receive
function to, well, receive messages. Let's test this out by sending a message to ourselves. The self
function returns the PID of the current process.
send(self, "a message")
receive do
message -> IO.inspect message
end
You can use pattern matching to handle specific messages:
send(self, {:message, 1})
receive do
{:message, id} ->
IO.puts "received message #{id}"
end
If a message can't be matched, or there are no messages in the process mailbox, receive
will block waiting. This will lock your iex session because there are no messages in the mailbox and you'll have to force kill it:
receive do
message -> IO.inspect message
end
You can, however, specify an after
block to handle this:
receive do
message -> IO.inspect message
after 5000 -> IO.puts "message not received"
end
Instead of just printing our query results, let's send them back to the calling process.
async_query = fn(query_arg) ->
caller = self
spawn(fn ->
send(caller, {:query_result, run_query.(query_arg)})
end)
end
Let's fire off our queries again:
1..5 |> Enum.each(&async_query.("query #{&1}"))
And let's define a function to receive our results:
get_result = fn ->
receive do
{:query_result, result} -> result
end
end
Now we can collect all our results into a single list:
results = 1..5 |> Enum.map(fn _ -> get_result.() end)
Notice the results arrive in a non-deterministic order - because it's not certain which of our queries will finish first. This is basically a bare-bones implementation of a parallel map. Here is a re-cap of the overall flow:
1..5 |>
Enum.map(&async_query.("query #{&1}")) |>
Enum.map(fn(_) -> get_result.() end)
Perhaps as a later excercise we can iterate on this idea to produce a result set in the same order as our initial queries.
In many cases, we need to be able take some action when one of our processes dies. Let's see what happens by default when an error happens in one of our spawned processes:
spawn fn -> raise "oops" end
An error was logged, but our spawning process was unaffected - because processes are isolated. Commonly, we actually want a failure
in a spawned process to propogate back to its parent. For this, we can use spawn_link
instead.
spawn_link fn -> raise "oops" end
In this case our spawning process also terminated - it just happens that iex is kind to us and doesn't exit altogether, but if you look at the prompt you'll notice it has actually restarted.
You can also just receive a message when a process dies, called 'trapping the exit'.
Process.flag :trap_exit, true
spawn_link fn -> raise "oops" end
receive do
msg -> IO.puts "Got message #{inspect msg}"
end
In practice, however, you'll rarely ever care about doing this. A little later we'll look at Supervisors which take care of this sort of thing for us in a much more convenient way.
One notable aspect of links is that both processes are linked - if the spawning process dies, the child process is taken down as well. This is important to clean up rogue processes when tearing down and restarting part of a system. Let's try something:
spawn fn ->
spawn_link fn ->
:timer.sleep 1000
IO.puts "Hello from the inner process"
end
exit :boom
end
In this example, we never see our output. Why not? Because the linked process is taken down before it can print anything when the outer function exits.
Monitors, on the other hand, are one way. You can monitor a process to be notified of it's exit, but if the monitoring process dies, the monitored process is unaffected. This is helpful for keeping an eye on processes the current process doesn't own.
spawn fn ->
spawn_monitor fn ->
:timer.sleep 1000
IO.puts "Hello from the inner process"
end
exit :boom
end
This time, we should see our message printed out.
Spawning processes direcly isn't too tedious for simple tasks, but Elixir provides the Task
module to make it even more convenient.
One thing it does it wrap error messages, so instead of getting an error directly from the erlang VM when a process dies, Task
shows
an elixir stack track so you more clearly see what actually went wrong. Tasks can also be used directly in supervision trees, which is
not possible with raw processes without additional boiler-plate code. Finally, Task
provides some convenience functions like
Task.async
and Task.await
.
Let's see how our previous example of message passing is simplified with Task
.
Recall this block of code to run our async queries:
async_query = fn(query_arg) ->
caller = self
spawn(fn ->
send(caller, {:query_result, run_query.(query_arg)})
end)
end
get_result = fn ->
receive do
{:query_result, result} -> result
end
end
async_query.("query 1")
get_result.()
Let's reimplement that with Task
.
Task.async(fn -> run_query.("query 1") end) |> Task.await
In Elixir lingo, a server process is a process that runs for a long time. It's a common pattern to use server processes to maintain state in your application. Let's explore how we would create such a thing from basic primitives.
Let's implement the next key-value store to take the world by storm. It's gonna be called Redix.
Now let's create a file redix_server.ex
with the following:
defmodule RedixServer do
def start do
initial_state = %{}
spawn(fn -> loop(initial_state) end)
end
defp loop state do
new_state = receive do
{:get, key, caller} ->
send caller, {:response, Map.get(state, key)}
state
{:put, key, value} ->
Map.put(state, key, value)
end
loop new_state
end
end
This shows the basic structure of a server - a start
function that kicks off an infinite loop. Each loop receives a message from the process
maillbox, does something with it, and loops again.
The loop function takes as a parameter the current state of the process. Let's try it out:
Start up iex again, and compile/load our file with the c
function.
c "redix_server.ex"
redix = RedixServer.start
send redix, {:put, :foo, "bar"}
send redix, {:get, :foo, self}
flush
Yay! So we have maintained some state in a long-running process. Having users of our Redix library explicitly send and receive messages to it is a little inconvenient though, it's common to wrap these details in interface functions. First, we'll register our process so we can refer to easily. Update the start function to:
def start do
initial_state = %{}
spawn(fn -> loop(initial_state) end)
|> Process.register(:redix_server)
:ok
end
Also, add the following to our redix_server:
def put key, value do
send :redix_server, {:put, key, value}
:ok
end
def get key do
send :redix_server, {:get, key, self}
receive do
{:response, value} -> value
end
end
Let's recompile our server and try our new interface from iex
c "redix_server.ex"
RedixServer.start
RedixServer.put :baz, "blue"
RedixServer.get :baz
If it complains that it is already started, you can just unregister or kill it:
Process.unregister(:redix_server)
#or
Process.whereis(:redix_server) |> Process.exit(:bye)
There we go! We've implemented a stateful server from base concepts. Processes used in this way resemble objects in OO programming. There are a few key differences of course. Processes are concurrent - so multiple processes run in parallel.
Although the interface we implemented above exposes a kind of mutable state, internally the server state uses immutable data structures. Using this server is entirely thread-safe - we could have 1000 clients reading and writing values to our server, and never need to think about mutexes and locks.
A notable aspect of servers using this pattern is that a single server is fact single-threaded - it processes messages from it's mailbox in sequence. This is a good thing - and in Elixir systems this trait is used as an intentional synchonisation point. One example where this might be used is a database client library where a limited number of connections are to be maintained to a database server, rather than spawning an unlimited number of processes for each query. A pool of server processes with a limited size would be used to manage these connections. Not understanding this trait, however, can lead to unintentional bottlenecks intoduced into your system.
Our Redix server didn't get too complicated yet, but the pattern of storing state is common enough that elixir provides a Module to make things
even more convenient: Agent
.
Let's reimplement our RedixServer, this time backed by Agent.
Create a new file redix_server_agent.ex
defmodule RedixServer do
def start_link do
Agent.start_link(fn -> %{} end, name: :redix_server)
end
def put key, value do
Agent.update(:redix_server, &Map.put(&1, key, value))
end
def get key do
Agent.get(:redix_server, &Map.get(&1, key))
end
end
Easy eh? We don't need to worry about the infinite loop and all that jazz, Agent implements that pattern for us.
Compile the file and have a play with setting and getting some keys, like we did before:
c "redix_server_agent.ex"
RedixServer.start
RedixServer.put :baz, "blue"
RedixServer.get :baz
Sometimes we need to build server processes that give us more flexibility that what Agent
provides. Agent is actually built on a
construct called a gen_server. gen_server is one of constructs provided by a library call OTP, which is part of the erlang standard library.
OTP provides helper modules and functions, which it calls behaviours, that provide a lot erlangs amazing distributed capabilities. There are a few predefined behaviours:
- gen_server: Generic implementation of a stateful server process
- supervisor: Provides error handling and recovery in concurrent systems
- application: Generic implementation of components and libraries
- gen_event: Provides event-handling support
- gen_fsm: Runs a finite state machine in a stateful server process
Each of these uses the building blocks we have explored so far to provide re-usable behaviours. Digging deeper into each of these is a topic for another day, but we'll breifly look at how to implement our RedixServer on top of GenServer.
defmodule RedixServer do
use GenServer
def start_link do
GenServer.start_link(RedixServer, %{}, name: :redix_server)
end
def put(key, value) do
GenServer.cast(:redix_server, {:put, key, value})
end
def get(key) do
GenServer.call(:redix_server, {:get, key})
end
def init(_) do
{:ok, HashDict.new}
end
def handle_cast({:put, key, value}, state) do
{:noreply, Map.put(state, key, value)}
end
def handle_call({:get, key}, _, state) do
{:reply, Map.get(state, key), state}
end
end
You'll note this is a bit more verbose than the Agent version - that's why agent exists, to hide this away. Note the pattern of having both
interface functions for clients to use, get
, put
, start
, and internal functions which are callbacks defined by the gen_server behavior,
handle_call
, handle_cast
. GenServer does a lot more than this underneath, such as support for distributed systems, customisable timeouts, and more.
Erlang and Elixir employ a different attitude to handling exceptions than other languages/platforms. We call it the "let it crash" philosophy. It may be counter-intuitive, but this is actually the key to building robust, self-healing systems. Instead of defensively handling every possible exception, we instead use supervisors to monitor and restart and processes that crash. Supervisor is one of the OTP behaviours that we discussed in the previous section.
Let's create a supervisor for our RedixServer. Create a file "redix_supervisor.ex"
defmodule RedixSupervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
children = [
worker(RedixServer, [])
]
supervise(children, strategy: :one_for_one)
end
end
Here use the Supervisor behaviour, and provide the required init
callback. We add our RedixServer to the list of children to supervise,
and tell it to start supervising with a strategy 'one_for_one'. What that means is basically if a worker dies, restart just that worker.
There are various restart strategies that can be used in different situations - you can read about them here: http://elixir-lang.org/docs/stable/elixir/#!Supervisor.html.
Let's play with our supervisor.
# If redix server already running, you'll have to stop if first
Process.whereis(:redix_server) |> Process.exit(:bye)
RedixSupervisor.start_link
# Redix server should be up and running
RedixServer.put :foo, "bar"
RedixServer.get :foo
#Let's look at it's pid
Process.whereis(:redix_server)
#Now let's kill it by force
Process.whereis(:redix_server) |> Process.exit(:bye)
#And let's look at it's pid again...you'll see it has a different number - it has been restarted
Process.whereis(:redix_server)
# And our redix server should still be working
RedixServer.put :baz, "bing"
RedixServer.get :baz
# You'll notice though, our previous state was lost
RedixServer.get :foo # => nil
Our supervisor restarted our server, with a fresh clean state. Clearly, this is not the place to store persistent state, but what if we did want to retain this state on a restart? Erlang/Elixir do provide a mechanism for this too, which we could use in our RedixServer, called ETS (Erlang Term Storage). You can find out more here: http://elixir-lang.org/getting-started/mix-otp/ets.html.
You might be wondering - what happens if our supervisor crashes? A typical elixir system is made up of a supervision tree - a heirachy of supervisors that supervise other supervisors. These generally look after a particular part of the system - and so isolate failure's within just that part of the system. The supervisors should also not contain any application logic - if the only thing they do is supervise other processes, there is no reason they should crash other than a catestrophic system failure (e.g. hardware failure) - in which case there's not much that be done - except distribute your app over multiple machines. That's a topic for another day :)
-
When we were using processes to run our long-running queries in parallel, we got our results back in a random order. Use
Task
to implement a parallel map functionpmap
, and ensures to return the elements in the correct order. -
Modify your RedixServer to persist state between crashes, using ETS.
Material in this workshop stolen and modified from:
- Elixir Getting Started Guides: http://elixir-lang.org/getting-started/introduction.html
- Elixir In Action: https://www.manning.com/books/elixir-in-action
- Programming Elixir: https://pragprog.com/book/elixir/programming-elixir
// Local Variables: // mode: elixir // End: