Created
March 11, 2016 18:03
-
-
Save noelbk/519a50c700b5d28e9d5f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# PipeTest - speed test for Elixir's basic binary processing | |
# | |
# Starts N clients connected by sockets. Each client reads from | |
# socket, computes checksum, and sends bytes to next client. | |
# | |
# Result: Elixir's binary matching is the bottleneck | |
# | |
# binary matching: slowest, 67Mb/s | |
# Enum reduce over bin_to_list: slow: 121Mb/sa | |
# crc32 bif: fastest 884Mb/s | |
# | |
# iex | |
# c("pipetest.ex") | |
# PipeTest.run(bytes: 10000000, clients: 8) | |
# | |
# compare with straight C at http://github.com/noelbk/bklib/blob/master/sendlist_t.c | |
defmodule PipeTest do | |
# uncomment one function below select different kinds of string iteration | |
def sum_acc(sum, buf) do | |
# slowest: binary matching like sum(<<c, rest::binary>>) | |
# brutally slow: 67Mb/s | |
sum_acc_binmatch(sum, buf) | |
# better: use Elixir's Enum.reduce | |
# still slow: 121Mb/s, around 2x faster than binmatch | |
#sum_acc_reduce(sum, buf) | |
# fastest: use the :erlang.crc32 BIF. | |
# 884Mb/s over 10x faster than binmatch! | |
#sum_acc_crc32(sum, buf) | |
# still, a C program can do this in ~2Gb/sec, which is ~30x faster | |
# than binmatch. See | |
end | |
# slowest: 67Mb/s | |
def sum_acc_binmatch(sum, <<c, rest::binary>>) do | |
sum_acc(sum+c, rest) | |
end | |
def sum_acc_binmatch(sum, <<>>) do | |
sum | |
end | |
# slow: 121Mb/s | |
def sum_acc_reduce(sum, buf) do | |
Enum.reduce(:binary.bin_to_list(buf), sum, fn acc, x -> acc + x end) | |
end | |
# fastest: 884Mb/s | |
# note: single-threaded C is ~750Mb/s | |
# note: multi-threaded C is ~1.4Gb/s | |
def sum_acc_crc32(sum, buf) do | |
:erlang.crc32(sum, buf) | |
end | |
# spawned client process connects to port, reads, sends to | |
# next_sock, then sends {:client_done, msg} to parent | |
def client(port, next_sock, parent) do | |
{:ok, sock} = :gen_tcp.connect({127,0,0,1}, port, [:binary, packet: 0, active: false]) | |
client_recv(sock, next_sock, parent, 0, 0) | |
:gen_tcp.close(sock) | |
if not is_nil(next_sock) do | |
:gen_tcp.close(next_sock) | |
end | |
# debug | |
#IO.puts("client done") | |
end | |
# receive from sock, send to next_sock, send len and sum to parent when done | |
def client_recv(sock, next_sock, parent, len, sum) do | |
case :gen_tcp.recv(sock, 0) do | |
{:ok, buf} -> | |
# debug | |
#IO.puts("client_recv got len=#{byte_size(buf)} to sock=#{inspect next_sock}") | |
if not is_nil(next_sock) do | |
:ok = :gen_tcp.send(next_sock, buf) | |
end | |
client_recv(sock, next_sock, parent, len+byte_size(buf), sum_acc(sum, buf)) | |
{:error, :closed} -> | |
msg = {:client_done, "len=#{len} sum=#{sum}"} | |
# debug | |
#IO.puts("client_recv got eof, sending to parent=#{inspect parent}, msg=#{inspect msg}") | |
send(parent, msg) | |
# debug | |
#IO.puts("client_recv done") | |
:ok | |
end | |
end | |
# start n clients on port | |
def start(n, port \\ 5678) do | |
{:ok, listen_sock} = :gen_tcp.listen(port, [:binary, packet: 0, active: false, reuseaddr: true]) | |
start(listen_sock, nil, n, port) | |
end | |
# end case of start. returns the last spawned client's sock | |
def start(listen_sock, last_client_sock, 0, _port) do | |
# debug | |
#IO.puts("closing listen_sock=#{inspect listen_sock}") | |
:ok = :gen_tcp.close(listen_sock) | |
{:ok, last_client_sock} | |
end | |
# end case of start. returns the last spawned client's sock | |
def start(listen_sock, last_client_sock, n, port) do | |
me = self | |
spawn_link fn -> client(port, last_client_sock, me) end | |
{:ok, client_sock} = :gen_tcp.accept(listen_sock) | |
start(listen_sock, client_sock, n-1, port) | |
end | |
def urandom(sock, n) do | |
cmd = "head -c #{n} /dev/urandom" | |
port = Port.open({:spawn, cmd}, [:binary]) | |
Process.link port | |
# debug | |
#IO.puts("urandom spawned port=#{inspect port} from cmd=#{cmd}") | |
urandom_read(port, sock, n) | |
end | |
def urandom_read(port, _sock, 0) do | |
send port, {self(), :close} | |
#Port.close(port) | |
:ok | |
end | |
def urandom_read(port, sock, n) do | |
receive do | |
{^port, {:data, data}} -> | |
# debug | |
#IO.puts("urandom_read sending len=#{byte_size(data)} to sock=#{inspect sock}") | |
:ok = :gen_tcp.send(sock, data) | |
urandom_read(port, sock, n - byte_size(data)) | |
{^port, :closed} -> | |
:ok | |
{^port, {:exit_status, status}} -> | |
status | |
end | |
end | |
# end case | |
def wait(0) do | |
:ok | |
end | |
# wait for n clients to return | |
def wait(n) do | |
# debug | |
#IO.puts "wait(#{n})" | |
receive do | |
{:client_done, msg} -> | |
IO.puts "client_done: #{msg}" | |
wait(n-1) | |
end | |
end | |
def cat(_file, _sock, 0) do | |
:ok | |
end | |
def cat(file, sock, bytes) do | |
{:ok, data} = :file.read(file, bytes) | |
:ok = :gen_tcp.send(sock, data) | |
cat(file, sock, bytes - byte_size(data)) | |
end | |
def timestamp do | |
{megas, secs, micros} = :erlang.timestamp | |
megas*1000000 + secs + micros/1000000 | |
end | |
def run(opts \\ []) do | |
num_clients = Keyword.get(opts, :num_clients, 5) | |
bytes = Keyword.get(opts, :bytes, 1000) | |
tmp_path = "/tmp/pipetest.tmp" | |
:os.cmd(String.to_char_list("dd if=/dev/urandom bs=#{bytes} count=1 >#{tmp_path}")) | |
{:ok, file} = :file.open(tmp_path, [:read, :binary, :raw]) | |
{:ok, sock} = start(num_clients) | |
:ok = cat(file, sock, bytes) | |
#:ok = urandom(sock, bytes) | |
t0 = timestamp | |
# debug | |
#IO.puts("urandom done") | |
:ok = :gen_tcp.close(sock) | |
:ok = wait(num_clients) | |
dt = timestamp - t0 | |
IO.puts "sent #{bytes} bytes through #{num_clients} clients in #{dt} secs = #{num_clients * bytes / dt / 1000000}Mb/s" | |
end | |
def dbg(opts \\ []) do | |
:dbg.tracer | |
:dbg.p(:new, :call) | |
:dbg.tpl(PipeTest, :_, []) | |
:dbg.tpl(PipeTest, :_, [{:_, [], [{:return_trace}]}]) | |
spawn fn -> run(opts) end | |
end | |
def eprof(opts \\ []) do | |
:eprof.profile fn -> run(opts) end | |
:eprof.analyze | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment