Skip to content

Instantly share code, notes, and snippets.

@g-andrade
Created October 21, 2020 14:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save g-andrade/42ee10e5e1fc97c157ce0dc627cbf2b7 to your computer and use it in GitHub Desktop.
Save g-andrade/42ee10e5e1fc97c157ce0dc627cbf2b7 to your computer and use it in GitHub Desktop.
Duplicate socket async reference
#!/usr/bin/env escript
-module(duplicate_socket_async_ref).
-mode(compile).
-export([main/1]).
main([]) ->
{ok, ListenSocket, ListenPort} = create_listen_socket(),
spawn_link(fun () -> run_socket_acceptor(ListenSocket) end),
try_reproducing_the_issue(ListenPort, 0).
create_listen_socket() ->
{ok, Socket} = gen_tcp:listen(0, [{ifaddr, {127,0,0,1}},
{active, false},
{reuseaddr, true},
{linger, {false, 0}},
{recbuf, 16}]),
{ok, Port} = inet:port(Socket),
{ok, Socket, Port}.
run_socket_acceptor(ListenSocket) ->
case gen_tcp:accept(ListenSocket) of
{ok, ConnectionSocket} ->
handle_inbound_connection(ConnectionSocket),
run_socket_acceptor(ListenSocket);
{error, _} ->
run_socket_acceptor(ListenSocket)
end.
handle_inbound_connection(Socket) ->
case gen_tcp:recv(Socket, 0) of
{ok, _} ->
handle_inbound_connection(Socket);
{error, closed} ->
ok
end.
try_reproducing_the_issue(ListenPort, AttemptsSoFar) ->
UpdatedAttemptsSoFar = AttemptsSoFar + 1,
case attempt(ListenPort) of
{succeeded, FirstMsg, SecondMsg} ->
io:format("Succeeded after ~b attempt(s)~n"
"First message: ~p~n"
"Second message: ~p~n",
[UpdatedAttemptsSoFar,
FirstMsg, SecondMsg]);
failed ->
io:format("Failed (~b attempt(s) so far)~n", [UpdatedAttemptsSoFar]),
try_reproducing_the_issue(ListenPort, UpdatedAttemptsSoFar)
end.
attempt(ListenPort) ->
{ok, Socket} = connect(ListenPort),
ok = socket:setopt(Socket, socket, sndbuf, 16),
PacketLength = 128,
Packet = <<0:PacketLength/integer-unit:8>>,
Self = self(),
SocketCloserPid = spawn(fun () -> run_socket_closer(Self, Socket) end),
case socket:send(Socket, Packet, nowait) of
ok ->
_ = socket:close(Socket),
failed;
{ok, {_Unsent, {select_info, _, SelectRef}}} ->
await_socket_closer(SocketCloserPid),
await_occurence(Socket, SelectRef);
{select, {select_info, _, SelectRef}} ->
await_socket_closer(SocketCloserPid),
await_occurence(Socket, SelectRef);
{error, _} ->
_ = socket:close(Socket),
failed
end.
connect(ListenPort) ->
{ok, Socket} = socket:open(inet, stream, tcp),
BindAddr = #{family => inet, addr => {127,0,0,1}},
{ok, _BindPort} = socket:bind(Socket, BindAddr),
ok = socket:setopt(Socket, socket, linger, abort),
ConnectAddr = #{family => inet, addr => {127,0,0,1}, port => ListenPort},
ok = socket:connect(Socket, ConnectAddr),
{ok, Socket}.
run_socket_closer(Parent, Socket) ->
_ = socket:close(Socket),
_ = Parent ! {self(), done}.
await_socket_closer(Pid) ->
receive
{Pid, done} -> ok
end.
await_occurence(Socket, SelectRef) ->
case receive_socket_messages(Socket) of
[{'$socket', Socket, select, SelectRef} = FirstMsg,
{'$socket', Socket, abort, {SelectRef, closed}} = SecondMsg
] ->
{succeeded, FirstMsg, SecondMsg};
[{'$socket', Socket, _, _}] ->
failed
end.
receive_socket_messages(Socket) ->
receive
{'$socket', Socket, _, _} = Msg ->
[Msg | receive_socket_messages(Socket)]
after
0 -> []
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment