Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Process communication programming
-module(channels).
-compile(export_all).
make() ->
Ref = make_ref(),
Pid = spawn(?MODULE, channel, [Ref]),
{channel, Pid, Ref}.
channel(Ref) ->
receive
{read, Ref, From} ->
receive
{enqueue, Ref, Value} ->
From ! {Ref, {ok, Value}},
channel(Ref);
{closed, Ref} ->
From ! {Ref, closed}
end
end.
enqueue({channel, Pid, Ref}, Value) ->
Pid ! {enqueue, Ref, Value}.
read({channel, Pid, Ref}) ->
Pid ! {read, Ref, self()},
Monitor = erlang:monitor(process, Pid),
Result = receive
{Ref, closed} -> closed;
{Ref, {ok, Value}} -> {ok, Value};
{'DOWN', Monitor, process, Pid, Reason} ->
check_close_reason(Reason)
end,
erlang:demonitor(Monitor),
Result.
check_close_reason(closed) -> closed;
check_close_reason(normal) -> closed;
check_close_reason(noproc) -> closed;
check_close_reason(Reason) -> throw({channel, Reason}).
close({channel, Pid, Ref}) ->
Pid ! {closed, Ref}.
dotimes(_, 0) ->
ok;
dotimes(F, Times) ->
F(),
dotimes(F, Times-1).
doapply(Ch, Callback) ->
case read(Ch) of
{ok, Value} -> {ok, Callback(Value)};
closed -> closed
end.
doall(Ch, Callback) ->
case doapply(Ch, Callback) of
{ok, _Value} -> doall(Ch, Callback);
closed -> ok
end.
stream_to(Origin, Target) ->
spawn(fun() ->
ok = doall(Origin, fun(El) -> enqueue(Target, El) end),
close(Target)
end).
map(F, Ch) ->
Upstream = make(),
spawn(fun() ->
ok = doall(Ch, fun(El) -> enqueue(Upstream, F(El)) end),
close(Upstream)
end),
Upstream.
filter(Pred, Ch) ->
Upstream = make(),
spawn(fun() ->
doall(Ch, fun(El) ->
case Pred(El) of
true -> enqueue(Upstream, El);
false -> ok
end
end),
close(Upstream)
end),
Upstream.
to_list(Ch) ->
to_list(Ch, []).
to_list(Ch, Elems) ->
case read(Ch) of
{ok, Value} -> to_list(Ch, [Value | Elems]);
closed -> lists:reverse(Elems)
end.
from_list(Elems) ->
Ch = make(),
spawn(fun() ->
lists:foreach(fun(El) -> enqueue(Ch, El) end, Elems),
close(Ch)
end),
Ch.
take(Count, Ch) ->
Upstream = make(),
%% xxx: actually, this implementation leaking,
%% cause we can get read access to Ch channel
%% even outside this function (there is no
%% single-handled reading functionality)
spawn(fun() ->
dotimes(fun() ->
case read(Ch) of
{ok, Value} -> enqueue(Upstream, Value);
closed -> ok
end
end, Count),
close(Upstream)
end),
Upstream.
drop(Count, Ch) ->
Upstream = make(),
%% xxx: actually, this implementation leaking,
%% cause we can get read access to Ch channel
%% even outside this function (there is no
%% single-handled reading functionality)
spawn(fun() ->
dotimes(fun() -> read(Ch) end, Count),
stream_to(Ch, Upstream)
end),
Upstream.
fork(Ch) ->
{Up1, Up2} = {make(), make()},
spawn(fun() ->
ok = doall(Ch, fun(El) ->
enqueue(Up1, El),
enqueue(Up2, El)
end),
close(Up1),
close(Up2)
end),
{Up1, Up2}.
zip(Chs) ->
lists:foldl(fun zip/2, make(), Chs).
zip(Ch1, Ch2) ->
Upstream = make(),
spawn(fun() ->
ok = zip_read_loop(Ch1, Ch2, Upstream),
close(Upstream)
end),
Upstream.
zip_read_loop({channel, Pid, Ref} = Ch, Upstream) ->
Pid ! {read, Ref, self()},
receive
{Ref, closed} -> ok;
{Ref, {ok, Value}} ->
enqueue(Upstream, Value),
zip_read_loop(Ch, Upstream)
end.
zip_read_loop({channel, Pid1, Ref1} = Ch1, {channel, Pid2, Ref2} = Ch2, Upstream) ->
Pid1 ! {read, Ref1, self()},
Pid2 ! {read, Ref2, self()},
receive
{Ref1, closed} -> zip_read_loop(Ch2, Upstream);
{Ref2, closed} -> zip_read_loop(Ch1, Upstream);
{_, {ok, Value}} ->
enqueue(Upstream, Value),
zip_read_loop(Ch1, Ch2, Upstream)
end.
chain(_Chs) ->
ok.
%% Example with Google distributed search
%% from Rob Pike talk about Go channels
fake_search(Kind) ->
fun(Ch, Query) ->
spawn(fun() ->
timer:sleep(random:uniform(100) + 10),
enqueue(Ch, {Kind, Query})
end)
end.
fastest(Query, Replicas) ->
Ch = make(),
lists:foreach(fun(R) -> R(Ch, Query) end, Replicas),
read(Ch).
collector(Ch, Query) ->
fun(Kinds) ->
spawn(fun() ->
Searches = lists:map(fun fake_search/1, Kinds),
enqueue(Ch, fastest(Query, Searches))
end)
end.
google(Query) ->
Ch = make(),
lists:foreach(collector(Ch, Query),
[[web1, web2],
[image1, image2],
[video1, video2, video3]]),
to_list(take(3, Ch)).
%% ===================================================================
%% Unit tests
%% ===================================================================
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
enqueue_infinite_helper(_Ch, _, []) -> ok;
enqueue_infinite_helper(Ch, Timeout, [H | Elems]) ->
timeout_helper(Timeout),
enqueue(Ch, H),
enqueue_infinite_helper(Ch, Timeout, Elems).
enqueue_list_helper(Ch, _, []) -> close(Ch);
enqueue_list_helper(Ch, Timeout, [H | Elems]) ->
timeout_helper(Timeout),
enqueue(Ch, H),
enqueue_list_helper(Ch, Timeout, Elems).
timeout_helper(0) -> ok;
timeout_helper(Time) -> timer:sleep(Time).
convert_to_list_test() ->
Ch = make(),
spawn(?MODULE, enqueue_list_helper, [Ch, 0, [1,2,3,4,5]]),
?assertEqual([1,2,3,4,5], to_list(Ch)).
convert_to_list_read_timer_test() ->
Ch = make(),
spawn(?MODULE, enqueue_list_helper, [Ch, 0, [1,2,3,4,5]]),
timer:sleep(10),
?assertEqual([1,2,3,4,5], to_list(Ch)).
convert_from_list_test() ->
?assertEqual([1,2,3], to_list(from_list([1,2,3]))).
stream_to_other_channel_test() ->
{Ch, Upstream} = {make(), make()},
spawn(?MODULE, enqueue_list_helper, [Ch, 0, [1,2,3,4,5]]),
stream_to(Ch, Upstream),
?assertEqual([1,2,3,4,5], to_list(Upstream)).
take_from_long_channel_test() ->
Ch = make(),
spawn(?MODULE, enqueue_list_helper, [Ch, 0, lists:seq(1,10)]),
?assertEqual([1,2], to_list(take(2, Ch))).
take_from_small_channel_test() ->
Ch = make(),
spawn(?MODULE, enqueue_list_helper, [Ch, 0, lists:seq(1,3)]),
?assertEqual([1,2,3], to_list(take(20, Ch))).
drop_from_long_channel_test() ->
Ch = make(),
spawn(?MODULE, enqueue_list_helper, [Ch, 0, lists:seq(1,30)]),
?assertEqual([26,27,28,29,30], to_list(drop(25, Ch))).
map_test() ->
Ch = from_list(lists:seq(1,5)),
?assertEqual([2,4,6,8,10], to_list(map(fun(El) -> El * 2 end, Ch))).
filter_test() ->
Ch = from_list(lists:seq(1,5)),
?assertEqual([1,2], to_list(filter(fun(El) -> El < 3 end, Ch))).
zip_test() ->
{Ch1, Ch2} = {from_list([1,2]), from_list([3,4,5])},
?assertEqual(5, length(to_list(zip(Ch1, Ch2)))).
-endif.
%%
%% todo:
%% [done] 1. end of stream mark / close channel functionality
%% 2. timeouts processing for each message
%% [done] 3. take / drop functions
%% 4. consume-all function / flush messages functionality
%% 5. fan-in and fan-out example
%% 6. fork functionality
%% [done] 7. map / filter transducers
%% 8. complex example with google search functionality
%% 9. use next instead of read
%%
boring(Name) ->
Ch = make(),
spawn(fun() ->
lists:map(fun(I) ->
timer:sleep(100),
enqueue(Ch, {Name, I})
end, lists:seq(1,5)),
close(Ch)
end),
Ch.
main() ->
Joe = boring("Joe"),
doall(Joe, fun({Name, I}) ->
io:format("~p: ~p~n", [Name, I])
end).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.