Skip to main content
  1. Readings/
  2. Books/
  3. Real World OCaml: Functional Programming for the Masses/

Chapter 16: Concurrent Programming

·· 5810 words· 24–39 min read

This set of notes goes beyond the scope of the concurrent programming chapter in the book (2nd edition) because in the OCaml ecosystem, there’s two phases to the journey of Concurrency / Parallelism: pre-OCaml 5 – cooperative concurrency as the typical concurrency model, with heavy reliance on libraries. Then there’s the Multicore Ocaml (~> 5) that changes things up ( notes found here). We’ll need to learn both to appreciate both phases.

Figure 1: The Ganges River forms an extensive delta where it empties into the Bay of Bengal. The delta is largely covered with a swamp forest known as the Sunderbans, which is home to the Royal Bengal Tiger. (retrieved)

Fundamentals: Understanding Monads #

Lwt relies on the monadic pattern, which is beautiful for clear composition. The OCaml docs on monads is an excellent reference for this.

Monad Signature Using examples: Maybe, Writer, Lwt #

The docs first outlay the structural and application implications before formalising monad laws. We follow that approach.

Monadic structures should be seen as structures that follow the monad design pattern.

1
2
3
4
5
module type Monad = sig
  type 'a t
  val return : 'a -> 'a t
  val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t
end
Code Snippet 1: Monad Signature, Satisfied by Monad Structures
meaning
returnputs value in box, has a trivial effect (in-terms of computation) when it does
bindgives the ability to compose and sequence operations

We use case studies to see this design pattern:

Maybe monad / Option monad / Error monad #

Supposed the basic arithmetic functions, we want to wrap them such that if there’s any error (e.g. div by 0) then we return None instead of throwing an exception.

pedestrian solution that isn’t quite monadic yet

We have some ways to make this feel monadic. We could re-implement the existing operators and that will give us ridiculous amount of boilerplate in the form of pattern-matches and such. It will work but we see the excessive effort needed for doing this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
let plus_opt (x:int option) (y:int option) : int option =
  match x,y with
  | None, _ | _, None -> None
  | Some a, Some b -> Some (Stdlib.( + ) a b)

let ( + ) = plus_opt

let minus_opt (x:int option) (y:int option) : int option =
  match x,y with
  | None, _ | _, None -> None
  | Some a, Some b -> Some (Stdlib.( - ) a b)

let ( - ) = minus_opt

let mult_opt (x:int option) (y:int option) : int option =
  match x,y with
  | None, _ | _, None -> None
  | Some a, Some b -> Some (Stdlib.( * ) a b)

let ( * ) = mult_opt

let div_opt (x:int option) (y:int option) : int option =
  match x,y with
  | None, _ | _, None -> None
  | Some a, Some b ->
    if b=0 then None else Some (Stdlib.( / ) a b)

let ( / ) = div_opt
Code Snippet 2: Pattern-matched, non-monadic solution

If we try to deduplicate all the unnecessary boilerplate, then we will achieve something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
let propagate_none
  (op : int -> int -> int option) (x : int option) (y : int option)
=
  match x, y with
  | None, _ | _, None -> None
  | Some a, Some b -> op a b


let wrap_output (op : int -> int -> int) (x : int) (y : int) : int option =
  Some (op x y)

let ( + ) = propagate_none (wrap_output Stdlib.( + ))
let ( - ) = propagate_none (wrap_output Stdlib.( - ))
let ( * ) = propagate_none (wrap_output Stdlib.( * ))

let div (x : int) (y : int) : int option =
  if y = 0 then None else wrap_output Stdlib.( / ) x y

let ( / ) = propagate_none div

The upgrade functionality can be generic and defined like so :

let upgrade : (int -> int option) -> (int option -> int option) =
  fun (op : int -> int option) (x : int option) -> (x >>= op)

 (* -- without the annotations:  *)
let upgrade op x = x >>= op
Code Snippet 3: A higher-order, upgrade function helps deduplicate

So we could just use return and >>= (bind) functions to reimplement the whole thing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
let ( + ) (x : int option) (y : int option) : int option =
  x >>= fun a ->
  y >>= fun b ->
  return (Stdlib.( + ) a b)

let ( - ) (x : int option) (y : int option) : int option =
  x >>= fun a ->
  y >>= fun b ->
  return (Stdlib.( - ) a b)

let ( * ) (x : int option) (y : int option) : int option =
  x >>= fun a ->
  y >>= fun b ->
  return (Stdlib.( * ) a b)

let ( / ) (x : int option) (y : int option) : int option =
  x >>= fun a ->
  y >>= fun b ->
  if b = 0 then None else return (Stdlib.( / ) a b)

linking this to the monadic design pattern from our pedestrian approach:

wrap_outputactually the return functionUpgrading a value from int to int option by wrapping
modifying the fnsbindHad to upgrade functions whose inputs were of type int to instead accept inputs of type int option – this upgrades the operation

We can deduplicate the common parts of the monadic approach further by having a generic upgrade function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
let upgrade_binary op x y =
  x >>= fun a ->
  y >>= fun b ->
  op a b

let return_binary op x y = return (op x y)

let ( + ) = upgrade_binary (return_binary Stdlib.( + ))
let ( - ) = upgrade_binary (return_binary Stdlib.( - ))
let ( * ) = upgrade_binary (return_binary Stdlib.( * ))
let ( / ) = upgrade_binary div
Code Snippet 4: A deduplicated approach to creating the maybe monad

Mental model for the bind e.g. in x >>= fun a -> y >>= fun b -> op a b :

  1. take x and extract form it the value a
  2. take y and extract from it b
  3. then use a and b to construct a written value using the operation op

Here’s how we would implement the monad signature for the maybe monad

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
module Maybe : Monad = struct
  type 'a t = 'a option

  let return x = Some x

  let (>>=) m f =
    match m with
    | None -> None
    | Some x -> f x
end
Code Snippet 5: Implementation for the monad signature

Writer Monad #

Suppose we want the effect of loggability (a log-writer that allows us to still compose the original routines).

intuiting the monadic approach

We first consider approaches that don’t have the monad pattern within, we want to make inc and dec loggable below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
let inc x = x + 1;;
let dec x = x - 1;;

let inc_log x = (x + 1, Printf.sprintf "Called inc on %i; " x);;
let dec_log x = (x - 1, Printf.sprintf "Called dec on %i; " x);;

