- rtshkmr's digital garden/
- Readings/
- Books/
- Real World OCaml: Functional Programming for the Masses/
- Chapter 16: Concurrent Programming/
Chapter 16: Concurrent Programming
Table of Contents
This chapter goes beyond the book because in the OCaml ecosystem, there’s two phases to using async:
pre-OCaml 5 – cooperative concurrency as the typical concurrency model, with heavy reliance on libraries
post-OCaml 5 – better primitives for multicore support
We’ll need to learn both.
Fundamentals #
references to pepper in soon:
- 2017 paper introducing multicore ocaml and how to use it (ref)
- tarides parallelism tutorial (ref) – also how the whole thing happened, the journey to ocaml multicore (ref)
- ocaml manual on parallel programming ([[https://ocaml.org/manual/5.4/parallelism.html
][ref]])
Understanding Monads #
Lwt relies on the monadic pattern, which is beautiful for clear composition. The OCaml docs on monads is an excellent reference for this.
Monad Signature Using examples: Maybe, Writer, Lwt #
The docs first outlay the structural and application implications before formalising monad laws. We follow that approach.
Monadic structures should be seen as structures that follow the monad design pattern.
| |
| meaning | |
|---|---|
return | puts value in box, has a trivial effect (in-terms of computation) when it does |
bind | gives the ability to compose and sequence operations |
We use case studies to see this design pattern:
Maybe monad / Option monad / Error monad
Supposed the basic arithmetic functions, we want to wrap them such that if there’s any error (e.g. div by 0) then we return
Noneinstead of throwing an exception.pedestrian solution that isn’t quite monadic yet
We have some ways to make this feel monadic. We could re-implement the existing operators and that will give us ridiculous amount of boilerplate in the form of pattern-matches and such. It will work but we see the excessive effort needed for doing this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28let plus_opt (x:int option) (y:int option) : int option = match x,y with | None, _ | _, None -> None | Some a, Some b -> Some (Stdlib.( + ) a b) let ( + ) = plus_opt let minus_opt (x:int option) (y:int option) : int option = match x,y with | None, _ | _, None -> None | Some a, Some b -> Some (Stdlib.( - ) a b) let ( - ) = minus_opt let mult_opt (x:int option) (y:int option) : int option = match x,y with | None, _ | _, None -> None | Some a, Some b -> Some (Stdlib.( * ) a b) let ( * ) = mult_opt let div_opt (x:int option) (y:int option) : int option = match x,y with | None, _ | _, None -> None | Some a, Some b -> if b=0 then None else Some (Stdlib.( / ) a b) let ( / ) = div_optCode Snippet 2: Pattern-matched, non-monadic solutionIf we try to deduplicate all the unnecessary boilerplate, then we will achieve something like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19let propagate_none (op : int -> int -> int option) (x : int option) (y : int option) = match x, y with | None, _ | _, None -> None | Some a, Some b -> op a b let wrap_output (op : int -> int -> int) (x : int) (y : int) : int option = Some (op x y) let ( + ) = propagate_none (wrap_output Stdlib.( + )) let ( - ) = propagate_none (wrap_output Stdlib.( - )) let ( * ) = propagate_none (wrap_output Stdlib.( * )) let div (x : int) (y : int) : int option = if y = 0 then None else wrap_output Stdlib.( / ) x y let ( / ) = propagate_none divThe upgrade functionality can be generic and defined like so :
let upgrade : (int -> int option) -> (int option -> int option) = fun (op : int -> int option) (x : int option) -> (x >>= op) (* -- without the annotations: *) let upgrade op x = x >>= opCode Snippet 3: A higher-order,upgradefunction helps deduplicateSo we could just use
returnand>>=(bind) functions to reimplement the whole thing:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19let ( + ) (x : int option) (y : int option) : int option = x >>= fun a -> y >>= fun b -> return (Stdlib.( + ) a b) let ( - ) (x : int option) (y : int option) : int option = x >>= fun a -> y >>= fun b -> return (Stdlib.( - ) a b) let ( * ) (x : int option) (y : int option) : int option = x >>= fun a -> y >>= fun b -> return (Stdlib.( * ) a b) let ( / ) (x : int option) (y : int option) : int option = x >>= fun a -> y >>= fun b -> if b = 0 then None else return (Stdlib.( / ) a b)linking this to the monadic design pattern from our pedestrian approach:
wrap_outputactually the returnfunctionUpgrading a value from inttoint optionby wrappingmodifying the fns bindHad to upgrade functions whose inputs were of type intto instead accept inputs of typeint option– this upgrades the operationWe can deduplicate the common parts of the monadic approach further by having a generic upgrade function:
1 2 3 4 5 6 7 8 9 10 11let upgrade_binary op x y = x >>= fun a -> y >>= fun b -> op a b let return_binary op x y = return (op x y) let ( + ) = upgrade_binary (return_binary Stdlib.( + )) let ( - ) = upgrade_binary (return_binary Stdlib.( - )) let ( * ) = upgrade_binary (return_binary Stdlib.( * )) let ( / ) = upgrade_binary divCode Snippet 4: A deduplicated approach to creating the maybe monadMental model for the
binde.g. inx >>= fun a -> y >>= fun b -> op a b:- take
xand extract form it the valuea - take
yand extract from itb - then use
aandbto construct a written value using the operationop
Here’s how we would implement the monad signature for the maybe monad
1 2 3 4 5 6 7 8 9 10module Maybe : Monad = struct type 'a t = 'a option let return x = Some x let (>>=) m f = match m with | None -> None | Some x -> f x endCode Snippet 5: Implementation for the monad signature- take
Writer Monad
Suppose we want the effect of loggability (a log-writer that allows us to still compose the original routines).
intuiting the monadic approach
We first consider approaches that don’t have the monad pattern within, we want to make
incanddecloggable below:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22let inc x = x + 1;; let dec x = x - 1;; let inc_log x = (x + 1, Printf.sprintf "Called inc on %i; " x);; let dec_log x = (x - 1, Printf.sprintf "Called dec on %i; " x);; (* using a composition operator: *) let ( >> ) f g x = x |> f |> g;; (* we can compose like so: *) let id = inc >> dec;; (* PROBLEM: the pattern isn't monadic, so we get an error from this: *) (* let id = inc_log >> dec_log *) let dec_log_upgraded (x, s) = (x - 1, Printf.sprintf "%s; Called dec on %i; " s x);; let inc_log_upgraded (x, s) = (x + 1, Printf.sprintf "%s; Called inc on %i; " s x);; let id = dec_log >> inc_log_upgraded;;Code Snippet 6: Non-monadic approach to log-writer implWe try to use a few helpers to reduce the boilerplate, to get a more monadic pattern :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20let log (name : string) (f : int -> int) : int -> int * string = fun x -> (f x, Printf.sprintf "Called %s on %i; " name x);; let loggable (name : string) (f : int -> int) : int * string -> int * string = fun (x, s1) -> let (y, s2) = log name f x in (y, s1 ^ s2);; let inc' : int * string -> int * string = loggable "inc" inc;; let dec' : int * string -> int * string = loggable "dec" dec;; let id' : int * string -> int * string = inc' >> dec';; let e x = (x, "") ;; (* -- for empty-logging*) id' (e 5) (*returns: (5, "Called inc on 5; Called dec on 6; ")*)Code Snippet 7: intuitive monadic pattern formereturntrivial effect of putting value into the metaphorical box, with empty log msg loggableuses bind,>>=We need code that handles pattern matching against pairs and string concat 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16module Writer : Monad = struct type 'a t = 'a * string let return x = (x, "") let ( >>= ) m f = let (x, s1) = m in let (y, s2) = f x in (y, s1 ^ s2) (* extra: Loggable implemented using >>= *) let loggable (name : string) (f : int -> int) : int * string -> int * string = fun m -> m >>= fun x -> log name f x endCode Snippet 8: Writer monad for logging effectsFamiliarity with the
>>=binding operator use and the monadic style of programming, our code can now concentrate on'ain the type'a Writer.tinstead of fixed tostring– the writer monad automatically keeps our code generic so long as we usereturnand>>=.
Lwtmonad- it’s a promise-based library, satisfies this monadic interface to compose / chain promises togther
bindregisters callbacks- the type
'ais the “something more” within the box, which are the values that get produces asynchronously when the promise is resolved (instead of immediately)
- what’s abstracted away from us:
- how
'a tandreturncreates references - the mechanism of callback registry just hidden within
bind, well-encapsulated
- how
- it’s a promise-based library, satisfies this monadic interface to compose / chain promises togther
Monad Laws #
Refers to invariants that govern the behaviours of the monad “data-structure”. Specifications are technically related to particular monads like the writer monad or Lwt monad.
There’s some generic laws still.
context: compose monad operator (>=>)
To compose monadic functions.
let compose f g x =
f x >>= fun y ->
g y
let ( >=> ) = composecompose function implementationThe idea here is that if we have 2 functions that we wish to compose:
f: 'a -> 'b tg: 'b -> 'c t
then we have compose f g : 'a -> 'c t then we ’re supporting the flow where composed fn takes a value of 'a, applies f to it, extracts out 'b out of the result, applies g to it, returns that value of type 'c t.
| Law | w/o compose | w compose |
|---|---|---|
| [1] Left Identity | return x >>= f \(\iff\) f x | return >=> f \(\iff\) f |
| [2] Right Identity | m >>= return \(\iff\) m | f >=> return \(\iff\) f |
| [3] Associative | (m >>= f) >>= g \(\iff\) m >>= (fun x -> f x >>= g) | (f>=>g)>=>h \(\iff\) f>=>(g>=>h) |
Law 1: Left-Identity
return >=> fbehaves the same asfLaw 1 says that having the trivial effect on a value, then binding a function on it, is the same as just calling the function on the value.
- same behaviour as in two expressions that will both evaluate to the same value, or they enter an infinite loop or they both raise the same exception
- examples:
- maybe monad:
return xwould beSome x, and>>= fwould extractxand applyfto it - Lwt monad:
return xwould be a promise that is already resolved withx, and>>= fwould registerfas a callback to run onx.
- maybe monad:
Law 2: Right-Identity
f >=> returnbehaves the same asfLaw 2 says that binding on the trivial effect is the same as just not having the effect.
- examples :
maybe monad:
m >>= returnwould depend upon the 2 cases: whethermisSome xorNone.misSome x: binding would extractx, andreturnwould just re-wrap it withSome.misNonebinding would just returnNone.
Lwt monad: binding on
mwould registerreturnas a callback to be run on the contents ofmafter it is resolved, andreturnwould just take those contents and put them back into an already resolved promise.
- examples :
Law 3: Associative
(f>=>g)>=>hbehaves the sames asf>=>(g>=>h)Law 3 says that bind sequences effects correctly
Overview #
| Primitive | Era | Parallelism | Style | Use case |
|---|---|---|---|---|
Thread | Pre-5 | No (GRL) | Preemptive | Blocking C FFI |
Mutex / Condition | Both | — | Imperative | Thread synchronization |
Lwt.t | Pre-5 (still used) | No | Monadic | I/O concurrency |
Async.Deferred.t | Pre-5 | No | Monadic | I/O concurrency (Jane Street) |
Domain | OCaml 5 | Yes | Imperative | CPU parallelism |
Atomic | OCaml 5 | Yes | Lock-free | Shared mutable state |
Effect | OCaml 5 | — | Direct-style | Scheduler building block |
Domainslib.Task | OCaml 5 | Yes | Direct-style | Structured parallelism |
Eio.Fiber | OCaml 5 | Per-domain | Direct-style | I/O concurrency |
Pre-OCaml 5 #
cooperative concurrency: primitives to use OS-threads limited by the Global Runtime Lock This is fascinating stuff – this begs to be compared against Python’s recent journey of escaping the GIL and how concurrency primitives have evolved in Python itself. such that only one thread could execute at one time \(\implies\) interleaving but not true simultaneous execution
The reason the GRL was such a limiting factor was because the GC was not thread-safe, that’s why it needed to enforce strictly that only one thread could execute OCaml code at any one time.
viable escape-hatch: using C FFI calls that could release the lock
Using
Threadfor control of POSIX threads- it’s a wrapper on OS-level pthreads
- Preemptive: the scheduler may interrupt whenever
- Still can’t parallelise becuase only single thread exec at any time because of the GRL
- Useful for: blocking I/O that would release the lock, background thread coordinating a C library
- These were monitor-style primitives:
Mutex,Condition,Semaphorein stdlib – everything limited by the GRL
Lwt- lightweight threads, as a concurrency libraryKey Idea:
Instead of blocking, every potentially-blocking operation shall return a promise (
'a Lwt.t). The scheduler (via an event-loop) runs promises to completion cooperatively; a promise yields control explicitly.Lwtis infectious: calling a blocking function (e.g.Unix.read) within a Lwt promise will block the entire event loop.Thread cancellation isn Lwt is cooperative and fragile – so
Lwt.cancelraisesLwt.Canceledonly @ yield points.that’s why we should always clean up threads with
Lwt.finalizeorLwt_switch
Async– Jane Street’s alternativeIt’s the same cooperative model, with different aesthetics and tradeoffs:
- Instead of
Lwt.t, it usesDeferred.t - Integrated with
Core(JS’s stdlib alternative) - Scheduler is more explicit – we have to start the event loops explicitly via
Async.Scheduler.go () - Piping capability of streams is stronger because it’s more composable (
Pipe.t) - Has better
Clockprimitives
- Instead of
Multi-processing as a parallelism workaround
Real CPU parallelism ended up requiring multiple OS processes communicating over Unix pipes or sockets
libs like Parmap helped to make this more ergonomic
Multicore OCaml >= 5 #
Has 2 orthogonal primitives, onto which the other libs are all built:
- Domains: for true parallelism
- Effects: for structured control flow
Domains – for parallelism
~ maps to a core – each domain runs on its own OS thread, has its own local minor heap (with a local GC) and it shares a major heap with the rest of the domains. Since there’s no GRL, multiple domains can run OCaml code simultaneously.
let d = Domain.spawn (fun () -> (* runs in parallel on another core *) expensive_computation () ) let result = Domain.join dCode Snippet 10: shape of how domains are usedSome nuances :
Domains are supposed to be heavyweight – we shouldn’t be creating thousands of them (more like Go threads than goroutines). Pattern:
Ndomains @ startup, one per-core then distribute the work using queues.Data Races are possible
- memory model spec makes it such that non-atomic shared mutable state requires explicit synchronisation
Atomicmodule gives basic lock-free primitivesmake,get,set,compare_and_set,fetch_and_add
GC is parallel and concurrent
- minor heap \(\implies\) most allocation is thread-local \(\implies\) great performance
Domainslib: for structured parallelismIt’s what we reach for when we want to do CPU-bound parallel-work. Work-stealing scheduler allows us to get load-balancing automatically.
Feature-rich, offers:
Task.pool: fixed-sized domain pool (with a work-stealing scheduler within it)Task.run: submits work to the poolTask.async/Task.await: futures within the poolTask.parallel_for: for parallel iteration
1 2 3 4 5 6 7let pool = Task.setup_pool ~num_domains:3 () let result = Task.run pool (fun () -> let a = Task.async pool (fun () -> heavy_work_1 ()) in let b = Task.async pool (fun () -> heavy_work_2 ()) in Task.await pool a + Task.await pool b ) Task.teardown_pool poolCode Snippet 11: example of usingDomainslib.Task
Effects – for concurrency
Key mechanism for delimited continuations (coroutines) without committing to a specific scheduler – it’s how we can get
Lwt-style concurrency without monadic wrapping.Like a resumable exception, we
performan effect to signal something, and a handler above us in the call-stack canresumethe computation later – likely after some I/O is done.We can write direct-style concurrent code Code that looks like ordinary, sequential code. Contrast with the monadic-style that we had to be reliant on before. and let the underlying scheduler use effects to manage fibers.
Effect handlers come in 2 flavours:
Effect.Deep: handler resumes the continuation multiple times (full-coroutines)Effect.Shallow: handle resumes once and must re-install itself (more explicit, slightly more efficient)
Eio – I/O Library for OCaml 5
Intends to be the replacement for
Lwt, built entirely onEffects, gives direct-style concurrency with structured concurrency guarantees.Key ideas:
Switch: an ownership scope for fibers. All fibers in a switch must finish before the switch exits. It’s structured concurrency Rough idea of structured concurrency: concurrency that follows the shape of the code’s scope. This means we should have tasks started and finished within a code-block and if we need to leave it, error-propagation happens in a controlled fashion without any “dangling” background tasks.Fiber: a lightweight coroutine. Within a switch, Fibers run cooperatively; scheduler switches between them at I/O pointsStream/Promise: communication primitives between Fibers.Can be combined (
Eio+Domainslib): we can useEiofor I/O concurrency within a domain and we can useDomainslibfor CPU parallelism across domains.
Pre-Ocaml 5 #
Async #
Pre-OCaml5 offers a hybrid model that aims to bring the best of both the threading model and event-loop model of concurrency. More performant + without the sync woes of pre-emptive threads.
Some general approaches to concurrency
We first consider some general approaches to concurrency:
pre-emptive system threads
each task that may require simultaneous waiting is given an OS thread of its own so it can block without stopping the entire program e.g. Java, C#
complications:
significant memory and resource usage per thread
OS can arbitrarily interleave the execution of system threads
need good sync mechanisms handled: so, programmer needs to protect shared resource state (e.g. with locks, guarded with conditionals) \(\implies\) error-prone
single-threaded programs running event loops
Event loop reacts to external events like timeouts or mouse-clicks by invoking a callback function that has been registered for that purpose. e.g. JavaScript, GUI toolkits.
- 1 task at a time, so no need to have complex sync mechanisms
complications:
- the control structure of event-driven programs has inverted control structure \(\implies\) your own control flow has to be threaded awkwardly through the system’s event loop \(\implies\) possible callback hell.
Async Basics #
Async is an explicit package of its own, designed to be an extension.
Comparing Async vs Core
(* ==== 1: sync example of reading something *)
open Core;;
#show In_channel.read_all;;
(* val read_all : string -> string *)
Out_channel.write_all "test.txt" ~data:"This is only a test.";;
(* - : unit = () *)
In_channel.read_all "test.txt";; (* -- NB: it's type shows that it's a blocking operation*)
(* - : string = "This is only a test." *)
(* ==== 2: Async version: *)
#require "async";;
open Async;;
#show Reader.file_contents;;
(* val file_contents : string -> string Deferred.t *)
(* ==== 3: using deferred values: *)
let contents = Reader.file_contents "test.txt";;
(* val contents : string Deferred.t = <abstr> *)
Deferred.peek contents;;
(* - : string option = None *) (* -- NB: this indicates that it's not complete*)
(* -- on complete *)
contents;; (*COUNTER INTUITIVE -- only the case for utop where this case of spontaneous evaluation happens *)
(* - : string = "This is only a test." *) (* -- NB: contents is a deferred type (container -- string Deferred.t) but this expression returns string type (the contained type)*)
Deferred.peek contents;;
(* - : string option = Some "This is only a test." *) (* -- peeking after completion*)Deferred Types as a representation for promises
Deferred types
Well-behaved Async functions are non-blocking. They return Deferred.t which is something of a placeholder – like a completable future / promise.
carrying out a non-blocking check:
Deferred.peekis a way of checking completion of the future. It will always give us an optional, if we inspect it, we should see the abstract parameter concretised e.g. typestring Deferred.tGOTCHA: special behaviour within
utop(repl)If we directly refer to a deferred value from within
utop(repl), we’re going to get the contained value directly instead of itself. This is because within a repl, this is a blocking operation (the scheduler starts if it hasn’t) and we effectively await the deferred this way.in scripted environments or within our own programs, this won’t happen because deferred values will never get spontaneously resolved. We would need to bind them with
bind,>>=,let%bindconstructs.
binding as a monadic primitive to sequence concurrent computations
sequencing concurrent computations via binding
We wish to await deferred computations and for that we use
bindas the sequencing operator, which has a sugar-form>>=.The function we bind to the deferred type will get invoked after the deferred is no longer empty e.g.
Writer.savegets called only after the file contents were read viaReader.file_contents.Bindexpects that the function wraps up the result in a deferred type. For that,Asynchas a function calledreturnwhich takes an ordinary value and wraps it up in a deferred.now,
bindandreturnform the monad functional pattern. – it’s one example of a monad.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50#show Deferred.bind;; (* val bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred. *) (* ==== using the sequencing operator (bind) *) let uppercase_file filename = Deferred.bind (Reader.file_contents filename) ~f:(fun text -> Writer.save filename ~contents:(String.uppercase text));; (* val uppercase_file : string -> unit Deferred.t = <fun> *) uppercase_file "test.txt";; (* - : unit = () *) Reader.file_contents "test.txt";; (* - : string = "THIS IS ONLY A TEST." *) (* ==== using the syntax sugar >>= *) let uppercase_file filename = Reader.file_contents filename >>= fun text -> Writer.save filename ~contents:(String.uppercase text);; (* val uppercase_file : string -> unit Deferred.t = <fun> *) (* === needing to wrap the bind function in an async result (wrapped as a Deferred) *) let count_lines filename = Reader.file_contents filename >>= fun text -> return (List.length (String.split text ~on:'\n'));; val count_lines : string -> int Deferred.t = <fun> (* -- this is the negative example: it would never have worked let count_lines filename = Reader.file_contents filename >>= fun text -> List.length (String.split text ~on:'\n');; Line 4, characters 5-45: Error: This expression has type int but an expression was expected of type 'a Deferred.t *) (* ==== deferred map: *) let count_lines filename = Reader.file_contents filename >>| fun text -> List.length (String.split text ~on:'\n');; (* val count_lines : string -> int Deferred.t = <fun> *) count_lines "/etc/hosts";; (* - : int = 10 *)sugar for the monadic bind
bindandreturnmonadic pattern used commonly enough that we haveDeferred.mapfor this (and it’s sugar>>|).Unknown element.
How Scheduling is done
Scheduling
Async has a dedicated scheduler, which needs to be started explicitly for our own programs.
utopwill handle this automatically for us.NOTE: this is similar to python’s thread based concurrency model.
Using Let syntax
The value that the Let syntax brings is from a syntax perspective, the compiled output is the same.
Just compare the bind and map based syntax differences using let syntax.
Why this is useful:
highlights the analogy between monadic bind and OCaml’s built-in let-binding \(\implies\) more uniform, readable code
Let_syntaxworks for any monad, opening the rightLet_syntaxmodule is how we choose the correct monad to use.in the case of
Asyncwe implicitly haveDeferred.Let_syntaxopened – in some contexts, we may wish to do more explicit opens.
#require "ppx_let";;
(* ==== bind with let syntax *)
let count_lines filename =
let%bind text = Reader.file_contents filename in
return (List.length (String.split text ~on:'\n'));;
(* val count_lines : string -> int Deferred.t = <fun> *)
let uppercase_file filename =
Reader.file_contents filename
>>= fun text ->
Writer.save filename ~contents:(String.uppercase text);;
(* val uppercase_file : string -> unit Deferred.t = <fun> *)
(* ==== map with let syntax *)
let count_lines filename =
let%map text = Reader.file_contents filename in
List.length (String.split text ~on:'\n');;
(* val count_lines : string -> int Deferred.t = <fun> *)
let count_lines filename =
Reader.file_contents filename
>>| fun text ->
List.length (String.split text ~on:'\n');;Ivars and Upon: incremental filling via binds
Ivar is about incremental filling of variables – we want to construct a deferred where we can programmatically decide when it gets filled in.
It’s low-level feature compared to map, bind and return. Very useful for building synchronisation patterns that isn’t well-supported.
3 fundamental operations with an ivar:
(* ==== syntax intro: using ivars *)
let ivar = Ivar.create ();;
(*
val ivar : '_weak1 Ivar.t =
{Async_kernel__.Types.Ivar.cell = Async_kernel__Types.Cell.Empty}
*)
let def = Ivar.read ivar;;
(* val def : '_weak1 Deferred.t = <abstr> *)
Deferred.peek def;;
(* - : '_weak2 option = None *)
Ivar.fill ivar "Hello";;
(* - : unit = () *)
Deferred.peek def;;
(* - : string option = Some "Hello" *)- create an ivar, use
Ivar.create - read: the deferred that corresponds to the ivar in question, using
Ivar.read - fill: fills the ivar, causing the deferred to be determined, using
Ivar.fill
Here’s a custom delay-timeout synchronisation pattern, where:
schedulereceives an action in the form of a thunk that returns a deferred. Remember thunks are nullary functions (hence just receive unit()as the arg)the caller of
schedulewill receive the contents of deferred value returned by the thunkthis is where we use the the
uponoperator for callback registration.uponschedules a callback to be executed when the deferred it is passed is determined, the difference being thatuponwill NOT return a new deferred for the callback to fill.val upon : 'a Deferred.t -> ('a -> unit) -> unit.our delayer uses a queue of thunks, where every call to
scheduleadds a thunk to the queue and schedules a job in the future to grab a thunk off the queue and run it.waiting done using
afterthat takes a time-span and returns a deferred that will be determined after that time-span – this is just timeout behaviour that we’re familiar with in languages like JavaScript.the queue of thunks is literally a task queue. note how they run in order (execution order) even if they are scheduled by
uponare run out of order (timing order). This part may be a little confusing:Even though
uponseems “sequential” because it mentions “after delay”, nothing forces thoseuponcallbacks to respect order — only the explicit queue logic does.remember that
uponschedules side-effects, not valuesinside the scheduler,
Asyncdoesn’t promise that the callbacks we registered withuponwill execute in the same order they were enqueued. 2 independent deferred might resolve in any order (depending on timing) \(\implies\)uponintroduces temporal non-determinism.we make the order deterministic by using the queue, which reminds me of python’s
ThreadPoolExecutorcoupled withas_completed().the call to
uponuses queue such that jobs are dequeued in the same order they were enqueued, even through the internal Async callbacks (upon) may be run in different timing order.Queue.enqueue t.jobs (fun () -> upon (thunk ()) (fun x -> Ivar.fill ivar x)); (* == it's this call that enforces the deterministic execution order*) upon (after t.delay) (fun () -> let job = Queue.dequeue_exn t.jobs in job ());(* ==== delayed interface: *) module type Delayer_intf = sig type t val create : Time.Span.t -> t val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t end;; (* module type Delayer_intf = sig type t val create : Time.Span.t -> t val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t end *) (* ===== implementation *) module Delayer : Delayer_intf = struct type t = { delay: Time.Span.t; jobs: (unit -> unit) Queue.t; } let create delay = { delay; jobs = Queue.create () } let schedule t thunk = let ivar = Ivar.create () in Queue.enqueue t.jobs (fun () -> upon (thunk ()) (fun x -> Ivar.fill ivar x)); upon (after t.delay) (fun () -> let job = Queue.dequeue_exn t.jobs in job ()); Ivar.read ivar end;; (* module Delayer : Delayer_intf *)Try sticking to the higher level of abstraction for the concurrency primitives instead of the lower level ones (ivars, upon) unless absolutely necessary for the thing we’re trying to express
Understanding bind in terms of Ivars and upon
This part is just to help us visualise the abstraction barrier part.
when we write let d' = Deferred.bind d ~f:
new ivar
iis created that will hold the final result of the computation, the deferred that is associated to this ivar is what is returneda function is registered to be called when
dis determinedthat function when invoked, calls function
fwith the value determined fordanother function is registered to be called when the deferred returned by
fis determined (like step 2)calling that function fills
i, causing the corresponding deferred to be determined
(* ==== a custom demo implementation *)
let my_bind d ~f =
let i = Ivar.create () in
upon d (fun x -> upon (f x) (fun y -> Ivar.fill i y));
Ivar.read i;;
(*
val my_bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred.t =
<fun>
*)Example: An Echo Server #
Objective: program that accepts connections from clients and spits back whatever is sent to it.
Initial helpers
Handling the client connection
We need a function that can copy data from input to output \(\implies\) this function handles the logic for handling a client connection.
bindmarks the places that we wait:- first, when we call
Reader.readto read a block of input - then when it’s complete, if a new block was returned, then we write that block to the writer
- finally we wait until writer buffers are flushed, then we recurse.
This is implemented with pushback which helps with memory-discipline:
if the process can’t make progress writing, it will stop reading.
this pattern is necessary so that the program DOES NOT allocate unbounded amounts of memory and keeps track of all the data that it intends to write but hasn’t been able to write yet.
open Core open Async (** Copy data from the reader to the writer, using the provided buffer as scratch space *) let rec copy_blocks buffer r w = match%bind Reader.read r buffer with | `Eof -> return () (* -- termination condition*) | `Ok bytes_read -> Writer.write w (Bytes.to_string buffer) ~len:bytes_read; let%bind () = Writer.flushed w in (* -- this is what allows the pushback to happen: no further reading until the current buffer is done writing.*) copy_blocks buffer r w- first, when we call
Setting up a server to receive client connections
We use Async’s
Tcpmodule and its collection of utilities for creating TCP clients and servers.(** Starts a TCP server, which listens on the specified port, invoking copy_blocks every time a client connects. *) let run () = let host_and_port = Tcp.Server.create ~on_handler_error:`Raise (Tcp.Where_to_listen.of_port 8765) (* -- next arg is the client connection handler, the most important argument to the server create function*) (fun _addr r w -> let buffer = Bytes.create (16 * 1024) in copy_blocks buffer r w) in ignore (host_and_port : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t)Calling
Tcp.Server.creategives us aTcp.Server.t, a handle to the server that can be used to shut the server down.we aren’t using those args here, that’s why
ignoreis used to supress the unused variables errorsout of the args for server creation, the last one is the client connection handler and it’s the most important one.
there’s no need for an explicit closing down of the lcient connections because the server will automatically shut down the connection once the deferred returned by the handler is determined.
Starting the
AsyncschedulerA common newbie error is to forget to manually start the scheduler.
(* Call [run], and then start the scheduler *) let () = run (); never_returns (Scheduler.go ())without the scheduler, the program does NOTHING at all, even
printfcalls won’t reach the terminal so it can be bewildering.though we don’t really add special code for handling multiple clients, the server can already do the concurrently connecting, reading and writing of data.
Tail-Calls and Chain of Deferreds
The OCaml compiler is smart and essentially does a form of tail-call optimisation lifted to the Deferred monad.
One potential worry we might have had is that the chain of deferred calls (and the length of the chain) may result in memory allocation problems since this possibly could take up and unbounded amount of memory as the echo process continues.
The compiler will automatically optimise this whole chain of deferreds and identify when the final deferred is determined (in the
copy_blocksexample, it’s when we getEof) and just flatten it into a single deferred \(\implies\) there’s no memory leak at all.- intuition: the
bindthat creates the deferred is in tail-position and nothing is done to that deferred once it’s created; it’s simply returned as is \(\implies\) this looks like a good candidate for tail call optimisation.
- intuition: the
Functions that Never Return
When starting the scheduler, we wrapped
Scheduler.goaround anever_returns.this is an explicit marker – this makes it clear to the invoker of
Scheduler.gothat the function never returns.by default the return type is inferred as
'a. If a function never returns, we’re free to impute any type at all to its non-existent return value so to allow that function (which never returns) to fit into any context within our program – we do this. We’ve seen this before in this book when learning about exceptions.never_returnsis an alias ofNothing.t, which is what the return type ofScheduler.gois defined as.Nothing.tis uninhabited, meaning there’s no values of that type, so a function can’t actually return a value of typeNothing.ti.e. a function that never returns. To do this, we just need to add type annotation:let rec loop_forever () : Nothing.t = loop_forever ();; (* val loop_forever : unit -> never_returns = <fun> *) (* === helpful error if we omit the never_returns: *) (* let do_stuff n = let x = 3 in if n > 0 then Scheduler.go (); x + n;; Line 3, characters 19-34: Error: This expression has type never_returns but an expression was expected of type unit because it is in the result of a conditional with no else branch *) (* ==== actually using the never_returns properly *) let do_stuff n = let x = 3 in if n > 0 then never_returns (Scheduler.go ()); x + n;; (* val do_stuff : int -> int = <fun> *)We had seen uninhabited types earlier before in a different context and use-case back when learning about narrowing without using GADTs
Improving the Echo Server
Improvements list:
- add a proper CLI with
Command - add a flag to specify the port to listen on and a flag to make the server echo back the capitalised version of input
- simplify the code using Asyn’s
Pipeinterface
open Core open Async let run ~uppercase ~port = let host_and_port = Tcp.Server.create ~on_handler_error:`Raise (Tcp.Where_to_listen.of_port port) (fun _addr r w -> Pipe.transfer (Reader.pipe r) (Writer.pipe w) ~f:(if uppercase then String.uppercase else Fn.id)) in ignore (host_and_port : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t); Deferred.never () (* --- returns a deferred that is never determined, indicating that the echo server doesn't shut down*) let () = Command.async ~summary:"Start an echo server" (let%map_open.Command uppercase = flag "-uppercase" no_arg ~doc:" Convert to uppercase before echoing back" and port = flag "-port" (optional_with_default 8765 int) ~doc:" Port to listen on (default 8765)" in fun () -> run ~uppercase ~port) |> Command_unix.runNOTEs:
Deferred.neverin therunfunction shows that the echo server doesn’t ever shut down.Deferred.neverreturns a deferred that is never determined.- Use of Aync’s
Pipeis the key change in the code.Pipeis an async comms channels used for connecting different parts of the programvisualise this as a consumer/producer queue that uses deferred for communicating when the pipe is ready to be read from or written into
we’re using one of the many utility functions within the
Pipemodule:Pipe.transferto set up a process that takes data from a reader-pipe and moves it to a writer-pipe (type:- : 'a Pipe.Reader.t -> 'b Pipe.Writer.t -> f:('a -> 'b) -> unit Deferred.t = <fun>)Reader.pipeandWriter.pipeare the two pipes that are connected here.
this still preserves the pushback: if the writer gets blocked, the writer’s pipe will stop pulling data from the reader’s pipe and this prevents the reader from reading in more data.
the deferred by
Pipe.transferbecomes determined once the reader has been closed and the last element is transferred from the reader to the writer. So the server shuts down that client connection once that deferred becomes determined.
Using Pipes
they are created in connected read/write pairs:
let (r,w) = Pipe.create ();; (* val r : '_weak3 Pipe.Reader.t = <abstr> *) (* val w : '_weak3 Pipe.Writer.t = <abstr> *)- the
randware weakly polymorphic
- the
Pipes have some internal slack: a number of slits to which something can be written before the write will block.
By default the slack is 0 – so the deferred returned by a write is determined only when the value is read out of the pipe.
let (r,w) = Pipe.create ();; (* val r : '_weak4 Pipe.Reader.t = <abstr> *) (* val w : '_weak4 Pipe.Writer.t = <abstr> *) let write_complete = Pipe.write w "Hello World!";; (* val write_complete : unit Deferred.t = <abstr> *) Pipe.read r;; (* -- so reading from the pie here will determine the deferred for the write to the pipe:*) (* - : [ `Eof | `Ok of string ] = `Ok "Hello World!" *) write_complete;; (* - : unit = () *)
- add a proper CLI with
Example: Searching Definitions with DuckDuckGo #
Setup
Install the following libs:
- textwrap
- uri
- yojason
- cohttp
URI Handling
We’d need to do some UI generation to be able to query the DDG servers:
open Core open Async (** Generate a DuckDuckGo search URI from a query string *) let query_uri query = let base_uri = Uri.of_string "http://api.duckduckgo.com/?format=json" in Uri.add_query_param base_uri ("q", [ query ])
Parsing JSON Strings
HTTP response from DDG is in JSON, so we’d need to parse it.
(** Extract the "Definition" or "Abstract" field from the DuckDuckGo results *) let get_definition_from_json json = match Yojson.Safe.from_string json with | `Assoc kv_list -> let find key = match List.Assoc.find ~equal:String.equal kv_list key with | None | Some (`String "") -> None | Some s -> Some (Yojson.Safe.to_string s) in (match find "Abstract" with | Some _ as x -> x | None -> find "Definition") | _ -> None
Executing an HTTP Client Query
For the query dispatching:
(* Execute the DuckDuckGo search *) let get_definition word = let%bind _, body = Cohttp_async.Client.get (query_uri word) in let%map string = Cohttp_async.Body.to_string body in word, get_definition_from_json string (* ==== understanding the signature for cohttp-async.Client.get *) #require "cohttp-async";; #show Cohttp_async.Client.get;; (* val get : ?interrupt:unit Deferred.t -> ?ssl_config:Conduit_async.V2.Ssl.Config.t -> ?headers:Cohttp.Header.t -> Uri.t -> (Cohttp.Response.t * Cohttp_async.Body.t) Deferred.t *)Understanding
Cohttp_async.Client.getrequired arg: URI
returns a deferred value containing a
Cohttp.Response.tand a Pipe Reader through which theCohttp_async.Body.tof the request will be streamedbecause the body isn’t that large, we just read the entire body directly using
Body.to_string(instead of chunking it)
Dispatching multiple searches in parallel
We use
Deferred.allfor our dispatcher. It preserves the order of the deferred that are passed to it.Printing also only happens until all the results arrive.
(* === Print out a word/definition pair *) let print_result (word, definition) = printf (* -- this is an async printf that goes through the async scheduler *) "%s\n%s\n\n%s\n\n" word (String.init (String.length word) ~f:(fun _ -> '-')) (match definition with | None -> "No definition found" | Some def -> String.concat ~sep:"\n" (Wrapper.wrap (Wrapper.make 70) def)) (* === Parallel dispatcher: *) (* Run many searches in parallel, printing out the results after they're all done. *) let search_and_print words = let%map results = Deferred.all (List.map words ~f:get_definition) in List.iter results ~f:print_result (* ==== alternative: retrieve it pipelined, in order of completion *) (* Run many searches in parallel, printing out the results as you go *) (* Deferred.all_unit;; - : unit Deferred.t list -> unit Deferred.t = <fun> *) let search_and_print words = Deferred.all_unit (List.map words ~f:(fun word -> get_definition word >>| print_result))Finally, a CLI interface for this :
let () = Command.async ~summary:"Retrieve definitions from duckduckgo search engine" (let%map_open.Command words = anon (sequence ("word" %: string)) in fun () -> search_and_print words) |> Command_unix.run
Exception Handling #
We should expect flakiness, especially when concurrently using external resources.
- Monitors
- Example: Handling exceptions with DuckDuckGo