TODO Chapter 16: Concurrent Programming with Async
The book introduces us to concurrent programming by talking about the problems of waiting on blocked processes on some sort of IO.
OCaml offers a hybrid model that aims to bring the best of both the threading model and event-loop model of concurrency. More performant + without the sync woes of pre-emptive threads.
We consider some approaches to concurrency:
pre-emptive system threads
each task that may require simultaneous waiting is given an OS thread of its own so it can block without stopping the entire program e.g. Java, C#
complications:
significant memory and resource usage per thread
OS can arbitrarily interleave the execution of system threads
need good sync mechanisms handled: so, programmer needs to protect shared resource state (e.g. with locks, guarded with conditionals) \(\implies\) error-prone
single-threaded programs running event loops
Event loop reacts to external events like timeouts or mouse-clicks by invoking a callback function that has been registered for that purpose. e.g. JavaScript, GUI toolkits.
- 1 task at a time, so no need to have complex sync mechanisms
complications:
- the control structure of event-driven programs has inverted control structure \(\implies\) your own control flow has to be threaded awkwardly through the system’s event loop \(\implies\) callback hell.
Async Basics
Async is an explicit package of its own, designed to be an extension.
| |
Deferred types
Well-behaved Async functions are non-blocking. They return Deferred.t which is something of a placeholder – like a completable future / promise.
carrying out a non-blocking check:
Deferred.peekis a way of checking completion of the future. It will always give us an optional, if we inspect it, we should see the abstract parameter concretised e.g. typestring Deferred.tGOTCHA: special behaviour within
utop(repl)If we directly refer to a deferred value from within
utop(repl), we’re going to get the contained value directly instead of itself. This is because within a repl, this is a blocking operation (the scheduler starts if it hasn’t) and we effectively await the deferred this way.in scripted environments or within our own programs, this won’t happen because deferred values will never get spontaneously resolved. We would need to bind them with
bind,>>=,let%bindconstructs.sequencing concurrent computations via binding
We wish to await deferred computations and for that we use
bindas the sequencing operator, which has a sugar-form>>=.The function we bind to the deferred type will get invoked after the deferred is no longer empty e.g.
Writer.savegets called only after the file contents were read viaReader.file_contents.Bindexpects that the function wraps up the result in a deferred type. For that,Asynchas a function calledreturnwhich takes an ordinary value and wraps it up in a deferred.now,
bindandreturnform the monad functional pattern. – it’s one example of a monad.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50#show Deferred.bind;; (* val bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred. *) (* ==== using the sequencing operator (bind) *) let uppercase_file filename = Deferred.bind (Reader.file_contents filename) ~f:(fun text -> Writer.save filename ~contents:(String.uppercase text));; (* val uppercase_file : string -> unit Deferred.t = <fun> *) uppercase_file "test.txt";; (* - : unit = () *) Reader.file_contents "test.txt";; (* - : string = "THIS IS ONLY A TEST." *) (* ==== using the syntax sugar >>= *) let uppercase_file filename = Reader.file_contents filename >>= fun text -> Writer.save filename ~contents:(String.uppercase text);; (* val uppercase_file : string -> unit Deferred.t = <fun> *) (* === needing to wrap the bind function in an async result (wrapped as a Deferred) *) let count_lines filename = Reader.file_contents filename >>= fun text -> return (List.length (String.split text ~on:'\n'));; val count_lines : string -> int Deferred.t = <fun> (* -- this is the negative example: it would never have worked let count_lines filename = Reader.file_contents filename >>= fun text -> List.length (String.split text ~on:'\n');; Line 4, characters 5-45: Error: This expression has type int but an expression was expected of type 'a Deferred.t *) (* ==== deferred map: *) let count_lines filename = Reader.file_contents filename >>| fun text -> List.length (String.split text ~on:'\n');; (* val count_lines : string -> int Deferred.t = <fun> *) count_lines "/etc/hosts";; (* - : int = 10 *)sugar for the monadic bind
bindandreturnmonadic pattern used commonly enough that we haveDeferred.mapfor this (and it’s sugar>>|).1Unknown element.
Scheduling
Async has a dedicated scheduler, which needs to be started explicitly for our own programs.
utopwill handle this automatically for us.NOTE: this is similar to python’s thread based concurrency model.
Using Let Syntax
The value that the Let syntax brings is from a syntax perspective, the compiled output is the same.
Just compare the bind and map based syntax differences using let syntax.
Why this is useful:
highlights the analogy between monadic bind and OCaml’s built-in let-binding \(\implies\) more uniform, readable code
Let_syntaxworks for any monad, opening the rightLet_syntaxmodule is how we choose the correct monad to use.in the case of
Asyncwe implicitly haveDeferred.Let_syntaxopened – in some contexts, we may wish to do more explicit opens.
| |
Ivars and Upon
Ivar is about incremental filling of variables – we want to construct a deferred where we can programmatically decide when it gets filled in.
It’s low-level feature compared to map, bind and return. Very useful for building synchronisation patterns that isn’t well-supported.
3 fundamental operations with an ivar:
| |
- create an ivar, use
Ivar.create - read: the deferred that corresponds to the ivar in question, using
Ivar.read - fill: fills the ivar, causing the deferred to be determined, using
Ivar.fill
Here’s a custom delay-timeout synchronisation pattern, where:
schedulereceives an action in the form of a thunk that returns a deferred. Remember thunks are nullary functions (hence just receive unit()as the arg)the caller of
schedulewill receive the contents of deferred value returned by the thunkthis is where we use the the
uponoperator for callback registration.uponschedules a callback to be executed when the deferred it is passed is determined, the difference being thatuponwill NOT return a new deferred for the callback to fill.val upon : 'a Deferred.t -> ('a -> unit) -> unit.our delayer uses a queue of thunks, where every call to
scheduleadds a thunk to the queue and schedules a job in the future to grab a thunk off the queue and run it.waiting done using
afterthat takes a time-span and returns a deferred that will be determined after that time-span – this is just timeout behaviour that we’re familiar with in languages like JavaScript.the queue of thunks is literally a task queue. note how they run in order (execution order) even if they are scheduled by
uponare run out of order (timing order). This part may be a little confusing:Even though
uponseems “sequential” because it mentions “after delay”, nothing forces thoseuponcallbacks to respect order — only the explicit queue logic does.remember that
uponschedules side-effects, not valuesinside the scheduler,
Asyncdoesn’t promise that the callbacks we registered withuponwill execute in the same order they were enqueued. 2 independent deferred might resolve in any order (depending on timing) \(\implies\)uponintroduces temporal non-determinism.we make the order deterministic by using the queue, which reminds me of python’s
ThreadPoolExecutorcoupled withas_completed().the call to
uponuses queue such that jobs are dequeued in the same order they were enqueued, even through the internal Async callbacks (upon) may be run in different timing order.1 2 3 4 5 6Queue.enqueue t.jobs (fun () -> upon (thunk ()) (fun x -> Ivar.fill ivar x)); (* == it's this call that enforces the deterministic execution order*) upon (after t.delay) (fun () -> let job = Queue.dequeue_exn t.jobs in job ());
| |
Try sticking to the higher level of abstraction for the concurrency primitives instead of the lower level ones (ivars, upon) unless absolutely necessary for the thing we’re trying to express
Understanding
bindin terms ofIvarsanduponThis part is just to help us visualise the abstraction barrier part.
when we write
let d' = Deferred.bind d ~f:new ivar
iis created that will hold the final result of the computation, the deferred that is associated to this ivar is what is returneda function is registered to be called when
dis determinedthat function when invoked, calls function
fwith the value determined fordanother function is registered to be called when the deferred returned by
fis determined (like step 2)calling that function fills
i, causing the corresponding deferred to be determined
1 2 3 4 5 6 7 8 9(* ==== a custom demo implementation *) let my_bind d ~f = let i = Ivar.create () in upon d (fun x -> upon (f x) (fun y -> Ivar.fill i y)); Ivar.read i;; (* val my_bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred.t = <fun> *)
Example: An Echo Server
Objective: program that accepts connections from clients and spits back whatever is sent to it.
Initial helpers
Handling the client connection
We need a function that can copy data from input to output \(\implies\) this function handles the logic for handling a client connection.
bindmarks the places that we wait:- first, when we call
Reader.readto read a block of input - then when it’s complete, if a new block was returned, then we write that block to the writer
- finally we wait until writer buffers are flushed, then we recurse.
This is implemented with pushback which helps with memory-discipline:
if the process can’t make progress writing, it will stop reading.
this pattern is necessary so that the program DOES NOT allocate unbounded amounts of memory and keeps track of all the data that it intends to write but hasn’t been able to write yet.
1 2 3 4 5 6 7 8 9 10 11 12open Core open Async (** Copy data from the reader to the writer, using the provided buffer as scratch space *) let rec copy_blocks buffer r w = match%bind Reader.read r buffer with | `Eof -> return () (* -- termination condition*) | `Ok bytes_read -> Writer.write w (Bytes.to_string buffer) ~len:bytes_read; let%bind () = Writer.flushed w in (* -- this is what allows the pushback to happen: no further reading until the current buffer is done writing.*) copy_blocks buffer r w- first, when we call
Setting up a server to receive client connections
We use Async’s
Tcpmodule and its collection of utilities for creating TCP clients and servers.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15(** Starts a TCP server, which listens on the specified port, invoking copy_blocks every time a client connects. *) let run () = let host_and_port = Tcp.Server.create ~on_handler_error:`Raise (Tcp.Where_to_listen.of_port 8765) (* -- next arg is the client connection handler, the most important argument to the server create function*) (fun _addr r w -> let buffer = Bytes.create (16 * 1024) in copy_blocks buffer r w) in ignore (host_and_port : (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t)Calling
Tcp.Server.creategives us aTcp.Server.t, a handle to the server that can be used to shut the server down.we aren’t using those args here, that’s why
ignoreis used to supress the unused variables errorsout of the args for server creation, the last one is the client connection handler and it’s the most important one.
there’s no need for an explicit closing down of the lcient connections because the server will automatically shut down the connection once the deferred returned by the handler is determined.
Starting the
AsyncschedulerA common newbie error is to forget to manually start the scheduler.
1 2 3 4(* Call [run], and then start the scheduler *) let () = run (); never_returns (Scheduler.go ())without the scheduler, the program does NOTHING at all, even
printfcalls won’t reach the terminal so it can be bewildering.though we don’t really add special code for handling multiple clients, the server can already do the concurrently connecting, reading and writing of data.
Tail-Calls and Chain of Deferreds
The OCaml compiler is smart and essentially does a form of tail-call optimisation lifted to the Deferred monad.
One potential worry we might have had is that the chain of deferred calls (and the length of the chain) may result in memory allocation problems since this possibly could take up and unbounded amount of memory as the echo process continues.
The compiler will automatically optimise this whole chain of deferreds and identify when the final deferred is determined (in the copy_blocks example, it’s when we get Eof) and just flatten it into a single deferred \(\implies\) there’s no memory leak at all.
- intuition: the
bindthat creates the deferred is in tail-position and nothing is done to that deferred once it’s created; it’s simply returned as is \(\implies\) this looks like a good candidate for tail call optimisation.
Functions that Never Return
When starting the scheduler, we wrapped Scheduler.go around a never_returns.
this is an explicit marker – this makes it clear to the invoker of
Scheduler.gothat the function never returns.by default the return type is inferred as
'a. If a function never returns, we’re free to impute any type at all to its non-existent return value so to allow that function (which never returns) to fit into any context within our program – we do this. We’ve seen this before in this book when learning about exceptions.never_returnsis an alias ofNothing.t, which is what the return type ofScheduler.gois defined as.Nothing.tis uninhabited, meaning there’s no values of that type, so a function can’t actually return a value of typeNothing.ti.e. a function that never returns. To do this, we just need to add type annotation:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26let rec loop_forever () : Nothing.t = loop_forever ();; (* val loop_forever : unit -> never_returns = <fun> *) (* === helpful error if we omit the never_returns: *) (* let do_stuff n = let x = 3 in if n > 0 then Scheduler.go (); x + n;; Line 3, characters 19-34: Error: This expression has type never_returns but an expression was expected of type unit because it is in the result of a conditional with no else branch *) (* ==== actually using the never_returns properly *) let do_stuff n = let x = 3 in if n > 0 then never_returns (Scheduler.go ()); x + n;; (* val do_stuff : int -> int = <fun> *)We had seen uninhabited types earlier before in a different context and use-case back when learning about narrowing without using GADTs
Improving the Echo Server
Improvements list:
- add a proper CLI with
Command - add a flag to specify the port to listen on and a flag to make the server echo back the capitalised version of input
- simplify the code using Asyn’s
Pipeinterface
| |
NOTEs:
Deferred.neverin therunfunction shows that the echo server doesn’t ever shut down.Deferred.neverreturns a deferred that is never determined.- Use of Aync’s
Pipeis the key change in the code.Pipeis an async comms channels used for connecting different parts of the programvisualise this as a consumer/producer queue that uses deferred for communicating when the pipe is ready to be read from or written into
we’re using one of the many utility functions within the
Pipemodule:Pipe.transferto set up a process that takes data from a reader-pipe and moves it to a writer-pipe (type:- : 'a Pipe.Reader.t -> 'b Pipe.Writer.t -> f:('a -> 'b) -> unit Deferred.t = <fun>)Reader.pipeandWriter.pipeare the two pipes that are connected here.
this still preserves the pushback: if the writer gets blocked, the writer’s pipe will stop pulling data from the reader’s pipe and this prevents the reader from reading in more data.
the deferred by
Pipe.transferbecomes determined once the reader has been closed and the last element is transferred from the reader to the writer. So the server shuts down that client connection once that deferred becomes determined.
Using Pipes
they are created in connected read/write pairs:
1 2 3let (r,w) = Pipe.create ();; (* val r : '_weak3 Pipe.Reader.t = <abstr> *) (* val w : '_weak3 Pipe.Writer.t = <abstr> *)- the
randware weakly polymorphic
- the
Pipes have some internal slack: a number of slits to which something can be written before the write will block.
By default the slack is 0 – so the deferred returned by a write is determined only when the value is read out of the pipe.
1 2 3 4 5 6 7 8 9let (r,w) = Pipe.create ();; (* val r : '_weak4 Pipe.Reader.t = <abstr> *) (* val w : '_weak4 Pipe.Writer.t = <abstr> *) let write_complete = Pipe.write w "Hello World!";; (* val write_complete : unit Deferred.t = <abstr> *) Pipe.read r;; (* -- so reading from the pie here will determine the deferred for the write to the pipe:*) (* - : [ `Eof | `Ok of string ] = `Ok "Hello World!" *) write_complete;; (* - : unit = () *)
Example: Searching Definitions with DuckDuckGo
Setup
Install the following libs:
- textwrap
- uri
- yojason
- cohttp
URI Handling
We’d need to do some UI generation to be able to query the DDG servers:
| |
Parsing JSON Strings
HTTP response from DDG is in JSON, so we’d need to parse it.
| |
Executing an HTTP Client Query
For the query dispatching:
| |
Understanding Cohttp_async.Client.get
required arg: URI
returns a deferred value containing a
Cohttp.Response.tand a Pipe Reader through which theCohttp_async.Body.tof the request will be streamedbecause the body isn’t that large, we just read the entire body directly using
Body.to_string(instead of chunking it)
Dispatching multiple searches in parallel
We use Deferred.all for our dispatcher. It preserves the order of the deferred that are passed to it.
Printing also only happens until all the results arrive.
| |
Finally, a CLI interface for this :
| |
Exception Handling
We should expect flakiness, especially when concurrently using external resources.