TODO Chapter 16: Concurrent Programming with Async

The book introduces us to concurrent programming by talking about the problems of waiting on blocked processes on some sort of IO.

OCaml offers a hybrid model that aims to bring the best of both the threading model and event-loop model of concurrency. More performant + without the sync woes of pre-emptive threads.

We consider some approaches to concurrency:

  1. pre-emptive system threads

    each task that may require simultaneous waiting is given an OS thread of its own so it can block without stopping the entire program e.g. Java, C#

    complications:

    1. significant memory and resource usage per thread

    2. OS can arbitrarily interleave the execution of system threads

      need good sync mechanisms handled: so, programmer needs to protect shared resource state (e.g. with locks, guarded with conditionals) \(\implies\) error-prone

  2. single-threaded programs running event loops

    Event loop reacts to external events like timeouts or mouse-clicks by invoking a callback function that has been registered for that purpose. e.g. JavaScript, GUI toolkits.

    1. 1 task at a time, so no need to have complex sync mechanisms

    complications:

    1. the control structure of event-driven programs has inverted control structure \(\implies\) your own control flow has to be threaded awkwardly through the system’s event loop \(\implies\) callback hell.

Async Basics

Async is an explicit package of its own, designed to be an extension.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
(* ==== 1: sync example of reading something *)
open Core;;
#show In_channel.read_all;;
(* val read_all : string -> string *)
Out_channel.write_all "test.txt" ~data:"This is only a test.";;
(* - : unit = () *)
In_channel.read_all "test.txt";; (* -- NB: it's type shows that it's a blocking operation*)
(* - : string = "This is only a test." *)

(* ==== 2: Async version: *)
#require "async";;
open Async;;
#show Reader.file_contents;;
(* val file_contents : string -> string Deferred.t *)

(* ==== 3: using deferred values: *)
let contents = Reader.file_contents "test.txt";;
(* val contents : string Deferred.t = <abstr> *)
Deferred.peek contents;;
(* - : string option = None *) (* -- NB: this indicates that it's not complete*)


(* -- on complete  *)
contents;; (*COUNTER INTUITIVE -- only the case for utop where this case of spontaneous evaluation happens *)
(* - : string = "This is only a test." *) (* -- NB: contents is a deferred type (container --  string Deferred.t) but this expression returns string type (the contained type)*)
Deferred.peek contents;;
(* - : string option = Some "This is only a test." *) (* -- peeking after completion*)

Deferred types

Well-behaved Async functions are non-blocking. They return Deferred.t which is something of a placeholder – like a completable future / promise.

  • carrying out a non-blocking check:

    Deferred.peek is a way of checking completion of the future. It will always give us an optional, if we inspect it, we should see the abstract parameter concretised e.g. type string Deferred.t

  • GOTCHA: special behaviour within utop (repl)

    If we directly refer to a deferred value from within utop (repl), we’re going to get the contained value directly instead of itself. This is because within a repl, this is a blocking operation (the scheduler starts if it hasn’t) and we effectively await the deferred this way.

    in scripted environments or within our own programs, this won’t happen because deferred values will never get spontaneously resolved. We would need to bind them with bind, >>=, let%bind constructs.

  • sequencing concurrent computations via binding

    We wish to await deferred computations and for that we use bind as the sequencing operator, which has a sugar-form >>=.

    The function we bind to the deferred type will get invoked after the deferred is no longer empty e.g. Writer.save gets called only after the file contents were read via Reader.file_contents.

    Bind expects that the function wraps up the result in a deferred type. For that, Async has a function called return which takes an ordinary value and wraps it up in a deferred.

    now, bind and return form the monad functional pattern. – it’s one example of a monad.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    
      #show Deferred.bind;;
      (* val bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred. *)
    
      (* ==== using the sequencing operator (bind) *)
      let uppercase_file filename =
        Deferred.bind (Reader.file_contents filename)
          ~f:(fun text ->
             Writer.save filename ~contents:(String.uppercase text));;
      (* val uppercase_file : string -> unit Deferred.t = <fun> *)
      uppercase_file "test.txt";;
      (* - : unit = () *)
      Reader.file_contents "test.txt";;
      (* - : string = "THIS IS ONLY A TEST." *)
    
    
      (* ==== using the syntax sugar >>= *)
      let uppercase_file filename =
        Reader.file_contents filename
        >>= fun text ->
        Writer.save filename ~contents:(String.uppercase text);;
      (* val uppercase_file : string -> unit Deferred.t = <fun> *)
    
      (* === needing to wrap the bind function in an async result (wrapped as a Deferred) *)
      let count_lines filename =
        Reader.file_contents filename
        >>= fun text ->
        return (List.length (String.split text ~on:'\n'));;
      val count_lines : string -> int Deferred.t = <fun>
    
      (* -- this is the negative example: it would never have worked
    
      let count_lines filename =
        Reader.file_contents filename
        >>= fun text ->
        List.length (String.split text ~on:'\n');;
      Line 4, characters 5-45:
      Error: This expression has type int but an expression was expected of type
               'a Deferred.t
    
      *)
    
    
      (* ==== deferred map: *)
      let count_lines filename =
        Reader.file_contents filename
        >>| fun text ->
        List.length (String.split text ~on:'\n');;
      (* val count_lines : string -> int Deferred.t = <fun> *)
      count_lines "/etc/hosts";;
      (* - : int = 10 *)
    
  • sugar for the monadic bind

    bind and return monadic pattern used commonly enough that we have Deferred.map for this (and it’s sugar >>|).

    1
    
    Unknown element.
    

Scheduling

  • Async has a dedicated scheduler, which needs to be started explicitly for our own programs.

    utop will handle this automatically for us.

    NOTE: this is similar to python’s thread based concurrency model.

Using Let Syntax

The value that the Let syntax brings is from a syntax perspective, the compiled output is the same.

Just compare the bind and map based syntax differences using let syntax.

Why this is useful:

  1. highlights the analogy between monadic bind and OCaml’s built-in let-binding \(\implies\) more uniform, readable code

  2. Let_syntax works for any monad, opening the right Let_syntax module is how we choose the correct monad to use.

    in the case of Async we implicitly have Deferred.Let_syntax opened – in some contexts, we may wish to do more explicit opens.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#require "ppx_let";;

(* ==== bind with let syntax *)
let count_lines filename =
  let%bind text = Reader.file_contents filename in
  return (List.length (String.split text ~on:'\n'));;
(* val count_lines : string -> int Deferred.t = <fun> *)

let uppercase_file filename =
  Reader.file_contents filename
  >>= fun text ->
  Writer.save filename ~contents:(String.uppercase text);;
(* val uppercase_file : string -> unit Deferred.t = <fun> *)

(* ==== map with let syntax *)
let count_lines filename =
  let%map text = Reader.file_contents filename in
  List.length (String.split text ~on:'\n');;
(* val count_lines : string -> int Deferred.t = <fun> *)

let count_lines filename =
  Reader.file_contents filename
  >>| fun text ->
  List.length (String.split text ~on:'\n');;

Ivars and Upon

Ivar is about incremental filling of variables – we want to construct a deferred where we can programmatically decide when it gets filled in.

It’s low-level feature compared to map, bind and return. Very useful for building synchronisation patterns that isn’t well-supported.

3 fundamental operations with an ivar:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
(* ==== syntax intro: using ivars *)
let ivar = Ivar.create ();;
(*
val ivar : '_weak1 Ivar.t =
  {Async_kernel__.Types.Ivar.cell = Async_kernel__Types.Cell.Empty}
  *)
let def = Ivar.read ivar;;
(* val def : '_weak1 Deferred.t = <abstr> *)
Deferred.peek def;;
(* - : '_weak2 option = None *)
Ivar.fill ivar "Hello";;
(* - : unit = () *)
Deferred.peek def;;
(* - : string option = Some "Hello" *)
  1. create an ivar, use Ivar.create
  2. read: the deferred that corresponds to the ivar in question, using Ivar.read
  3. fill: fills the ivar, causing the deferred to be determined, using Ivar.fill

Here’s a custom delay-timeout synchronisation pattern, where:

  1. schedule receives an action in the form of a thunk that returns a deferred. Remember thunks are nullary functions (hence just receive unit () as the arg)

    the caller of schedule will receive the contents of deferred value returned by the thunk

    this is where we use the the upon operator for callback registration. upon schedules a callback to be executed when the deferred it is passed is determined, the difference being that upon will NOT return a new deferred for the callback to fill. val upon : 'a Deferred.t -> ('a -> unit) -> unit.

  2. our delayer uses a queue of thunks, where every call to schedule adds a thunk to the queue and schedules a job in the future to grab a thunk off the queue and run it.

    waiting done using after that takes a time-span and returns a deferred that will be determined after that time-span – this is just timeout behaviour that we’re familiar with in languages like JavaScript.

    the queue of thunks is literally a task queue. note how they run in order (execution order) even if they are scheduled by upon are run out of order (timing order). This part may be a little confusing:

    • Even though upon seems “sequential” because it mentions “after delay”, nothing forces those upon callbacks to respect order — only the explicit queue logic does.

    • remember that upon schedules side-effects, not values

    • inside the scheduler, Async doesn’t promise that the callbacks we registered with upon will execute in the same order they were enqueued. 2 independent deferred might resolve in any order (depending on timing) \(\implies\) upon introduces temporal non-determinism.

    • we make the order deterministic by using the queue, which reminds me of python’s ThreadPoolExecutor coupled with as_completed().

      the call to upon uses queue such that jobs are dequeued in the same order they were enqueued, even through the internal Async callbacks (upon) may be run in different timing order.

      1
      2
      3
      4
      5
      6
      
           Queue.enqueue t.jobs (fun () ->
             upon (thunk ()) (fun x -> Ivar.fill ivar x));
           (* == it's this call that enforces the deterministic execution order*)
           upon (after t.delay) (fun () ->
             let job = Queue.dequeue_exn t.jobs in
             job ());
      
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
(* ==== delayed interface: *)
module type Delayer_intf = sig
  type t
  val create : Time.Span.t -> t
  val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
end;;
(*
module type Delayer_intf =
  sig
    type t
    val create : Time.Span.t -> t
    val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
  end
*)


(* ===== implementation  *)
module Delayer : Delayer_intf = struct
  type t = { delay: Time.Span.t;
             jobs: (unit -> unit) Queue.t;
           }

  let create delay =
    { delay; jobs = Queue.create () }

  let schedule t thunk =
    let ivar = Ivar.create () in
    Queue.enqueue t.jobs (fun () ->
      upon (thunk ()) (fun x -> Ivar.fill ivar x));
    upon (after t.delay) (fun () ->
      let job = Queue.dequeue_exn t.jobs in
      job ());
    Ivar.read ivar
end;;
(* module Delayer : Delayer_intf *)

Try sticking to the higher level of abstraction for the concurrency primitives instead of the lower level ones (ivars, upon) unless absolutely necessary for the thing we’re trying to express

  • Understanding bind in terms of Ivars and upon

    This part is just to help us visualise the abstraction barrier part.

    when we write let d' = Deferred.bind d ~f:

    1. new ivar i is created that will hold the final result of the computation, the deferred that is associated to this ivar is what is returned

    2. a function is registered to be called when d is determined

    3. that function when invoked, calls function f with the value determined for d

    4. another function is registered to be called when the deferred returned by f is determined (like step 2)

    5. calling that function fills i, causing the corresponding deferred to be determined

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    (* ==== a custom demo implementation *)
    let my_bind d ~f =
      let i = Ivar.create () in
      upon d (fun x -> upon (f x) (fun y -> Ivar.fill i y));
      Ivar.read i;;
    (*
    val my_bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred.t =
      <fun>
      *)
    

Example: An Echo Server

Objective: program that accepts connections from clients and spits back whatever is sent to it.

Initial helpers

  • Handling the client connection

    We need a function that can copy data from input to output \(\implies\) this function handles the logic for handling a client connection.

    bind marks the places that we wait:

    1. first, when we call Reader.read to read a block of input
    2. then when it’s complete, if a new block was returned, then we write that block to the writer
    3. finally we wait until writer buffers are flushed, then we recurse.

    This is implemented with pushback which helps with memory-discipline:

    • if the process can’t make progress writing, it will stop reading.

    • this pattern is necessary so that the program DOES NOT allocate unbounded amounts of memory and keeps track of all the data that it intends to write but hasn’t been able to write yet.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    open Core
    open Async
    
    (** Copy data from the reader to the writer, using the provided buffer
       as scratch space *)
    let rec copy_blocks buffer r w =
      match%bind Reader.read r buffer with
      | `Eof -> return () (* -- termination condition*)
      | `Ok bytes_read ->
        Writer.write w (Bytes.to_string buffer) ~len:bytes_read;
        let%bind () = Writer.flushed w in (* -- this is what allows the pushback to happen: no further reading until the current buffer is done writing.*)
        copy_blocks buffer r w
    
  • Setting up a server to receive client connections

    We use Async’s Tcp module and its collection of utilities for creating TCP clients and servers.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    (** Starts a TCP server, which listens on the specified port, invoking
        copy_blocks every time a client connects. *)
    let run () =
      let host_and_port =
        Tcp.Server.create
          ~on_handler_error:`Raise
          (Tcp.Where_to_listen.of_port 8765)
          (* -- next arg is the client connection handler, the most important argument to the server create function*)
          (fun _addr r w ->
            let buffer = Bytes.create (16 * 1024) in
            copy_blocks buffer r w)
      in
      ignore
        (host_and_port
          : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t)
    
    • Calling Tcp.Server.create gives us a Tcp.Server.t, a handle to the server that can be used to shut the server down.

      we aren’t using those args here, that’s why ignore is used to supress the unused variables errors

    • out of the args for server creation, the last one is the client connection handler and it’s the most important one.

    • there’s no need for an explicit closing down of the lcient connections because the server will automatically shut down the connection once the deferred returned by the handler is determined.

  • Starting the Async scheduler

    A common newbie error is to forget to manually start the scheduler.

    1
    2
    3
    4
    
    (* Call [run], and then start the scheduler *)
    let () =
      run ();
      never_returns (Scheduler.go ())
    
    • without the scheduler, the program does NOTHING at all, even printf calls won’t reach the terminal so it can be bewildering.

    • though we don’t really add special code for handling multiple clients, the server can already do the concurrently connecting, reading and writing of data.

Tail-Calls and Chain of Deferreds

The OCaml compiler is smart and essentially does a form of tail-call optimisation lifted to the Deferred monad.

One potential worry we might have had is that the chain of deferred calls (and the length of the chain) may result in memory allocation problems since this possibly could take up and unbounded amount of memory as the echo process continues.

The compiler will automatically optimise this whole chain of deferreds and identify when the final deferred is determined (in the copy_blocks example, it’s when we get Eof) and just flatten it into a single deferred \(\implies\) there’s no memory leak at all.

  • intuition: the bind that creates the deferred is in tail-position and nothing is done to that deferred once it’s created; it’s simply returned as is \(\implies\) this looks like a good candidate for tail call optimisation.

Functions that Never Return

When starting the scheduler, we wrapped Scheduler.go around a never_returns.

  • this is an explicit marker – this makes it clear to the invoker of Scheduler.go that the function never returns.

  • by default the return type is inferred as 'a. If a function never returns, we’re free to impute any type at all to its non-existent return value so to allow that function (which never returns) to fit into any context within our program – we do this. We’ve seen this before in this book when learning about exceptions.

  • never_returns is an alias of Nothing.t, which is what the return type of Scheduler.go is defined as.

    Nothing.t is uninhabited, meaning there’s no values of that type, so a function can’t actually return a value of type Nothing.t i.e. a function that never returns. To do this, we just need to add type annotation:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    
      let rec loop_forever () : Nothing.t = loop_forever ();;
      (* val loop_forever : unit -> never_returns = <fun> *)
    
      (* === helpful error if we omit the never_returns: *)
      (*
    
        let do_stuff n =
        let x = 3 in
        if n > 0 then Scheduler.go ();
        x + n;;
    
    
    
      Line 3, characters 19-34:
      Error: This expression has type never_returns
             but an expression was expected of type unit
             because it is in the result of a conditional with no else branch
    
        *)
    
      (* ==== actually using the never_returns properly *)
      let do_stuff n =
        let x = 3 in
        if n > 0 then never_returns (Scheduler.go ());
        x + n;;
      (* val do_stuff : int -> int = <fun> *)
    

    We had seen uninhabited types earlier before in a different context and use-case back when learning about narrowing without using GADTs

Improving the Echo Server

Improvements list:

  1. add a proper CLI with Command
  2. add a flag to specify the port to listen on and a flag to make the server echo back the capitalised version of input
  3. simplify the code using Asyn’s Pipe interface
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
open Core
open Async

let run ~uppercase ~port =
  let host_and_port =
    Tcp.Server.create
      ~on_handler_error:`Raise
      (Tcp.Where_to_listen.of_port port)
      (fun _addr r w ->
        Pipe.transfer
          (Reader.pipe r)
          (Writer.pipe w)
          ~f:(if uppercase then String.uppercase else Fn.id))
  in
  ignore
    (host_and_port
      : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t);
  Deferred.never () (* --- returns a deferred that is never determined, indicating that the echo server doesn't shut down*)

let () =
  Command.async
    ~summary:"Start an echo server"
    (let%map_open.Command uppercase =
       flag
         "-uppercase"
         no_arg
         ~doc:" Convert to uppercase before echoing back"
     and port =
       flag
         "-port"
         (optional_with_default 8765 int)
         ~doc:" Port to listen on (default 8765)"
     in
     fun () -> run ~uppercase ~port)
  |> Command_unix.run

NOTEs:

  1. Deferred.never in the run function shows that the echo server doesn’t ever shut down. Deferred.never returns a deferred that is never determined.
  2. Use of Aync’s Pipe is the key change in the code.
    • Pipe is an async comms channels used for connecting different parts of the program

    • visualise this as a consumer/producer queue that uses deferred for communicating when the pipe is ready to be read from or written into

    • we’re using one of the many utility functions within the Pipe module:

      • Pipe.transfer to set up a process that takes data from a reader-pipe and moves it to a writer-pipe (type: - : 'a Pipe.Reader.t -> 'b Pipe.Writer.t -> f:('a -> 'b) -> unit Deferred.t = <fun>)
      • Reader.pipe and Writer.pipe are the two pipes that are connected here.
    • this still preserves the pushback: if the writer gets blocked, the writer’s pipe will stop pulling data from the reader’s pipe and this prevents the reader from reading in more data.

      the deferred by Pipe.transfer becomes determined once the reader has been closed and the last element is transferred from the reader to the writer. So the server shuts down that client connection once that deferred becomes determined.

  • Using Pipes

    1. they are created in connected read/write pairs:

      1
      2
      3
      
         let (r,w) = Pipe.create ();;
         (* val r : '_weak3 Pipe.Reader.t = <abstr> *)
         (* val w : '_weak3 Pipe.Writer.t = <abstr> *)
      
      • the r and w are weakly polymorphic
    2. Pipes have some internal slack: a number of slits to which something can be written before the write will block.

      By default the slack is 0 – so the deferred returned by a write is determined only when the value is read out of the pipe.

      1
      2
      3
      4
      5
      6
      7
      8
      9
      
         let (r,w) = Pipe.create ();;
         (* val r : '_weak4 Pipe.Reader.t = <abstr> *)
         (* val w : '_weak4 Pipe.Writer.t = <abstr> *)
         let write_complete = Pipe.write w "Hello World!";;
         (* val write_complete : unit Deferred.t = <abstr> *)
         Pipe.read r;; (* -- so reading from the pie here will determine the deferred for the write  to the pipe:*)
         (* - : [ `Eof | `Ok of string ] = `Ok "Hello World!" *)
         write_complete;;
         (* - : unit = () *)
      

Example: Searching Definitions with DuckDuckGo

Setup

Install the following libs:

  1. textwrap
  2. uri
  3. yojason
  4. cohttp

URI Handling

We’d need to do some UI generation to be able to query the DDG servers:

1
2
3
4
5
6
7
8
9
open Core
open Async

(** Generate a DuckDuckGo search URI from a query string *)
let query_uri query =
  let base_uri =
    Uri.of_string "http://api.duckduckgo.com/?format=json"
  in
  Uri.add_query_param base_uri ("q", [ query ])

Parsing JSON Strings

HTTP response from DDG is in JSON, so we’d need to parse it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
(** Extract the "Definition" or "Abstract" field from the DuckDuckGo
   results *)
let get_definition_from_json json =
  match Yojson.Safe.from_string json with
  | `Assoc kv_list ->
    let find key =
      match List.Assoc.find ~equal:String.equal kv_list key with
      | None | Some (`String "") -> None
      | Some s -> Some (Yojson.Safe.to_string s)
    in
    (match find "Abstract" with
    | Some _ as x -> x
    | None -> find "Definition")
  | _ -> None

Executing an HTTP Client Query

For the query dispatching:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
(* Execute the DuckDuckGo search *)
let get_definition word =
  let%bind _, body = Cohttp_async.Client.get (query_uri word) in
  let%map string = Cohttp_async.Body.to_string body in
  word, get_definition_from_json string



(* ==== understanding the signature for cohttp-async.Client.get *)
#require "cohttp-async";;
#show Cohttp_async.Client.get;;
(*
val get :
  ?interrupt:unit Deferred.t ->
  ?ssl_config:Conduit_async.V2.Ssl.Config.t ->
  ?headers:Cohttp.Header.t ->
  Uri.t -> (Cohttp.Response.t * Cohttp_async.Body.t) Deferred.t
*)

Understanding Cohttp_async.Client.get

  1. required arg: URI

  2. returns a deferred value containing a Cohttp.Response.t and a Pipe Reader through which the Cohttp_async.Body.t of the request will be streamed

    because the body isn’t that large, we just read the entire body directly using Body.to_string (instead of chunking it)

Dispatching multiple searches in parallel

We use Deferred.all for our dispatcher. It preserves the order of the deferred that are passed to it.

Printing also only happens until all the results arrive.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
(* === Print out a word/definition pair *)
let print_result (word, definition) =
  printf (* -- this is an async printf that goes through the async scheduler *)
    "%s\n%s\n\n%s\n\n"
    word
    (String.init (String.length word) ~f:(fun _ -> '-'))
    (match definition with
    | None -> "No definition found"
    | Some def ->
      String.concat ~sep:"\n" (Wrapper.wrap (Wrapper.make 70) def))


(* === Parallel dispatcher: *)
(* Run many searches in parallel, printing out the results after
   they're all done. *)
let search_and_print words =
  let%map results = Deferred.all (List.map words ~f:get_definition) in
  List.iter results ~f:print_result


(* ==== alternative: retrieve it pipelined, in order of completion *)
(* Run many searches in parallel, printing out the results as you
   go *)
(*
    Deferred.all_unit;;
    - : unit Deferred.t list -> unit Deferred.t = <fun>
*)
let search_and_print words =
  Deferred.all_unit
    (List.map words ~f:(fun word ->
         get_definition word >>| print_result))

Finally, a CLI interface for this :

1
2
3
4
5
6
7
8
let () =
  Command.async
    ~summary:"Retrieve definitions from duckduckgo search engine"
    (let%map_open.Command words =
       anon (sequence ("word" %: string))
     in
     fun () -> search_and_print words)
  |> Command_unix.run

Exception Handling

We should expect flakiness, especially when concurrently using external resources.

Monitors

Example: Handling exceptions with DuckDuckGo

Timeouts, Cancellation, and Choices

Working with System Threads

Multicore OCaml

Thread-Safety and Locking

TODO Chapter 17: Testing

TODO Chapter 18: Handling JSON Data