Skip to content

Instantly share code, notes, and snippets.

@noelbk
Created March 11, 2016 18:03
Show Gist options
  • Save noelbk/519a50c700b5d28e9d5f to your computer and use it in GitHub Desktop.
Save noelbk/519a50c700b5d28e9d5f to your computer and use it in GitHub Desktop.
# PipeTest - speed test for Elixir's basic binary processing
#
# Starts N clients connected by sockets. Each client reads from
# socket, computes checksum, and sends bytes to next client.
#
# Result: Elixir's binary matching is the bottleneck
#
# binary matching: slowest, 67Mb/s
# Enum reduce over bin_to_list: slow: 121Mb/sa
# crc32 bif: fastest 884Mb/s
#
# iex
# c("pipetest.ex")
# PipeTest.run(bytes: 10000000, clients: 8)
#
# compare with straight C at http://github.com/noelbk/bklib/blob/master/sendlist_t.c
defmodule PipeTest do
# uncomment one function below select different kinds of string iteration
def sum_acc(sum, buf) do
# slowest: binary matching like sum(<<c, rest::binary>>)
# brutally slow: 67Mb/s
sum_acc_binmatch(sum, buf)
# better: use Elixir's Enum.reduce
# still slow: 121Mb/s, around 2x faster than binmatch
#sum_acc_reduce(sum, buf)
# fastest: use the :erlang.crc32 BIF.
# 884Mb/s over 10x faster than binmatch!
#sum_acc_crc32(sum, buf)
# still, a C program can do this in ~2Gb/sec, which is ~30x faster
# than binmatch. See
end
# slowest: 67Mb/s
def sum_acc_binmatch(sum, <<c, rest::binary>>) do
sum_acc(sum+c, rest)
end
def sum_acc_binmatch(sum, <<>>) do
sum
end
# slow: 121Mb/s
def sum_acc_reduce(sum, buf) do
Enum.reduce(:binary.bin_to_list(buf), sum, fn acc, x -> acc + x end)
end
# fastest: 884Mb/s
# note: single-threaded C is ~750Mb/s
# note: multi-threaded C is ~1.4Gb/s
def sum_acc_crc32(sum, buf) do
:erlang.crc32(sum, buf)
end
# spawned client process connects to port, reads, sends to
# next_sock, then sends {:client_done, msg} to parent
def client(port, next_sock, parent) do
{:ok, sock} = :gen_tcp.connect({127,0,0,1}, port, [:binary, packet: 0, active: false])
client_recv(sock, next_sock, parent, 0, 0)
:gen_tcp.close(sock)
if not is_nil(next_sock) do
:gen_tcp.close(next_sock)
end
# debug
#IO.puts("client done")
end
# receive from sock, send to next_sock, send len and sum to parent when done
def client_recv(sock, next_sock, parent, len, sum) do
case :gen_tcp.recv(sock, 0) do
{:ok, buf} ->
# debug
#IO.puts("client_recv got len=#{byte_size(buf)} to sock=#{inspect next_sock}")
if not is_nil(next_sock) do
:ok = :gen_tcp.send(next_sock, buf)
end
client_recv(sock, next_sock, parent, len+byte_size(buf), sum_acc(sum, buf))
{:error, :closed} ->
msg = {:client_done, "len=#{len} sum=#{sum}"}
# debug
#IO.puts("client_recv got eof, sending to parent=#{inspect parent}, msg=#{inspect msg}")
send(parent, msg)
# debug
#IO.puts("client_recv done")
:ok
end
end
# start n clients on port
def start(n, port \\ 5678) do
{:ok, listen_sock} = :gen_tcp.listen(port, [:binary, packet: 0, active: false, reuseaddr: true])
start(listen_sock, nil, n, port)
end
# end case of start. returns the last spawned client's sock
def start(listen_sock, last_client_sock, 0, _port) do
# debug
#IO.puts("closing listen_sock=#{inspect listen_sock}")
:ok = :gen_tcp.close(listen_sock)
{:ok, last_client_sock}
end
# end case of start. returns the last spawned client's sock
def start(listen_sock, last_client_sock, n, port) do
me = self
spawn_link fn -> client(port, last_client_sock, me) end
{:ok, client_sock} = :gen_tcp.accept(listen_sock)
start(listen_sock, client_sock, n-1, port)
end
def urandom(sock, n) do
cmd = "head -c #{n} /dev/urandom"
port = Port.open({:spawn, cmd}, [:binary])
Process.link port
# debug
#IO.puts("urandom spawned port=#{inspect port} from cmd=#{cmd}")
urandom_read(port, sock, n)
end
def urandom_read(port, _sock, 0) do
send port, {self(), :close}
#Port.close(port)
:ok
end
def urandom_read(port, sock, n) do
receive do
{^port, {:data, data}} ->
# debug
#IO.puts("urandom_read sending len=#{byte_size(data)} to sock=#{inspect sock}")
:ok = :gen_tcp.send(sock, data)
urandom_read(port, sock, n - byte_size(data))
{^port, :closed} ->
:ok
{^port, {:exit_status, status}} ->
status
end
end
# end case
def wait(0) do
:ok
end
# wait for n clients to return
def wait(n) do
# debug
#IO.puts "wait(#{n})"
receive do
{:client_done, msg} ->
IO.puts "client_done: #{msg}"
wait(n-1)
end
end
def cat(_file, _sock, 0) do
:ok
end
def cat(file, sock, bytes) do
{:ok, data} = :file.read(file, bytes)
:ok = :gen_tcp.send(sock, data)
cat(file, sock, bytes - byte_size(data))
end
def timestamp do
{megas, secs, micros} = :erlang.timestamp
megas*1000000 + secs + micros/1000000
end
def run(opts \\ []) do
num_clients = Keyword.get(opts, :num_clients, 5)
bytes = Keyword.get(opts, :bytes, 1000)
tmp_path = "/tmp/pipetest.tmp"
:os.cmd(String.to_char_list("dd if=/dev/urandom bs=#{bytes} count=1 >#{tmp_path}"))
{:ok, file} = :file.open(tmp_path, [:read, :binary, :raw])
{:ok, sock} = start(num_clients)
:ok = cat(file, sock, bytes)
#:ok = urandom(sock, bytes)
t0 = timestamp
# debug
#IO.puts("urandom done")
:ok = :gen_tcp.close(sock)
:ok = wait(num_clients)
dt = timestamp - t0
IO.puts "sent #{bytes} bytes through #{num_clients} clients in #{dt} secs = #{num_clients * bytes / dt / 1000000}Mb/s"
end
def dbg(opts \\ []) do
:dbg.tracer
:dbg.p(:new, :call)
:dbg.tpl(PipeTest, :_, [])
:dbg.tpl(PipeTest, :_, [{:_, [], [{:return_trace}]}])
spawn fn -> run(opts) end
end
def eprof(opts \\ []) do
:eprof.profile fn -> run(opts) end
:eprof.analyze
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment