Created
October 7, 2015 01:09
-
-
Save noelbk/3201a6a82006f1b276e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# PipeTest - speed test for Elixir's basic binary processing | |
# | |
# Starts N clients connected by sockets. Each client reads from | |
# socket, computes checksum, and sends bytes to next client. | |
# | |
# Result: Elixir's binary matching is the bottleneck | |
# | |
# binary matching: slowest, 67Mb/sa | |
# Enum reduce over bin_to_list: slow: 121Mb/sa | |
# crc32 bif: fastest 884Mb/s | |
# | |
defmodule PipeTest do | |
# slowest: 67Mb/s | |
def sum_acc_binmatch(sum, <<c, rest::binary>>) do | |
sum_acc(sum+c, rest) | |
end | |
def sum_acc_binmatch(sum, <<>>) do | |
sum | |
end | |
# slow: 121Mb/s | |
def sum_acc_reduce(sum, buf) do | |
Enum.reduce(:binary.bin_to_list(buf), sum, fn acc, x -> acc + x end) | |
end | |
# fastest: 884Mb/s | |
# note: single-threaded C is ~750Mb/s | |
# note: multi-threaded C is ~1.4Gb/s | |
def sum_acc_crc32(sum, buf) do | |
:erlang.crc32(sum, buf) | |
end | |
def sum_acc(sum, buf) do | |
#sum_acc_crc32(sum, buf) | |
sum_acc_binmatch(sum, buf) | |
#sum_acc_reduce(sum, buf) | |
end | |
# spawned client process connects to port, reads, sends to | |
# next_sock, then sends {:client_done, msg} to parent | |
def client(port, next_sock, parent) do | |
{:ok, sock} = :gen_tcp.connect({127,0,0,1}, port, [:binary, packet: 0, active: false]) | |
client_recv(sock, next_sock, parent, 0, 0) | |
:gen_tcp.close(sock) | |
if not is_nil(next_sock) do | |
:gen_tcp.close(next_sock) | |
end | |
# debug | |
#IO.puts("client done") | |
end | |
# receive from sock, send to next_sock, send len and sum to parent when done | |
def client_recv(sock, next_sock, parent, len, sum) do | |
case :gen_tcp.recv(sock, 0) do | |
{:ok, buf} -> | |
# debug | |
#IO.puts("client_recv got len=#{byte_size(buf)} to sock=#{inspect next_sock}") | |
if not is_nil(next_sock) do | |
:ok = :gen_tcp.send(next_sock, buf) | |
end | |
client_recv(sock, next_sock, parent, len+byte_size(buf), sum_acc(sum, buf)) | |
{:error, :closed} -> | |
msg = {:client_done, "len=#{len} sum=#{sum}"} | |
# debug | |
#IO.puts("client_recv got eof, sending to parent=#{inspect parent}, msg=#{inspect msg}") | |
send(parent, msg) | |
# debug | |
#IO.puts("client_recv done") | |
:ok | |
end | |
end | |
# start n clients on port | |
def start(n, port \\ 5678) do | |
{:ok, listen_sock} = :gen_tcp.listen(port, [:binary, packet: 0, active: false, reuseaddr: true]) | |
start(listen_sock, nil, n, port) | |
end | |
# end case of start. returns the last spawned client's sock | |
def start(listen_sock, last_client_sock, 0, _port) do | |
# debug | |
#IO.puts("closing listen_sock=#{inspect listen_sock}") | |
:ok = :gen_tcp.close(listen_sock) | |
{:ok, last_client_sock} | |
end | |
# end case of start. returns the last spawned client's sock | |
def start(listen_sock, last_client_sock, n, port) do | |
me = self | |
spawn_link fn -> client(port, last_client_sock, me) end | |
{:ok, client_sock} = :gen_tcp.accept(listen_sock) | |
start(listen_sock, client_sock, n-1, port) | |
end | |
def urandom(sock, n) do | |
cmd = "head -c #{n} /dev/urandom" | |
port = Port.open({:spawn, cmd}, [:binary]) | |
Process.link port | |
# debug | |
#IO.puts("urandom spawned port=#{inspect port} from cmd=#{cmd}") | |
urandom_read(port, sock, n) | |
end | |
def urandom_read(port, _sock, 0) do | |
send port, {self(), :close} | |
#Port.close(port) | |
:ok | |
end | |
def urandom_read(port, sock, n) do | |
receive do | |
{^port, {:data, data}} -> | |
# debug | |
#IO.puts("urandom_read sending len=#{byte_size(data)} to sock=#{inspect sock}") | |
:ok = :gen_tcp.send(sock, data) | |
urandom_read(port, sock, n - byte_size(data)) | |
{^port, :closed} -> | |
:ok | |
{^port, {:exit_status, status}} -> | |
status | |
end | |
end | |
# end case | |
def wait(0) do | |
:ok | |
end | |
# wait for n clients to return | |
def wait(n) do | |
# debug | |
#IO.puts "wait(#{n})" | |
receive do | |
{:client_done, msg} -> | |
IO.puts "client_done: #{msg}" | |
wait(n-1) | |
end | |
end | |
def cat(_file, _sock, 0) do | |
:ok | |
end | |
def cat(file, sock, bytes) do | |
{:ok, data} = :file.read(file, bytes) | |
:ok = :gen_tcp.send(sock, data) | |
cat(file, sock, bytes - byte_size(data)) | |
end | |
def timestamp do | |
{megas, secs, micros} = :erlang.timestamp | |
megas*1000000 + secs + micros/1000000 | |
end | |
def run(opts \\ []) do | |
num_clients = Keyword.get(opts, :num_clients, 5) | |
bytes = Keyword.get(opts, :bytes, 1000) | |
tmp_path = "/tmp/pipetest.tmp" | |
:os.cmd(String.to_char_list("dd if=/dev/urandom bs=#{bytes} count=1 >#{tmp_path}")) | |
{:ok, file} = :file.open(tmp_path, [:read, :binary, :raw]) | |
{:ok, sock} = start(num_clients) | |
t0 = timestamp | |
:ok = cat(file, sock, bytes) | |
#:ok = urandom(sock, bytes) | |
# debug | |
#IO.puts("urandom done") | |
:ok = :gen_tcp.close(sock) | |
:ok = wait(num_clients) | |
dt = timestamp - t0 | |
IO.puts "run: #{num_clients * bytes / dt / 1000000}Mb/s" | |
end | |
def dbg(opts \\ []) do | |
:dbg.tracer | |
:dbg.p(:new, :call) | |
:dbg.tpl(PipeTest, :_, []) | |
:dbg.tpl(PipeTest, :_, [{:_, [], [{:return_trace}]}]) | |
spawn fn -> run(opts) end | |
end | |
def eprof(opts \\ []) do | |
:eprof.profile fn -> run(opts) end | |
:eprof.analyze | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment