Skip to main content
  1. Readings/
  2. Books/
  3. Real World OCaml: Functional Programming for the Masses/

Chapter 16B: Parallel & Concurrent Programming in OCaml 5

·· 7303 words· 30–49 min read

OCaml 5 removed the Global Runtime Lock and introduced Domains as the unit of true parallelism. These notes cover Domain-based programming, Domainslib’s work-stealing task pools, the relaxed memory model and DRF-SC guarantee, atomic variables for non-blocking communication, and blocking synchronisation with Mutex and Condition — a practical toolkit for parallel OCaml. We then go through Effects and Threads to complete the Concurrent programming picture.

Figure 1: Parallel & Concurrent

Parallel Programming (via Domains) #

This section goes through the parallelism docs on the OCaml manual, and overlaps a little with the parallelism paper, for which notes can be found here.

Domains #

Domains can be spawned and an associated lambda can be executed: Domain.spawn (fun _ -> print_endline "I ran in parallel"). There’s a limit by OCaml of 128 domains that can be active at the same time.

Spawned Domains can be joined to retrieve their results — join waits for target domain to terminate, this is a blocking operation.

Gotcha: join is blocking, but spawn is already parallel #

Careful not to misread this. The spawn will already make the work happen. So even if join is blocking, the work on the other domain wouldn’t be blocked (only the control flow would be blocked until the domain terminates). That’s why the double parallel call takes roughly the same time as a single call.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
(* fib_twice.ml *)
let n = int_of_string Sys.argv.(1)

let rec fib n = if n < 2 then n else fib (n - 1) + fib (n - 2)

let main () =
  let d1 = Domain.spawn (fun _ -> fib n) in
  let d2 = Domain.spawn (fun _ -> fib n) in
  let r1 = Domain.join d1 in
  Printf.printf "fib(%d) = %d\n%!" n r1;
  let r2 = Domain.join d2 in
  Printf.printf "fib(%d) = %d\n%!" n r2

let _ = main ()
Code Snippet 1: fib_twice.ml for input = 42 takes roughly the same time as a single call to fib 42
$ ocamlopt -o fib_twice.exe fib_twice.ml
$ ./fib_twice.exe 42
fib(42) = 267914296
fib(42) = 267914296
$ hyperfine './fib_twice.exe 42'
Benchmark 1: ./fib_twice.exe 42
  Time (mean ± sd):     1.249 s ±  0.025 s    [User: 2.451 s, System: 0.012 s]
  Range (min … max):    1.221 s …  1.290 s    10 runs

Domainslib: for nested-parallel programming #

The idea is that the core compiler distribution only offers the low-lying primitives for concurrent and parallel programming. This is so that the high-level libraries can be distributed outside the core compiler distribution.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
(* fib_par2.ml *)
let num_domains = int_of_string Sys.argv.(1)
let n = int_of_string Sys.argv.(2)

let rec fib n = if n < 2 then n else fib (n - 1) + fib (n - 2)

module T = Domainslib.Task

let rec fib_par pool n =
  if n > 20 then begin
    let a = T.async pool (fun _ -> fib_par pool (n-1)) in
    let b = T.async pool (fun _ -> fib_par pool (n-2)) in
    T.await pool a + T.await pool b
  end else fib n

let main () =
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
  let res = T.run pool (fun _ -> fib_par pool n) in
  T.teardown_pool pool;
  Printf.printf "fib(%d) = %d\n" n res

let _ = main ()
Code Snippet 2: parallel implementation of fib, using Domainslib
perf analysis
$ ocamlfind ocamlopt -package domainslib -linkpkg -o fib_par2.exe fib_par2.ml
$ ./fib_par2.exe 1 42
fib(42) = 267914296
$ hyperfine './fib.exe 42' './fib_par2.exe 2 42' \
            './fib_par2.exe 4 42' './fib_par2.exe 8 42'
Benchmark 1: ./fib.exe 42
  Time (mean ± sd):     1.217 s ±  0.018 s    [User: 1.203 s, System: 0.004 s]
  Range (min … max):    1.202 s …  1.261 s    10 runs

Benchmark 2: ./fib_par2.exe 2 42
  Time (mean ± sd):    628.2 ms ±   2.9 ms    [User: 1243.1 ms, System: 4.9 ms]
  Range (min … max):   625.7 ms … 634.5 ms    10 runs

Benchmark 3: ./fib_par2.exe 4 42
  Time (mean ± sd):    337.6 ms ±  23.4 ms    [User: 1321.8 ms, System: 8.4 ms]
  Range (min … max):   318.5 ms … 377.6 ms    10 runs

Benchmark 4: ./fib_par2.exe 8 42
  Time (mean ± sd):    250.0 ms ±   9.4 ms    [User: 1877.1 ms, System: 12.6 ms]
  Range (min … max):   242.5 ms … 277.3 ms    11 runs

Summary
  './fib_par2.exe 8 42' ran
    1.35 ± 0.11 times faster than './fib_par2.exe 4 42'
    2.51 ± 0.10 times faster than './fib_par2.exe 2 42'
    4.87 ± 0.20 times faster than './fib.exe 42'

Notes:

  1. a good idea to keep things serial for small input sizes
  2. the task pool lifecycle management is ours to be responsible for:
    1. setup_pool: config
    2. run: kickstart
    3. teardown: fini

Parallel Iteration Constructs #

It’s an ergonomic loop-construct:

comparison between serial and parallel using the spectral_norm benchmark
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
(* spectralnorm.ml *)
let n = try int_of_string Sys.argv.(1) with _ -> 32

let eval_A i j = 1. /. float((i+j)*(i+j+1)/2+i+1)

let eval_A_times_u u v =
  let n = Array.length v - 1 in
  for i = 0 to  n do
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done;
    v.(i) <- !vi
  done

let eval_At_times_u u v =
  let n = Array.length v - 1 in
  for i = 0 to n do
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A j i *. u.(j) done;
    v.(i) <- !vi
  done

let eval_AtA_times_u u v =
  let w = Array.make (Array.length u) 0.0 in
  eval_A_times_u u w; eval_At_times_u w v

let () =
  let u = Array.make n 1.0  and  v = Array.make n 0.0 in
  for _i = 0 to 9 do
    eval_AtA_times_u u v; eval_AtA_times_u v u
  done;

  let vv = ref 0.0  and  vBv = ref 0.0 in
  for i=0 to n-1 do
    vv := !vv +. v.(i) *. v.(i);
    vBv := !vBv +. u.(i) *. v.(i)
  done;
  Printf.printf "%0.9f\n" (sqrt(!vBv /. !vv))
Code Snippet 3: serial implementation

We observe the looping that happens at eval_A_times_u and eval_At_times_u and realise that the iterations of the outer loop are not dependent on each other and can be executed in parallel — Each iteration of the outer loop body reads from u but writes to disjoint memory locations in v.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
(* spectralnorm_par.ml *)
let num_domains = try int_of_string Sys.argv.(1) with _ -> 1
let n = try int_of_string Sys.argv.(2) with _ -> 32

let eval_A i j = 1. /. float((i+j)*(i+j+1)/2+i+1)

module T = Domainslib.Task

let eval_A_times_u pool u v =
  let n = Array.length v - 1 in
  T.parallel_for pool ~start:0 ~finish:n ~body:(fun i ->
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done;
    v.(i) <- !vi
  )

let eval_At_times_u pool u v =
  let n = Array.length v - 1 in
  T.parallel_for pool ~start:0 ~finish:n ~body:(fun i ->
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A j i *. u.(j) done;
    v.(i) <- !vi
  )

let eval_AtA_times_u pool u v =
  let w = Array.make (Array.length u) 0.0 in
  eval_A_times_u pool u w; eval_At_times_u pool w v

let () =
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
  let u = Array.make n 1.0  and  v = Array.make n 0.0 in
  T.run pool (fun _ ->
  for _i = 0 to 9 do
    eval_AtA_times_u pool u v; eval_AtA_times_u pool v u
  done);

  let vv = ref 0.0  and  vBv = ref 0.0 in
  for i=0 to n-1 do
    vv := !vv +. v.(i) *. v.(i);
    vBv := !vBv +. u.(i) *. v.(i)
  done;
  T.teardown_pool pool;
  Printf.printf "%0.9f\n" (sqrt(!vBv /. !vv))
Code Snippet 4: parallel implementation
-let n = Array.length v - 1 in
-  for i = 0 to  n do
-    let vi = ref 0. in
-    for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done;
-    v.(i) <- !vi
-  done
+let n = Array.length v - 1 in
+  T.parallel_for pool ~start:0 ~finish:n ~body:(fun i ->
+    let vi = ref 0. in
+    for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done;
+    v.(i) <- !vi
+  )
Code Snippet 5: The change to using parallel iterators is an isomorphic one

Memory-model: The Easy Bits #

This section summarises the contents from this section of the OCaml manual.

Typical modern processors and compilers do aggressive optimisation of code, that’s why serial vs parallel code gives surprising behaviour differences in the parallel version.

OCaml also makes such optimisations but uses a relaxed memory model so that parallel code becomes more predictable. It describes what values an OCaml program is allowed to witness when reading a memory location.

Recipes to Follow #

So we get recipes to follow that retain the simplicity of sequential processing.

  1. immutable values:

    can be freely shared between multiple domains, can access in parallel

  2. mutable data structures (e.g. ref cells arrays, mutable record fields)

    the programmer needs to ensure that data-races are avoidedwhere a data-race is for concurent access of a non-atomic memory location without synchronisation AND at least one of the accesses is a write operation.

    So it matters if the datastructure is atomic or non-atomic:

    1. non-atomic datastructures (e.g. ref-cells, arrays, mutable record fields)

    2. atomic datastructures — no need sync mechanisms

  3. Approaches to introduce synchronisation:

    1. using atomic variables
    2. using explict sycn primitives mutexes, semaphores, conditional
  4. The DRF-SC (Data-Race-Free programs with ) guarantee: when observed program behaviour can be explained by the interleaving of operations from different domains

    In OCaml, DRF-SC guarantee is modular: if a part of a program is data-race free then the OCaml memory model ensures that those parts have sequential consistency despite other parts of the program having data-races.

Memory-model: The Hard Bits #

The rest of this section relates to the “Memory model: The Hard Bits”, the technical paper that is detailed about this can be see here.

Sequential Consistency: simple model to help us reason #

Sequential consistency is a consistency model that helps us easily reason about the behaviour of our programs.

values observed by the program can be explained through some interleaving of the operations from different domains in the program — we just need to know how the interleaving happens

elaboration on cases where sequential consistency model should make reasoning easier
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
let d1 a b =
  let r1 = !a * 2 in
  let r2 = !b in
  let r3 = !a * 2 in
  (r1, r2, r3)

let d2 b = b := 0

let main () =
  let a = ref 1 in
  let b = ref 1 in
  let h = Domain.spawn (fun _ ->
    let r1, r2, r3 = d1 a b in
    Printf.printf "r1 = %d, r2 = %d, r3 = %d\n" r1 r2 r3)
  in
  d2 b;
  Domain.join h
Code Snippet 6: simple example, where two functions write to the same mutable refs

here we may have some observed behaviour, e.g. r1 = 2, r2 = 0, r3 = 2 if the write to b in d2 happened before the read of b in d1

We complicate this further, let a and b be the same ref

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
let d1 a b =
  let r1 = !a * 2 in
  let r2 = !b in
  let r3 = !a * 2 in
  (r1, r2, r3)

let d2 b = b := 0

let main () =
  let ab = ref 1 in
  let h = Domain.spawn (fun _ ->
    let r1, r2, r3 = d1 ab ab in
    assert (not (r1 = 2 && r2 = 0 && r3 = 2)))
  in
  d2 ab;
  Domain.join h
Code Snippet 7: all modifications done to the same ref cell

We might expect the assert statement to never fail since if r2 is 0 then the write in d2 occurred before the read of b in d1. Since a and b are aliases, the second read of a in d1 should also return 0, right?

  • Optimisation Mechanisms as a Source of Sequential Consistency Assertion Failure

    Why would the assertion fail? \(\implies\) other mechanisms at play

    1. compiler optimisations as a source of reordering

      compiler optimisations may affect the program logic (e.g. via sub-expression elimination (CSE)). The use of optimisations is fair and valid and necesary for good performance. They don’t change the sequential meaning of the program but have effects within a parallel runtime.

      CSE breaks sequential reasoning. \(\implies\) observed behaviour may not be able to be explained by interleaving of operations from different domains in the source program.

      example of how compiler might optimise using CSE
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      
         let d1 a b =
           let r1 = !a * 2 in
           let r2 = !b in
           let r3 = r1 in (* CSE: !a * 2 ==> r1 *)
           (r1, r2, r3)
      
         let d2 b = b := 0
      
         let main () =
           let ab = ref 1 in
           let h = Domain.spawn (fun _ ->
             let r1, r2, r3 = d1 ab ab in
             assert (not (r1 = 2 && r2 = 0 && r3 = 2)))
           in
           d2 ab;
           Domain.join h
      Code Snippet 8: Here, r3 's original !a * 2 might be replaced with a reference-read by the compiler
    2. hardware optimisations as a source of reordering

      Complex cache hierarchies with multiple levels of cache needs cache coherence: ensuring that reads and writes to a single memory location respect sequential consistency.

      Problem: weaker guarantees on programs that operate on different memory locations. The changes within a core may not have been published / replicated to the other cores yet.

      example of lack of cache coherence
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      
         let a = ref 0
         and b = ref 0
      
         let d1 () =
           a := 1;
           !b
      
         let d2 () =
           b := 1;
           !a
      
         let main () =
           let h = Domain.spawn d2 in
           let r1 = d1 () in
           let r2 = Domain.join h in
           assert (not (r1 = 0 && r2 = 0))
      Code Snippet 9: it's possible for the assertion here to fail

      The assertion may fail if the writes performed at a CPU core are not immediately published to all of the other cores. Since a and b are different memory locations, the reads of a and b may both witness the initial values and that’s why there might be an assertion failure.

      Why this happens: This behaviour can be explained if a load is allowed to be reordered before a preceding store to a different memory location.

      This reordering can happen due to the presence of in-core store-buffers on modern processors.

      Each core effectively has a FIFO buffer of pending writes to avoid the need to block while a write completes. The writes to a and b may be in the store-buffers of cores c1 and c2 running the domains d1 and d2, respectively. The reads of b and a running on the cores c1 and c2, respectively, will not see the writes if the writes have not propagated from the buffers to the main memory.

Data race freedom implies sequential consistency #

OCaml’s relaxed memory model so that we can precisely describe which orders are preserved by teh OCaml program — compiler, hardware free to optimise the program as long as they respect the ordering guarantees of the memory model.

The memory model describes the conditions under which a program will only exhibit sequentially consistent behaviours — that’s what Data Race Freedom implies Sequential Consistency (DRF-SC) guarantee means.

precursor understanding to help us understand what DRF-SC guarantee means
  1. memory locations

    • classified as atomic memory locations and non-atomic memory locations by OCaml
    • non-atomic: ref cells, array fields, mutable record fields, immutable objects are also non-atomic locations with an initialising write but no further updates
    • atomic: locations created using Atomic module.
  2. Happens-Before Relation

    action typemeaningexamples
    inter-domain actionscan be observed, influenced by actions on other domainsR/W of atomic AND non-atomic locations, Spawn and Join of Domains, operations on mutexes
    intra-domain actionscan neither be observed nor influence the execution of other domainsevaluating an arithmetic expression, calling a function, etc

    so a totally ordered list of actions that is executed by the abstract machine is the execution trace — due to non-determinism there may be several possible execution traces for a given program.

    Irreflexive, Transitive, happens-before relation: for a given execution trace, the thing that captures the causality between actions in OCaml program.

    It’s the smallest transitive relation that satisfies the situations: x precedes y if:

    1. if action x precedes another action y in the program order, then x precedes y in the happens-before order.

      what is program order? it’s the order in which a domain executes its actions

    2. if x is a write to atomic location and y is a subsequent R/W to that location in the execution trace, then x precedes y in happens-before order

      for atomic locations, the functions are considered both a read and a write: compare_and_set , fetch_and_add, exchange, incr, decr

    3. x is Domain.spawn f and y is the first action in the newly spawned domain executing f, then x precedes y in happens-before order

    4. x is the last action in a domain, d, and y is Domain.join d , then x preceds y in happens-before order.

    5. x is an unlock operation on a mutex, and y is any subsequent operation on the mutex in the execution trace, then x precedes y in happens-before order.

  3. Data-race: a tight definition

    • two actions conflict if in a trace, they access the same non-atomic location where at least one is a write and neither of them is an initialising write to that location.

    • there’s a data race if there exits some execution trace of the program with 2 conflicting actions and there isn’t a happens-before relationship between the conflicting accesses.

    • if a program has no data race \(\implies\) it’s correctly synchronised

  4. DRF-SC

    • strong guarantee: program without data races will only exhibit sequentially consistent behaviours

    • with this guarantee, we can use sequential reasoning: reasoning by executing one inter-domain actions after the other, to identify whether the program has a data-race.

    • this means, we don’t need to reason about re-orderings – in order to determine if a program has a data-race.

Reasoning with DRF-SC #

For the code examples in this section, assume that they are called in parallel by main function.

here’s how the main function would look like

It’s just spawning the two domains, having handles to them and then joining them.

1
2
3
4
5
6
let main () =
  let h1 = Domain.spawn d1 in
  let h2 = Domain.spawn d2 in
  ...
  ignore @@ Domain.join h1;
  ignore @@ Domain.join h2
simple example of a data-race
1
2
3
4
(* Has data race *)
let r = ref 0
let d1 () = r := 1
let d2 () = !r

So this r is a non-atomic ref so the 2 domains race to access the ref and d1 is a write. Because there’s no happens-before relationship, this is a data race. They may exhibit non-sequentially-consistent behaviours.

This is a clear-cut way of identifying data-races!

not data race: accessing disjoint array indices and fields of a record in parallel
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
(* --- disjoint array indices access: No data race *)
let a = [| 0; 1 |]
let d1 () = a.(0) <- 42
let d2 () = a.(1) <- 42


(* --- mutable record-fields access: No data race *)
type t = {
  mutable a : int;
  mutable b : int
}
let r = {a = 0; b = 1}
let d1 () = r.a <- 42
let d2 () = r.b <- 42
races on atomic locations don’t lead to a data-race
1
2
3
4
(*--- all are atomic accesses: No data race *)
let r = Atomic.make 0
let d1 () = Atomic.set r 1
let d2 () = Atomic.get r
  • use-case: message passing

    We can use atomic variables to implement non-blocking communication between domains.

    example of atomic variables used for inter-domain comms
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    (* No data race *)
    let msg = ref 0
    let flag = Atomic.make false
    let d1 () =
      msg := 42; (* --- action a *)
      Atomic.set flag true (* --- action b *)
    let d2 () =
      if Atomic.get flag (* --- action c *) then
        !msg (* --- action d *)
      else 0
    Code Snippet 10: Here, msg is non-atomic but the actions don't lead to a data-race

    Here, we see that actions a and d write and read from the same non-atomic location, msg and therefore they are conflicting. If a and d have a happens-before relationship, then we can show that this program does NOT have a data-race:

    1. a preceeds b in program-order, hence a happens-before b
    2. c happens-before d
    3. CASE A: if d2 observes the atomic variable, flag to be true then b precedes c in happens-before order
      1. happens-before is a transitive relationship, so the conflicting actions a and d are in happens-before order.
    4. CASE B: If d2 observes the flag to be false then the read of msg is NOT done, so there’s no conflicting access in this execution trace \(\implies\) program does not have a data trace

    The next example, does have a data-race:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    (* Has data race *)
    let msg = ref 0
    let flag = Atomic.make false
    let d1 () =
      msg := 42; (* --- action a *)
      Atomic.set flag true (* --- action b *)
    let d2 () =
      ignore (Atomic.get flag); (*---action c *)
      !msg (* --- action d *)
    Code Snippet 11: version of the message passing program that has a dataa-race

    Now, there’s a possible trace where a and d are conflicting operations AND there’s no happens-before relationship between them so there’s a data-race:

    Atomic.get flag; (* c *)
    !msg; (* d *)
    msg := 42; (* a *)
    Atomic.set flag true (* b *)
    Code Snippet 1: conflicting trace without happens-before relationship \(\implies\) data-race
    extension: honing the intuition that made me think that this is a mutex-like behaviour

    The atomic flag does play an ordering role analogous to what a mutex does — but calling it “a sort of mutex” conflates two different things that mutexes provide, only one of which the flag gives you.

    A mutex provides two distinct guarantees:

    1. Mutual exclusion — only one thread inside the critical section at a time
    2. Happens-before edges — the release of a mutex happens-before the subsequent acquire of it by another thread

    The atomic flag here provides only the second one. Both domains run fully in parallel — there’s no exclusion. What the flag establishes is a conditional synchronisation point: if d2 observes flag = true, then the SC atomic operations guarantee that b (the write of flag) happens-before c (the read of flag), which combined with program order gives us a → b → c → d.

    The flag doesn’t protect msg the way a mutex would. It creates a one-way, conditional ordering guarantee. The reason it works is specifically because SCatomics establish a total order over all atomic operations globally — any read that observes a particular write is ordered after that write in the happens-before relation.

    [ The More Precise Analogy ]

    What this pattern actually is in memory model literature is a release-acquire handshake (even though OCaml’s Atomic gives the stronger SC, which subsumes it):

    • d1 does its work then releases by writing the flag
    • d2 acquires by reading the flag and only proceeds if it observes the release

    This is exactly how std::atomic with memory_order_release / memory_order_acquire works in C++, and it’s the canonical message passing pattern. A mutex generalises this to be symmetric and reusable — but under the hood, a mutex’s unlock / lock pair is implemented using exactly this kind of release-acquire on an atomic variable.

    So my intuition has the causality slightly backwards: it’s not that the flag acts like a mutex — it’s that mutexes are built from this kind of atomic flag pattern.

    [ Why The Second Example Breaks It ]

    The broken version makes this crisp. d2 ignores the value of the flag:

    ignore (Atomic.get flag);  (* c *)
    !msg                        (* d *)

    The SC total order still exists over the atomic operations — b and c are still totally ordered. But now regardless of which way they order, d always executes. If c happens before b in the SC order, then d runs concurrently with a — no happens-before between them, hence a race.

    The conditionality in the first example (if Atomic.get flag then) is load-bearing. It’s what makes Case B safe — the conflicting access d simply doesn’t happen if the ordering isn’t established. The flag by itself doesn’t prevent the race; the branch on the flag’s value is what eliminates the conflicting access in the unsafe trace.

Local Data Race Freedom (LDRF-SC guarantee) #

Even for programs with data-races, the OCaml memory model has strong guarantees for local data race freedom sequential consistency (LDRF-SC). Read the paper for more info.

DRF-SC says that the data race free parts of the program remain sequentially consistent. That is, even if the program has data races, those parts of the program that are disjoint from the parts with data races are amenable to sequential reasoning.

This is because the OCaml memory model is bounded both in space and time.:

  • bounded in space: data races in one location don’t affect unrelated locations
  • bounded in time: data races in the future don’t corrupt past behaviour
example
1
2
3
4
5
let snippet () =
  let c = ref 0 in
  c := 42;
  let a = !c in
  (a, c)
Code Snippet 12: actions on a newly allocated ref, c

Can read of c return a value that is NOT 42?

  • yes for C++ and Java memory models. If program has datarace, even in unrelated parts, then the semantics is undefined

  • so if this was FFI-linked to a library that had a data-race, then under the C++ memory model, the read may return any value \(\implies\) data-races on unrelated locations can affect program behaviour \(\implies\) C++ memory model is NOT unbounded in space.

  • Java memory model is bounded in space but is not bounded in time; data-races in the future will affect past behaviour:

    here’s an example:
    1
    2
    3
    4
    5
    6
    7
    8
    
      // Thread 1
      C c = new C();
      c.x = 42;
      a = c.x;
      g = c;
    
      // Thread 2
      g.x = 7;

    The read of c.x and the write of g in the first thread are done on separate memory locations.

    Hence, the Java memory model allows them to be reordered.

    As a result, the write in the second thread may occur before the read of c.x, and hence, c.x returns 7.

The OCaml version of the java example would be:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
let g = ref None

let snippet () =
  let c = ref 0 in
  c := 42;
  let a = !c in
  (a, c)

let d1 () =
  let (a,c) = snippet () in
  g := Some c;
  a

let d2 () =
  match !g with
  | None -> ()
  | Some c -> c := 7
Code Snippet 13: this snippet has sequentially consistent behaviour because OCaml memory model is bounded in both Space and Time.

so there’s a data-race on both g and c, within snippet, just see the first 3 instructions:

let c = ref 0 in
c := 42;
let a = !c in
...

The only memory location here is c. There is neither the data-race in space (the race on g) nor in time (the future race on c) therefore the snippet will have sequentially consistent behaviour, and teh value returned by !c will be 42

The OCaml memory model guarantees that even for programs with data races, memory safety is preserved. While programs with data races may observe non-sequentially consistent behaviours, they will not crash.

An Operational View of the Memory Model #

The purpose of this section in the manual is to give us a mental model intuition of how to reason with all this using an abstract state machine. The examples are good to follow here.

Trust me, the examples are great to follow. Skim them here.

Blocking Synchronisation #

Both Domains and System Threads (systhreads) can be synchronised using Mutex, Condition, Semaphore modules — they are blocking synchronisation primitives.

Writing a Concurrent Stack with Blocking Synchronisation #

Implementation of a concurrent stack; mutable contents stores stack elements, mutex controls access to contents, a nonempty Condition variable
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
module Blocking_stack : sig
  type 'a t
  val make : unit -> 'a t
  val push : 'a t -> 'a -> unit
  val pop  : 'a t -> 'a
end = struct
  type 'a t = {
    mutable contents: 'a list;
    mutex : Mutex.t;
    nonempty : Condition.t
  }

  let make () = {
    contents = [];
    mutex = Mutex.create ();
    nonempty = Condition.create ()
  }

  let push r v =
    Mutex.lock r.mutex;
    r.contents <- v::r.contents;
    Condition.signal r.nonempty;
    Mutex.unlock r.mutex

  let pop r =
    Mutex.lock r.mutex;
    let rec loop () =
      match r.contents with
      | [] ->
          Condition.wait r.nonempty r.mutex;
          loop ()
      | x::xs -> r.contents <- xs; x
    in
    let res = loop () in
    Mutex.unlock r.mutex;
    res
end
Code Snippet 14: mutable contents stores stack elements, mutex controls access to contents, a nonempty Condition variable

This is familiar, the critical sections are wrapped around with mutex locking and unlocking, the Condition variable nonempty, is signalled while the lock is held in order to wake up any domains waiting on this condition — only if there are waiting domains will one of them be woken up

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
(* ... *)
  let push r v =
    Mutex.lock r.mutex;
    r.contents <- v::r.contents;
    Condition.signal r.nonempty;
    Mutex.unlock r.mutex

  let pop r =
    Mutex.lock r.mutex;
    let rec loop () =
      match r.contents with
      | [] ->
          Condition.wait r.nonempty r.mutex;
          loop ()
      | x::xs -> r.contents <- xs; x
    in
    let res = loop () in
    Mutex.unlock r.mutex;
    res
(* ... *)
Code Snippet 15: partial implementation for the concurrent stack

So for the pop operation, we lock the mutex and if the stack is empty then the calling domain waits on teh condition variable nonempty using the wait primitive — so wait will atomically suspend the execution of the current domain and unlock the mutex — when the domain is woken up again (i.e. when the wait returns), then it holds the lock on mutex. The domain reads the contents of the stack again — if still non-empty then pop can be done so it updates the contents to the tail of the old list (xs) and returns the head (x).

Using a mutex here is sufficient sync between multiple domains using the stack \(\implies\) there’s no data races when multiple domains use the stack in parallel.

Interaction with systhreads #

Related notes on the pre-Multicore OCaml can be found in this section, “Working with System Threads” — useful to juxtapose the two.

Systhreads are pinned to the domain in which they are created – only one systhread can run OCaml code on a particular domain. Systhreads belonging to a particular domain may run C lib or system code in parallel (old school style). SO, systhreads belonging to different domains may execute in parallel.

When using systhreads, the thread created for executing the computation given to Domain.spawn is also treated as a systhread.

Here we use a shared ref cell protected by a mutex to communicate between different systehreads running on two different domains. The systhread identifiers uniquely identify systhreads in teh program. The id-counters are 0-indexed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
(* dom_thr.ml *)
let m = Mutex.create ()
let r = ref None (* protected by m *)

let task () =
  let my_thr_id = Thread.(id (self ())) in
  let my_dom_id :> int = Domain.self () in
  Mutex.lock m;
  begin match !r with
  | None ->
      Printf.printf "Thread %d running on domain %d saw initial write\n%!"
        my_thr_id my_dom_id
  | Some their_thr_id ->
      Printf.printf "Thread %d running on domain %d saw the write by thread %d\n%!"
        my_thr_id my_dom_id their_thr_id;
  end;
  r := Some my_thr_id;
  Mutex.unlock m

let task' () =
  let t = Thread.create task () in
  task ();
  Thread.join t

let main () =
  let d = Domain.spawn task' in
  task' ();
  Domain.join d

let _ = main ()
Code Snippet 16: Here, we create two domains in total, with two systhreads each (including the initial systhread for each of the domains)
$ ocamlopt -I +threads -I +unix unix.cmxa threads.cmxa -o dom_thr.exe dom_thr.ml
$ ./dom_thr.exe
Thread 1 running on domain 1 saw initial write
Thread 0 running on domain 0 saw the write by thread 1
Thread 2 running on domain 1 saw the write by thread 0
Thread 3 running on domain 0 saw the write by thread 2

Interactions with C bindings #

Prior to multicore OCaml, the FFI / C-binding assumption was that if OCaml runtime lock is not released (GRL), then it would be safe to manipulate global C state (e.g. initialise a function-local static value) — this assumption doesn’t hold anymore in the face of parallel execution with multiple domains.

In multicore OCaml, C code running on a domain may run in parallel with any C code running in other domains even if neither of them has released the “domain lock”.

Atomics #

Are all about non-blocking sync primitives.It’s not a mechanism for suspending and waking up domains, they’re compiled to atomic read-modify-write primitives that the hardware provides.

Atomic features exist because of hardware support for atomicity.

example: Atomic vs non-atomic Counters in Parallel #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
(* incr.ml *)
let twice_in_parallel f =
  let d1 = Domain.spawn f in
  let d2 = Domain.spawn f in
  Domain.join d1;
  Domain.join d2

let plain_ref n =
  let r = ref 0 in
  let f () = for _i=1 to n do incr r done in
  twice_in_parallel f;
  Printf.printf "Non-atomic ref count: %d\n" !r

let atomic_ref n =
  let r = Atomic.make 0 in
  let f () = for _i=1 to n do Atomic.incr r done in
  twice_in_parallel f;
  Printf.printf "Atomic ref count: %d\n" (Atomic.get r)

let main () =
  let n = try int_of_string Sys.argv.(1) with _ -> 1 in
  plain_ref n;
  atomic_ref n

let _ = main ()
Code Snippet 17: Non-atomic counter vs Atomic Counter used in Parallel
$ ocamlopt -o incr.exe incr.ml
$ ./incr.exe 1_000_000
Non-atomic ref count: 1187193
Atomic ref count: 2000000

example: Atomic variables used for Message-Passing #

Atomic variables can be used for low-level sync between domains – e.g. message passing between domains:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
let r = Atomic.make None

let sender () = Atomic.set r (Some "Hello")

let rec receiver () =
  match Atomic.get r with
  | None -> Domain.cpu_relax (); receiver ()
  | Some m -> print_endline m

let main () =
  let s = Domain.spawn sender in
  let d = Domain.spawn receiver in
  Domain.join s;
  Domain.join d

let _ = main ()

(*
Hello
val r : string option Atomic.t = <abstr>
val sender : unit -> unit = <fun>
val receiver : unit -> unit = <fun>
val main : unit -> unit = <fun>
*)

Lock-free stack #

Atomic module is for implementing non-blocking, lock-free data-structures.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
module Lockfree_stack : sig
  type 'a t
  val make : unit -> 'a t
  val push : 'a t -> 'a -> unit
  val pop  : 'a t -> 'a option
end = struct
  type 'a t = 'a list Atomic.t

  let make () = Atomic.make []

  let rec push r v =
    let s = Atomic.get r in
    if Atomic.compare_and_set r s (v::s) then ()
    else (Domain.cpu_relax (); push r v)

  let rec pop r =
    let s = Atomic.get r in
    match s with
    | [] -> None
    | x::xs ->
        if Atomic.compare_and_set r s xs then Some x
        else (Domain.cpu_relax (); pop r)
end
Code Snippet 18: A lock-free stack (Treiber Stack), with an atomic ref that holds a list

compare_and_set r seen v sets the value of r to v \(\iff\) it’s current value is physically equal to seen — the comparator and the udpate occur atomically.

if overall comparison succeeded and the update happened \(\rightarrow\) returns true

If compare_and_set fails, then some other domain is also attempting to update the atomic ref at the same time \(\implies\) Domains.cpu_relax to back-off fora short duration allowing competing domains to make progress before retrying the failed operation.

Structured Concurrency (via Effects, Effect-Handlers) #

The concurrency primitives now rely on Effects and Effect-handlers as an alternative to the older thread-based approaches. Notes cover content from KC Sivaramakrishnan’s Blog post and the OCaml Manual’s Effect Handlers.

In doing this exploration, there was a need to be sensitive about what’s proposed vs what’s actually implemented. Even for the implemented approaches, there’s also subsequent deprecations (e.g. the point on multishot continuations being “deprecated” and shifted to its own repo). This doesn’t invalidate the intuition behind many of the concepts discussed, just makes it especially important to be sensitive about proposals vs implementations. The manual is the canonical source of truth.

How KC Sivaramakrishnan’s blog post has an older framing by using “linear algebraic effects”

The post calls the system “linear algebraic effects” and ties the one-shot property to the word “linear.”

OCaml 5’s documentation and the 2021 PLDI paper (“Retrofitting Effect Handlers onto OCaml”) dropped the “linear” qualifier from the public framing. The one-shot property still exists but it’s described as a runtime enforcement rather than a type-system property. There is still no effect type system tracking which effects a function performs — this was described as “WIP” in 2015 and remains absent from shipped OCaml 5.

Overview and Motivation #

Effect handlers allow programmers to describe computations that perform effectful operations, whose meaning is described by handlers that enclose the computations.

Effect-handlers:

  • generalisation of exception handlers
  • allow non-local control-flow mechanisms to be expressed in a composable fashion e.g. resumable exceptions, lightweight threads, coroutines, generators, async I/O

Core tenet of Programming with Algebraic Effects, to express effectful computation:

  • When we can separate the expression of an effectful computation from its implementation — expressing effectful computation.

The objective for OCaml’s multicore implementations is to provide a minimal set of functionality onto which programmers can implement new primitives and schedulers as OCaml libraries.

the tradeoff involved if the multicore runtime bakes concurrency primitives and concurrent thread scheduler within it

The problem with other FP PLs (GHC, F#, Manticore…) that are multicore capable is that the primitives for concurrency and the concurrent thread scheduler is baked into the runtime system.

  • the runtime system tends to grow in complexity (complex monolithic piece of software with extensive use of locks, condition variables, timers, thread pools …)

    \(\implies\) this makes it difficult to maintain existing concurrency libraries and add new ones.

  • lack of malleability \(\implies\) prevents devs from experimenting with custom concurrency libs and scheduling strategies — low ecosystem innovation

Basics of Effects and Effect Handlers #

In the first example of Xchg effect, the continuation happens immediately just for demonstration. Because these are pointers to the underlying fibres, we can also choose to capture and store them in data-structures as part of more complex logic that we apply to work with delimited continuations – that’s what the concurrency extension to the example does.
  • an effect

    open Effect
    open Effect.Deep
    
    type _ Effect.t += Xchg: int -> int t
    let comp1 () = perform (Xchg 0) + perform (Xchg 1)
    Code Snippet 19: Xchg effect: an operation that is performed

    “the Xchg effect takes an integer param and, when performed, returns an integer”:

    • declaration happens by extending the variant type Effect.t (which is predefined and extensible)

      so, it’s a new constructor to that variant: Xchg: int -> int t

    • comp1 is the computation performs the effect twice using perform primitive

  • an effect-handler

    Computations have to be enclosed within effect-handlers — so comp1 () is run under an effect handler like so:

    try comp1 () with
    | effect (Xchg n), k -> continue k (n+1)
    (* - : int = 3 *)
    Code Snippet 20: Effect handler to handle Xchg effect — here we always return the successor of the offered value
    general shape of an effect handler
    try
      computation
    with
    | effect E1 k -> ...
    | effect E2 k -> ...
    | exn -> ...
    Code Snippet 21: shape of an effect-handler with multiple clauses (arms)

    effect is a keyword to help do pattern-matching on effects and not exceptions (or return values). So, the Xchg effect is handled with a continuation bound to k (the delimited continuation function). k represents the suspended computation between the point of perform and this handler.

    [ comparing effect-handlers to exception handlers ]

    • effect handlers are a generalisation of exception handlers
    • similarity: when computation performs the effect, the control jumps to the corresponding effect-handler AND unhandled effects are forwarded to the outer handler
    • difference: unlike exception handlers, the effect handler is also provided with the delimited continuation k
    walkthrough of what it means for k to represent “delimited continuation”

    k is a function representing “the rest of the computation” from perform to the handler boundary.

    ComponentResponsibility
    comp1requests effects
    handlerinterprets effects
    krepresents suspended computation

    Let’s walkthrough this:

    1. a recap on the things we know so far:

      1. the try-with is the effect-handler. It pattern matches on the Xchg effect
      2. the computation we care about is comp1 () which may emit effects — this is also what we will pause and continue. We can see comp1 as having effect boundaries:
        1
        2
        3
        4
        
              let comp1 () =
                let a = perform (Xchg 0) in (*<--- the first perform happens, effect is emitted*)
                let b = perform (Xchg 1) in (*<--- the second perform happens*)
                a + b
        Code Snippet 22: rewritten version of comp1 that is easy to read the control flow
    2. we trace the execution of comp1 and see that when the first perform happens, OCaml captures the continuation k (\(k_{1}\)).

         k1 = fun v ->
           let a = v in
           let b = perform (Xchg 1) in
           a + b
      Code Snippet 23: This is what \(k_{1}\) ends up capturing

      v is the input to the continuation here, which is based on the effect being handled: | effect (Xchg n), k -> continue k (n+1). Given that the first effect is performed with n = 0, this means that the continuation \(k_{1}\) will have v = 1

    3. on continue, the control jumps back to the continuation function, the computation looks like:

         let a = 1 in
         let b = perform (Xchg 1) in (*<--- this is when the control jumps back to the effect handler*)
         a + b
      Code Snippet 24: resumed computation, before the 2nd effect is performed / emitted
    4. when the second perform happens, the control jumps back to the effect handler. so \(k_2\) is the remaining computation that gets captured:

         k2 = fun v ->
           let b = v in
           1 + b
      Code Snippet 25: 2nd continuation that gets captured
    5. the handler runs again with n = 1 so v = 2 and \(k_2\) eventually gives 1 + 2 = 3.

  • resuming the computation

    The effect handler can resume the computation.

    continue primitive resumes the suspended computation.

    So in the example, comp1 is the computation which performs Xchg 0 and Xchg 1, which makes it receive the values 1 (from the effect Xch 0) and 2 (from the effect Xchg 1), which sum to give 3.

  • 2 versions: deep effect-handlers and shallow effect-handlers

    Effect handlers can come in deep or shallow versions. Deep effect handlers monitor computation until the computation terminates (normally, exceptionally… ) and handles all of the effects performed in sequence by the computation.

    Shallow effect-handlers monitor a computation until either:

    • computation terminates OR
    • computation performs one effect – and the effect handler handles that single effect only.

Concurrency #

We extend the Xchg example, now to handle message-passing concurrency between 2 concurrent compuations using Xchg effect — and we wrap them as tasks.

This is so cool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
(* --- type of an Xchg effect *)
open Effect
open Effect.Deep

type _ Effect.t += Xchg: int -> int t

(* --- consider 2 computations: *)
let comp1 () = perform (Xchg 0) + perform (Xchg 1)
let comp2 () = perform (Xchg 21) * perform (Xchg 21)

(* --- A Task's Status: *)
type 'a status =
  | Complete of 'a
  | Suspended of {msg: int; cont: (int, 'a status) continuation}

(* --- defining a step function, an effect-handler *)
let step (f : unit -> 'a) () : 'a status =
  match f () with
  | v -> Complete v
  | effect (Xchg msg), cont -> Suspended {msg; cont}

(* --- a simple scheduler to run both tasks *)
let rec run_both a b =
  match a (), b () with
  | Complete va, Complete vb -> (va, vb)
  | Suspended {msg = m1; cont = k1},
    Suspended {msg = m2; cont = k2} ->
      run_both (fun () -> continue k1 m2)
               (fun () -> continue k2 m1)
  | _ -> failwith "Improper synchronization"

(* --- running the scheduler with the two computations *)
run_both (step comp1) (step comp2)
Code Snippet 26: Complete example, with a simple scheduler to run 2 steppable tasks that exchange arguments with each other

Program execution looks something like this:

1. comp1 offers values 0 and 1
2. comp2 offers values 21 and 21
3. comp1 receives values 21 and 21 := 21 + 21 = 42
4. comp2 receives values 0 and 1  := 0 * 1 = 0
Table 1: pointers on the 2-run-simple scheduler impl
point of interestelaboration
A taskA concurrent computation, it has a completion status 'a status
task status, 'a statusEither Complete or Suspended, with a msg and cont (for the suspended delimited computation)
the step fn (the effect handler)executes a single step for the computation, either completes or suspends. Returns 'a status type. It could be allowed to perform other effects.
who performs the Xchg effect?handled by step but not it isn’t performed by step; the computations, comp1 and comp2 perform the Xchg effect
the scheduler run_bothis where the tasks run to completion, communication between the two computations is programmed entirely inside the scheduler (m2 injected into k1 and m1 injected into k2)
computations and effectthe computations, comp1 and comp2, by themselves, don’t assign any meaning to the Xchg effect, the “exchange” happens @ scheduler, run_both

User-level Threads #

With effect-handlers in OCaml, we can implement user-level threads and their schedulers without needing to be provided with language primitives for user-level threads.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
open Effect
open Effect.Deep

(* --- 1: user-level threading usually has Fork and Yield effects *)
type _ Effect.t += Xchg: int -> int t
    | Fork : (unit -> unit) -> unit t
    | Yield : unit t

(* --- 2: helpers to perform the effects [performers] *)
let fork f = perform (Fork f)
let yield () = perform Yield
let xchg v = perform (Xchg v)

(* --- 3: top-level, definition of scheduler *)
(* A concurrent round-robin scheduler *)
let run (main : unit -> unit) : unit =
  let exchanger : (int * (int, unit) continuation) option ref =
    ref None (* waiting exchanger *)
  in
  let run_q = Queue.create () in (* scheduler's mutable queue *)
  let enqueue k v =
    let task () = continue k v in
    Queue.push task run_q
  in
  let dequeue () =
    if Queue.is_empty run_q then () (* done *)
    else begin
      let task = Queue.pop run_q in
      task ()
    end
  in
  let rec spawn (f : unit -> unit) : unit =
    match f () with
    | () -> dequeue ()
    | exception e ->
        print_endline (Printexc.to_string e);
        dequeue ()
    | effect Yield, k -> enqueue k (); dequeue ()
    | effect (Fork f), k -> enqueue k (); spawn f
    | effect (Xchg n), k ->
        begin match !exchanger with
        | Some (n', k') -> exchanger := None; enqueue k' n; continue k n'
        | None -> exchanger := Some (n, k); dequeue ()
        end
  in
  spawn main

(* --- 4: Running a concurrent program *)
open Printf

let _ = run (fun _ ->
  fork (fun _ ->
    printf "[t1] Sending 0\n";
    let v = xchg 0 in
    printf "[t1] received %d\n" v);
  fork (fun _ ->
    printf "[t2] Sending 1\n";
    let v = xchg 1 in
    printf "[t2] received %d\n" v))

(* --- expected output:
[t1] Sending 0
[t2] Sending 1
[t2] received 0
[t1] received 1
*)
Code Snippet 27: Round-robin scheduler example with Xchg, Fork, Yield effects
Table 2: pointers on the round-robin scheduler impl
point of interestelaboration
the Fork effectconsumes a thunk (suspended computation), returns a unit to the its performer
the Yield effectunparameterised, returns unit when performed
convenience performersperform the effects
what enables round-robin scheduling?the use of a mutable FIFO queue, run_q (ready queue). Tasks here are just thunks.
the exchanger ref cellit holds a suspended task that is offering to exchange a value. Invariant: at any point in time, either 0 or 1 suspended task that is offerring and exchange.
the importance of spawn here (effect-handler)runs the given computation f in an effect handler. f may raise an exception (handle by printing then dequeue), may return unit (then dequeue), or may perform effects
effects performed by spawneffect Yield handled by suspending current task and running next in run_q; Fork suspends current task, executes new task f immediately via a tail-call to spawn f; Xchg effect checks if there’s a waiting task to exchange values with else makes the current task the waiting exchanger.
implementing Forkin the implementation, we run the new task first. This is arbitrary. We could have chosen to insert f (new task) into the ready queue and resumed k immediately
Interleaved messagesFrom the output, we see that messages from the 2 tasks are interleaved
Direct-style concurrencyThe run fn invocation makes no reference to the effect-handlers and has no monadic operations. It’s written in a direct-style. The use of effect-handlers keeps the user-code in simple, direct-style and effect-handlers are contained within the concurrency library implementation.
  • Resuming with an Exception

    There’s a simple discipline to follow to avoid blocked tasks:

    Every continuation must be eventually either continued or discontinued.

    If we look at the earlier implementation of dequeue, we notice that it’s buggy because we might have a case where there’s nothing left to dequeue from the run_q and it returns to its caller BUT there could be a waiting task stored @ exchanger — so that task may be blocked forever and it might even have leaky resources.

    1
    2
    3
    4
    5
    
    let leaky_task () =
      fork (fun _ ->
        let oc = open_out "secret.txt" in
        Fun.protect ~finally:(fun _ -> close_out oc) (fun _ ->
          output_value oc (xchg 0)))
    Code Snippet 28: an example of a leaky task that may be blocked even if Fun.protect is used for the output channel

    It’s not enough that Fun.protect is used (to ensure that the oc is closed on both normal and exceptional return cases) because it doesn’t do anything if the task is just blocked.

    exception Improper_synchronization
    
    let dequeue () =
      if Queue.is_empty run_q then begin
        match !exchanger with
        | None -> () (* done *)
        | Some (n, k) ->
            exchanger := None;
            discontinue k Improper_synchronization
      end else (Queue.pop run_q) ()
    Code Snippet 29: using discontinue to ensure no forever-blocks – forces termination (with an exception)

    So in the blocked case, the dequeue function discontinue s the blocked thread with an Improvper_synchronization exception — the exception is raised @ the blocked xchg function call, which causes the finally block to be run, thereby closing the oc channel. The user only observes things as though the function call xchg 0= raises the exception Improper_synchronization.

    So, effect handlers can:

    • resume a continuation with a value via continue
    • resume by raising an effect at the point of perform via discontinue

Control Inversion #

api stylewho has controlexample
push-basedthe producer has controlfn List.iter in List.iter f l controls the traversal
pull-basedthe consumer has controlthe module Seq , like a delayed list – is produced on demand by the consumer

For flexibility, suppose we wish to write a producer in the pull-based style and consumer because it’s more natural and ergonomic to be in control \(\implies\) that’s why we might be interested in Control Inversion.

val invert : iter:(('a -> unit) -> unit) -> 'a Seq.t
Code Snippet 30: invert function type, takes in an iter (producer that pushes elements to consumer) and returns a Seq (consumer has the control)
(* --- the classic push style from a producer *)
lst_iter (fun i -> Printf.printf "%d\n" i)
(*
1
2
3
- : unit = ()
*)

(* --- pull style from inverting the control *)
let s = invert ~iter:lst_iter
let next = Seq.to_dispenser s;;
(* val s : int Seq.t = <fun> *)
(* val next : unit -> int option = <fun> *)
next ();;
(* - : int option = Some 1 *)
next ();;
(* - : int option = Some 2 *)
next ();;
(* - : int option = Some 3 *)
next ();;
(* - : int option = None *)
Code Snippet 31: comparing push-style and inverting into pull-style
  • Implementing Control Inversion

    let invert (type a) ~(iter : (a -> unit) -> unit) : a Seq.t =
      let module M = struct
        type _ Effect.t += Yield : a -> unit t
      end in
      let yield v = perform (M.Yield v) in
      fun () -> match iter yield with
      | () -> Seq.Nil
      | effect M.Yield v, k -> Seq.Cons (v, continue k)
    Code Snippet 32: yield function performs the Yield effect; Lambda fun () -> defers the action until first-demanded

    Once the first -element of the sequence is demanded, the computation iter yield is executed under an effect-handler. Every time iter pushes an element to yield function, the computation is interrupted by Yield effect. Yield effect handled by returning the value Seq.Cons(v, continue k) to the consumer (which gets the element v and the suspended computation, k). k, from the consumer’s eyes is just the tail of the sequence.

    When consumer demands next, then k is resumed — that’s how iter yield makes progress until either yields another element or terminates normally (in which case, Seq.Nil is returned to the consumer). The Seq here, that is returned by the invert, is an ephemeral sequence — must be used at most once and the sequence must be fully consumed (at least once) — captured continuation is used linearly (one-shot).

Semantics #

  • Nesting Handlers
  • Fibres
  • Unhandled Effects
  • Linear Continuations

Shallow Handlers #

Quick Example #

A Simple Round-Robin Scheduler #

[ DEPRECATIONS: ] The OCaml manual is the source of truth for APIs in general. In this section, we already have some changes from the writings here and how things are like now:

  • type _ eff += as the declaration type for effect \(\rightarrow\) Effect.t
  • continue k v \(\rightarrow\) Effect.Deep.continue k v
  • multishot continuations are not standard and have been deprecated from multicore ocaml (used to be that we could clone a single-shot continuation and mimic multishot, here’s why ), they’re extracted into an ocaml-multicont.

Here’s a basic interface for our scheduler:

(* Control operations on threads *)
val fork : (unit -> unit) -> unit
val yield : unit -> unit

(* Runs the scheduler. *)
val run : (unit -> unit) -> unit
Code Snippet 33: sched.mli — interface for our Round Robin scheduler

Performing an effectful computation is separate from its interpretation \(\implies\) interpretation is dynamically chosen based on the context in which an effect is performed.

Effectful actions for our Scheduler that we will declare:

  • spawning a new thread
  • yielding control to another thread

Predefined variant type for effects is the type 'a eff'a being the return type of performing that effect.

type _ eff +=
| Fork  : (unit -> unit) -> unit eff
| Yield : unit eff

(* --- with convenience syntax *)
effect Fork  : (unit -> unit) -> unit
effect Yield : unit
Code Snippet 34: Declaring the Fork and Yield effects

Effects are performed. The primitive perform has the type 'a eff -> 'a. We now define the functions:

let fork f = perform (Fork f)
let yield () = perform Yield
Code Snippet 35: Functions fork and yield

We need an interpretation of what it means to perform fork and yield, for which we use effect-handlers.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
let run main =
  let run_q = Queue.create () in
  let enqueue k = Queue.push k run_q in
  let rec dequeue () =
    if Queue.is_empty run_q then ()
    else continue (Queue.pop run_q) ()
  in
  let rec spawn f =
    match f () with
    | () -> dequeue ()
    | exception e ->
       print_string (to_string e);
       dequeue ()
    | effect Yield k ->
       enqueue k; dequeue ()
    | effect (Fork f) k ->
       enqueue k; spawn f
  in
  spawn main
Code Snippet 36: sched.ml: Effect-handlers used to give an interpretation on how to perform fork and yield

How this works:

  • the scheduler queue is a queue of delimited continuations, and we have queue-manipulation functions

The effect-handling is done like so here: spawn f evaluates f in a new thread of control, which may have the following cases for how it returns:

  1. return normally with () \(\implies\) we pop the scheduler queue and resume the resultant continuation using continue

    • continue k v is how the continuation (k: ('a, 'b') continuation) is resumed with value v: 'a and returns a type of value 'b
  2. return exceptionally with e \(\implies\) same as the normal return case

  3. return effectfully, with the effect performed along with the delimited continuation, k

    The return type of the effect ('a) matches the argument type of the continuation and the return type of the delimited continuation is 'b — specifically, for the pattern effect e k, if the effect e has type 'a eff, then the delimited continuation k has type ('a, 'b) continuation.

    1. [ fork effect ] effectful return of Fork f:

      the continuation function (k) is what we enqueue and we spawn a new thread of control for evaluating f (different from the original f, this f is from the effectful return)

    2. [ yield effect ] effectful return of Yield:

      enqueue the current continuation (k) and resume some other saved continuation from the scheduler queue.

Though the effect pattern in match expressions IS still supported in OCaml 5 as surface syntax, the idiomatic effect handler syntax has changed. In modern multicore OCaml, it’s more like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Effect.Deep.match_with f ()
  { retc = (fun () -> dequeue ());
    exnc = (fun e -> print_string (Printexc.to_string e); dequeue ());
    effc = fun (type a) (eff : a Effect.t) ->
      match eff with
      | Yield -> Some (fun (k : (a, _) Effect.Deep.continuation) ->
          enqueue k; dequeue ())
      | Fork f -> Some (fun k ->
          Effect.Deep.continue k (); spawn f)
      | _ -> None }
Code Snippet 37: desugared, idiomatic effect handler

So here’s a concurrent program to test out this scheduler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
let log = Printf.printf

let rec f id depth =
  log "Starting number %i\n%!" id;
  if depth > 0 then begin
    log "Forking number %i\n%!" (id * 2 + 1);
    Sched.fork (fun () -> f (id * 2 + 1) (depth - 1));
    log "Forking number %i\n%!" (id * 2 + 2);
    Sched.fork (fun () -> f (id * 2 + 2) (depth - 1))
  end else begin
    log "Yielding in number %i\n%!" id;
    Sched.yield ();
    log "Resumed number %i\n%!" id;
  end;
  log "Finishing number %i\n%!" id

let () = Sched.run (fun () -> f 0 2)
Code Snippet 38: concurrent.ml: spawns a binary tree of tasks in depth-first order
output of running this test
$ git clone https://github.com/kayceesrk/ocaml-eff-example
$ cd ocaml-eff-example
$ make
$ ./concurrent
Starting number 0
Forking number 1
Starting number 1
Forking number 3
Starting number 3
Yielding in number 3
Forking number 2
Starting number 2
Forking number 5
Starting number 5
Yielding in number 5
Forking number 4
Starting number 4
Yielding in number 4
Resumed number 3
Finishing number 3
Finishing number 0
Forking number 6
Starting number 6
Yielding in number 6
Resumed number 5
Finishing number 5
Finishing number 1
Resumed number 4
Finishing number 4
Finishing number 2
Resumed number 6
Finishing number 6
Code Snippet 2: output illustrates how tasks are forked and scheduled

Implementation #

Algebraic effects were retroffitted, their implementation faced some challenges as well.

Fibres for Concurrency #

Main challenge: efficient management of delimited continuations.

Multicore OCaml uses fibers for this:

  • represent the unit of concurrency in the runtime system
  • small heap-allocated, dynamically resized stacks

Most continuations only need to be linear (one-shot) i.e. resumed only once (as opposed to multishot Reminder: multishot continuations could be done in multicore OCaml before by just cloning a one-shot continuation , but this got deprecated. ).

  • Capturing one-shot continuation requires no extra context vs fn-call

    It’s fast to capture a one-shot continuation \(\implies\) just need a pointer to the underlying fiber \(\implies\) no need for any allocation. Capturing a one-shot continuation requires no more context than needed for a normal function call because the calling convention for OCaml doesn’t use callee-save registers.

    OCaml doesn’t have linear types Linear types are a type-system guarantee about how many times a value can be used – they enforce correctness @ compile time. Today’s OCaml is neither affine (i.e. can be used at most once) nor linear (can be used exactly once). However, one-shot continuations are logically linear. . One-shot continuations are logically linear — they have to be enforced @ runtime by raising an exception the second time a continuation is invoked.

  • Continuations: delimited vs undelimited

    Delimited continuations enable complex nested and hierarchical schedulers to be expressed more naturally because they introduce parent-child relationships between fibers — similar to function invocation.

    Table 3: comparing undelimited continuation (runtimes like Manticore) vs delimited continuation (like in OCaml)
    featureundelimited continuationdelimited continuation (OCaml)
    intuition“Pause the whole universe and resume it anywhere”“Pause this function and resume it inside its caller”
    scopewhole programup to a handler
    structureflat / globalnested / hierarchical
    controlunrestrictedscoped
    analogylike goto on steroidslike a function call

Running on Multiple Cores #

We’ve covered Domains earlier. Each domain, our unit of parallelism, runs a separate system thread (with a small local heap, single shared major heap).

To distribute fibres amongst the available domains, working sharing / stealing schedulers are initiated on each domain. For inter-domain communication and mutual execution, we can rely on sync mechanisms.

The blog post states an invariant that is no longer true: “multicore runtime has the invariant that there are no pointers between the domain local heaps”. The shipped multicore OCaml 5 uses the stop-the-world parallel minor collector, which does allow pointers between minor heaps (that’s why it needs the stop-the-world sync).

TODO Illustrative Case Studies / Examples #

This section is about internalising the examples that can be found here.

TODO eio: Effects-based direct-style IO #

TODO Legacy / OS-level concurrency abstraction (via Threads) #