(* using a composition operator: *)
let ( >> ) f g x = x |> f |> g;;

 (* we can compose like so:  *)
let id = inc >> dec;;

(* PROBLEM: the pattern isn't monadic, so we get an error from this:  *)
(* let id = inc_log >> dec_log *)

let dec_log_upgraded (x, s) =
  (x - 1, Printf.sprintf "%s; Called dec on %i; " s x);;

let inc_log_upgraded (x, s) =
  (x + 1, Printf.sprintf "%s; Called inc on %i; " s x);;

let id = dec_log >> inc_log_upgraded;;
Code Snippet 6: Non-monadic approach to log-writer impl

We try to use a few helpers to reduce the boilerplate, to get a more monadic pattern :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
let log (name : string) (f : int -> int) : int -> int * string =
  fun x -> (f x, Printf.sprintf "Called %s on %i; " name x);;

let loggable (name : string) (f : int -> int) : int * string -> int * string =
  fun (x, s1) ->
    let (y, s2) = log name f x in
    (y, s1 ^ s2);;

let inc' : int * string -> int * string =
  loggable "inc" inc;;

let dec' : int * string -> int * string =
  loggable "dec" dec;;

let id' : int * string -> int * string =
  inc' >> dec';;

let e x = (x, "") ;;  (* -- for empty-logging*)

id' (e 5)  (*returns: (5, "Called inc on 5; Called dec on 6; ")*)
Code Snippet 7: intuitive monadic pattern form
ereturntrivial effect of putting value into the metaphorical box, with empty log msg
loggableuses bind, >>=We need code that handles pattern matching against pairs and string concat
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
module Writer : Monad = struct
  type 'a t = 'a * string

  let return x = (x, "")

  let ( >>= ) m f =
    let (x, s1) = m in
    let (y, s2) = f x in
    (y, s1 ^ s2)

  (* extra: Loggable implemented using >>= *)
  let loggable (name : string) (f : int -> int) : int * string -> int * string =
    fun m ->
    m >>= fun x ->
    log name f x
end
Code Snippet 8: Writer monad for logging effects

Familiarity with the >>= binding operator use and the monadic style of programming, our code can now concentrate on 'a in the type 'a Writer.t instead of fixed to string – the writer monad automatically keeps our code generic so long as we use return and >>=.

Lwt monad #

  1. it’s a promise-based library, satisfies this monadic interface to compose / chain promises togther
    • bind registers callbacks
    • the type 'a is the “something more” within the box, which are the values that get produces asynchronously when the promise is resolved (instead of immediately)
  2. what’s abstracted away from us:
    • how 'a t and return creates references
    • the mechanism of callback registry just hidden within bind, well-encapsulated

Monad Laws #

Refers to invariants that govern the behaviours of the monad “data-structure”. Specifications are technically related to particular monads like the writer monad or Lwt monad.

There’s some generic laws still.

context: compose monad operator (>=>)

To compose monadic functions.

let compose f g x =
  f x >>= fun y ->
  g y

let ( >=> ) = compose
Code Snippet 9: compose function implementation

The idea here is that if we have 2 functions that we wish to compose:

  • f: 'a -> 'b t
  • g: 'b -> 'c t

then we have compose f g : 'a -> 'c t then we ’re supporting the flow where composed fn takes a value of 'a, applies f to it, extracts out 'b out of the result, applies g to it, returns that value of type 'c t.

Laww/o composew compose
[1] Left Identityreturn x >>= f \(\iff\) f xreturn >=> f \(\iff\) f
[2] Right Identitym >>= return \(\iff\) mf >=> return \(\iff\) f
[3] Associative(m >>= f) >>= g \(\iff\) m >>= (fun x -> f x >>= g)(f>=>g)>=>h \(\iff\) f>=>(g>=>h)

Law 1: Left-Identity return >=> f behaves the same as f #

Law 1 says that having the trivial effect on a value, then binding a function on it, is the same as just calling the function on the value.

  • same behaviour as in two expressions that will both evaluate to the same value, or they enter an infinite loop or they both raise the same exception
  • examples:
    • maybe monad: return x would be Some x, and >>= f would extract x and apply f to it
    • Lwt monad: return x would be a promise that is already resolved with x, and >>= f would register f as a callback to run on x.

Law 2: Right-Identity f >=> return behaves the same as f #

Law 2 says that binding on the trivial effect is the same as just not having the effect.

  • examples :
    • maybe monad: m >>= return would depend upon the 2 cases: whether m is Some x or None.

      1. m is Some x : binding would extract x, and return would just re-wrap it with Some.
      2. m is None binding would just return None.
    • Lwt monad: binding on m would register return as a callback to be run on the contents of m after it is resolved, and return would just take those contents and put them back into an already resolved promise.

Law 3: Associative (f>=>g)>=>h behaves the sames as f>=>(g>=>h) #

Law 3 says that bind sequences effects correctly

Overview #

Table 1: Comparison of Concurrency Primitives across Eras
PrimitiveEraParallelismStyleUse case
ThreadPre-5No (GRL)PreemptiveBlocking C FFI
Mutex / ConditionBothImperativeThread synchronization
Lwt.tPre-5 (still used)NoMonadicI/O concurrency
Async.Deferred.tPre-5NoMonadicI/O concurrency (Jane Street)
DomainOCaml 5YesImperativeCPU parallelism
AtomicOCaml 5YesLock-freeShared mutable state
EffectOCaml 5Direct-styleScheduler building block
Domainslib.TaskOCaml 5YesDirect-styleStructured parallelism
Eio.FiberOCaml 5Per-domainDirect-styleI/O concurrency

Pre-OCaml 5 #

  • cooperative concurrency: primitives to use OS-threads limited by the Global Runtime Lock This is fascinating stuff – this begs to be compared against Python’s recent journey of escaping the GIL and how concurrency primitives have evolved in Python itself. such that only one thread could execute at one time \(\implies\) interleaving but not true simultaneous execution

    The reason the GRL was such a limiting factor was because the GC was not thread-safe, that’s why it needed to enforce strictly that only one thread could execute OCaml code at any one time.

  • viable escape-hatch: using C FFI calls that could release the lock

Using Thread for control of POSIX threads #

  1. it’s a wrapper on OS-level pthreads
  2. Preemptive: the scheduler may interrupt whenever
  3. Still can’t parallelise becuase only single thread exec at any time because of the GRL
  4. Useful for: blocking I/O that would release the lock, background thread coordinating a C library
  5. These were monitor-style primitives: Mutex, Condition, Semaphore in stdlib – everything limited by the GRL

Lwt - lightweight threads, as a concurrency library #

  1. Key Idea:

    Instead of blocking, every potentially-blocking operation shall return a promise ('a Lwt.t). The scheduler (via an event-loop) runs promises to completion cooperatively; a promise yields control explicitly.

  2. Lwt is infectious: calling a blocking function (e.g. Unix.read) within a Lwt promise will block the entire event loop.

  3. Thread cancellation isn Lwt is cooperative and fragile – so Lwt.cancel raises Lwt.Canceled only @ yield points.

    that’s why we should always clean up threads with Lwt.finalize or Lwt_switch

Async – Jane Street’s alternative #

It’s the same cooperative model, with different aesthetics and tradeoffs:

  • Instead of Lwt.t, it uses Deferred.t
  • Integrated with Core (JS’s stdlib alternative)
  • Scheduler is more explicit – we have to start the event loops explicitly via Async.Scheduler.go ()
  • Piping capability of streams is stronger because it’s more composable (Pipe.t)
  • Has better Clock primitives

Multi-processing as a parallelism workaround #

  • Real CPU parallelism ended up requiring multiple OS processes communicating over Unix pipes or sockets

  • libs like Parmap helped to make this more ergonomic

Multicore OCaml >= 5 #

Has 2 orthogonal primitives, onto which the other libs are all built:

  • Domains: for true parallelism
  • Effects: for structured control flow

Domains – for parallelism #

~ maps to a core – each domain runs on its own OS thread, has its own local minor heap (with a local GC) and it shares a major heap with the rest of the domains. Since there’s no GRL, multiple domains can run OCaml code simultaneously.

Each domain runs on exactly one OS thread (1:1 mapping, enforced by the runtime). The OS scheduler then assigns that thread to a CPU core. Since there’s no GRL, multiple domains run OCaml code simultaneously on separate cores.

The “one domain per core” is more of a programming convention, not a runtime guarantee — in practice, spawning \(N\) domains for \(N\) cores is the recommended pattern to maximise parallel throughput while avoiding context-switching overhead.

let d = Domain.spawn (fun () ->
  (* runs in parallel on another core *)
  expensive_computation ()
)
let result = Domain.join d
Code Snippet 10: shape of how domains are used

Some nuances :

  1. Domains are supposed to be heavyweight – we shouldn’t be creating thousands of them (more like Go threads than goroutines). Pattern: N domains @ startup, one per-core then distribute the work using queues.

  2. Data Races are possible

    • memory model spec makes it such that non-atomic shared mutable state requires explicit synchronisation
  3. Atomic module gives basic lock-free primitives

    • make, get, set, compare_and_set, fetch_and_add
  4. GC is parallel and concurrent

    • minor heap \(\implies\) most allocation is thread-local \(\implies\) great performance
  • Domainslib: for structured parallelism; nested-parallel programming

    It’s what we reach for when we want to do CPU-bound parallel-work. Work-stealing scheduler allows us to get load-balancing automatically.

    Two key mechanisms that it offers:

    1. async/await mechanism for spawning parallel tasks and awaiting their results
    2. provisioning of parallel iterators

    Feature-rich, offers:

    • Task.pool: fixed-sized domain pool (with a work-stealing scheduler within it)
    • Task.run: submits work to the pool
    • Task.async / Task.await: futures within the pool
    • Task.parallel_for: for parallel iteration
    1
    2
    3
    4
    5
    6
    7
    
    let pool = Task.setup_pool ~num_domains:3 ()
    let result = Task.run pool (fun () ->
      let a = Task.async pool (fun () -> heavy_work_1 ()) in
      let b = Task.async pool (fun () -> heavy_work_2 ()) in
      Task.await pool a + Task.await pool b
    )
    Task.teardown_pool pool
    Code Snippet 11: example of using Domainslib.Task

Effects – for structured concurrency #

Key mechanism for delimited continuations (coroutines) without committing to a specific scheduler – it’s how we can get Lwt-style concurrency without monadic wrapping.

Like a resumable exception, we perform an effect to signal something, and a handler above us in the call-stack can resume the computation later – likely after some I/O is done.

effects let us define our own concurrency model

Instead of baking concurrency into the runtime (like in OS threads) or encoding it in monads (like in Lwt / Async), we can choose to write our own:

  • schedulers
  • fibers (lightweight threads)
  • async/await systems
  • generators
  • backtracking engines

All of which will be within user-space.

(Cooperative) Concurrency requires us to handle the following actions:

  • suspending work
  • resuming it later
  • interleaving multiple tasks

So in effects-terms:

  1. a fiber does some work
  2. hits an effect like Yield or Await
  3. the scheduler captuers its continuations
  4. runs something else
  5. resumes later

We can write direct-style concurrent code Code that looks like ordinary, sequential code. Contrast with the monadic-style that we had to be reliant on before. and let the underlying scheduler use effects to manage fibers.

Effect handlers come in 2 flavours:

  1. Effect.Deep : handler resumes the continuation multiple times (full-coroutines)

  2. Effect.Shallow: handle resumes once and must re-install itself (more explicit, slightly more efficient)

This was a brief outline. Notes on Structured Concurrency with Effects can be seen here.
  • Eio – I/O Library for OCaml 5

    Intends to be the replacement for Lwt, built entirely on Effects, gives direct-style concurrency with structured concurrency guarantees.

    Key ideas:

    1. Switch: an ownership scope for fibers. All fibers in a switch must finish before the switch exits. It’s structured concurrency Rough idea of structured concurrency: concurrency that follows the shape of the code’s scope. This means we should have tasks started and finished within a code-block and if we need to leave it, error-propagation happens in a controlled fashion without any “dangling” background tasks.

    2. Fiber: a lightweight coroutine. Within a switch, Fibers run cooperatively; scheduler switches between them at I/O points

    3. Stream / Promise: communication primitives between Fibers.

    4. Can be combined (Eio + Domainslib): we can use Eio for I/O concurrency within a domain and we can use Domainslib for CPU parallelism across domains.

Async for Cooperative Concurrency #

There’s some ecosystem context to internalise, which can be found here. Pre-OCaml 5, there’s 2 approaches to co-operatative concurrency that is exposed via Async and Lwt. They overlap quite a bit, and have similar primitives. Lwt notes can be found here.

Pre-OCaml5 offers a hybrid model that aims to bring the best of both the threading model and event-loop model of concurrency. More performant + without the sync woes of pre-emptive threads.

Some general approaches to concurrency

We first consider some general approaches to concurrency:

  1. pre-emptive system threads

    each task that may require simultaneous waiting is given an OS thread of its own so it can block without stopping the entire program e.g. Java, C#

    complications:

    1. significant memory and resource usage per thread

    2. OS can arbitrarily interleave the execution of system threads

      need good sync mechanisms handled: so, programmer needs to protect shared resource state (e.g. with locks, guarded with conditionals) \(\implies\) error-prone

  2. single-threaded programs running event loops

    Event loop reacts to external events like timeouts or mouse-clicks by invoking a callback function that has been registered for that purpose. e.g. JavaScript, GUI toolkits.

    1. 1 task at a time, so no need to have complex sync mechanisms

    complications:

    1. the control structure of event-driven programs has inverted control structure \(\implies\) your own control flow has to be threaded awkwardly through the system’s event loop \(\implies\) possible callback hell.

Async Basics #

Async is an explicit package of its own, designed to be an extension.

Comparing Async vs Core #

(* ==== 1: sync example of reading something *)
open Core;;
#show In_channel.read_all;;
(* val read_all : string -> string *)
Out_channel.write_all "test.txt" ~data:"This is only a test.";;
(* - : unit = () *)
In_channel.read_all "test.txt";; (* -- NB: it's type shows that it's a blocking operation*)
(* - : string = "This is only a test." *)

(* ==== 2: Async version: *)
#require "async";;
open Async;;
#show Reader.file_contents;;
(* val file_contents : string -> string Deferred.t *)

(* ==== 3: using deferred values: *)
let contents = Reader.file_contents "test.txt";;
(* val contents : string Deferred.t = <abstr> *)
Deferred.peek contents;;
(* - : string option = None *) (* -- NB: this indicates that it's not complete*)


(* -- on complete  *)
contents;; (*COUNTER INTUITIVE -- only the case for utop where this case of spontaneous evaluation happens *)
(* - : string = "This is only a test." *) (* -- NB: contents is a deferred type (container --  string Deferred.t) but this expression returns string type (the contained type)*)
Deferred.peek contents;;
(* - : string option = Some "This is only a test." *) (* -- peeking after completion*)

Deferred Types as a representation of promises #

Well-behaved Async functions are non-blocking. They return Deferred.t which is something of a placeholder – like a completable future / promise.

  • carrying out a non-blocking check:

    Deferred.peek is a way of checking completion of the future. It will always give us an optional, if we inspect it, we should see the abstract parameter concretised e.g. type string Deferred.t

  • GOTCHA: utop (repl) special behaviour

    If we directly refer to a deferred value from within utop (repl), we’re going to get the contained value directly instead of itself. This is because within a repl, this is a blocking operation (the scheduler starts if it hasn’t) and we effectively await the deferred this way.

    In scripted environments or within our own programs, this won’t happen because deferred values will never get spontaneously resolved. We would need to bind them with bind, >>=, let%bind constructs.

Binding as a monadic, sequencing primitive for concurrent computations #

This section was learnt before doing a review of Monads. It’s better to go through that section first then coming back to this.

We wish to await deferred computations and for that we use bind as the sequencing operator, which has a sugar-form >>=.

The function we bind to the deferred type will get invoked after the deferred is no longer empty e.g. Writer.save gets called only after the file contents were read via Reader.file_contents.

Bind expects that the function wraps up the result in a deferred type. For that, Async has a function called return which takes an ordinary value and wraps it up in a deferred.

now, bind and return form the monad functional pattern. – it’s one example of a monad.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#show Deferred.bind;;
(* val bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred. *)

(* ==== using the sequencing operator (bind) *)
let uppercase_file filename =
  Deferred.bind (Reader.file_contents filename)
    ~f:(fun text ->
       Writer.save filename ~contents:(String.uppercase text));;
(* val uppercase_file : string -> unit Deferred.t = <fun> *)
uppercase_file "test.txt";;
(* - : unit = () *)
Reader.file_contents "test.txt";;
(* - : string = "THIS IS ONLY A TEST." *)


(* ==== using the syntax sugar >>= *)
let uppercase_file filename =
  Reader.file_contents filename
  >>= fun text ->
  Writer.save filename ~contents:(String.uppercase text);;
(* val uppercase_file : string -> unit Deferred.t = <fun> *)

(* === needing to wrap the bind function in an async result (wrapped as a Deferred) *)
let count_lines filename =
  Reader.file_contents filename
  >>= fun text ->
  return (List.length (String.split text ~on:'\n'));;
val count_lines : string -> int Deferred.t = <fun>

(* -- this is the negative example: it would never have worked

let count_lines filename =
  Reader.file_contents filename
  >>= fun text ->
  List.length (String.split text ~on:'\n');;
Line 4, characters 5-45:
Error: This expression has type int but an expression was expected of type
         'a Deferred.t

*)


(* ==== deferred map: *)
let count_lines filename =
  Reader.file_contents filename
  >>| fun text ->
  List.length (String.split text ~on:'\n');;
(* val count_lines : string -> int Deferred.t = <fun> *)
count_lines "/etc/hosts";;
(* - : int = 10 *)

Scheduling in Async #

Async has a dedicated scheduler, which needs to be started explicitly for our own programs.

utop will handle this automatically for us.

This is similar to Python’s asyncio scheduler. Async is an event-loop model more analogous to Python’s asyncio, not Python’s threading module. Python’s thread model has a GIL (comparable to OCaml’s old GRL) but uses preemptive scheduling between threads. Async’s single-threaded cooperative scheduler differs from that.

Using Let syntax #

The value that the Let syntax brings is from a syntax perspective, the compiled output is the same.

Just compare the bind and map based syntax differences using let syntax.

Why this is useful:

  1. highlights the analogy between monadic bind and OCaml’s built-in let-binding \(\implies\) more uniform, readable code — it’s direct-style programming

  2. Let_syntax works for any monad, opening the right Let_syntax module is how we choose the correct monad to use.

    in the case of Async we implicitly have Deferred.Let_syntax opened – in some contexts, we may wish to do more explicit opens.

#require "ppx_let";;

(* ==== bind with let syntax *)
let count_lines filename =
  let%bind text = Reader.file_contents filename in
  return (List.length (String.split text ~on:'\n'));;
(* val count_lines : string -> int Deferred.t = <fun> *)

let uppercase_file filename =
  Reader.file_contents filename
  >>= fun text ->
  Writer.save filename ~contents:(String.uppercase text);;
(* val uppercase_file : string -> unit Deferred.t = <fun> *)

(* ==== map with let syntax *)
let count_lines filename =
  let%map text = Reader.file_contents filename in
  List.length (String.split text ~on:'\n');;
(* val count_lines : string -> int Deferred.t = <fun> *)

let count_lines filename =
  Reader.file_contents filename
  >>| fun text ->
  List.length (String.split text ~on:'\n');;

Incremental-filling using Ivars and Upon #

Ivar is about incremental filling of variables – we want to construct a deferred where we can programmatically decide when it gets filled in.

It’s low-level feature compared to map, bind and return. Very useful for building synchronisation patterns that isn’t well-supported.

3 fundamental operations with an ivar:

(* ==== syntax intro: using ivars *)
let ivar = Ivar.create ();;
(*
val ivar : '_weak1 Ivar.t =
  {Async_kernel__.Types.Ivar.cell = Async_kernel__Types.Cell.Empty}
  *)
let def = Ivar.read ivar;;
(* val def : '_weak1 Deferred.t = <abstr> *)
Deferred.peek def;;
(* - : '_weak2 option = None *)
Ivar.fill ivar "Hello";;
(* - : unit = () *)
Deferred.peek def;;
(* - : string option = Some "Hello" *)
  1. create an ivar, use Ivar.create
  2. read: the deferred that corresponds to the ivar in question, using Ivar.read
  3. fill: fills the ivar, causing the deferred to be determined, using Ivar.fill

Here’s a custom delay-timeout synchronisation pattern, where:

Try sticking to the higher level of abstraction for the concurrency primitives instead of the lower level ones (ivars, upon) unless absolutely necessary for the thing we’re trying to express.
  • Understanding bind interms of Ivars and upon

    This part is just to help us visualise the abstraction barrier part.

    when we write let d' = Deferred.bind d ~f:

    1. new ivar i is created that will hold the final result of the computation, the deferred that is associated to this ivar is what is returned

    2. a function is registered to be called when d is determined

    3. that function when invoked, calls function f with the value determined for d

    4. another function is registered to be called when the deferred returned by f is determined (like step 2)

    5. calling that function fills i, causing the corresponding deferred to be determined

    (* ==== a custom demo implementation *)
    let my_bind d ~f =
      let i = Ivar.create () in
      upon d (fun x -> upon (f x) (fun y -> Ivar.fill i y));
      Ivar.read i;;
    (*
    val my_bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred.t =
      <fun>
      *)
  • 3 Fundamental Operations using Ivar

    (* ==== syntax intro: using ivars *)
    let ivar = Ivar.create ();;
    (*
    val ivar : '_weak1 Ivar.t =
      {Async_kernel__.Types.Ivar.cell = Async_kernel__Types.Cell.Empty}
      *)
    let def = Ivar.read ivar;;
    (* val def : '_weak1 Deferred.t = <abstr> *)
    Deferred.peek def;;
    (* - : '_weak2 option = None *)
    Ivar.fill ivar "Hello";;
    (* - : unit = () *)
    Deferred.peek def;;
    (* - : string option = Some "Hello" *)
    1. create an ivar, use Ivar.create
    2. read: the deferred that corresponds to the ivar in question, using Ivar.read
    3. fill: fills the ivar, causing the deferred to be determined, using Ivar.fill
  • A Custom Delay-Timeout Sync Pattern

    (* ==== delayed interface: *)
    module type Delayer_intf = sig
      type t
      val create : Time.Span.t -> t
      val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
    end;;
    (*
    module type Delayer_intf =
      sig
        type t
        val create : Time.Span.t -> t
        val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
      end
    *)
    
    
    (* ===== implementation  *)
    module Delayer : Delayer_intf = struct
      type t = { delay: Time.Span.t;
                 jobs: (unit -> unit) Queue.t;
               }
    
      let create delay =
        { delay; jobs = Queue.create () }
    
      let schedule t thunk =
        let ivar = Ivar.create () in
        Queue.enqueue t.jobs (fun () ->
          upon (thunk ()) (fun x -> Ivar.fill ivar x));
        upon (after t.delay) (fun () ->
          let job = Queue.dequeue_exn t.jobs in
          job ());
        Ivar.read ivar
    end;;
    (* module Delayer : Delayer_intf *)
    Code Snippet 12: the Delayed interface
    1. schedule receives an action in the form of a thunk that returns a deferred. Remember thunks are nullary functions (hence just receive unit () as the arg)

      the caller of schedule will receive the contents of deferred value returned by the thunk

      this is where we use the the upon operator for callback registration. upon schedules a callback to be executed when the deferred it is passed is determined, the difference being that upon will NOT return a new deferred for the callback to fill. val upon : 'a Deferred.t -> ('a -> unit) -> unit.

    2. our delayer uses a queue of thunks, where every call to schedule adds a thunk to the queue and schedules a job in the future to grab a thunk off the queue and run it.

      waiting done using after that takes a time-span and returns a deferred that will be determined after that time-span – this is just timeout behaviour that we’re familiar with in languages like JavaScript.

      the queue of thunks is literally a task queue. note how they run in order (execution order) even if they are scheduled by upon are run out of order (timing order). This part may be a little confusing:

      • Even though upon seems “sequential” because it mentions “after delay”, nothing forces those upon callbacks to respect order — only the explicit queue logic does.

      • remember that upon schedules side-effects, not values

      • inside the scheduler, Async doesn’t promise that the callbacks we registered with upon will execute in the same order they were enqueued. 2 independent deferred might resolve in any order (depending on timing) \(\implies\) upon introduces temporal non-determinism.

      • we make the order deterministic by using the queue, which reminds me of python’s ThreadPoolExecutor coupled with as_completed().

        the call to upon uses queue such that jobs are dequeued in the same order they were enqueued, even through the internal Async callbacks (upon) may be run in different timing order.

             Queue.enqueue t.jobs (fun () ->
               upon (thunk ()) (fun x -> Ivar.fill ivar x));
             (* == it's this call that enforces the deterministic execution order*)
             upon (after t.delay) (fun () ->
               let job = Queue.dequeue_exn t.jobs in
               job ());
        Code Snippet 13: the call to upon

Example: An Echo Server #

Objective: program that accepts connections from clients and spits back whatever is sent to it.

Initial helpers #

  • Handling the client connection

    We need a function that can copy data from input to output \(\implies\) this function handles the logic for handling a client connection.

    bind marks the places that we wait:

    1. first, when we call Reader.read to read a block of input
    2. then when it’s complete, if a new block was returned, then we write that block to the writer
    3. finally we wait until writer buffers are flushed, then we recurse.

    This is implemented with pushback which helps with memory-discipline:

    • if the process can’t make progress writing, it will stop reading.

    • this pattern is necessary so that the program DOES NOT allocate unbounded amounts of memory and keeps track of all the data that it intends to write but hasn’t been able to write yet.

    open Core
    open Async
    
    (** Copy data from the reader to the writer, using the provided buffer
       as scratch space *)
    let rec copy_blocks buffer r w =
      match%bind Reader.read r buffer with
      | `Eof -> return () (* -- termination condition*)
      | `Ok bytes_read ->
        Writer.write w (Bytes.to_string buffer) ~len:bytes_read;
        let%bind () = Writer.flushed w in (* -- this is what allows the pushback to happen: no further reading until the current buffer is done writing.*)
        copy_blocks buffer r w
  • Setting up a server to receive client connections

    We use Async’s Tcp module and its collection of utilities for creating TCP clients and servers.

    (** Starts a TCP server, which listens on the specified port, invoking
        copy_blocks every time a client connects. *)
    let run () =
      let host_and_port =
        Tcp.Server.create
          ~on_handler_error:`Raise
          (Tcp.Where_to_listen.of_port 8765)
          (* -- next arg is the client connection handler, the most important argument to the server create function*)
          (fun _addr r w ->
            let buffer = Bytes.create (16 * 1024) in
            copy_blocks buffer r w)
      in
      ignore
        (host_and_port
          : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t)
    • Calling Tcp.Server.create gives us a Tcp.Server.t, a handle to the server that can be used to shut the server down.

      we aren’t using those args here, that’s why ignore is used to supress the unused variables errors

    • out of the args for server creation, the last one is the client connection handler and it’s the most important one.

    • there’s no need for an explicit closing down of the lcient connections because the server will automatically shut down the connection once the deferred returned by the handler is determined.

  • Starting the Async scheduler

    A common newbie error is to forget to manually start the scheduler.

    (* Call [run], and then start the scheduler *)
    let () =
      run ();
      never_returns (Scheduler.go ())
    • without the scheduler, the program does NOTHING at all, even printf calls won’t reach the terminal so it can be bewildering.

    • though we don’t really add special code for handling multiple clients, the server can already do the concurrently connecting, reading and writing of data.

Tail-Calls and Chain of Deferreds #

The OCaml compiler is smart and essentially does a form of tail-call optimisation lifted to the Deferred monad.

One potential worry we might have had is that the chain of deferred calls (and the length of the chain) may result in memory allocation problems since this possibly could take up and unbounded amount of memory as the echo process continues.

The compiler will automatically optimise this whole chain of deferreds and identify when the final deferred is determined (in the copy_blocks example, it’s when we get Eof) and just flatten it into a single deferred \(\implies\) there’s no memory leak at all.

  • intuition: the bind that creates the deferred is in tail-position and nothing is done to that deferred once it’s created; it’s simply returned as is \(\implies\) this looks like a good candidate for tail call optimisation.

Functions that Never Return #

When starting the scheduler, we wrapped Scheduler.go around a never_returns.

  • this is an explicit marker – this makes it clear to the invoker of Scheduler.go that the function never returns.

  • by default the return type is inferred as 'a. If a function never returns, we’re free to impute any type at all to its non-existent return value so to allow that function (which never returns) to fit into any context within our program – we do this. We’ve seen this before in this book when learning about exceptions.

  • never_returns is an alias of Nothing.t, which is what the return type of Scheduler.go is defined as.

    Nothing.t is uninhabited, meaning there’s no values of that type, so a function can’t actually return a value of type Nothing.t i.e. a function that never returns. To do this, we just need to add type annotation:

      let rec loop_forever () : Nothing.t = loop_forever ();;
      (* val loop_forever : unit -> never_returns = <fun> *)
    
      (* === helpful error if we omit the never_returns: *)
      (*
    
        let do_stuff n =
        let x = 3 in
        if n > 0 then Scheduler.go ();
        x + n;;
    
    
    
      Line 3, characters 19-34:
      Error: This expression has type never_returns
             but an expression was expected of type unit
             because it is in the result of a conditional with no else branch
    
        *)
    
      (* ==== actually using the never_returns properly *)
      let do_stuff n =
        let x = 3 in
        if n > 0 then never_returns (Scheduler.go ());
        x + n;;
      (* val do_stuff : int -> int = <fun> *)

    We had seen uninhabited types earlier before in a different context and use-case back when learning about narrowing without using GADTs

Improving the Echo Server #

Improvements list:

  1. add a proper CLI with Command
  2. add a flag to specify the port to listen on and a flag to make the server echo back the capitalised version of input
  3. simplify the code using Asyn’s Pipe interface
open Core
open Async

let run ~uppercase ~port =
  let host_and_port =
    Tcp.Server.create
      ~on_handler_error:`Raise
      (Tcp.Where_to_listen.of_port port)
      (fun _addr r w ->
        Pipe.transfer
          (Reader.pipe r)
          (Writer.pipe w)
          ~f:(if uppercase then String.uppercase else Fn.id))
  in
  ignore
    (host_and_port
      : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t);
  Deferred.never () (* --- returns a deferred that is never determined, indicating that the echo server doesn't shut down*)

let () =
  Command.async
    ~summary:"Start an echo server"
    (let%map_open.Command uppercase =
       flag
         "-uppercase"
         no_arg
         ~doc:" Convert to uppercase before echoing back"
     and port =
       flag
         "-port"
         (optional_with_default 8765 int)
         ~doc:" Port to listen on (default 8765)"
     in
     fun () -> run ~uppercase ~port)
  |> Command_unix.run

NOTEs:

  1. Deferred.never in the run function shows that the echo server doesn’t ever shut down. Deferred.never returns a deferred that is never determined.
  2. Use of Aync’s Pipe is the key change in the code.
    • Pipe is an async comms channels used for connecting different parts of the program

    • visualise this as a consumer/producer queue that uses deferred for communicating when the pipe is ready to be read from or written into

    • we’re using one of the many utility functions within the Pipe module:

      • Pipe.transfer to set up a process that takes data from a reader-pipe and moves it to a writer-pipe (type: - : 'a Pipe.Reader.t -> 'b Pipe.Writer.t -> f:('a -> 'b) -> unit Deferred.t = <fun>)
      • Reader.pipe and Writer.pipe are the two pipes that are connected here.
    • this still preserves the pushback: if the writer gets blocked, the writer’s pipe will stop pulling data from the reader’s pipe and this prevents the reader from reading in more data.

      the deferred by Pipe.transfer becomes determined once the reader has been closed and the last element is transferred from the reader to the writer. So the server shuts down that client connection once that deferred becomes determined.

  • Using Pipes

    1. they are created in connected read/write pairs:

         let (r,w) = Pipe.create ();;
         (* val r : '_weak3 Pipe.Reader.t = <abstr> *)
         (* val w : '_weak3 Pipe.Writer.t = <abstr> *)
      • the r and w are weakly polymorphic
    2. Pipes have some internal slack: a number of slits to which something can be written before the write will block.

      By default the slack is 0 – so the deferred returned by a write is determined only when the value is read out of the pipe.

         let (r,w) = Pipe.create ();;
         (* val r : '_weak4 Pipe.Reader.t = <abstr> *)
         (* val w : '_weak4 Pipe.Writer.t = <abstr> *)
         let write_complete = Pipe.write w "Hello World!";;
         (* val write_complete : unit Deferred.t = <abstr> *)
         Pipe.read r;; (* -- so reading from the pie here will determine the deferred for the write  to the pipe:*)
         (* - : [ `Eof | `Ok of string ] = `Ok "Hello World!" *)
         write_complete;;
         (* - : unit = () *)

Example: Searching Definitions with DuckDuckGo #

Setup #

Install the following libs:

  1. textwrap
  2. uri
  3. yojason
  4. cohttp

URI Handling #

We’d need to do some UI generation to be able to query the DDG servers:

open Core
open Async

(** Generate a DuckDuckGo search URI from a query string *)
let query_uri query =
  let base_uri =
    Uri.of_string "http://api.duckduckgo.com/?format=json"
  in
  Uri.add_query_param base_uri ("q", [ query ])

Parsing JSON Strings #

HTTP response from DDG is in JSON, so we’d need to parse it.

(** Extract the "Definition" or "Abstract" field from the DuckDuckGo
   results *)
let get_definition_from_json json =
  match Yojson.Safe.from_string json with
  | `Assoc kv_list ->
    let find key =
      match List.Assoc.find ~equal:String.equal kv_list key with
      | None | Some (`String "") -> None
      | Some s -> Some (Yojson.Safe.to_string s)
    in
    (match find "Abstract" with
    | Some _ as x -> x
    | None -> find "Definition")
  | _ -> None

Executing an HTTP Client Query #

For the query dispatching:

(* Execute the DuckDuckGo search *)
let get_definition word =
  let%bind _, body = Cohttp_async.Client.get (query_uri word) in
  let%map string = Cohttp_async.Body.to_string body in
  word, get_definition_from_json string



(* ==== understanding the signature for cohttp-async.Client.get *)
#require "cohttp-async";;
#show Cohttp_async.Client.get;;
(*
val get :
  ?interrupt:unit Deferred.t ->
  ?ssl_config:Conduit_async.V2.Ssl.Config.t ->
  ?headers:Cohttp.Header.t ->
  Uri.t -> (Cohttp.Response.t * Cohttp_async.Body.t) Deferred.t
*)

Understanding Cohttp_async.Client.get

  1. required arg: URI

  2. returns a deferred value containing a Cohttp.Response.t and a Pipe Reader through which the Cohttp_async.Body.t of the request will be streamed

    because the body isn’t that large, we just read the entire body directly using Body.to_string (instead of chunking it)

Dispatching multiple searches in parallel #

We use Deferred.all for our dispatcher. It preserves the order of the deferred that are passed to it.

Printing also only happens until all the results arrive.

(* === Print out a word/definition pair *)
let print_result (word, definition) =
  printf (* -- this is an async printf that goes through the async scheduler *)
    "%s\n%s\n\n%s\n\n"
    word
    (String.init (String.length word) ~f:(fun _ -> '-'))
    (match definition with
    | None -> "No definition found"
    | Some def ->
      String.concat ~sep:"\n" (Wrapper.wrap (Wrapper.make 70) def))


(* === Parallel dispatcher: *)
(* Run many searches in parallel, printing out the results after
   they're all done. *)
let search_and_print words =
  let%map results = Deferred.all (List.map words ~f:get_definition) in
  List.iter results ~f:print_result


(* ==== alternative: retrieve it pipelined, in order of completion *)
(* Run many searches in parallel, printing out the results as you
   go *)
(*
    Deferred.all_unit;;
    - : unit Deferred.t list -> unit Deferred.t = <fun>
*)
let search_and_print words =
  Deferred.all_unit
    (List.map words ~f:(fun word ->
         get_definition word >>| print_result))

Finally, a CLI interface for this :

let () =
  Command.async
    ~summary:"Retrieve definitions from duckduckgo search engine"
    (let%map_open.Command words =
       anon (sequence ("word" %: string))
     in
     fun () -> search_and_print words)
  |> Command_unix.run

Exception Handling #

We should expect flakiness, especially when concurrently using external resources.

Monitors #

Example: Handling exceptions with DuckDuckGo #

Timeouts, Cancellation, and Choices #

Omitted this part, just didn’t get to it 🤷🏽

Working with System Threads #

Related notes can be found in this section, “Interaction with Systhreads” in Multicore OCaml — useful to juxtapose the two.

Earlier, there was a comparison between using cooperative threading model with an event-loop (Async) vs pre-emptive system threads, and we looked at their tradeoffs.

There’s value to using the kernel-level threads (whose interleaving is controlled by the operating system) – because they are useful. The OS-premption means that synchronisation onus was on the programmer via locks and other primitives.

The Global Runtime Lock (GRL) applies to the use of OCaml’s system threads – this limits the runtime to a single lock \(\implies\) at most one thread can be holding at a time \(\implies\) there’s no physical parallelism.

comparison for usefulness:

  1. async:
    • built around non-blocking I/O, an event loop, with monadic / continuation style composition \(\implies\) code had to be written in an async style – with the blocking operations being explicit regions
  2. System Threads:
    1. Blocking Sys-calls Utility: for OS sys calls that were blocking (no non-blocking alternative) so we can’t run them directly within the system like Async without the whole programme being blocked.

      • thread pool is maintained by Async for running these calls
    2. FFI utility: to deal with non-OCaml libraries that have their own event loop – or need their own threads

      • better to delegate it to another thread for this case – the OCaml code on the foreign thread is part of the comms to the main program (KIV FFI)
    3. Interoperation with Compute-Intensive OCaml Code:

      • if there’s a long-running computation that doesn’t call bind or map then it blocks the Async runtime until it’s done
      • an approach: chunk up the calculation and do it map-reduce style: smaller pieces that are separated by =bind=s for explicit yielding
      • alternative approach: run the code in a separate thread. Async’s In_thread module gives a bunch of utils for this – this is like spawning worker threads.

Multicore OCaml #

Notes for multicore OCaml can be found here and a dive into the GC design to retrofit thread safe GC can be found here.

For now, the interop between Async and System Thread (the fact that it’s tricky) needs some attention: Interoperability between system threads and Async is tricky because:

  1. Tricky: scheduling is OS-controlled if we delegate to OS threads

    Elaboration

    An example for using system threads on blocking routines, to show that Async and system thread interop can be tricky

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
       let log_delays thunk =
         let start = Time.now () in
         let print_time () =
           let diff = Time.diff (Time.now ()) start in
           printf "%s, " (Time.Span.to_string diff)
         in
         let d = thunk () in
         Clock.every (sec 0.1) ~stop:d print_time;
         let%bind () = d in
         printf "\nFinished at: ";
         print_time ();
         printf "\n";
         Writer.flushed (force Writer.stdout);;
       (* val log_delays : (unit -> unit Deferred.t) -> unit Deferred.t = <fun> *)
    Code Snippet 14: Function with a deferred-returning thunk. Runs thunk then prints heartbeat.

    when applied to a simple timeout deferred thunk, it will do the heartbeating. But if instead ofa clock event, we await on a busy loop to finish, then we see the whole thing being blocked.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    
       (* --- non-blocking time event *)
       log_delays (fun () -> after (sec 0.5));;
       (* the response woudl be something like:
       37.670135498046875us, 100.65722465515137ms, 201.19547843933105ms, 301.85389518737793ms, 402.58693695068359ms,
       Finished at: 500.67615509033203ms,
       - : unit = ()
        *)
    
    
       (* -- for the blocking example *)
       let busy_loop () =
         let x = ref None in
         for i = 1 to 100_000_000 do x := Some i done;;
       (*
       val busy_loop : unit -> unit = <fun>
       *)
    
    
       log_delays (fun () -> return (busy_loop ()));;
       (*Finished at: 874.99594688415527ms,
       - : unit = ()*)
    Code Snippet 15: Non-blocking clock event vs blocking busy event

    This won’t be blocking anymore if we offload the thread scheduling to the OS:

    1
    2
    3
    4
    5
    6
    7
    
       log_delays (fun () -> In_thread.run busy_loop);;
       (*
         31.709671020507812us, 107.50102996826172ms, 207.65542984008789ms, 307.95812606811523ms, 458.15873146057129ms, 608.44659805297852ms, 708.55593681335449ms, 808.81166458129883ms,
    
         Finished at: 840.72136878967285ms,
       - : unit = ()
       *)
    Code Snippet 16: Delegating to OS Threads, including the scheduling

    At the same time, the scheduling is no longer cooperative (no longer at clean 100ms interval do we get a heartbeat) because system threads are scheduled by the OC.

  2. Tricky runtime consequence: scheduling behaviour differs between native vs bytecode runs

    Elaboration of different scheduling behaviour: run as bytecode vs run as native code

    For native code, it has to run up to certain runtime checkpoints before yielding to another OCaml thread – allocation being one of the main opportunities for doing that

    For bytecode, it has its own interpreter-level scheduling behaviour so it’s possible to yield even for a non-allocating loop and other threads can have a chance to run

    1. So if loop never allocates then it can keep hogging runtime in native code and starve other OCaml threads.

      • native code relies more directly on runtime lock handoff points – out of which allocation is a key one – so if no allocation then it will hog runtime in native code
        • allocation (of memory) is also a synchronization point, when the runtime checks if it should let another thread proceed – that’s why code that allocates tends to be more interleavable
    2. it’s a runtime consequence: bytecode (native-code executable) behaves differently from native code.

Thread-Safety and Locking #

  1. Needs explicit sync on mutable data-structures else they’ll behave non-deterministically when multiple concurrent threads access it

    examples of issues:

    1. runtime exceptions
    2. corrupted data structures
  2. even things like lazy values can be faulty when multiple threads access it

  3. Mutex packages available:

    • Mutex: part of the stdlib
    • Nano_mutex: efficient alternative, exploits the locking done by the OCaml runtime to avoid needing to create an OS-level mutex (~20x faster to create Nano_mutex.t vs Mutex.t and ~40x faster to acquire the mutex)
      • seems like this could be a stale piece of information, correct @ the point the book was written — since Nano_mutex hasn’t been actively maintained
  4. Conditions that make interoperability between Async and threads easier :

    1. no shared mutable state between various threads involved

    2. computations executed by In_thread.run do not make any calls to the Async library

      • this can be violated by foreign threads – which may acquire teh Async lock using calls from Thread_safe module within Async so that Async computations can be run safely – this is complex though, has to be deliberated on before using.