jlouis' Ramblings

Musings on tech, software, and other things

Testing a Parallel map implementation

Erlang programs, often have a a list comprehension which implements the ubiquitous serial map of functional programming:

map(F, Xs) ->
  [F(X) || X <- Xs]

If, however, the function given as F blocks, then the map function blocks as well. This is not a desirable situation if the elements in Xs are truly independent. We could run all of the comprehension in parallel and then collect the data afterwards. This is the purpose of the pmap function:

pmap(F, Es) ->
     Parent = self(),
     Running = [
           spawn_monitor(fun() -> Parent ! {self(), F(E)} end)
         || E <- Es],
     collect(Running, 5000).

The idea here is we spawn each work unit as a function, and attach a monitor to the spawned function. Once the computation is done, we send the result back to the invoker of pmap/2 together with the pid() of the spawned function. This monitor/pid acts like a future[0] which we can use to later collect the running processes:

collect([], _Timeout) -> [];
  collect([{Pid, MRef} | Next], Timeout) ->
    receive
      {Pid, Res} ->
        erlang:demonitor(MRef, [flush]),
        [{ok, Res} | collect(Next, Timeout)];
      {DOWN, MRef, process, Pid, Reason} ->
        [{error, Reason} | collect(Next, Timeout)]
    after Timeout ->
      exit(pmap_timeout)
    end.

There are two cases of collection. Either there are no processes spawned, in which case we can return the empty list. Or we have a pair of a PID and a MonitorRef. In this case, we do a selective recieve from the mailbox in which we expect a result containing the PID. This ought to force the order in which we receive answers to be the same as the order in which the worker processes were invoked.

Once we have the result, we remove the monitor and set the ‘flush’ option to make sure it gets wiped from our mailbox as well should it have made its way in there. Then we recursively collect more results.

Two error paths exist, which we must handle. If the call F(X) in the process fails to return, then we must handle this error. Since we have a monitor on the process, this will result in an asynchronous ‘DOWN’ message sent into our mailbox. So we grab that and return it as an error.

Finally, if we are blocked for more than Timeout milli-seconds, we regard the computation as a whole as an error and crash ourselves. Your mileage may vary here, but for my purpose this is an adequate assumption to make.


Writing the function is only half the battle. How do we test it? There are three major properties of the parallel map function which makes it fairly easy to write a test for:

  • It should behave the same as the serial version of a map, as written above.
  • The fact that we are concerned about blocking behavior is not important in the test.
  • It has the signature of a purely functional function. Even though its implementation internals are not, any user of the function can regard it as being a purely functional implementation. Had this been OCaml or Standard ML, the module abstraction would be able to hide the fact we are using parallel invocation behind the scenes.

However, there is also a property which makes it fairly hard to write a test for: if running a test case, it is very likely we will get the same schedule every time we run. So unless we block in the F function, we can’t test alternative schedules.

Blocking in a test case makes it slow. So now we have the option of not testing alternative schedules, or slowing down the tests it seems. There is a way around this in Erlang which is to start the VM with the +T flag. This flag will randomize the scheduler and if we then run many tests back to back, we will eventually get alternative schedules.

In this exposition, we will explore another path: Quickcheck and PULSE[1]. We note that we can write a fully stateless test for the parallel map function, and then we can use PULSE to randomize the scheduler of Erlang. An added benefit of this approach is that PULSE automatically compresses time: if every process under its control are sleeping, it will “forward” time to the next point at which something interesting happens. We start out by writing a testing module:

-module(dht_par_eqc).
  -compile(export_all).
  -include_lib(eqc/include/eqc.hrl).
  -include_lib(pulse/include/pulse.hrl).

And then we write down a crasher function:

crasher() ->
     ?LET(F, function1(int()),
       fun
         (-1) -> timer:sleep(6000); %% Fail by timing out
         (0) -> exit(err); %% Fail by crashing
         (X) -> F(X) %% Run normally
       end).

The idea of this function is that it generates a random function F and then alters it slightly: Giving it -1 will have it sleep for 6000ms. Giving it 0 will have it crash. And every other value it will behave as F.

We can also write down a function which explains the expected result of running the parallel map function:

expected_result(F, Xs) ->
      case [X || X <- Xs, X == -1] of
          [] ->
            [case X of
               0 -> {error, err};
               N -> {ok, F(N)} end
             || X <- Xs];
          [_|_] ->
              {'EXIT', pmap_timeout}
      end.

We look for a value of -1. If this value is present, we expect the result to be a pmap_timeout. Otherwise, we run through the Xs and analyze them one by one through a simple list comprehension, reflecting the responses from the crasher function.

The function here calculates the result as if it had been run serially. This ensures we verify that our parallel version has the same behaviour and interface as the serial version, with the small change that the function F is run in a crash-resistent manner and errors are caught and transformed into terms.

Now, we can write down the main property. The idea is to generate a list of inputs in which error cases are present, but fairly rare. Once generated, we use the ?PULSE macro to run the parallel map under control of PULSE. Once we have a Result, we can verify that the result matches the expected output:

prop_pmap() ->
      ?FORALL([F, Xs],
              [crasher(),
               list( frequency([
                       {10,0},
                       {1, -1},
                       {1000, nat()} ]) ) ],
        ?PULSE(Result, (catch dht_par:pmap(F, Xs)),
          begin
              Expected = expected_result(F, Xs),
              equals(Result, Expected)
          end)).

We can use this to run numeruos test cases. Here we run the checker for 2 minutes:

25> eqc:module({testing_time, 120}, dht_par_eqc).
prop_pmap: ................................................
.........................................................(x10)
...........................................................
.........................................(x100)............
...........................................................
.............................(x1000).......................
...........................................(x100).......
Time limit reached: 120.0 seconds.

OK, passed 77800 tests
[]

In two minutes, we have generated 77800 test cases. Far more than we would have been able to with blocking calls in there. The power of PULSE shows itself since it realizes that the processes under its control can’t continue without time passing and then it automatically forwards time to the next event point.

[0] Note the notions of future and promise are not set entirely in stone. Here, a future is used in the meaning of a read-only value.

[1] PULSE is a scheduler randomizer. It will explore random schedules rather than use the standard Erlang schedule. It transforms Erlang code such that it runs under a manager process that control in which order processes are going to run. This allows you to weed out parallel bugs in the code base which are due to races in concurrent invocation.