Skip to content

Instantly share code, notes, and snippets.

@mprymek
Last active October 23, 2019 17:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mprymek/0308d05d79237211dd732436f7887cd6 to your computer and use it in GitHub Desktop.
Save mprymek/0308d05d79237211dd732436f7887cd6 to your computer and use it in GitHub Desktop.
# Simplest possible actor - just prints received messages.
defmodule SimplestActor do
def run() do
receive do
{from, msg} ->
IO.puts("[SimplestActor] Got message from #{inspect(from)}: #{inspect(msg)}")
end
run()
end
end
IO.puts("[main] My PID: #{inspect(self())}")
# spawn new actor
a1 = spawn(fn -> SimplestActor.run() end)
IO.puts("[main] Agent a1 runs with #{inspect(a1)}")
# send some message to it
send(a1, {self(), 1})
# wait a while
:timer.sleep(100)
send(a1, {self(), 2})
:timer.sleep(100)
# Simple request-response stateless actor - receives integer, adds 1 to it and sends result back.
defmodule StatelessActor do
def run() do
receive do
{from, msg} ->
IO.puts("[StatelessActor] Got message from #{inspect(from)}: #{inspect(msg)}")
send(from, msg+1)
end
run()
end
end
IO.puts("[main] My PID: #{inspect(self())}")
# spawn new actor
a1 = spawn(fn -> StatelessActor.run() end)
IO.puts("[main] Agent a1 runs with #{inspect(a1)}")
# send some request to it
send(a1, {self(), 1})
# wait for reply
receive do
x ->
IO.puts("[main] Got reply from actor a1: #{inspect(x)}")
end
send(a1, {self(), 2})
receive do
x ->
IO.puts("[main] Got reply from actor a1: #{inspect(x)}")
end
# Simple request-response statefull actor implementing ChitChat protocol.
#
# This implementation is buggy - it can't serve more than one client.
# ChitChat protocol:
#
# A -> B: :hello
# B -> A: :hello
# A -> B: :bye
# [end of session]
defmodule ChitChatActor2 do
@name "ChitChatActor2"
def run() do
run(:start)
end
defp run(:start) do
receive do
{from, msg=:hello} ->
IO.puts("[#{@name}] Got message from #{inspect(from)}: #{inspect(msg)}")
send(from, :hello)
run(:hello_received)
end
end
defp run(:hello_received) do
receive do
{from, msg=:bye} ->
IO.puts("[#{@name}] Got message from #{inspect(from)}: #{inspect(msg)}")
IO.puts("[#{@name}] Session closed")
run(:start)
end
end
end
IO.puts("[main] My PID: #{inspect(self())}")
# spawn new actor
a1 = spawn(fn -> ChitChatActor2.run() end)
IO.puts("[main] Agent a1 runs with #{inspect(a1)}")
# send :hello
send(a1, {self(), :hello})
# wait for :hello
receive do
msg ->
IO.puts("[main] Got reply from actor a1: #{inspect(msg)}")
case msg do
:hello ->
:ok
_ ->
raise RuntimeError, message: "Unexpected message from a1: #{inspect(msg)}"
end
end
# send :bye
send(a1, {self(), :bye})
# wait a while
:timer.sleep(100)
# send :hello
send(a1, {self(), :hello})
# wait for :hello
receive do
msg ->
IO.puts("[main] Got reply from actor a1: #{inspect(msg)}")
case msg do
:hello ->
:ok
_ ->
raise RuntimeError, message: "Unexpected message from a1: #{inspect(msg)}"
end
end
# send :illegal_message
send(a1, {self(), :illegal_message})
# wait a while
:timer.sleep(100)
# Now, we will exploit ChitChatActor1 bug...
cc_client = fn ->
name = "ccc #{inspect self()}"
IO.puts("[#{name}] Starting")
# send :hello
send(a1, {self(), :hello})
# wait for :hello
receive do
:hello -> :ok
after 500 ->
raise RuntimeError, message: "Timeout!"
end
# wait a while
:timer.sleep(100)
# send :bye
send(a1, {self(), :bye})
# wait a while
:timer.sleep(100)
IO.puts("[#{name}] Everything OK")
end
spawn(cc_client)
spawn(cc_client)
:timer.sleep(1000)
# Simple request-response statefull actor implementing ChitChat protocol.
#
# This implementation fixes previous implementation bug by receiving messages
# selectively.
# It's still buggy because it does not identify message sender, but at least it
# pretends to work properly ;)
defmodule ChitChatActor2 do
@name "ChitChatActor2"
def run() do
run(:start)
end
defp run(:start) do
receive do
{from, msg=:hello} ->
IO.puts("[#{@name}] Got message from #{inspect(from)}: #{inspect(msg)}")
send(from, :hello)
run(:hello_received)
end
end
defp run(:hello_received) do
receive do
{from, msg=:bye} ->
IO.puts("[#{@name}] Got message from #{inspect(from)}: #{inspect(msg)}")
IO.puts("[#{@name}] Session closed")
run(:start)
end
end
end
# spawn new actor
a1 = spawn(fn -> ChitChatActor2.run() end)
IO.puts("[main] Agent a1 runs with #{inspect(a1)}")
# Now, we will try to exploit ChitChatActor1 bug but it looks like it's not there...
cc_client = fn ->
name = "ccc #{inspect self()}"
IO.puts("[#{name}] Starting")
# send :hello
send(a1, {self(), :hello})
# wait for :hello
receive do
:hello -> :ok
after 500 ->
raise RuntimeError, message: "Timeout!"
end
# wait a while
:timer.sleep(100)
# send :bye
send(a1, {self(), :bye})
# wait a while
:timer.sleep(100)
IO.puts("[#{name}] Everything OK")
end
spawn(cc_client)
spawn(cc_client)
:timer.sleep(1000)
# Simple request-response statefull actor implementing ChitChat protocol.
#
# This is correct implementation of ChitChat protocol using explicit session store.
#
# But it's still buggy:
# - session is not cleaned up if the client dies unexpectedly
# - we should not rely on PIDs; clients should identify sessions using make_ref()
defmodule ChitChatActor3 do
@name "ChitChatActor3"
def run() do
run(%{})
end
defp run(sessions) do
sessions =
receive do
{from, msg} ->
IO.puts("[#{@name}] Got message from #{inspect(from)}: #{inspect(msg)}")
session_id = from
session = sessions |> Map.get(session_id)
case handle_message(from, msg, session_id, session) do
:session_closed ->
sessions |> Map.delete(session_id)
{:session, session} ->
sessions |> Map.put(session_id, session)
end
end
run(sessions)
end
defp handle_message(from, :hello, session_id, nil) do
IO.puts("[#{@name}] New session #{inspect(session_id)}")
send(from, :hello)
{:session, :hello_received}
end
defp handle_message(_from, :bye, session_id, :hello_received) do
IO.puts("[#{@name}] session #{inspect(session_id)} closed")
:session_closed
end
defp handle_message(from, msg, _session_id, session) do
IO.puts("[#{@name}] unexpected mesage from #{inspect(from)}: session=#{inspect(session)} msg=#{inspect(msg)}")
:session_closed
end
end
# spawn new actor
a1 = spawn(fn -> ChitChatActor3.run() end)
IO.puts("[main] Agent a1 runs with #{inspect(a1)}")
# Now, we will try to exploit ChitChatActor1 bug but it looks like it's not there...
cc_client = fn ->
name = "ccc #{inspect self()}"
IO.puts("[#{name}] Starting")
# send :hello
send(a1, {self(), :hello})
# wait for :hello
receive do
:hello -> :ok
after 500 ->
raise RuntimeError, message: "Timeout!"
end
# wait a while
:timer.sleep(100)
# send :bye
send(a1, {self(), :bye})
# wait a while
:timer.sleep(100)
IO.puts("[#{name}] Everything OK")
end
spawn(cc_client)
spawn(cc_client)
:timer.sleep(1000)
# Simple request-response statefull actor implementing ChitChat protocol.
#
# More "elixiric" implementation using protocol-agnostic proxy and separate actor for every session.
#
# BUGS: still uses PIDs to identify sessions
defmodule ChitChatHandler do
def run(parent, id) do
IO.puts("[CCH #{inspect(id)}] Starting")
loop({parent, id, :start})
end
defp loop({parent, id, state}) do
receive do
{:message, from, msg} ->
case handle_message(id, state, from, msg) do
:quit ->
send(parent, {:handler, :quit, id})
:ok
state ->
loop({parent, id, state})
end
end
end
defp handle_message(id, :start, from, :hello) do
IO.puts("[CCH #{inspect(id)}] got :hello from #{inspect(from)}")
send(from, :hello)
:hello_received
end
defp handle_message(id, :hello_received, from, :bye) do
IO.puts("[CCH #{inspect(id)}] got :bye from #{inspect(from)}")
:quit
end
defp handle_message(id, state, from, msg) do
IO.puts("[CCH #{inspect(id)}] unexpected mesage from #{inspect(from)}: state=#{inspect(state)} msg=#{inspect(msg)}")
:quit
end
end
defmodule Proxy do
@name "Proxy"
def run(handler_mod) do
loop(handler_mod, %{})
end
defp loop(handler_mod, handlers) do
handlers =
receive do
{from, msg} ->
IO.puts("[#{@name}] Got message from #{inspect(from)}: #{inspect(msg)}")
session_id = from
case handlers |> Map.get(session_id) do
nil ->
me = self()
IO.puts("[#{@name}] Creating handler #{handler_mod} for session #{inspect(session_id)}")
handler = spawn_link(fn -> handler_mod.run(me, session_id) end)
send(handler, {:message, from, msg})
handlers = handlers |> Map.put(session_id, handler)
IO.puts("[#{@name}] handlers = #{inspect(handlers)}")
handlers
handler ->
send(handler, {:message, from, msg})
handlers
end
{:handler, :quit, session_id} ->
IO.puts("[#{@name}] Deleting handler for session #{inspect(session_id)}")
handlers = handlers |> Map.delete(session_id)
IO.puts("[#{@name}] handlers = #{inspect(handlers)}")
handlers
end
loop(handler_mod, handlers)
end
end
# spawn new actor
a1 = spawn(fn -> Proxy.run(ChitChatHandler) end)
IO.puts("[main] Agent a1 runs with #{inspect(a1)}")
# Now, we will try to exploit ChitChatActor1 bug but it looks like it's not there...
cc_client = fn ->
name = "ccc #{inspect self()}"
IO.puts("[#{name}] Starting")
# send :hello
send(a1, {self(), :hello})
# wait for :hello
receive do
:hello -> :ok
after 500 ->
raise RuntimeError, message: "Timeout!"
end
# wait a while
:timer.sleep(100)
# send :bye
send(a1, {self(), :bye})
# wait a while
:timer.sleep(100)
IO.puts("[#{name}] Everything OK")
end
spawn(cc_client)
spawn(cc_client)
:timer.sleep(1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment