Skip to main content
  1. Readings/
  2. Books/
  3. Real World OCaml: Functional Programming for the Masses/

Chapter 16A: Cooperative Concurrency with Lwt

·· 2414 words· 12 mins

Before OCaml 5, cooperative concurrency via Lwt was the dominant model for I/O-heavy OCaml systems. These notes trace the monadic plumbing of Lwt promises, the event-loop scheduler, and cancellation semantics — worked through tutorials such as the Mirage tutorial with annotated solutions and an analysis of how tail-call behaviour interacts with the monadic bind.

Figure 1: Cooperative Concurrency, as witnessed in the Japanese Parliament (retrieved from r/AccidentalRenaissance)
There’s some ecosystem context to internalise, which can be found here. Pre-OCaml 5, there’s 2 approaches to co-operatative concurrency that is exposed via Async and Lwt. They overlap quite a bit, and have similar primitives. Async notes can be found here.

Complexity from a Variety of I/O #

Complexity comes from not just I/O, it’s the variety of I/O that a program may do which makes things complicated:

  1. Kernel Syscalls
  2. IO with graphics engine
  3. Network IO for coordination
  4. User Input IO

General Approaches of Handling I/O Complexity #

There’s a few general approaches of handling that complexity:

  1. main loop (event loop) with all components integrated into this main loop

    This has a problem: it’s complicated to write async sequential code

    e.g. GUI interfaces freezing and not redrawing they’re waiting for some blocking part of the code to complete / yield

  2. pre-emptive system threads

    this also has a key problem: it has resource-overheads – we should limit the number of concurrent threads because of said overheads

  3. promise-based approach: alternative by Lwt

    Here are characteristics of promises

    1. reference to be filled asynchronously
    2. Functions returning a promise have no extra overhead: no new stack frame, proc… \(\implies\) it’s just a normal, fast function call
    3. great composability – promise-chaining

Drawing Parallels between Lwt vs Fibres #

When reading the paper, “Retrofitting Concurrency into OCaml” ( see notes here), there was a short intro to fibers that felt very similar to my understanding Lwt.

…support for fibers that run in parallel, which are language level lightweight threads implemented as runtime managed stack segments. The implementation of fibers is similar to lightweight threads in Haskell GHC and Goroutines in the Go language. While the details of the language support for fibers is beyond the scope of the paper…

So my intuition was that they feel really similar and overlap a lot. The reality is the existence of a real-stack itself.

Table 1: Direct comparison between Lwt vs multicore OCaml's Fibers
LwtFibers
Suspended execution unitHeap closure chainActual saved (segmented) stack
Who manages continuationsThe programmer, via >>=The runtime
Code styleMonadic (let*, >>=)Direct style code
Stack at yield pointDoesn’t existSaved in memory

A fiber is a concrete suspended call stack. When a fiber yields, the runtime literally saves its register state and stack pointer, and restores them on resume. The stack exists as a real memory object the runtime manages. That’s what “runtime-managed stack segments” means.

The programmer writes direct-style code:

1
2
3
(* fiber-style: looks like normal sequential code *)
let line = read_line ()  (* suspends here, stack saved *)
print_endline line       (* resumes here, stack restored *)

Lwt has no suspended stacks. In the snippet below, the fun line -> Lwt_io.printl line is a heap-allocated closure. When read_line () resolves, the event loop finds this closure and calls it. There is no stack to resume — the continuation was manually reified into a function object by us, the programmer, via >>=.

This is precisely why Lwt is monadic and infectious. The monad is the programmer’s way of manually encoding what a fiber runtime would do automatically with a real stack. You’re building a chain of heap closures that simulate what resuming a stack would give you.

Lwt is closer to JavaScript Promises or Python asynciocooperative concurrency built on an event loop with heap-allocated continuations. The “lightweight” part is real (no stack overhead per promise), but it achieves it by making the programmer do the continuation-passing manually.

Fibers (and Eio’s fibers in OCaml 5) are closer to Go goroutinesthe runtime genuinely saves and restores stacks, so you get direct-style code for free. The OCaml 5 fiber implementation described in that paper is exactly what makes Eio possible: we can write let line = read_line () without >>= because there’s an actual stack to suspend.

Punchline: Lwt simulates fibers at the cost of monadic syntax. OCaml 5 fibers make the simulation unnecessary by providing the real thing at the runtime level.

Mirage Lwt Tutorial #

The code snippets for this can be found here. I’ll just attempt the exercises without actually running them.

Basics #

The Mirage folks consider “promises” and “threads” to be the same

The monadic patterns is a foundation for this. Beyond syntax sugar, we have clear examples on how we can convert a sync to an Lwt-threaded code:

let x =
  let a = get_input "Enter a" in
  let b = get_input "Enter b" in
  a + b
Code Snippet 1: Sync version of getting inputs
1
2
3
4
5
6
7
open Lwt.Infix

(* x is a thread/promise now (that's why the output is wrapped in a return ) *)
let x =
  get_input_lwt "Enter a" >>= fun a ->
  get_input_lwt "Enter b" >>= fun b ->
  Lwt.return (a + b);;
Code Snippet 2: Async version, using binds: >>=

Promise/Thread Composition #

There’s 3 types of composition to pick from:

  1. join : it’s an all-or-nothing strategy — waits for the whole list of promises to resolve
  2. choose : waits for at least one promise to resolve and short-circuits that
    • if several are already terminated by the time choose is called, then it does a random selection
  3. pick: does the same short-circuiting but also cancels promises other than the first one

Main Event Loop #

Lwt’s core is an event loop – we’d typically manually start it using Lwt_main.run.

The reason threads/promises are cheap in Lwt is because of this event-loop approach instead of using pre-emptive threads. Sleeping/pausing allows the scheduler to wake others up.

Mutexes for cooperation #

Typically we can avoid mutexes when using lwt – this is because it’s a cooperative concurrency model that we have here.

In Lwt, a thread executes serially until it explicitly yields (most commonly via >>=); for this reason, Lwt threads are said to be cooperative.

There’s dangers with cooperative threading, when the threads don’t cooperate and we have race conditions – e.g. expression that takes a lot of time to compute with no cooperation point, then the whole program just hangs.

We can explicitly introduce a cooperation point by yielding using Lwt.yield (or sleeping, or pausing).

Lwt_mutex is available for locking data-structures.

Spawning Background Threads #

Threads that we disregard what the output is and just send to background. It’s still possible to get exceptions reported by the way (by having a handle to the call).

let spawn () =
  Lwt.async (fun () ->
    Mirage_sleep.ns (Duration.of_sec 10) >|= fun () ->
    Logs.info (fun m -> m "Tails")
  )
Code Snippet 3: Spawning thread without waiting for result

We have to configure an exception handler: Lwt.async_exception_handler which Lwt.async will report to on exceptions bubbling up.

Error handling, use result #

When errors are expected, use result type:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
(*-- just monadic wrapping: *)
let ok x = Lwt.return (Ok x)

let (>>*=) m f =
  m >>= function
  | Error _ as e -> Lwt.return e
  | Ok x -> f x

let example () =
  read_arg () >>*= fun a ->
  read_arg () >>*= fun b ->
  ok (a + b)
Code Snippet 4: Having helpers to make it easier to use return type for expected outcomes

Unexepcted failures, use raise or fail #

For bugs, use raise or fail: there’s a Lwt.fail (to be explicit). Lwt will still catch exceptions and turn them into failures automatically if we don’t do this.

Catching Exceptions Lwt.catch #

If we really need to, we could catch exceptions like so :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Lwt.catch
  (fun () -> foo x)
  (function
   | Error_you_want_to_catch -> (* handle error here *)
   | ex -> Lwt.fail ex  (* Pass others on *)
  )

(* --- the sync equivalent version looks something like this: *)
try foo x
with
| Error_you_want_to_catch -> (* handle error here *)

Finalizing for cleanups #

Sometimes we might want to be extra sure that we do GC cleanups (e.g. if fatal failure and such):

let r = Resource.alloc () in
    Lwt.finalize
      (fun () -> use r)
      (fun () -> Resource.free r)

(* --- can consider using a with_* function for this to make it more ergonomic *)
with_resource (fun r -> use r)

User-defined threads for external proc interaction #

These are useful when interacting with external processes or libraries that don’t directly support Lwt.

let invoke_remote msg =
  let t, waker = Lwt.wait () in
  let id = new_id () in
  on_response_to id (fun resp -> Lwt.wakeup waker resp);
  send_request id msg;
  t
Code Snippet 5: User-defined thread, using Lwt.wait

Cancelling a thread using Lwt.cancel #

1
2
let timeout delay t =
  Mirage_sleep.ns delay >|= fun () -> Lwt.cancel t
Code Snippet 6: example of cancelling a thread

TODO: the notes on lwt manual are better for this.

Challenge 1: Sleep and Join #

Now write a program that spins off two threads, each of which sleeps for some amount of time, say 1 and 2 seconds and then one prints “Heads”, the other “Tails”. After both have finished, it prints “Finished” and exits.

To sleep for some number of nanoseconds use the function Mirage_sleep.ns declared in the interface Mirage_sleep, and to print to the console use Logs.info. Note that Mirage_sleep is a Mirage-specific module; if you are using Lwt in another context, use Lwt_unix.sleep and Lwt_io.write. (You will also need to manually start the main event loop with Lwt_main.run.)

For convenience, you’ll likely want to also use the Duration library, which provides handy functions for converting between seconds, milliseconds, nanoseconds, and other units of time.

We compose the threads using join since both are needed to be completed. Then there’s a sequential print.

note:

  • naming-convention to be followed as per the lwt manual: *_s is sequential and *_p is parallel
  • also, I’ve used the Lwt_* specific modules for syscalls and io instead of wrapping it manually in some promise.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
let challenge_1 () =
  let make_s t l =
    ( t |> Float.of_int |> Lwt_unix.sleep )
        >>= fun () -> Lwt_io.printl l
  in
  Lwt.join [
      make_s 1 "Heads";
      make_s 2 "Tails"
    ]
  >>= fun () -> Lwt_io.printl "Finished";;

challenge_1 ();;
Code Snippet 7: My solution to Challenge 1, works using utop

the model solution from the mirage tutorial gives:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
open Lwt.Infix

let start () =
  Lwt.join
    [
      ( Mirage_sleep.ns (Duration.of_sec 1) >|= fun () ->
        Logs.info (fun m -> m "Heads") );
      ( Mirage_sleep.ns (Duration.of_sec 2) >|= fun () ->
        Logs.info (fun m -> m "Tails") );
    ]
  >|= fun () -> Logs.info (fun m -> m "Finished")
Code Snippet 8: Model solution from mirage os tutorial

Note: their use of infix >|= is because it’s doing Lwt.map as opposed to my binding that I’m doing. The >|= operator (“map”) used here is similar to >>= but automatically wraps the result of the function you provide with return

Challenge 2: Looping Echo Server #

Write an echo server that reads from a dummy input generator and, for each line it reads, writes it to the console. The server should stop after reading 10 lines.

Hint: it’s easier to convert a program to use Lwt if we write loops in a functional style (using tail recursion) rather than using special syntax (e.g. while and for).

This is about getting the structure right so that when Lwt inserts async boundaries, the recursion is safe. For genuinely async operations, the event loop handles the rest. For sync ones we rely on TCO, which OCaml does for the tail-call itself — except that Lwt’s bind adds intermediate frames that OCaml can’t eliminate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Random.self_init ();
let start_server () =
  let s_len = 20 in
  let read_random_line () = Lwt.return (String.init s_len (fun _ -> Random.ascii ())) in
  let rec aux = function
    | 0 -> Lwt.return ()
    | n ->
       read_random_line () >>= fun line ->
       Lwt_io.printl line >>= fun _ ->
       aux (n - 1)in
  aux 10;;
Code Snippet 9: My solution, works in utop

My solution, compared with the model solution :

  1. fun _ -> on Lwt_io.printl discards the unit result — fun () -> is more precise

  2. read_random_line using Lwt.return makes it synchronous — the promise is already resolved before it’s even returned. The model solution’s read_line is genuinely async (it sleeps for a random duration).

    This distinction matters for our questions on TCO below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
open Lwt.Infix

let read_line () =
  Mirage_sleep.ns (Duration.of_ms (Randomconv.int ~bound:2500 Mirage_crypto_rng.generate))
  >|= fun () -> String.make (Randomconv.int ~bound:20 Mirage_crypto_rng.generate) 'a'

let start () =
  let rec echo_server = function
    | 0 -> Lwt.return ()
    | n ->
        read_line () >>= fun s ->
        Logs.info (fun m -> m "%s" s);
        echo_server (n - 1)
  in
  echo_server 10
Code Snippet 10: Provided solution
promise typestack behaviourwhy?
Genuine async (I/O, sleep)Constant StackNo need for compiler to consider TCO because there’s no stack accumulation because the event-loop structurally breaks the chain.Event loop resets stack each tick
(sync) Lwt.return usedStack grows with nthe bind calls the callback immediately in the same stack

Technically, we will have TCO within each closure (i.e. fun _ -> aux (n - 1) parts) but Lwt.bind will not be calling the callback in tail position (it’s a regular function call internally) – so with large enough n, we could stack overflow.

illustrations of stack growth
Event loop tick: invokes callback -> aux 10 -> returns pending promise
Event loop tick: invokes callback -> aux 9  -> returns pending promise
Event loop tick: invokes callback -> aux 8  -> returns pending promise
...
Code Snippet 1: genuinely async will use new stack each time
aux 10
  └─ bind (sync) -> callback -> Lwt_io.printl >>= fun _ ->
       aux 9
         └─ bind (sync) -> callback -> Lwt_io.printl >>= fun _ ->
              aux 8
                └─ ...
Code Snippet 2: already-resolved promise will cause the growth of the stack
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
open Lwt.Infix

let start_server () =
  let s_len = 20 in
  let read_random_line () =
    (* ℹ️ simulates the async boundary *)
    Lwt.pause () >|= fun () ->
    String.init s_len (fun _ -> Random.ascii ())
  in
  let rec aux = function
    | 0 ->
       Lwt.return_unit
    | n ->
       read_random_line () >>= fun line ->
       Lwt_io.printl line >>= fun () ->
       aux (n - 1)
  in
  aux 10;;
Code Snippet 11: My improved version with simulated async boundary
cleaner version, with infix let*
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
open Lwt.Infix

let start_server () =
  let s_len = 20 in

  let read_random_line () =
    Lwt.pause () >|= fun () ->
    String.init s_len (fun _ -> Random.ascii ())
  in

  let rec loop n =
    match n with
    | 0 -> Lwt.return_unit
    | _ ->
        let* line = read_random_line () in
        let* () = Lwt_io.printl line in
        loop (n - 1)
  in

  loop 10
Code Snippet 12: Looks structurally the same as before

Challenge 3: Timeouts #

This timeout function does not allow one to use the result returned by the thread t.

Modify the timeout function so that it returns either None if t has not yet returned after delay seconds or Some v if t returns v within delay seconds.

In order to achieve this behaviour it is possible to use the function Lwt.state that, given a thread, returns the state it is in, either Sleep, Return or Fail.

1
2
3
4
5
6
7
8
let timeout delay t  =
  Lwt_unix.sleep delay >>= fun () ->
     match Lwt.state t with
       | Lwt.Fail ex -> Lwt.fail x
       | Lwt.Return v -> Lwt.return (Some v)
       | Lwt.Sleep ->
          Lwt.cancel t;
          Lwt.return None
Code Snippet 13: Solution, should always return after f seconds, even when t returns within delay seconds

Challenge 4: Better timeouts #

In a typical use of a timeout, if t returns before the timeout has expired, one would want the timeout to be cancelled right away.

The next challenge is to modify the timeout function to return Some v right after t returns.

Of course if the timeout does expire then it should cancel t and return None.

1
2
3
4
5
6
7
8
9
let timeout delay t =
  let time_out = Lwt_unix.sleep delay in
  let t_on_expire =  time_out >|= fun () -> None in
  let t_immediate_return = t >|= fun v -> Some v in

  Lwt.pick [
      t_on_expire;
      t_immediate_return
  ]
Code Snippet 14: My solution
1
2
3
4
5
6
let timeout delay t =
  let tmout = Mirage_sleep.ns delay in
  Lwt.pick [
    (tmout >|= fun () -> None);
    (t >|= fun v -> Some v);
  ]
Code Snippet 15: model solution

Note:

  1. Lwt.pick calls Lwt.cancel on the losers internally, but that’s alright because it’s abstracted away from us. It’s trickier if we were to manually call Lwt.cancel (e.g. we call that on an arbitrary function that we don’t own and it’s in the middle of a non-atomic operation)

  2. we should use cancel function sparingly because it works by throwing an unexpected exception into the middle of some executing code. A cancel that occurs when the thread happens to be performing an uncancellable operation will be silently ignored.

    There’s a proper failure mode for this:

    1
    2
    3
    4
    5
    6
    
       (* Dangerous: cancel called on arbitrary external promise *)
       let p = some_library_operation () in
       Lwt.cancel p  (* may interrupt mid-write, leave resources uncleaned *)
    
       (* Safe: Lwt.pick cancels promises it controls *)
       Lwt.pick [p1; p2]  (* cancels loser at its next yield point *)
    Code Snippet 16: Failure mode from manual use of Lwt.cancel
    Table 2: Mental model for which approach to use when
    UseWhen
    use Lwt.pick / Lwt.cancelCooperative cancellation via exception, works alright when code doesn’t need cleanup
    use Lwt.switchIs a structured cancellation contract, requires teh callee to explicitly register cleanup hooks via Lwt_switch.add_hook
  1. [ Idiom ]: It’s safer to use Lwt_switch. This means that cancellation will only happen at well defined points, although it does require explicit support from the code being cancelled.

    If we have a function that only responds to cancel, we should wrap it in a function that takes a switch and cancels it when the switch is turned off.

    1
    2
    3
    
       let with_timeout ~sw delay f =
         Lwt_switch.add_hook (Some sw) (fun () -> (* signal f to stop *) Lwt.return ());
         f sw
    Code Snippet 17: Wrapper pattern as a standard idiom
  1. Use Lwt.pick or use Lwt_switch, avoid using Lwt.cancel manually because there’s a clear failure mode when we kill an arbitrary thread middle of its non-atomic process.

  2. Lwt.pick is safe for racing independent futures; for long-running stateful operations, prefer Lwt_switch so cancellation happens at explicit checkpoints rather than at arbitrary yield points.

TODO lwt from cookbok #

DO this: https://ocaml.org/cookbook/create-and-await-promises/lwt

TODO Pending tasks #