Skip to main content
  1. Readings/
  2. Books/
  3. Fluent Python: Clear, Concise, and Effective Programming – Luciano Ramalho/

Chapter 21. Asynchronous Programming

··7554 words·36 mins
  • async constructs

  • Objects supporting async constructs

    • includes other constructs enables by the async/await keywords: async generator functions, async comprehensions, async genexps

      these aren’t tied to asyncio!

  • async libraries like asyncio

What’s New in This Chapter #

A Few Definitions #

  • native coroutines

    only defined using async def

    delegation from coroutine to coroutine only done using await, not necessary that it MUST delegate

  • classic coroutines

    actually a generator function that consumes data (data that is sent to it via my_coro.send(data) calls)

    can delegate to other classic coroutines using yield from. Ref “Meaning of yield from”

    no longer supported by asyncio and doesn’t support await keyword

  • generator-based coroutines (decorated using @types.coroutine)

    a decorated generator function (@types.coroutine), which makes the generator compatible with await keyword

    this is NOT supportd by asyncio, but used in low-level code in other frameworks like Curio and Trio

  • async generator (function)

    generator function defined with async def that uses yield in its body

    returns an async generator object that provides __anext__, which is a coroutine method to retrieve the next item.

An asyncio Example: Probing Domains #

  • async operations are interleaved \(\implies\) the total time is practically the same as the time for the single slowest DNS response, instead of the sum of the times of all responses.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4  # <1>


async def probe(domain: str) -> tuple[str, bool]:  # <2> returns tuple of domain name and bool
    loop = asyncio.get_running_loop()  # <3> have a ref to the =asyncio= event loop, so that we can use it
    try:
        await loop.getaddrinfo(domain, None)  # <4> await on coroutine that returns a 5-part tuple of parameters. We don't directly use that here because if it resolves means it's a DNS resolution
    except socket.gaierror: # get addr info err
        return (domain, False)
    return (domain, True)


async def main() -> None:  # <5> has to be a coroutine so that we can use =await= within it
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)  # <6> gen
    domains = (f'{name}.dev'.lower() for name in names)  # <7> gen
    coros = [probe(domain) for domain in domains]  # <8> build list of coros
    for coro in asyncio.as_completed(coros):  # <9> generator that yields coroutines that return the results of the coros passed to it in the order they are completed (not order of submission), similar to =futures.as_completed=
        domain, found = await coro  # <10> this await is non-blocking because it's guarded by the as_completed above
        mark = '+' if found else ' '
        print(f'{mark} {domain}')


if __name__ == '__main__':
    asyncio.run(main())  # <11> starts the event loop until the loop exits
  • loop.getaddrinfo() is the async version of socket.getaddrinfo()

    this returns a 5-part tuples of params to connect to the given address using a socket

  • asyncio.get_running_loop is designed to be used from within coroutines.

    If no running event loops, then it raises a RuntimeError. The event loop should have already been started prior to execution reaching there.

  • for coro in asyncio.as_completed(coros):

    the asyncio.as_completed(coros) generator that yields coroutines that return the results of the coros passed to it in the order they are completed (not order of submission), similar to futures.as_completed

  • the await coro is non-blocking because it’s guarded by the as_completed above

    if coro raises an exception, then it gets re-raised here

  • event loop:

    • started using asyncio.run()

    • IDIOM: for scripts, the common pattern is to make the main function a coroutine as well. The main coroutine is driven with asyncio.run()

Guido’s Trick to Read Asynchronous Code #

  • squint and pretend the async and await keywords are not there. If you do that, you’ll realize that coroutines read like plain old sequential functions.

New Concept: Awaitable #

  • await expression:

    • uses the yield from implementation with an extra step of validating its argument

    • only accepts an awaitable

  • for \(\rightarrow\) iterables, await \(\rightarrow\) awaitables

  • from asyncio, we typically work with these awaitables:

    • a native coroutine object that we get by calling a native coroutine function e.g. coro() where coro is the coroutine function

    • asyncio.Task that we get when we create a task from a coroutine object to asyncio.create_task()

      • remember that the coro_obj = coro(), so the overall call is usually asyncio.creat_task(one_coro()), note the invocation of the native coroutine function

      • Whether to keep a handle to the task or not depends on whether we need to use it (e.g. to cancel the task or wait for it)

    • lower-level awaitables: (something we might encouter if we work with lower level abstractions)

      • an obj with __await__ method that returns an iterator (e.g. asyncio.Future, by the way, asyncio.Task <: asyncio.Future)

      • objs written in other langs that use the Python/C API with a tp_as_async.am_wait function, returning an iterator (similar to __await__ method)

      • soon to be deprecated: generator-based-coroutine objects

Downloading with asyncio and HTTPX #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python3

"""Download flags of top 20 countries by population

asyncio + aiottp version

Sample run::

    $ python3 flags_asyncio.py
    EG VN IN TR RU ID US DE CN MX JP BD NG ET FR BR PH PK CD IR
    20 flags downloaded in 1.07s
"""
# tag::FLAGS_ASYNCIO_TOP[]
import asyncio

from httpx import AsyncClient  # <1> have to install httpx

from flags import BASE_URL, save_flag, main  # <2>

async def download_one(client: AsyncClient, cc: str):  # <3> has to be a native coro so that we can await on get_flag
    image = await get_flag(client, cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

async def get_flag(client: AsyncClient, cc: str) -> bytes:  # <4> needs the client to make the http request
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=6.1,
                                  follow_redirects=True)  # <5> get method also returns a ClientResponse that is an async context manager, the network I/O is drive async via the =asyncio= event loop
    return resp.read()  # <6> the body is just lazily fetched from the response object. This fully consumes the response body into memory.
# end::FLAGS_ASYNCIO_TOP[]

# tag::FLAGS_ASYNCIO_START[]
def download_many(cc_list: list[str]) -> int:    # <1> has t obe a plain function and not a coroutine (to be called by main function from the other modules)
    return asyncio.run(supervisor(cc_list))      # <2> event loop is executed, which drives the supervisor(cc_list) coroutine obj until that coroutine returns. Remember the coroutine fn was supervisor, which , when called, returns a coroutine object. Also this is a blocking call that blocks the event loop, until the coroutine returns.

async def supervisor(cc_list: list[str]) -> int:
    async with AsyncClient() as client:          # <3> an async context manager (AsyncClient) is being used here
        to_do = [download_one(client, cc)
                 for cc in sorted(cc_list)]      # <4> build list of coros here
        res = await asyncio.gather(*to_do)       # <5> pass the awaitables so that they can be gathered after completion, so that we get a list of results. Gathers in the order of submission of the coros.

    return len(res)                              # <6> supervisor returns length of list

if __name__ == '__main__':
    main(download_many)
# end::FLAGS_ASYNCIO_START[]
  • asyncio directly supports TCP and UDP, without relying on external packages

  • res = await asyncio.gather(*to_do):

    Here, we pass the awaitables so that they can be gathered after completion, so that we get a list of results. Gathers in the order of submission of the coros.

  • AsyncClient is the async context manager that is used here. It’s a context manager that has async setup and teardown functions KIV

    In this snippet of the get_flags coroutine:

    1
    2
    3
    4
    5
    
      async def get_flag(client: AsyncClient, cc: str) -> bytes:  # <4> needs the client to make the http request
          url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
          resp = await client.get(url, timeout=6.1,
                                        follow_redirects=True)  # <5> get method also returns a ClientResponse that is an async context manager, the network I/O is drive async via the =asyncio= event loop
          return resp.read()  # <6> the body is just lazily fetched from the response object. This fully consumes the response body into memory.
    
    • Implicit delegation of coroutines via async context managers:

      get method of an httpx.AsyncClient instance returns a ClientResponse object that is also an asynchronous context manager.

      this is an awaitable that returns a Response

      by the way, Response can also be used as a context manager when streaming! If it was, then resp.read() would have been an an I/O operation that may yield to the event loop again if it’s attempting to drain the response body stream from the socket

  • the await yields control flow to the event loop while the network I/O happens (DNS resolution, TCP connect, handshake, waiting for response headers). During that suspension, other tasks can run.

    so by the end of point 5, resp is a proper Response object and not a coroutine. The connection is ready.

  • LANG_LIMITATION: However, asyncio does not provide an asynchronous filesystem API at this time like Node.js does.

    there’s OS-level support for it (io_uring on Linux), but nothing that supports this for python’s stdlib/asyncio

The Secret of Native Coroutines: Humble Generators #

  • classic vs native coroutines: the native ones don’t rely on a visible .send() call or yield expressions

  • mechanistic model for async programs and how they drive async libraries:

    inline image

    Here, we see how in an async program:

    • a user’s function starts the event loop, scheduling an initial coroutine with asyncio.run

    • Each user’s coroutine drives the next with an await expression, which is when the control flow is yielded to the next coroutine

      this forms a channel that enables communication between a library like HTTPX and the event loop.

      await chain eventually reaches a low-level awaitable, which returns a generator that the event loop can drive in response to events such as timers or network I/O. The low-level awaitables and generators at the end of these await chains are implemented deep into the libraries, are not part of their APIs, and may be Python/C extensions.

  • await borrows most of its implementation from yield from (classic coroutines), which also makes .send calls to drive coroutines.

  • functions like asyncio.gather and asyncio.create_task, you can start multiple concurrent await channels, enabling concurrent execution of multiple I/O operations driven by a single event loop, in a single thread.

The All-or-Nothing Problem #

  • had to replace I/O functions with their async versions so that they could be activated with await or asyncio.create_task

  • if no choice, have to delegate to separate thread/proc

    If you can’t rewrite a blocking function as a coroutine, you should run it in a separate thread or process

Asynchronous Context Managers via async with #

  • asynchronous context managers: objects implementing the __aenter__ and __aexit__ methods as coroutines.

Enhancing the asyncio Downloader #

  • caution: asyncio vs threading asyncio can send requests faster, so more likely to get suspected of ddos by the HTTP server.

Using asyncio.as_completed and a Thread #

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
#!/usr/bin/env python3

"""Download flags of countries (with error handling).

asyncio async/await version

"""
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path

import httpx
import tqdm  # type: ignore

from flags2_common import main, DownloadStatus, save_flag

# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

async def get_flag(client: httpx.AsyncClient,  # <1> similar to the sequential version, just that here it requires a client param
                   base_url: str,
                   cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)   # <2> we await the coroutine from client.get()
    resp.raise_for_status()
    return resp.content

async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore:  # <3> semaphore as an asynchronous context manager so that the program as a whole is not blocked; only this coroutine is suspended when the semaphore counter is zero.
            image = await get_flag(client, base_url, cc)
    except httpx.HTTPStatusError as exc:  # <4> familiar error handling logic
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        await asyncio.to_thread(save_flag, image, f'{cc}.gif')  # <5> FileSystem I/O, don't let it block us by running it in a thread
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status
# end::FLAGS2_ASYNCIO_TOP[]

# tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int) -> Counter[DownloadStatus]:  # <1> since it's a coroutine, it can't be invoked directly from main.
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req)  # <2> creates the semaphore to be shared across the coros we will have
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)]  # <3> list of coro objs, one per call to download_one coro fn
        to_do_iter = asyncio.as_completed(to_do)  # <4> get an iter, receives in the order of completion, allows the iter to be wrapped by tqdm
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # <5> wrap iter w tqdm
        error: httpx.HTTPError | None = None  # <6> init error
        for coro in to_do_iter:  # <7> iter over completed coro objs
            try:
                status = await coro  # <8> this is a nonblocking await because implicitly guarded by the =as_completed=
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc  # <9> to preserve the exc
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc  # <10> preserve the exc
            except KeyboardInterrupt:
                break
            else:
                error = None

            if error:
                status = DownloadStatus.ERROR  # <11> user internal error enum
                if verbose:
                    url = str(error.request.url)  # <12>
                    cc = Path(url).stem.upper()   # <13>
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1

    return counter

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro)  # <14> drives the event loop, passes coro to event loop and returns when the event loop ends.

    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]
  • the asyncio.semaphore is being used as an asynchronous context manager so that the program as a whole is not blocked; only this coroutine is suspended when the semaphore counter is zero.
  • notice how we delegate the File I/O in point 5 to a threadpool provided by asyncio using asyncio.to_thread, we just await it and yield the control flow to allow other threads to carry on

Throttling Requests with a Semaphore #

  • throwback to OS mods in school, semaphore numbered “mutex” \(\implies\) more flexibilty than just a binary mutex lock.

  • we can share the semaphore between multiple coroutines with a configured max number in order to throttle our Network I/O

  • why? because we should avoid spamming a server with too many concurrent requests \(\implies\) we need to throttle the Network I/O

  • previously, we did the throttling in a coarse manner by setting the max_workers for the download_many in the demo code

Python’s Semaphores #

  • all the 3 different concurrency structures (threading, multiprocessing, asyncio) have their own semaphore classes

  • initial value set @ point of creating the semaphore, semaphore is passed to every coroutine that needs to rely on it to synchronize semaphore = asyncio.Semaphore(concur_req)

  • semaphore decrements when we await on .acquire() coroutine, increments when we call release() method (non blocking, not a coroutine)

  • if not ready (count = 0), =.acquire() suspends the awaiting coroutine until some other coroutine calls .release() on the same Semaphore, thus incrementing the counter.

  • asyncio.Semaphore used as an async context manager:

    • instead of using semaphore.acquire() and semaphore.release() directly, we can rely on the async context manager to acquire (Semaphore.__aenter__ coroutine method await for .acquire()) and release the semaphore (Semaphore.__aexit__ calls .release())

    • this guarantees that no more than concur_req instances of get_flags coroutines will be active at any time

Making Multiple Requests for Each Download #

  • our objective now is to make 2 callbacks per country. In a sequential pattern, it would have been to just call one after the other. The async version isn’t directly the same.

    We can drive the asynchronous requests one after the other, sharing the local scope of the driving coroutine.

  • here’s the v3 using asyncio

    some changes:

    1. new coroutine get_country is a new coroutine for the .json fetch

    2. download_one we now use await to delegate to get_flag and the new get_country

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3

"""Download flags of countries (with error handling).

asyncio async/await version

"""
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path

import httpx
import tqdm  # type: ignore

from flags2_common import main, DownloadStatus, save_flag

# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

async def get_flag(client: httpx.AsyncClient,  # <1>
                   base_url: str,
                   cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)   # <2>
    resp.raise_for_status()
    return resp.content

# tag::FLAGS3_ASYNCIO_GET_COUNTRY[]
async def get_country(client: httpx.AsyncClient,
                      base_url: str,
                      cc: str) -> str:    # <1> returns string with country name
    url = f'{base_url}/{cc}/metadata.json'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()
    metadata = resp.json()  # <2> is a python dict
    return metadata['country']  # <3> erturns the country name
# end::FLAGS3_ASYNCIO_GET_COUNTRY[]

# tag::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore:  # <1> hold the semaphore to =await= (delegate) to =get_flag=
            image = await get_flag(client, base_url, cc)
        async with semaphore:  # <2> hold the semaphore again to delegate to the next step
            country = await get_country(client, base_url, cc)
    except httpx.HTTPStatusError as exc:
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        filename = country.replace(' ', '_')  # <3>
        await asyncio.to_thread(save_flag, image, f'{filename}.gif')
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status
# end::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]

# tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int) -> Counter[DownloadStatus]:  # <1>
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req)  # <2>
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)]  # <3>
        to_do_iter = asyncio.as_completed(to_do)  # <4>
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # <5>
        error: httpx.HTTPError | None = None  # <6>
        for coro in to_do_iter:  # <7>
            try:
                status = await coro  # <8>
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc  # <9>
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc  # <10>
            except KeyboardInterrupt:
                break

            if error:
                status = DownloadStatus.ERROR  # <11>
                if verbose:
                    url = str(error.request.url)  # <12>
                    cc = Path(url).stem.upper()   # <13>
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1

    return counter

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro)  # <14>

    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]
  • NOTE: point 1 & 2 in download_one: it’s good practice to hold semaphores and locks for the shortest possible time.
  • One challenge is to know when you have to use await and when you can’t use it.

    The answer in principle is easy: you await coroutines and other awaitables, such as asyncio.Task instances.

    Reality is that the APIs can be confusingly named e.g. StreamWriter

Delegating Tasks to Executors #

  • problem: unlike NodeJS where ALL I/O has async APIs, python doesn’t have async APIs for all I/O. Notably, File I/O is NOT async.

    This means that in our async code, file I/O can severly bottleneck performance if the main thread is blocked.

  • delegating to an executor is a good idea then

    • we can use asyncio.to_thread e.g. await asyncio.to_thread(save_flag, image, f'{cc}.gif')

      under the hood, it uses loop.run_in_executor, so the equivalent to the above statement would be:

          loop = asyncio.get_running_loop() # gets a reference to the event loop
          loop.run_in_executor(None, save_flag, image, f'{cc}.gif')
          # 1st Arg: Executor to use. None => default => ThreadPoolExecutor (always available in asyncio event loop)
      

      when using run_in_executor, the 1st Arg is the Executor to use. None \(\implies\) default \(\implies\) ThreadPoolExecutor (always available in asyncio event loop)

      CAUTION: this accepts positional args, have to use functool.partial if we wish to use kwargs. Or just use the newer asyncio.to_thread which will accept kwargs.

    • IDIOM: this is a common pattern in async APIs:

      wrap blocking calls that are implementation details in coroutines using run_in_executor internally. That way, you provide a consistent interface of coroutines to be driven with await, and hide the threads you need to use for pragmatic reasons.

    • loop.run_in_executor’s explicit Executor allows us to use process-based approach for CPU-intensive tasks so that it’s a different python process and we avoid the GIL contention.

    • TRICK / IDIOM: prime the ProcessPoolExecutor in the supervisor and then pass it to the coroutines that need it to reduce the effect of the high startup costs

  • WARNING / LANG_LIMITATION: Coroutines that use executors give the pretense of cancellation because the underlying thread/proc has no cancellation mechanism.

    Using run_in_executor can produce hard-to-debug problems since cancellation doesn’t work the way one might expect. Coroutines that use executors give merely the pretense of cancellation: the underlying thread (if it’s a ThreadPoolExecutor) has no cancellation mechanism.

    For example, a long-lived thread that is created inside a run_in_executor call may prevent your asyncio program from shutting down cleanly:

    asyncio.run will wait for the executor to fully shut down before returning, and it will wait forever if the executor jobs don’t stop somehow on their own.

    My greybeard inclination is to want that function to be namedrun_in_executor_uncancellable.

Writing asyncio Servers #

A FastAPI Web Service #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pathlib import Path
from unicodedata import name

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel

from charindex import InvertedIndex

STATIC_PATH = Path(__file__).parent.absolute() / 'static'  # <1> beautiful overloading of =/= for pathlib

app = FastAPI(  # <2> defines the ASGI app, params are for autogen docs
    title='Mojifinder Web',
    description='Search for Unicode characters by name.',
)

class CharName(BaseModel):  # <3> pydantic schema for runtime type checking
    char: str
    name: str

def init(app):  # <4> attach to app state for later use
    app.state.index = InvertedIndex()
    app.state.form = (STATIC_PATH / 'form.html').read_text()

init(app)  # <5>

@app.get('/search', response_model=list[CharName])  # <6> search endpoint, response_model uses the CharName pydantic model to describe the response format
async def search(q: str):  # <7> non-path params within the coro signature
    chars = sorted(app.state.index.search(q))
    return ({'char': c, 'name': name(c)} for c in chars)  # <8> an iterable of dicts compatible with response_model schema => FastAPI can build the json response accoding to the response model that we supplied in the @app.get decorator

@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form():  # <9> can use regular functions to handle endpoints as well, not just coros
    return app.state.form

# no main funcion  # <10>
  • endpoint handlers can be coros or plain functions like we see here.

  • there’s no main function, it’s loaded and driven by the ASGI server (uvicorn).

  • we don’t have return type hints here because we allow the pydantic schema to do the job

    this is like schema casting when defining changesets in elixir

    model is declared in this parameter instead of as a function return type annotation, because the path function may not actually return that response model but rather return a dict, database object or some other model, and then use the response_model to perform the field limiting and serialization.

    response_model in FastAPI + Pydantic plays the role of both serialization and field-whitelisting — taking arbitrary Python objects/dicts and producing clean, predictable outputs according to the model definition

by the way the inverted index was implemened like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python

"""
Class ``InvertedIndex`` builds an inverted index mapping each word to
the set of Unicode characters which contain that word in their names.

Optional arguments to the constructor are ``first`` and ``last+1``
character codes to index, to make testing easier. In the examples
below, only the ASCII range was indexed.

The `entries` attribute is a `defaultdict` with uppercased single
words as keys::

    >>> idx = InvertedIndex(32, 128)
    >>> idx.entries['DOLLAR']
    {'$'}
    >>> sorted(idx.entries['SIGN'])
    ['#', '$', '%', '+', '<', '=', '>']
    >>> idx.entries['A'] & idx.entries['SMALL']
    {'a'}
    >>> idx.entries['BRILLIG']
    set()

The `.search()` method takes a string, uppercases it, splits it into
words, and returns the intersection of the entries for each word::

    >>> idx.search('capital a')
    {'A'}

"""

import sys
import unicodedata
from collections import defaultdict
from collections.abc import Iterator

STOP_CODE: int = sys.maxunicode + 1

Char = str
Index = defaultdict[str, set[Char]]


def tokenize(text: str) -> Iterator[str]:
    """return iterator of uppercased words"""
    for word in text.upper().replace('-', ' ').split():
        yield word


class InvertedIndex:
    entries: Index

    def __init__(self, start: int = 32, stop: int = STOP_CODE):
        entries: Index = defaultdict(set)
        for char in (chr(i) for i in range(start, stop)):
            name = unicodedata.name(char, '')
            if name:
                for word in tokenize(name):
                    entries[word].add(char)
        self.entries = entries

    def search(self, query: str) -> set[Char]:
        if words := list(tokenize(query)):
            found = self.entries[words[0]]
            return found.intersection(*(self.entries[w] for w in words[1:]))
        else:
            return set()


def format_results(chars: set[Char]) -> Iterator[str]:
    for char in sorted(chars):
        name = unicodedata.name(char)
        code = ord(char)
        yield f'U+{code:04X}\t{char}\t{name}'


def main(words: list[str]) -> None:
    if not words:
        print('Please give one or more words to search.')
        sys.exit(2)  # command line usage error
    index = InvertedIndex()
    chars = index.search(' '.join(words))
    for line in format_results(chars):
        print(line)
    print('─' * 66, f'{len(chars)} found')


if __name__ == '__main__':
    main(sys.argv[1:])

An asyncio TCP Server (no deps, just asyncio streams) #

  • this demo is one where we use plain TCP to comms with a telnet/netcat client and using asyncio directly without any external dependencies!
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python3

# tag::TCP_MOJIFINDER_TOP[]
import asyncio
import functools
import sys
from asyncio.trsock import TransportSocket
from typing import cast

from charindex import InvertedIndex, format_results  # <1> formatting useful for TUI via CLI telnet session

CRLF = b'\r\n'
PROMPT = b'?> '

async def finder(index: InvertedIndex,          # <2> server expects a coro / function that only takes in teh reader and writer args. That's why we need to wrap it up in a partial
                 reader: asyncio.StreamReader,
                 writer: asyncio.StreamWriter) -> None:
    client = writer.get_extra_info('peername')  # <3> remote client addr
    while True:  # <4> handles a dialog until we get a control char (see break stmt below)
        writer.write(PROMPT)  # can't await!  # <5> this is not a CORO, just a plain function
        await writer.drain()  # must await!  # <6> flushes writer buffer, it's a coro that's why needs to be driven with =await=
        data = await reader.readline()  # <7> coro that returns bytes
        if not data:  # <8> no bytes => client closed the connection ==> break the loop
            break
        try:
            query = data.decode().strip()  # <9> byte to string decoding
        except UnicodeDecodeError:  # <10> replace with null char for simplicity (e.g. when keyboard interrupt then we get control bytes that can't be decoded into str)
            query = '\x00'
        print(f' From {client}: {query!r}')  # <11> log stmt
        if query:
            if ord(query[:1]) < 32:  # <12> kill loop if control or nullchar
                break
            results = await search(query, index, writer)  # <13> delegate to searching coro
            print(f'   To {client}: {results} results.')  # <14>

    writer.close()  # <15> close the writer steram
    await writer.wait_closed()  # <16> wait for closing of stream
    print(f'Close {client}.')  # <17>log
# end::TCP_MOJIFINDER_TOP[]

# tag::TCP_MOJIFINDER_SEARCH[]
async def search(query: str,  # <1> has to be a coro because we have to write to a StreamWriter and use its =.drain()= coro method
                 index: InvertedIndex,
                 writer: asyncio.StreamWriter) -> int:
    chars = index.search(query)  # <2> query inverted index
    lines = (line.encode() + CRLF for line  # <3> genexp gives char, name and CRLF
                in format_results(chars))
    writer.writelines(lines)  # <4> SURPRISE! this is NOT a coro
    await writer.drain()      # <5> SURPRISE! this is a coro
    status_line = f'{"─" * 66} {len(chars)} found'  # <6> status line to be written
    writer.write(status_line.encode() + CRLF)
    await writer.drain()
    return len(chars)
# end::TCP_MOJIFINDER_SEARCH[]

# tag::TCP_MOJIFINDER_MAIN[]
async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
    server = await asyncio.start_server(    # <1> gets an instance of the server, creates and starts it so that it's ready to receive conns
        functools.partial(finder, index),   # <2> =client_connected_cb=, a cb that is either a fn/coro needs to be supplied a stream reader and stream writer
        host, port)                         # <3>

    socket_list = cast(tuple[TransportSocket, ...], server.sockets)  # <4> because typeshed type is outdated
    addr = socket_list[0].getsockname()
    print(f'Serving on {addr}. Hit CTRL-C to stop.')  # <5>
    await server.serve_forever()  # <6> suspends the supervisor. without this supervisor returns immediately

def main(host: str = '127.0.0.1', port_arg: str = '2323'):
    port = int(port_arg)
    print('Building index.')
    index = InvertedIndex()                         # <7> index gets built
    try:
        asyncio.run(supervisor(index, host, port))  # <8> starts the event loop that will drive the supervisor coro
    except KeyboardInterrupt:                       # <9> catch CTRL-C
        print('\nServer shut down.')

if __name__ == '__main__':
    main(*sys.argv[1:])
# end::TCP_MOJIFINDER_MAIN[]
  • IDIOM @ finder point number 2;

    Use functools.partial to bind that parameter and obtain a callable that takes the reader and writer. Adapting user functions to callback APIs is the most common use case for functools.partial

  • how multiple clients can be served at once:

    While the event loop is alive, a new instance of the finder coroutine will be started for each client that connects to the server.

  • how the keyboard interrupt works

    the interrupt signal will cause the raising of KeyboardInterrupt exception from within the supervisor::server.serve_forever.

    event loop dies also.

    This propagates out into the main function that had been driving the event loop.

  • GOTCHA: StreamWriter.write is not a coro, StreamWriter.drain is a coro

    some of the I/O methods are coroutines and must be driven with await, while others are simple functions. For example, StreamWriter.write is a plain function, because it writes to a buffer. On the other hand, StreamWriter.drain — which flushes the buffer and performs the network I/O — is a coroutine, as is StreamReader.readline —but not StreamWriter.writelines!

Asynchronous Iteration and Asynchronous Iterables and using async for #

  • async with \(\implies\) works with Async Context Managers

    async for \(\implies\) asynchronous iterables:

    • __aiter__ that returns an async iterator BUT __aiter__ is NOT as coro method, it’s a regular method

    async iterator provides __anext__ coro method that returns an awaitable, usually a coro object. Just like the sync counterparts, expected to implement __aiter__ which trivially returns self

  • Remember same point about NOT mixing iterables and iterators

  • example: aiopg async postgres driver :

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
      async def go():
              pool = await aiopg.create_pool(dsn)
    
              async with pool.acquire() as conn:
                      async with conn.cursor() as cur: # the cursor is the async iterator here
                              await cur.execute("SELECT 1")
                              ret = []
                              async for row in cur: # important to NOT block the event loop while cursor may be waiting for additional rows
                                      ret.append(row)
                              assert ret == [(1,)]
    
    • By implementing the cursor as an asynchronous iterator, aiopg may yield to the event loop at each __anext__ call, and resume later when more rows arrive from PostgreSQL.

Asynchronous Generator Functions #

Implementing and Using an async generator #

  • Implementing an Async Iterator

    • class-implementation for async iterator: implement a class with __anext__ and __aiter__

    • simpler way to implement an async iterator: as a generator function that is async \(\implies\) async generator

      write a function declared with async def and use yield in its body. This parallels how generator functions simplify the classic Iterator pattern.

  • Usage of async generators:

    • Async generators can be used with async for \(\Leftarrow\) driven by async for:

      • as a block statement
      • as async comprehensions
    • We can’t use typical for loops because async generators implement __aiter__ and NOT __iter__

  • Demo example

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
      import asyncio
      import socket
      from collections.abc import Iterable, AsyncIterator
      from typing import NamedTuple, Optional
    
    
      class Result(NamedTuple):  # <1> convenience: easier to read and debug
          domain: str
          found: bool
    
    
      OptionalLoop = Optional[asyncio.AbstractEventLoop]  # <2> typealias to clean up the hinting below
    
    
      async def probe(domain: str, loop: OptionalLoop = None) -> Result:  # <3>
          if loop is None: # no current event loop handle in scope
              loop = asyncio.get_running_loop()
          try:
              await loop.getaddrinfo(domain, None)
          except socket.gaierror:
              return Result(domain, False)
          return Result(domain, True)
    
    
      async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]:  # <4> Async Generator function returns an async generator object, that's why it's typed like that
          loop = asyncio.get_running_loop()
          coros = [probe(domain, loop) for domain in domains]  # <5> list of proble coros
          for coro in asyncio.as_completed(coros):  # <6> this is a classic generator, that's why we can drive it using =for= and not =async for=
              result = await coro  # <7> guarded by the =as_completed= not to worry that it will be actually blocking.
              yield result  # <8> this is what makes multiproble an async generator
    
  • The result is yielded by multi_probe, which is what makes multi_probe an async generator

    Shortcut to the for loop:

    1
    2
    
        for coro in asyncio.as_completed(coros):
                yield await coro
    
  • TRICK: The .invalid top-level domain is reserved for testing.

    see elaboration here:

        Yes, the statement is **true**.
    
        The **`.invalid` top-level domain (TLD) is reserved specifically for testing and use in examples or scenarios where a guaranteed invalid domain is needed**. It is defined as a special-use domain name by the Internet Engineering Task Force (IETF) in [RFC 2606 (1999)](https://www.rfc-editor.org/rfc/rfc2606.html) and officially reserved by the Internet Assigned Numbers Authority (IANA).
    
        ### Key points on `.invalid` TLD reservation:
    
        - The `.invalid` TLD **cannot appear in the global DNS root zone** to avoid conflicts with existing or future valid TLDs.
    ​    - It is intended to be used in tests, documentation, or example scenarios where domain names must be constructed clearly as invalid or guaranteed to not resolve.
    ​    - Alongside `.invalid`, other reserved TLDs for similar "safe" use are `.test`, `.example`, and `.localhost`.
    ​    - Using `.invalid` in software or test settings helps catch or demonstrate domain resolution failures without accidentally affecting real domains.
    ​    - Because of this reservation, any use of `.invalid` as a real domain name should not expect it to resolve on the public internet.
    
        ### Supporting authoritative references:
    
        - **RFC 2606 (Reserved Top Level DNS Names)** states:
          > "`.invalid` is intended for use in online construction of domain names that are sure to be invalid and which it is obvious at a glance are invalid."
    ​    - [Wikipedia: .invalid](https://en.wikipedia.org/wiki/.invalid) also confirms this reservation by IETF for such use.
    ​    - IANA maintains `.invalid` as one of the reserved special-use domain names unlikely to ever be delegated.
    
        ### Summary table
    
        | Domain         | Purpose                             | Delegated in global DNS? | Use Case                                         |
        |----------------|-----------------------------------|-------------------------|-------------------------------------------------|
        | `.invalid`     | Reserved for invalid/test domains | No                      | Testing, documentation, avoiding domain clashes |
        | `.test`        | Reserved for testing               | No                      | Test environments                               |
        | `.example`     | Reserved for examples              | No                      | Documentation and examples                       |
        | `.localhost`   | Reserved for loopback services     | No                      | Localhost network reference                      |
    
        ***
    
        In conclusion, your quoted **TRICK** that `.invalid` is a top-level domain reserved for testing is **correct and reflects Internet standards**.
    
        If you want, I can provide more background on reserved TLDs, best practices for using them in networking or development, or how they differ from other special-use or reserved names. Just let me know!
    
        [1] https://en.wikipedia.org/wiki/.invalid
        [2] https://skynethosting.net/blog/what-is-invalid-tlds/
        [3] https://datatracker.ietf.org/doc/rfc2606/
        [4] https://www.rfc-editor.org/rfc/rfc2606.html
        [5] https://domaintyper.com/invalid-domain
        [6] https://stackoverflow.com/questions/4128351/is-there-a-valid-domain-name-guaranteed-to-be-unreachable
        [7] https://circleid.com/posts/20090618_most_popular_invalid_tlds_should_be_reserved
        [8] https://news.ycombinator.com/item?id=15268822
        [9] https://en.wikipedia.org/wiki/Top-level_domain
    
  • Using the async generator:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
        #!/usr/bin/env python3
        import asyncio
        import sys
        from keyword import kwlist
    
        from domainlib import multi_probe
    
    
        async def main(tld: str) -> None:
            tld = tld.strip('.')
            names = (kw for kw in kwlist if len(kw) <= 4)  # <1>
            domains = (f'{name}.{tld}'.lower() for name in names)  # <2>
            print('FOUND\t\tNOT FOUND')  # <3>
            print('=====\t\t=========')
            async for domain, found in multi_probe(domains):  # <4> async iterate over the async generator
                indent = '' if found else '\t\t'  # <5>
                print(f'{indent}{domain}')
    
    
        if __name__ == '__main__':
            if len(sys.argv) == 2:
                asyncio.run(main(sys.argv[1]))  # <6>
            else:
                print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')
    

Async generators as context managers #

  • Generators (sync and async versions) have one extra use unrelated to iteration: they can be made into context managers.

  • We can use the @asynccontextmanager decorator within the contextlib module

    Similar to its sync counterpart @contextmanager

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
      from contextlib import asynccontextmanager
    
      @asynccontextmanager
      async def web_page(url): # the function to be decorated has to be an async generator
              loop = asyncio.get_running_loop()
              data = await loop.run_in_executor(
                      None, download_webpage, url) # we run in a separate thread in case this is a blocking function; keeps out event loop unblocked
    
              yield data # this makes it an async generator
    
              await loop.run_in_executor(None, update_stats, url)
    
      async with web_page('google.com') as data:
              process(data)
    
    • Outcome

      similar to the sync version, all lines before the yield become the entry code, __aenter__ coro method of the async context manager that is built by the decorator. So, when control flow comes back to this, the value of data will be bound to the data target variable that is associated with the context manager below.

      All lines after yield become the __aexit__ coro method. Another possibly blocking call is delegated to the thread executor.

Asynchronous generators versus native coroutines #

  • Similarities

    • async def for both
  • Differences

    • async generator has a yield in its body but not a native coroutine

    • async generator can ONLY have empty return statements BUT a naive coro may return a value other than None

    • Async generators are NOT awaitable, they are iterables so are driven by async for or async comprehensions

      meanwwhile, native coros are awaitable. Therefore:

      • can be driven by await expressions

      • can be passed to asyncio functions that consume awaitables (e.g. create_task)

Async Comprehensions and Async Generator Expressions #

Async generator expressions #

Here’s how we can define and use one:

1
2
3
4
gen_found = (name async for name, found in multi_probe(names) if found) # the async genexpr builds the async generator (async iterator) obj

async for  name in gen_found: # driven by the async for
    print(name)
  • an asynchronous generator expression can be defined anywhere in your program, but it can only be consumed inside a native coroutine or asynchronous generator function.

Async comprehensions #

  • we can have the usual kind of comprehensions done async! just need to make sure that it’s within an async context i.e. within an async def or within an async REPL console.

  • async listcomps: result = [i async for i in aiter() if i % 2] which is actually similar to asyncio.gather() just a little less flexible. gather function allows us to do better exception handling.

  • async dictcomps: {name: found async for name, found in multi_probe(names)}

  • async setcomps: {name for name in names if (await probe(name)).found}

    the extra parentheses is because __getattr__ operator, . has operator precedence there

async Beyond asyncio: Curio #

  • async/await constructs are library agnostic

  • curio blogdom demo example:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
      #!/usr/bin/env python3
      from curio import run, TaskGroup
      import curio.socket as socket
      from keyword import kwlist
    
      MAX_KEYWORD_LEN = 4
    
    
      async def probe(domain: str) -> tuple[str, bool]:  # <1> no need to receive event loop
          try:
              await socket.getaddrinfo(domain, None)  # <2> getaddrinfo is top-level fn of the curio.socket, it's not a method of a loop object like it is in asyncio
          except socket.gaierror:
              return (domain, False)
          return (domain, True)
    
      async def main() -> None:
          names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
          domains = (f'{name}.dev'.lower() for name in names)
          async with TaskGroup() as group:  # <3> core concept in curio monitors and controls a group of tasks (coros)
              for domain in domains:
                  await group.spawn(probe, domain)  # <4> we spawn to start a coro, managed by a particular TaskGroup instance. Coro is wrapped by a Task within the TaskGroup
              async for task in group:  # <5> yields as it's completed, like =as_completed=
                  domain, found = task.result
                  mark = '+' if found else ' '
                  print(f'{mark} {domain}')
    
      if __name__ == '__main__':
          run(main())  # <6> sensible syntax
    
  • TaskGroup

    • Curio TaskGroup is an asynchronous context manager that replaces several ad hoc APIs and coding patterns in asyncio.

    • above we saw how we can just drive the group and we get things in the order of completion, analogous to asyncio.as_completed

    • we can also gather them all easily:

    1
    2
    3
    
        async with TaskGroup(wait=all) as g:
                await g.spawn(coro1)
                await g.spawn(coro2)
    
  • TaskGroup as a support for structured concurrency:

    • adds a constraint to concurrent programming:

      a group of async tasks should have a single entry and single exit point.

      as an asynchronous context manager, a TaskGroup ensures that all tasks spawned inside are completed or cancelled, and any exceptions raised, upon exiting the enclosed block.

    • just like how structured programming advised against the use of GOTO statements

  • seems like asyncio has some partial support for structured concurrency since 3.11, e.g. with TaskGroups…

  • Curio also provides a UniversalQueue that can be used to coordinate the work among threads, Curio coroutines, and asyncio coroutines.

Type Hinting Asynchronous Objects #

  • the return type of native coroutine == the type of result it spits out when you await on it

  • different from annotations for classic coroutines, where it’s the 3-paramed Generator type

  • 3 points about typing:

    • all the async objects are all covariant on the first type parameter, which is the type of the items yielded from these objects. Aligns with the “producer” / output types being covariant.

    • AsyncGenerator and Coroutine are contravariant on the second to last parameter. That’s because they are output types and output types are contravariant.

    • AsyncGenerator has no return type

      when we saw typing.Generator, we realised how we could return values by hacking the StopIteration(value) and that’s how generator-enhanced classic coroutines were hacked out, which is why we could make generators operate as classic coroutines and support yield from

      No such thing for AsyncGenerator

      AsyncGenerator objects don’t return values, and are completely separate from native coroutine objects, which are annotated with typing.Coroutine

How Async Works and How It Doesn’t #

Running Circles Around Blocking Calls #

  • IO is god damn slow, if we async in a disciplined manner then our servers would be high-performance

The Myth of I/O-Bound Systems #

  • there are “I/O bound functions” but no “I/O bound systems”

  • any nontrivial system will have CPU-bound functions, dealing with them is the key to success in async programming

Avoiding CPU-Bound Traps #

  • should have performance regression tests
  • important with async code, but also relevant to threaded Python code because of the GIL
  • we should not OBSERVE slowdown (by that time it’s too late) because the direct performance hit bad patterns are less likely to be humanly observable (until it’s too late).

What to do if we see a CPU-hogging bottleneck: #

  • delegate task to a python proc pool
  • delegate task to external task queue
  • avoid GIL constraints, rewrite code in Cython, C, Rust – anything that interfaces with the Python/C API
  • choose to do nothing

Chapter Summary #

  • don’t block the event loop, delegate to different processing unit (thread, proc, task queue)

Further Reading #