Skip to main content
  1. Readings/
  2. Books/
  3. Real World OCaml: Functional Programming for the Masses/

Chapter 16: Concurrent Programming

·· 4555 words· 19–31 min read
This is still a WIP, I’m currently doing a deep-dive into multicore ocaml before and after…

This chapter goes beyond the book because in the OCaml ecosystem, there’s two phases to using async:

  1. pre-OCaml 5 – cooperative concurrency as the typical concurrency model, with heavy reliance on libraries

  2. post-OCaml 5 – better primitives for multicore support

We’ll need to learn both.

Fundamentals #

references to pepper in soon:

  1. 2017 paper introducing multicore ocaml and how to use it (ref)
  2. tarides parallelism tutorial (ref) – also how the whole thing happened, the journey to ocaml multicore (ref)
  3. ocaml manual on parallel programming ([[https://ocaml.org/manual/5.4/parallelism.html

][ref]])

Understanding Monads #

Lwt relies on the monadic pattern, which is beautiful for clear composition. The OCaml docs on monads is an excellent reference for this.

Monad Signature Using examples: Maybe, Writer, Lwt #

The docs first outlay the structural and application implications before formalising monad laws. We follow that approach.

Monadic structures should be seen as structures that follow the monad design pattern.

1
2
3
4
5
module type Monad = sig
  type 'a t
  val return : 'a -> 'a t
  val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t
end
Code Snippet 1: Monad Signature, Satisfied by Monad Structures
meaning
returnputs value in box, has a trivial effect (in-terms of computation) when it does
bindgives the ability to compose and sequence operations

We use case studies to see this design pattern:

  • Maybe monad / Option monad / Error monad

    Supposed the basic arithmetic functions, we want to wrap them such that if there’s any error (e.g. div by 0) then we return None instead of throwing an exception.

    pedestrian solution that isn’t quite monadic yet

    We have some ways to make this feel monadic. We could re-implement the existing operators and that will give us ridiculous amount of boilerplate in the form of pattern-matches and such. It will work but we see the excessive effort needed for doing this:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    let plus_opt (x:int option) (y:int option) : int option =
      match x,y with
      | None, _ | _, None -> None
      | Some a, Some b -> Some (Stdlib.( + ) a b)
    
    let ( + ) = plus_opt
    
    let minus_opt (x:int option) (y:int option) : int option =
      match x,y with
      | None, _ | _, None -> None
      | Some a, Some b -> Some (Stdlib.( - ) a b)
    
    let ( - ) = minus_opt
    
    let mult_opt (x:int option) (y:int option) : int option =
      match x,y with
      | None, _ | _, None -> None
      | Some a, Some b -> Some (Stdlib.( * ) a b)
    
    let ( * ) = mult_opt
    
    let div_opt (x:int option) (y:int option) : int option =
      match x,y with
      | None, _ | _, None -> None
      | Some a, Some b ->
        if b=0 then None else Some (Stdlib.( / ) a b)
    
    let ( / ) = div_opt
    Code Snippet 2: Pattern-matched, non-monadic solution

    If we try to deduplicate all the unnecessary boilerplate, then we will achieve something like this:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    let propagate_none
      (op : int -> int -> int option) (x : int option) (y : int option)
    =
      match x, y with
      | None, _ | _, None -> None
      | Some a, Some b -> op a b
    
    
    let wrap_output (op : int -> int -> int) (x : int) (y : int) : int option =
      Some (op x y)
    
    let ( + ) = propagate_none (wrap_output Stdlib.( + ))
    let ( - ) = propagate_none (wrap_output Stdlib.( - ))
    let ( * ) = propagate_none (wrap_output Stdlib.( * ))
    
    let div (x : int) (y : int) : int option =
      if y = 0 then None else wrap_output Stdlib.( / ) x y
    
    let ( / ) = propagate_none div

    The upgrade functionality can be generic and defined like so :

    let upgrade : (int -> int option) -> (int option -> int option) =
      fun (op : int -> int option) (x : int option) -> (x >>= op)
    
     (* -- without the annotations:  *)
    let upgrade op x = x >>= op
    Code Snippet 3: A higher-order, upgrade function helps deduplicate

    So we could just use return and >>= (bind) functions to reimplement the whole thing:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    let ( + ) (x : int option) (y : int option) : int option =
      x >>= fun a ->
      y >>= fun b ->
      return (Stdlib.( + ) a b)
    
    let ( - ) (x : int option) (y : int option) : int option =
      x >>= fun a ->
      y >>= fun b ->
      return (Stdlib.( - ) a b)
    
    let ( * ) (x : int option) (y : int option) : int option =
      x >>= fun a ->
      y >>= fun b ->
      return (Stdlib.( * ) a b)
    
    let ( / ) (x : int option) (y : int option) : int option =
      x >>= fun a ->
      y >>= fun b ->
      if b = 0 then None else return (Stdlib.( / ) a b)

    linking this to the monadic design pattern from our pedestrian approach:

    wrap_outputactually the return functionUpgrading a value from int to int option by wrapping
    modifying the fnsbindHad to upgrade functions whose inputs were of type int to instead accept inputs of type int option – this upgrades the operation

    We can deduplicate the common parts of the monadic approach further by having a generic upgrade function:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    let upgrade_binary op x y =
      x >>= fun a ->
      y >>= fun b ->
      op a b
    
    let return_binary op x y = return (op x y)
    
    let ( + ) = upgrade_binary (return_binary Stdlib.( + ))
    let ( - ) = upgrade_binary (return_binary Stdlib.( - ))
    let ( * ) = upgrade_binary (return_binary Stdlib.( * ))
    let ( / ) = upgrade_binary div
    Code Snippet 4: A deduplicated approach to creating the maybe monad

    Mental model for the bind e.g. in x >>= fun a -> y >>= fun b -> op a b :

    1. take x and extract form it the value a
    2. take y and extract from it b
    3. then use a and b to construct a written value using the operation op

    Here’s how we would implement the monad signature for the maybe monad

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    module Maybe : Monad = struct
      type 'a t = 'a option
    
      let return x = Some x
    
      let (>>=) m f =
        match m with
        | None -> None
        | Some x -> f x
    end
    Code Snippet 5: Implementation for the monad signature
  • Writer Monad

    Suppose we want the effect of loggability (a log-writer that allows us to still compose the original routines).

    intuiting the monadic approach

    We first consider approaches that don’t have the monad pattern within, we want to make inc and dec loggable below:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    let inc x = x + 1;;
    let dec x = x - 1;;
    
    let inc_log x = (x + 1, Printf.sprintf "Called inc on %i; " x);;
    let dec_log x = (x - 1, Printf.sprintf "Called dec on %i; " x);;
    
    (* using a composition operator: *)
    let ( >> ) f g x = x |> f |> g;;
    
     (* we can compose like so:  *)
    let id = inc >> dec;;
    
    (* PROBLEM: the pattern isn't monadic, so we get an error from this:  *)
    (* let id = inc_log >> dec_log *)
    
    let dec_log_upgraded (x, s) =
      (x - 1, Printf.sprintf "%s; Called dec on %i; " s x);;
    
    let inc_log_upgraded (x, s) =
      (x + 1, Printf.sprintf "%s; Called inc on %i; " s x);;
    
    let id = dec_log >> inc_log_upgraded;;
    Code Snippet 6: Non-monadic approach to log-writer impl

    We try to use a few helpers to reduce the boilerplate, to get a more monadic pattern :

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    let log (name : string) (f : int -> int) : int -> int * string =
      fun x -> (f x, Printf.sprintf "Called %s on %i; " name x);;
    
    let loggable (name : string) (f : int -> int) : int * string -> int * string =
      fun (x, s1) ->
        let (y, s2) = log name f x in
        (y, s1 ^ s2);;
    
    let inc' : int * string -> int * string =
      loggable "inc" inc;;
    
    let dec' : int * string -> int * string =
      loggable "dec" dec;;
    
    let id' : int * string -> int * string =
      inc' >> dec';;
    
    let e x = (x, "") ;;  (* -- for empty-logging*)
    
    id' (e 5)  (*returns: (5, "Called inc on 5; Called dec on 6; ")*)
    Code Snippet 7: intuitive monadic pattern form
    ereturntrivial effect of putting value into the metaphorical box, with empty log msg
    loggableuses bind, >>=We need code that handles pattern matching against pairs and string concat
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    module Writer : Monad = struct
      type 'a t = 'a * string
    
      let return x = (x, "")
    
      let ( >>= ) m f =
        let (x, s1) = m in
        let (y, s2) = f x in
        (y, s1 ^ s2)
    
      (* extra: Loggable implemented using >>= *)
      let loggable (name : string) (f : int -> int) : int * string -> int * string =
        fun m ->
        m >>= fun x ->
        log name f x
    end
    Code Snippet 8: Writer monad for logging effects

    Familiarity with the >>= binding operator use and the monadic style of programming, our code can now concentrate on 'a in the type 'a Writer.t instead of fixed to string – the writer monad automatically keeps our code generic so long as we use return and >>=.

  • Lwt monad

    1. it’s a promise-based library, satisfies this monadic interface to compose / chain promises togther
      • bind registers callbacks
      • the type 'a is the “something more” within the box, which are the values that get produces asynchronously when the promise is resolved (instead of immediately)
    2. what’s abstracted away from us:
      • how 'a t and return creates references
      • the mechanism of callback registry just hidden within bind, well-encapsulated

Monad Laws #

Refers to invariants that govern the behaviours of the monad “data-structure”. Specifications are technically related to particular monads like the writer monad or Lwt monad.

There’s some generic laws still.

context: compose monad operator (>=>)

To compose monadic functions.

let compose f g x =
  f x >>= fun y ->
  g y

let ( >=> ) = compose
Code Snippet 9: compose function implementation

The idea here is that if we have 2 functions that we wish to compose:

  • f: 'a -> 'b t
  • g: 'b -> 'c t

then we have compose f g : 'a -> 'c t then we ’re supporting the flow where composed fn takes a value of 'a, applies f to it, extracts out 'b out of the result, applies g to it, returns that value of type 'c t.

Laww/o composew compose
[1] Left Identityreturn x >>= f \(\iff\) f xreturn >=> f \(\iff\) f
[2] Right Identitym >>= return \(\iff\) mf >=> return \(\iff\) f
[3] Associative(m >>= f) >>= g \(\iff\) m >>= (fun x -> f x >>= g)(f>=>g)>=>h \(\iff\) f>=>(g>=>h)
  • Law 1: Left-Identity return >=> f behaves the same as f

    Law 1 says that having the trivial effect on a value, then binding a function on it, is the same as just calling the function on the value.

    • same behaviour as in two expressions that will both evaluate to the same value, or they enter an infinite loop or they both raise the same exception
    • examples:
      • maybe monad: return x would be Some x, and >>= f would extract x and apply f to it
      • Lwt monad: return x would be a promise that is already resolved with x, and >>= f would register f as a callback to run on x.
  • Law 2: Right-Identity f >=> return behaves the same as f

    Law 2 says that binding on the trivial effect is the same as just not having the effect.

    • examples :
      • maybe monad: m >>= return would depend upon the 2 cases: whether m is Some x or None.

        1. m is Some x : binding would extract x, and return would just re-wrap it with Some.
        2. m is None binding would just return None.
      • Lwt monad: binding on m would register return as a callback to be run on the contents of m after it is resolved, and return would just take those contents and put them back into an already resolved promise.

  • Law 3: Associative (f>=>g)>=>h behaves the sames as f>=>(g>=>h)

    Law 3 says that bind sequences effects correctly

Overview #

Table 1: Comparison of Concurrency Primitives across Eras
PrimitiveEraParallelismStyleUse case
ThreadPre-5No (GRL)PreemptiveBlocking C FFI
Mutex / ConditionBothImperativeThread synchronization
Lwt.tPre-5 (still used)NoMonadicI/O concurrency
Async.Deferred.tPre-5NoMonadicI/O concurrency (Jane Street)
DomainOCaml 5YesImperativeCPU parallelism
AtomicOCaml 5YesLock-freeShared mutable state
EffectOCaml 5Direct-styleScheduler building block
Domainslib.TaskOCaml 5YesDirect-styleStructured parallelism
Eio.FiberOCaml 5Per-domainDirect-styleI/O concurrency

Pre-OCaml 5 #

  • cooperative concurrency: primitives to use OS-threads limited by the Global Runtime Lock This is fascinating stuff – this begs to be compared against Python’s recent journey of escaping the GIL and how concurrency primitives have evolved in Python itself. such that only one thread could execute at one time \(\implies\) interleaving but not true simultaneous execution

    The reason the GRL was such a limiting factor was because the GC was not thread-safe, that’s why it needed to enforce strictly that only one thread could execute OCaml code at any one time.

  • viable escape-hatch: using C FFI calls that could release the lock

  • Using Thread for control of POSIX threads

    1. it’s a wrapper on OS-level pthreads
    2. Preemptive: the scheduler may interrupt whenever
    3. Still can’t parallelise becuase only single thread exec at any time because of the GRL
    4. Useful for: blocking I/O that would release the lock, background thread coordinating a C library
    5. These were monitor-style primitives: Mutex, Condition, Semaphore in stdlib – everything limited by the GRL
  • Lwt - lightweight threads, as a concurrency library

    1. Key Idea:

      Instead of blocking, every potentially-blocking operation shall return a promise ('a Lwt.t). The scheduler (via an event-loop) runs promises to completion cooperatively; a promise yields control explicitly.

    2. Lwt is infectious: calling a blocking function (e.g. Unix.read) within a Lwt promise will block the entire event loop.

    3. Thread cancellation isn Lwt is cooperative and fragile – so Lwt.cancel raises Lwt.Canceled only @ yield points.

      that’s why we should always clean up threads with Lwt.finalize or Lwt_switch

  • Async – Jane Street’s alternative

    It’s the same cooperative model, with different aesthetics and tradeoffs:

    • Instead of Lwt.t, it uses Deferred.t
    • Integrated with Core (JS’s stdlib alternative)
    • Scheduler is more explicit – we have to start the event loops explicitly via Async.Scheduler.go ()
    • Piping capability of streams is stronger because it’s more composable (Pipe.t)
    • Has better Clock primitives
  • Multi-processing as a parallelism workaround

    • Real CPU parallelism ended up requiring multiple OS processes communicating over Unix pipes or sockets

    • libs like Parmap helped to make this more ergonomic

Multicore OCaml >= 5 #

Has 2 orthogonal primitives, onto which the other libs are all built:

  • Domains: for true parallelism
  • Effects: for structured control flow
  • Domains – for parallelism

    ~ maps to a core – each domain runs on its own OS thread, has its own local minor heap (with a local GC) and it shares a major heap with the rest of the domains. Since there’s no GRL, multiple domains can run OCaml code simultaneously.

    let d = Domain.spawn (fun () ->
      (* runs in parallel on another core *)
      expensive_computation ()
    )
    let result = Domain.join d
    Code Snippet 10: shape of how domains are used

    Some nuances :

    1. Domains are supposed to be heavyweight – we shouldn’t be creating thousands of them (more like Go threads than goroutines). Pattern: N domains @ startup, one per-core then distribute the work using queues.

    2. Data Races are possible

      • memory model spec makes it such that non-atomic shared mutable state requires explicit synchronisation
    3. Atomic module gives basic lock-free primitives

      • make, get, set, compare_and_set, fetch_and_add
    4. GC is parallel and concurrent

      • minor heap \(\implies\) most allocation is thread-local \(\implies\) great performance
    • Domainslib: for structured parallelism

      It’s what we reach for when we want to do CPU-bound parallel-work. Work-stealing scheduler allows us to get load-balancing automatically.

      Feature-rich, offers:

      • Task.pool: fixed-sized domain pool (with a work-stealing scheduler within it)
      • Task.run: submits work to the pool
      • Task.async / Task.await: futures within the pool
      • Task.parallel_for: for parallel iteration
      1
      2
      3
      4
      5
      6
      7
      
      let pool = Task.setup_pool ~num_domains:3 ()
      let result = Task.run pool (fun () ->
        let a = Task.async pool (fun () -> heavy_work_1 ()) in
        let b = Task.async pool (fun () -> heavy_work_2 ()) in
        Task.await pool a + Task.await pool b
      )
      Task.teardown_pool pool
      Code Snippet 11: example of using Domainslib.Task
  • Effects – for concurrency

    Key mechanism for delimited continuations (coroutines) without committing to a specific scheduler – it’s how we can get Lwt-style concurrency without monadic wrapping.

    Like a resumable exception, we perform an effect to signal something, and a handler above us in the call-stack can resume the computation later – likely after some I/O is done.

    We can write direct-style concurrent code Code that looks like ordinary, sequential code. Contrast with the monadic-style that we had to be reliant on before. and let the underlying scheduler use effects to manage fibers.

    Effect handlers come in 2 flavours:

    1. Effect.Deep : handler resumes the continuation multiple times (full-coroutines)

    2. Effect.Shallow: handle resumes once and must re-install itself (more explicit, slightly more efficient)

    • Eio – I/O Library for OCaml 5

      Intends to be the replacement for Lwt, built entirely on Effects, gives direct-style concurrency with structured concurrency guarantees.

      Key ideas:

      1. Switch: an ownership scope for fibers. All fibers in a switch must finish before the switch exits. It’s structured concurrency Rough idea of structured concurrency: concurrency that follows the shape of the code’s scope. This means we should have tasks started and finished within a code-block and if we need to leave it, error-propagation happens in a controlled fashion without any “dangling” background tasks.

      2. Fiber: a lightweight coroutine. Within a switch, Fibers run cooperatively; scheduler switches between them at I/O points

      3. Stream / Promise: communication primitives between Fibers.

      4. Can be combined (Eio + Domainslib): we can use Eio for I/O concurrency within a domain and we can use Domainslib for CPU parallelism across domains.

Pre-Ocaml 5 #

Async #

Pre-OCaml5 offers a hybrid model that aims to bring the best of both the threading model and event-loop model of concurrency. More performant + without the sync woes of pre-emptive threads.

Some general approaches to concurrency

We first consider some general approaches to concurrency:

  1. pre-emptive system threads

    each task that may require simultaneous waiting is given an OS thread of its own so it can block without stopping the entire program e.g. Java, C#

    complications:

    1. significant memory and resource usage per thread

    2. OS can arbitrarily interleave the execution of system threads

      need good sync mechanisms handled: so, programmer needs to protect shared resource state (e.g. with locks, guarded with conditionals) \(\implies\) error-prone

  2. single-threaded programs running event loops

    Event loop reacts to external events like timeouts or mouse-clicks by invoking a callback function that has been registered for that purpose. e.g. JavaScript, GUI toolkits.

    1. 1 task at a time, so no need to have complex sync mechanisms

    complications:

    1. the control structure of event-driven programs has inverted control structure \(\implies\) your own control flow has to be threaded awkwardly through the system’s event loop \(\implies\) possible callback hell.

Async Basics #

Async is an explicit package of its own, designed to be an extension.

Comparing Async vs Core
(* ==== 1: sync example of reading something *)
open Core;;
#show In_channel.read_all;;
(* val read_all : string -> string *)
Out_channel.write_all "test.txt" ~data:"This is only a test.";;
(* - : unit = () *)
In_channel.read_all "test.txt";; (* -- NB: it's type shows that it's a blocking operation*)
(* - : string = "This is only a test." *)

(* ==== 2: Async version: *)
#require "async";;
open Async;;
#show Reader.file_contents;;
(* val file_contents : string -> string Deferred.t *)

(* ==== 3: using deferred values: *)
let contents = Reader.file_contents "test.txt";;
(* val contents : string Deferred.t = <abstr> *)
Deferred.peek contents;;
(* - : string option = None *) (* -- NB: this indicates that it's not complete*)


(* -- on complete  *)
contents;; (*COUNTER INTUITIVE -- only the case for utop where this case of spontaneous evaluation happens *)
(* - : string = "This is only a test." *) (* -- NB: contents is a deferred type (container --  string Deferred.t) but this expression returns string type (the contained type)*)
Deferred.peek contents;;
(* - : string option = Some "This is only a test." *) (* -- peeking after completion*)
Deferred Types as a representation for promises

Deferred types

Well-behaved Async functions are non-blocking. They return Deferred.t which is something of a placeholder – like a completable future / promise.

  • carrying out a non-blocking check:

    Deferred.peek is a way of checking completion of the future. It will always give us an optional, if we inspect it, we should see the abstract parameter concretised e.g. type string Deferred.t

  • GOTCHA: special behaviour within utop (repl)

    If we directly refer to a deferred value from within utop (repl), we’re going to get the contained value directly instead of itself. This is because within a repl, this is a blocking operation (the scheduler starts if it hasn’t) and we effectively await the deferred this way.

    in scripted environments or within our own programs, this won’t happen because deferred values will never get spontaneously resolved. We would need to bind them with bind, >>=, let%bind constructs.

binding as a monadic primitive to sequence concurrent computations
  • sequencing concurrent computations via binding

    We wish to await deferred computations and for that we use bind as the sequencing operator, which has a sugar-form >>=.

    The function we bind to the deferred type will get invoked after the deferred is no longer empty e.g. Writer.save gets called only after the file contents were read via Reader.file_contents.

    Bind expects that the function wraps up the result in a deferred type. For that, Async has a function called return which takes an ordinary value and wraps it up in a deferred.

    now, bind and return form the monad functional pattern. – it’s one example of a monad.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    
      #show Deferred.bind;;
      (* val bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred. *)
    
      (* ==== using the sequencing operator (bind) *)
      let uppercase_file filename =
        Deferred.bind (Reader.file_contents filename)
          ~f:(fun text ->
             Writer.save filename ~contents:(String.uppercase text));;
      (* val uppercase_file : string -> unit Deferred.t = <fun> *)
      uppercase_file "test.txt";;
      (* - : unit = () *)
      Reader.file_contents "test.txt";;
      (* - : string = "THIS IS ONLY A TEST." *)
    
    
      (* ==== using the syntax sugar >>= *)
      let uppercase_file filename =
        Reader.file_contents filename
        >>= fun text ->
        Writer.save filename ~contents:(String.uppercase text);;
      (* val uppercase_file : string -> unit Deferred.t = <fun> *)
    
      (* === needing to wrap the bind function in an async result (wrapped as a Deferred) *)
      let count_lines filename =
        Reader.file_contents filename
        >>= fun text ->
        return (List.length (String.split text ~on:'\n'));;
      val count_lines : string -> int Deferred.t = <fun>
    
      (* -- this is the negative example: it would never have worked
    
      let count_lines filename =
        Reader.file_contents filename
        >>= fun text ->
        List.length (String.split text ~on:'\n');;
      Line 4, characters 5-45:
      Error: This expression has type int but an expression was expected of type
               'a Deferred.t
    
      *)
    
    
      (* ==== deferred map: *)
      let count_lines filename =
        Reader.file_contents filename
        >>| fun text ->
        List.length (String.split text ~on:'\n');;
      (* val count_lines : string -> int Deferred.t = <fun> *)
      count_lines "/etc/hosts";;
      (* - : int = 10 *)
  • sugar for the monadic bind

    bind and return monadic pattern used commonly enough that we have Deferred.map for this (and it’s sugar >>|).

    Unknown element.
How Scheduling is done

Scheduling

  • Async has a dedicated scheduler, which needs to be started explicitly for our own programs.

    utop will handle this automatically for us.

    NOTE: this is similar to python’s thread based concurrency model.

Using Let syntax

The value that the Let syntax brings is from a syntax perspective, the compiled output is the same.

Just compare the bind and map based syntax differences using let syntax.

Why this is useful:

  1. highlights the analogy between monadic bind and OCaml’s built-in let-binding \(\implies\) more uniform, readable code

  2. Let_syntax works for any monad, opening the right Let_syntax module is how we choose the correct monad to use.

    in the case of Async we implicitly have Deferred.Let_syntax opened – in some contexts, we may wish to do more explicit opens.

#require "ppx_let";;

(* ==== bind with let syntax *)
let count_lines filename =
  let%bind text = Reader.file_contents filename in
  return (List.length (String.split text ~on:'\n'));;
(* val count_lines : string -> int Deferred.t = <fun> *)

let uppercase_file filename =
  Reader.file_contents filename
  >>= fun text ->
  Writer.save filename ~contents:(String.uppercase text);;
(* val uppercase_file : string -> unit Deferred.t = <fun> *)

(* ==== map with let syntax *)
let count_lines filename =
  let%map text = Reader.file_contents filename in
  List.length (String.split text ~on:'\n');;
(* val count_lines : string -> int Deferred.t = <fun> *)

let count_lines filename =
  Reader.file_contents filename
  >>| fun text ->
  List.length (String.split text ~on:'\n');;
Ivars and Upon: incremental filling via binds

Ivar is about incremental filling of variables – we want to construct a deferred where we can programmatically decide when it gets filled in.

It’s low-level feature compared to map, bind and return. Very useful for building synchronisation patterns that isn’t well-supported.

3 fundamental operations with an ivar:

(* ==== syntax intro: using ivars *)
let ivar = Ivar.create ();;
(*
val ivar : '_weak1 Ivar.t =
  {Async_kernel__.Types.Ivar.cell = Async_kernel__Types.Cell.Empty}
  *)
let def = Ivar.read ivar;;
(* val def : '_weak1 Deferred.t = <abstr> *)
Deferred.peek def;;
(* - : '_weak2 option = None *)
Ivar.fill ivar "Hello";;
(* - : unit = () *)
Deferred.peek def;;
(* - : string option = Some "Hello" *)
  1. create an ivar, use Ivar.create
  2. read: the deferred that corresponds to the ivar in question, using Ivar.read
  3. fill: fills the ivar, causing the deferred to be determined, using Ivar.fill

Here’s a custom delay-timeout synchronisation pattern, where:

  1. schedule receives an action in the form of a thunk that returns a deferred. Remember thunks are nullary functions (hence just receive unit () as the arg)

    the caller of schedule will receive the contents of deferred value returned by the thunk

    this is where we use the the upon operator for callback registration. upon schedules a callback to be executed when the deferred it is passed is determined, the difference being that upon will NOT return a new deferred for the callback to fill. val upon : 'a Deferred.t -> ('a -> unit) -> unit.

  2. our delayer uses a queue of thunks, where every call to schedule adds a thunk to the queue and schedules a job in the future to grab a thunk off the queue and run it.

    waiting done using after that takes a time-span and returns a deferred that will be determined after that time-span – this is just timeout behaviour that we’re familiar with in languages like JavaScript.

    the queue of thunks is literally a task queue. note how they run in order (execution order) even if they are scheduled by upon are run out of order (timing order). This part may be a little confusing:

    • Even though upon seems “sequential” because it mentions “after delay”, nothing forces those upon callbacks to respect order — only the explicit queue logic does.

    • remember that upon schedules side-effects, not values

    • inside the scheduler, Async doesn’t promise that the callbacks we registered with upon will execute in the same order they were enqueued. 2 independent deferred might resolve in any order (depending on timing) \(\implies\) upon introduces temporal non-determinism.

    • we make the order deterministic by using the queue, which reminds me of python’s ThreadPoolExecutor coupled with as_completed().

      the call to upon uses queue such that jobs are dequeued in the same order they were enqueued, even through the internal Async callbacks (upon) may be run in different timing order.

           Queue.enqueue t.jobs (fun () ->
             upon (thunk ()) (fun x -> Ivar.fill ivar x));
           (* == it's this call that enforces the deterministic execution order*)
           upon (after t.delay) (fun () ->
             let job = Queue.dequeue_exn t.jobs in
             job ());
           (* ==== delayed interface: *)
           module type Delayer_intf = sig
             type t
             val create : Time.Span.t -> t
             val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
           end;;
           (*
           module type Delayer_intf =
             sig
               type t
               val create : Time.Span.t -> t
               val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
             end
           *)
      
      
           (* ===== implementation  *)
           module Delayer : Delayer_intf = struct
             type t = { delay: Time.Span.t;
                        jobs: (unit -> unit) Queue.t;
                      }
      
             let create delay =
               { delay; jobs = Queue.create () }
      
             let schedule t thunk =
               let ivar = Ivar.create () in
               Queue.enqueue t.jobs (fun () ->
                 upon (thunk ()) (fun x -> Ivar.fill ivar x));
               upon (after t.delay) (fun () ->
                 let job = Queue.dequeue_exn t.jobs in
                 job ());
               Ivar.read ivar
           end;;
           (* module Delayer : Delayer_intf *)

      Try sticking to the higher level of abstraction for the concurrency primitives instead of the lower level ones (ivars, upon) unless absolutely necessary for the thing we’re trying to express

Understanding bind in terms of Ivars and upon

This part is just to help us visualise the abstraction barrier part.

when we write let d' = Deferred.bind d ~f:

  1. new ivar i is created that will hold the final result of the computation, the deferred that is associated to this ivar is what is returned

  2. a function is registered to be called when d is determined

  3. that function when invoked, calls function f with the value determined for d

  4. another function is registered to be called when the deferred returned by f is determined (like step 2)

  5. calling that function fills i, causing the corresponding deferred to be determined

(* ==== a custom demo implementation *)
let my_bind d ~f =
  let i = Ivar.create () in
  upon d (fun x -> upon (f x) (fun y -> Ivar.fill i y));
  Ivar.read i;;
(*
val my_bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred.t =
  <fun>
  *)

Example: An Echo Server #

Objective: program that accepts connections from clients and spits back whatever is sent to it.

  • Initial helpers

    • Handling the client connection

      We need a function that can copy data from input to output \(\implies\) this function handles the logic for handling a client connection.

      bind marks the places that we wait:

      1. first, when we call Reader.read to read a block of input
      2. then when it’s complete, if a new block was returned, then we write that block to the writer
      3. finally we wait until writer buffers are flushed, then we recurse.

      This is implemented with pushback which helps with memory-discipline:

      • if the process can’t make progress writing, it will stop reading.

      • this pattern is necessary so that the program DOES NOT allocate unbounded amounts of memory and keeps track of all the data that it intends to write but hasn’t been able to write yet.

      open Core
      open Async
      
      (** Copy data from the reader to the writer, using the provided buffer
         as scratch space *)
      let rec copy_blocks buffer r w =
        match%bind Reader.read r buffer with
        | `Eof -> return () (* -- termination condition*)
        | `Ok bytes_read ->
          Writer.write w (Bytes.to_string buffer) ~len:bytes_read;
          let%bind () = Writer.flushed w in (* -- this is what allows the pushback to happen: no further reading until the current buffer is done writing.*)
          copy_blocks buffer r w
    • Setting up a server to receive client connections

      We use Async’s Tcp module and its collection of utilities for creating TCP clients and servers.

      (** Starts a TCP server, which listens on the specified port, invoking
          copy_blocks every time a client connects. *)
      let run () =
        let host_and_port =
          Tcp.Server.create
            ~on_handler_error:`Raise
            (Tcp.Where_to_listen.of_port 8765)
            (* -- next arg is the client connection handler, the most important argument to the server create function*)
            (fun _addr r w ->
              let buffer = Bytes.create (16 * 1024) in
              copy_blocks buffer r w)
        in
        ignore
          (host_and_port
            : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t)
      • Calling Tcp.Server.create gives us a Tcp.Server.t, a handle to the server that can be used to shut the server down.

        we aren’t using those args here, that’s why ignore is used to supress the unused variables errors

      • out of the args for server creation, the last one is the client connection handler and it’s the most important one.

      • there’s no need for an explicit closing down of the lcient connections because the server will automatically shut down the connection once the deferred returned by the handler is determined.

    • Starting the Async scheduler

      A common newbie error is to forget to manually start the scheduler.

      (* Call [run], and then start the scheduler *)
      let () =
        run ();
        never_returns (Scheduler.go ())
      • without the scheduler, the program does NOTHING at all, even printf calls won’t reach the terminal so it can be bewildering.

      • though we don’t really add special code for handling multiple clients, the server can already do the concurrently connecting, reading and writing of data.

  • Tail-Calls and Chain of Deferreds

    The OCaml compiler is smart and essentially does a form of tail-call optimisation lifted to the Deferred monad.

    One potential worry we might have had is that the chain of deferred calls (and the length of the chain) may result in memory allocation problems since this possibly could take up and unbounded amount of memory as the echo process continues.

    The compiler will automatically optimise this whole chain of deferreds and identify when the final deferred is determined (in the copy_blocks example, it’s when we get Eof) and just flatten it into a single deferred \(\implies\) there’s no memory leak at all.

    • intuition: the bind that creates the deferred is in tail-position and nothing is done to that deferred once it’s created; it’s simply returned as is \(\implies\) this looks like a good candidate for tail call optimisation.
  • Functions that Never Return

    When starting the scheduler, we wrapped Scheduler.go around a never_returns.

    • this is an explicit marker – this makes it clear to the invoker of Scheduler.go that the function never returns.

    • by default the return type is inferred as 'a. If a function never returns, we’re free to impute any type at all to its non-existent return value so to allow that function (which never returns) to fit into any context within our program – we do this. We’ve seen this before in this book when learning about exceptions.

    • never_returns is an alias of Nothing.t, which is what the return type of Scheduler.go is defined as.

      Nothing.t is uninhabited, meaning there’s no values of that type, so a function can’t actually return a value of type Nothing.t i.e. a function that never returns. To do this, we just need to add type annotation:

        let rec loop_forever () : Nothing.t = loop_forever ();;
        (* val loop_forever : unit -> never_returns = <fun> *)
      
        (* === helpful error if we omit the never_returns: *)
        (*
      
          let do_stuff n =
          let x = 3 in
          if n > 0 then Scheduler.go ();
          x + n;;
      
      
      
        Line 3, characters 19-34:
        Error: This expression has type never_returns
               but an expression was expected of type unit
               because it is in the result of a conditional with no else branch
      
          *)
      
        (* ==== actually using the never_returns properly *)
        let do_stuff n =
          let x = 3 in
          if n > 0 then never_returns (Scheduler.go ());
          x + n;;
        (* val do_stuff : int -> int = <fun> *)

      We had seen uninhabited types earlier before in a different context and use-case back when learning about narrowing without using GADTs

  • Improving the Echo Server

    Improvements list:

    1. add a proper CLI with Command
    2. add a flag to specify the port to listen on and a flag to make the server echo back the capitalised version of input
    3. simplify the code using Asyn’s Pipe interface
    open Core
    open Async
    
    let run ~uppercase ~port =
      let host_and_port =
        Tcp.Server.create
          ~on_handler_error:`Raise
          (Tcp.Where_to_listen.of_port port)
          (fun _addr r w ->
            Pipe.transfer
              (Reader.pipe r)
              (Writer.pipe w)
              ~f:(if uppercase then String.uppercase else Fn.id))
      in
      ignore
        (host_and_port
          : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t);
      Deferred.never () (* --- returns a deferred that is never determined, indicating that the echo server doesn't shut down*)
    
    let () =
      Command.async
        ~summary:"Start an echo server"
        (let%map_open.Command uppercase =
           flag
             "-uppercase"
             no_arg
             ~doc:" Convert to uppercase before echoing back"
         and port =
           flag
             "-port"
             (optional_with_default 8765 int)
             ~doc:" Port to listen on (default 8765)"
         in
         fun () -> run ~uppercase ~port)
      |> Command_unix.run

    NOTEs:

    1. Deferred.never in the run function shows that the echo server doesn’t ever shut down. Deferred.never returns a deferred that is never determined.
    2. Use of Aync’s Pipe is the key change in the code.
      • Pipe is an async comms channels used for connecting different parts of the program

      • visualise this as a consumer/producer queue that uses deferred for communicating when the pipe is ready to be read from or written into

      • we’re using one of the many utility functions within the Pipe module:

        • Pipe.transfer to set up a process that takes data from a reader-pipe and moves it to a writer-pipe (type: - : 'a Pipe.Reader.t -> 'b Pipe.Writer.t -> f:('a -> 'b) -> unit Deferred.t = <fun>)
        • Reader.pipe and Writer.pipe are the two pipes that are connected here.
      • this still preserves the pushback: if the writer gets blocked, the writer’s pipe will stop pulling data from the reader’s pipe and this prevents the reader from reading in more data.

        the deferred by Pipe.transfer becomes determined once the reader has been closed and the last element is transferred from the reader to the writer. So the server shuts down that client connection once that deferred becomes determined.

    • Using Pipes

      1. they are created in connected read/write pairs:

           let (r,w) = Pipe.create ();;
           (* val r : '_weak3 Pipe.Reader.t = <abstr> *)
           (* val w : '_weak3 Pipe.Writer.t = <abstr> *)
        • the r and w are weakly polymorphic
      2. Pipes have some internal slack: a number of slits to which something can be written before the write will block.

        By default the slack is 0 – so the deferred returned by a write is determined only when the value is read out of the pipe.

           let (r,w) = Pipe.create ();;
           (* val r : '_weak4 Pipe.Reader.t = <abstr> *)
           (* val w : '_weak4 Pipe.Writer.t = <abstr> *)
           let write_complete = Pipe.write w "Hello World!";;
           (* val write_complete : unit Deferred.t = <abstr> *)
           Pipe.read r;; (* -- so reading from the pie here will determine the deferred for the write  to the pipe:*)
           (* - : [ `Eof | `Ok of string ] = `Ok "Hello World!" *)
           write_complete;;
           (* - : unit = () *)

Example: Searching Definitions with DuckDuckGo #

  • Setup

    Install the following libs:

    1. textwrap
    2. uri
    3. yojason
    4. cohttp
  • URI Handling

    We’d need to do some UI generation to be able to query the DDG servers:

    open Core
    open Async
    
    (** Generate a DuckDuckGo search URI from a query string *)
    let query_uri query =
      let base_uri =
        Uri.of_string "http://api.duckduckgo.com/?format=json"
      in
      Uri.add_query_param base_uri ("q", [ query ])
  • Parsing JSON Strings

    HTTP response from DDG is in JSON, so we’d need to parse it.

    (** Extract the "Definition" or "Abstract" field from the DuckDuckGo
       results *)
    let get_definition_from_json json =
      match Yojson.Safe.from_string json with
      | `Assoc kv_list ->
        let find key =
          match List.Assoc.find ~equal:String.equal kv_list key with
          | None | Some (`String "") -> None
          | Some s -> Some (Yojson.Safe.to_string s)
        in
        (match find "Abstract" with
        | Some _ as x -> x
        | None -> find "Definition")
      | _ -> None
  • Executing an HTTP Client Query

    For the query dispatching:

    (* Execute the DuckDuckGo search *)
    let get_definition word =
      let%bind _, body = Cohttp_async.Client.get (query_uri word) in
      let%map string = Cohttp_async.Body.to_string body in
      word, get_definition_from_json string
    
    
    
    (* ==== understanding the signature for cohttp-async.Client.get *)
    #require "cohttp-async";;
    #show Cohttp_async.Client.get;;
    (*
    val get :
      ?interrupt:unit Deferred.t ->
      ?ssl_config:Conduit_async.V2.Ssl.Config.t ->
      ?headers:Cohttp.Header.t ->
      Uri.t -> (Cohttp.Response.t * Cohttp_async.Body.t) Deferred.t
    *)

    Understanding Cohttp_async.Client.get

    1. required arg: URI

    2. returns a deferred value containing a Cohttp.Response.t and a Pipe Reader through which the Cohttp_async.Body.t of the request will be streamed

      because the body isn’t that large, we just read the entire body directly using Body.to_string (instead of chunking it)

  • Dispatching multiple searches in parallel

    We use Deferred.all for our dispatcher. It preserves the order of the deferred that are passed to it.

    Printing also only happens until all the results arrive.

    (* === Print out a word/definition pair *)
    let print_result (word, definition) =
      printf (* -- this is an async printf that goes through the async scheduler *)
        "%s\n%s\n\n%s\n\n"
        word
        (String.init (String.length word) ~f:(fun _ -> '-'))
        (match definition with
        | None -> "No definition found"
        | Some def ->
          String.concat ~sep:"\n" (Wrapper.wrap (Wrapper.make 70) def))
    
    
    (* === Parallel dispatcher: *)
    (* Run many searches in parallel, printing out the results after
       they're all done. *)
    let search_and_print words =
      let%map results = Deferred.all (List.map words ~f:get_definition) in
      List.iter results ~f:print_result
    
    
    (* ==== alternative: retrieve it pipelined, in order of completion *)
    (* Run many searches in parallel, printing out the results as you
       go *)
    (*
        Deferred.all_unit;;
        - : unit Deferred.t list -> unit Deferred.t = <fun>
    *)
    let search_and_print words =
      Deferred.all_unit
        (List.map words ~f:(fun word ->
             get_definition word >>| print_result))

    Finally, a CLI interface for this :

    let () =
      Command.async
        ~summary:"Retrieve definitions from duckduckgo search engine"
        (let%map_open.Command words =
           anon (sequence ("word" %: string))
         in
         fun () -> search_and_print words)
      |> Command_unix.run

Exception Handling #

We should expect flakiness, especially when concurrently using external resources.

  • Monitors
  • Example: Handling exceptions with DuckDuckGo

Timeouts, Cancellation, and Choices #

Working with System Threads #

Multicore OCaml #

Thread-Safety and Locking #