Skip to content

Instantly share code, notes, and snippets.

@slfritchie
Created October 30, 2017 23:40
Show Gist options
  • Save slfritchie/4b5120e7a83b961ab3dc920fb53c275d to your computer and use it in GitHub Desktop.
Save slfritchie/4b5120e7a83b961ab3dc920fb53c275d to your computer and use it in GitHub Desktop.
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(foo).
-ifdef(TEST).
-ifdef(PROPER).
-compile(export_all).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(QC_FMT(Fmt, Args),
io:format(user, Fmt, Args)).
-define(QC_OUT(P),
proper:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)).
-spec test() -> any().
-spec basic_prop_test_() -> tuple().
gen_workflow() ->
%% 10 of 11 times, use gen_seq().
%% 1 of 11 times, use gen_farm()
list(frequency([{10, gen_seq()},
{ 1, gen_farm()}])).
gen_seq() ->
%% 9 of 10 times, use gen_seq_prepend()
%% 1 of 10 times, use gen_seq_duplicate()
frequency([{9, gen_seq_prepend()},
{1, gen_seq_duplicate()}]).
gen_seq_prepend() ->
%% tuple format:
%% {'bp_seq', #-in-flight-credits, computation fun, initial state data}
%% 'bp_seq' is the tag for Basho Pipe sequence stage.
%% In flight credits is roughly the amount of buffer permitted between
%% stages of the pipeline before backpressure will start slowing
%% things down.
%%
%% In this test, prepend() is prepending a single thingie to each
%% input item, which are always lists. The end of the test will
%% calculate the list lengths at the sink. Thus what exactly we
%% add to the list doesn't matter, so we'll add the atom 'x'.
%% Instead, we could be prepending/appending stuff to a string,
%% but it's easy & lazy to work with lists instead.
{bp_seq, 2, fun prepend/3, x}.
gen_seq_duplicate() ->
%% The duplicate() function uses it state to determine how many
%% times it should emit the incoming item. We hardcode the # of
%% duplicates to 3. This is a magic constant that appears elsewhere,
%% i.e., in the math:pow/2 calls.
{bp_seq, 2, fun duplicate/3, 3}.
%% We can easily create a workflow with too many processes for a
%% default Erlang VM when using a simple 'rebar eunit' command. Not
%% too many workers, and the two constants for 'div' operations below are
gen_n_workers() ->
choose(1, 2).
%% The use of ?SIZED() and the integer argument to gen_farm_workflow()
%% are hacks to control QuickCheck to avoid creating "big" farms. If
%% we don't use this control, we can end up with nested
%% farms-of-farms-of farms-of-farms so deep that we exponentially
%% create so many Erlang processes that we can crash a default VM
%% instance. Otherwise, ignore the complexity. ^_^
gen_farm() ->
?SIZED(Size, gen_farm_workflow(Size div 4)).
gen_farm_workflow(0) ->
%% tuple format:
%% {'bp_farm', #-in-flight-credits, computation fun, #-of-worker-processes}
%% 'bp_farm' is the tag for Basho Pipe sequence stage.
{bp_farm, 2, gen_workflow(), gen_n_workers()};
gen_farm_workflow(X) ->
gen_farm_workflow(X div 11).
gen_inputs() ->
%% The intermediate choices of X & X2 are hacks to get list
%% lengths that are longer than QuickCheck's typical max of its
%% built-in list generator, which is approximately 40. We send
%% empty lists down the pipe.
?LET(X , nat(),
?LET(X2, oneof([X, X*X]),
lists:duplicate(X2, []))).
basic_prop() ->
?FORALL({WorkFlow0, Inputs}, {gen_workflow(), gen_inputs()},
?IMPLIES(begin
Flat = flatten(WorkFlow0),
NumDuplicateOps = length([x || duplicate <- Flat]),
ExpectOutputs = length(Inputs) * math:pow(3, NumDuplicateOps),
%% Really, let's not spend lots of time on a single test case.
%% This is a kludge but a very helpful one when you discover
%% occasionally spending 4 minutes executing a single test
%% case....
ExpectOutputs < 2*1000
end,
begin
MyRef = make_ref(),
Sink = {bp_sink, 2, fun sink/3, {self(),MyRef}},
WorkFlow = WorkFlow0 ++ [Sink],
Flat = flatten(WorkFlow0),
NumPrependOps = length([x || prepend <- Flat]),
NumDuplicateOps = length([x || duplicate <- Flat]),
NumFarmOps = length([x || farm <- Flat]),
NumInputs = length(Inputs),
{Source, Workers} = skel:bp_do(WorkFlow, Inputs),
Res = receive
{sink_final_result, MyRef, Val} ->
Val
end,
DeadFun = fun(Pid) -> not is_process_alive(Pid) end,
%% All items (i.e. lists) received by the sink should
%% be *identical*.
SinkGotZeroOrOne = case lists:usort(Res) of
[] -> true;
[_] -> true;
_ -> false
end,
ExpectOutputs = NumInputs * math:pow(3, NumDuplicateOps),
%% I honestly don't recall when the magic 7 constant here is
%% for examining that 1st list in the sink's output....
ListLenGood = if Res == [] ->
true;
length(hd(Res)) > 7, length(NumDuplicateOps) > 0 ->
false;
true ->
length(hd(Res)) == NumPrependOps
end,
%% Apparently I had problems where Erlang process
%% death wasn't happening quickly enough from the last
%% test, which iterfered with this test (probably by
%% hitting the VM's max process limit). So, I baked
%% into the test's correctness that all the damn
%% workers had better be dead.
timer:sleep(1), % give death a chance
AllDead = lists:all(DeadFun, [Source|Workers]),
?WHENFAIL(
?QC_FMT("Flat ~w\n# prepends ~w\n# dups ~w\n# inputs ~w\n# outputs ~w\n", [Flat, NumPrependOps, NumDuplicateOps, NumInputs, length(Res)]),
%% It's extremely valuable to review basic properties of
%% the test data: does it have the expected # of operations,
%% mix of operations, etc?
%% Example of the successful output (which is when
%% you'll see output from measure():
%%
%% num_inputs: Count: 97 Min: 0 Max: 900 Avg: 67.165 StdDev: 147.85 Total: 6515
%% num_outputs: Count: 97 Min: 0 Max: 7803 Avg: 350.258 StdDev: 1180.08 Total: 33975
%% num_prepend: Count: 97 Min: 0 Max: 38 Avg: 3.928 StdDev: 5.810 Total: 381
%% num_duplicate: Count: 97 Min: 0 Max: 38 Avg: 3.928 StdDev: 5.810 Total: 381
%% num_farm: Count: 97 Min: 0 Max: 7 Avg: 0.546 StdDev: 1.22 Total: 53
measure(num_inputs, length(Inputs),
measure(num_outputs, length(Res),
measure(num_prepend, NumPrependOps,
measure(num_duplicate, NumPrependOps,
measure(num_farm, NumFarmOps,
conjunction([
{uniques, SinkGotZeroOrOne},
{individual_list_length, ListLenGood},
{expect_outputs, length(Res) == ExpectOutputs},
{all_dead, AllDead}
])))))))
end)).
%% Basho Pipe can call a computation state (a "pipe fitting") in 4
%% difference cases:
%% 1. bp_init: pipe initialization time
%% 2. bp_eoi: pipe end-of-input, no more data will be received by this fitting
%% 3. bp_work: operate on the data item received by this fitting
%% 4. bp_continue: resume computation on prior bp_work item.
%% The sink fitting accumulates input items until 'bp_eoi', when it
%% will send that accumulator to its parent pid.
sink(bp_init, {_,_}=MyParent, _Ignore) ->
{ok, {[], MyParent}};
sink(bp_eoi, _, {Acc, {MyParentPid,Ref}}=State) ->
MyParentPid ! {sink_final_result, Ref, Acc},
{ok, State};
sink(bp_work, Data, {Acc, MyParent}) ->
{ok, {[Data|Acc], MyParent}}.
%% Each input item is a list: we prepend a single thing to the list.
prepend(bp_init, Data, _Ignore) ->
{ok, Data};
prepend(bp_eoi, _, S) ->
{ok, S};
prepend(bp_work, List, S) ->
Res = [S|List],
{done, [Res], S}.
%% Basho Pipe has the option to emit multiple things in a single
%% bp_work call, but I elected to use the bp_continue continuation
%% scheme to always emit just a single item at a time. {shrug}
duplicate(bp_init, Max, _Ignore) ->
{ok, Max};
duplicate(bp_eoi, _, Max) ->
{ok, Max};
duplicate(bp_work, X, Max) ->
Res = X,
{continue, [Res], {2,X}, Max};
duplicate(bp_continue, {NextInt, X}, Max) ->
Emit = X,
if NextInt == Max ->
{done, [Emit], Max};
true ->
{continue, [Emit], {NextInt+1, X}, Max}
end.
basic_prop_test_() ->
Time = case os:getenv("EQC_TIME") of
false -> 2;
T -> list_to_integer(T)
end,
{timeout, Time+120,
fun() -> true = proper:quickcheck(?QC_OUT(basic_prop()))
end}.
%%%%%
%% We take a workflow and flatten it to a single simple description.
%% If we encounter a farm, then we'll step aside to flatten out its
%% description (recursively) before resuming processing.
%%
%% Example output: [farm,prepend,farm,prepend,prepend,prepend,duplicate,prepend]
flatten(WorkFlow) when is_list(WorkFlow) ->
lists:flatten([flatten(X) || X <- WorkFlow]);
flatten({bp_seq, _, _, _}=Tuple) ->
%% !@#$! can't use fun func/arity in a guard.....
Prepend = gen_seq_prepend(),
Duplicate = gen_seq_duplicate(),
if Tuple == Prepend ->
prepend;
Tuple == Duplicate ->
duplicate
end;
flatten({bp_farm,_,WorkFlow,_}) ->
[farm|flatten(WorkFlow)].
-endif. % PROPER
-endif. % TEST
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment