Skip to main content
  1. Readings/
  2. Books/
  3. Fluent Python: Clear, Concise, and Effective Programming – Luciano Ramalho/

Chapter 20. Concurrent Executors

··4002 words·19 mins
  • concurrent.futures.Executor classes that encapsulate the pattern of “spawning a bunch of independent threads and collecting the results in a queue,” described by Michele Simionato.

    can be used with threads as well as processes

  • introduces futures, similar to JS promises. futures are the low level objects here

  • this chapter is more demo, less theoretical

What’s New in This Chapter #

Concurrent Web Downloads #

  • the concurrent scripts are about 5x faster

  • typically when well done, concurrent scripts can outpace sequential ones by a factor of 20x or more

  • TRICK: I didn’t know that the HTTPX library is more modern and the go-to vs requests lib. HTTPX gives both async and sync functions but requests will only give sync versions.

  • for server-side, servers that may be hit by many clients, there is a difference between what concurrency primitive we use (threading vs coroutines):

    coroutines scale better because they use much less memory than threads, and also reduce the cost of context switching

A Sequential Download Script #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/env python3

"""Download flags of top 20 countries by population

Sequential version

Sample runs (first with new domain, so no caching ever)::

    $ ./flags.py
    BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
    20 downloads in 26.21s
    $ ./flags.py
    BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
    20 downloads in 14.57s


"""

# tag::FLAGS_PY[]
import time
from pathlib import Path
from typing import Callable

import httpx  # <1> non stdlib import, conventionally comes after stdlib imports

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()  # <2>

BASE_URL = 'https://www.fluentpython.com/data/flags'  # <3>
DEST_DIR = Path('downloaded')                         # <4>

def save_flag(img: bytes, filename: str) -> None:     # <5> saving bytes to file
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:  # <6> downloads the thing, returns byte contents of the response
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1,       # <7> good to have timeouts if we are making blocking calls like in this demo
                     follow_redirects=True)  # <8>
    resp.raise_for_status()  # <9> prevents silent failures because of non 2XX responses
    return resp.content

def download_many(cc_list: list[str]) -> int:  # <10> sequential version, to be compared across the other examples
    for cc in sorted(cc_list):                 # <11> to observe that the order will be preserved
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)         # <12> the flush is to flush the print buffer
    return len(cc_list)

def main(downloader: Callable[[list[str]], int]) -> None:  # <13> allows the downloader to be injectable, to be used for the other examples
    DEST_DIR.mkdir(exist_ok=True)                          # <14> create dir if necessary
    t0 = time.perf_counter()                               # <15>
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
    main(download_many)     # <16>
# end::FLAGS_PY[]

Downloading with concurrent.futures #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python3

"""Download flags of top 20 countries by population

ThreadPoolExecutor version

Sample run::

    $ python3 flags_threadpool.py
    DE FR BD CN EG RU IN TR VN ID JP BR NG MX PK ET PH CD US IR
    20 downloads in 0.35s

"""

# tag::FLAGS_THREADPOOL[]
from concurrent import futures

from flags import save_flag, get_flag, main  # <1> reusing things

def download_one(cc: str):  # <2> single downloader, this is what each worker will execute
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executor:         # <3> ThreadPoolExecutor is the context manager here, exit method will be blocking until all threads are done
        res = executor.map(download_one, sorted(cc_list))  # <4> map is similar in style as map builtin, returns a generator that we have to iterate to get the value from each function call

    return len(list(res))                                  # <5>

if __name__ == '__main__':
    main(download_many)  # <6>
# end::FLAGS_THREADPOOL[]
  • The context manager is ThreadPoolExecutor, the executor.__exit__ method will call executor.shutdown(wait=True) and this is blocking until all the threads are done.

  • executor.map() similar to map builtin,

    the function is called concurrently from multiple threads

    it returns a generator that we need to iterate to retrieve the value returned by each function call

    any exceptions from a particular call will also be within this.

  • concurrent.futures makes it easy for us to add concurrent execution atop legacy sequential code

  • Other useful args to ThreadPoolExecutor:

    • max_workers

      the default is max_workers = min(32, os.cpu_count() + 4) the extra ones are for I/O-BOUND tasks

      Also it will try to reuse idle workers instead of using new workers. (lmao meeting rooms II leetcode question be like)

Where Are the Futures? #

  • purpose: an instance of either Future class represents a deferred computation that may or may not have completed.

    like Promise in JS

    both async frameworks give us futures: concurrent.futures.Future and asyncio.Future

  • allows us to put them in queues and check if they’re done

    HOWEVER, it is the job of the concurrency framework to handle futures, WE DON’T create them directly. This is because a future represents something that will eventually run, so it must be scheduled to run and that’s the role of the framework

    e.g. Executor.submit(<callable>) does the scheduling and returns a Future

  • Who can change the state of a future?

    Only the concurrency framework, never the application code.

    We are NOT in control of the state of a future.

  • push/pull method to determine completion:

    pull: Future.done() where the applogic keeps polling

    push: Future.add_done_callback() to register a callback that will be invoked when the future is done. NOTE: the callback callable will run in the same worker thread or process that ran the function wrapped in the future.

  • futures have a result()

    • when done, it works the same for both libs

    • when not done, it works differently for the two libs:

      =concurrency.futures.Future: calling f.result() will block the caller’s thread until the result is ready (we can pass a timeout to avoid infinite blocking)

  • demo:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
      #!/usr/bin/env python3
    
      """Download flags of top 20 countries by population
    
      ThreadPoolExecutor example with ``as_completed``.
      """
      from concurrent import futures
    
      from flags import main
      from flags_threadpool import download_one
    
    
      # tag::FLAGS_THREADPOOL_AS_COMPLETED[]
      def download_many(cc_list: list[str]) -> int:
          cc_list = cc_list[:5]  # <1> smaller sample
          with futures.ThreadPoolExecutor(max_workers=3) as executor:  # <2> attempt to see pending futures in the output
              to_do: list[futures.Future] = []
              for cc in sorted(cc_list):  # <3>
                  future = executor.submit(download_one, cc)  # <4> schedules the  callable to be executed, returns a future representing this pending operation
                  to_do.append(future)  # <5> just storing it for inspection
                  print(f'Scheduled for {cc}: {future}')  # <6> we'll see something like this: Scheduled for BR: <Future at 0x100791518 state=running>
    
              for count, future in enumerate(futures.as_completed(to_do), 1):  # <7> yields futures as they are completed
                  res: str = future.result()  # <8> retrieving the result
                  print(f'{future} result: {res!r}')  # <9> will look something like this: IN <Future at 0x101807080 state=finished returned str> result: 'IN'
    
          return count
      # end::FLAGS_THREADPOOL_AS_COMPLETED[]
    
      if __name__ == '__main__':
          main(download_many)
    
    In this example, because we’re getting the futures from as_completed, when we call future.result(), it will never be blocking.

Launching Processes with concurrent.futures #

  • Both ProcessPoolExecutor and ThreadPoolExecutor implement the Executor interface

    this allows us to switch from thread-based to process-based concurrency using concurrent.futures

  • so we can use process-based primitives just like we can use thread-based primitives, we just have to call a different pool executor

  • main usecase for process-based is for CPU-intensive jobs

    Using process-based allows us to go around the GIL and use multiple CPU cores to simplify

    Remember processes use more memory and take longer to start than threads

    Main usecase for thread-based is I/O intensive applications.

Multicore Prime Checker Redux #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python3

"""
proc_pool.py: a version of the proc.py example from chapter 20,
but using `concurrent.futures.ProcessPoolExecutor`.
"""

# tag::PRIMES_POOL[]
import sys
from concurrent import futures  # <1> no need to import the lower level abstractions (multiprocessing, SimpleQueue)
from time import perf_counter
from typing import NamedTuple

from primes import is_prime, NUMBERS

class PrimeResult(NamedTuple):  # <2> we don't need the queues or the worker function anymore
    n: int
    flag: bool
    elapsed: float

def check(n: int) -> PrimeResult:
    t0 = perf_counter()
    res = is_prime(n)
    return PrimeResult(n, res, perf_counter() - t0)

def main() -> None:
    if len(sys.argv) < 2:
        workers = None      # <3> setting to None allows the ProcessPoolExecutor decide for us
    else:
        workers = int(sys.argv[1])

    executor = futures.ProcessPoolExecutor(workers)  # <4> built executor so that we have access the acutal workers selected
    actual_workers = executor._max_workers  # type: ignore  # <5>

    print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')

    t0 = perf_counter()

    numbers = sorted(NUMBERS, reverse=True)  # <6>
    with executor:  # <7> we use the executor as the context manager
        for n, prime, elapsed in executor.map(check, numbers):  # <8> returns PrimeResult instances that =check= returns in the same order as the numbers argument
            label = 'P' if prime else ' '
            print(f'{n:16}  {label} {elapsed:9.6f}s')

    time = perf_counter() - t0
    print(f'Total time: {time:.2f}s')

if __name__ == '__main__':
    main()
# end::PRIMES_POOL[]
  • the use of executor.map() will block until all child processes are done. It preserves the order in which they were spawned.

    blocking overall but not individually, that’s why the rest return almost instantly

Experimenting with Executor.map #

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
Experiment with ``ThreadPoolExecutor.map``
"""
# tag::EXECUTOR_MAP[]
from time import sleep, strftime
from concurrent import futures

def display(*args):  # <1> just echoes back with timestamp
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

def loiter(n):  # <2>
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10  # <3>

def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)  # <4> 3 threads
    results = executor.map(loiter, range(5))  # <5> the first 3 tasks will start immediately (nonblocking)
    display('results:', results)  # <6>
    display('Waiting for individual results:')
    for i, result in enumerate(results):  # <7>
        display(f'result {i}: {result}')

if __name__ == '__main__':
    main()
# end::EXECUTOR_MAP[]
  • the display will be seen to get updated incrementally.

  • enumerate call in the for loop will implicitly invoke next(results), which in turn will invoke _f.result() on the (internal) _f future representing the first call, loiter(0)

    the _f.result() will block unti the future is done

  • Executor.map() will block until all the jobs are done.

    Alternatively, to make it more JIT, we can use Executor.submit and futures.as_completed

    TRICK : This is more flexible than executor.map because you can submit different callables and arguments, while executor.map is designed to run the same callable on the different arguments.

  • TRICK: we can pass futures to futures.as_completed such that the futures come from different pool executors (including different type of pool executors)

Downloads with Progress Display and Error Handling #

  • common functions just a reference on the support code
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    
      """Utilities for second set of flag examples.
      """
    
      import argparse
      import string
      import sys
      import time
      from collections import Counter
      from enum import Enum
      from pathlib import Path
    
      DownloadStatus = Enum('DownloadStatus', 'OK NOT_FOUND ERROR')
    
      POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
                  'MX PH VN ET EG DE IR TR CD FR').split()
    
      DEFAULT_CONCUR_REQ = 1
      MAX_CONCUR_REQ = 1
    
      SERVERS = {
          'REMOTE': 'https://www.fluentpython.com/data/flags',
          'LOCAL':  'http://localhost:8000/flags',
          'DELAY':  'http://localhost:8001/flags',
          'ERROR':  'http://localhost:8002/flags',
      }
      DEFAULT_SERVER = 'LOCAL'
    
      DEST_DIR = Path('downloaded')
      COUNTRY_CODES_FILE = Path('country_codes.txt')
    
    
      def save_flag(img: bytes, filename: str) -> None:
          (DEST_DIR / filename).write_bytes(img)
    
    
      def initial_report(cc_list: list[str],
                         actual_req: int,
                         server_label: str) -> None:
          if len(cc_list) <= 10:
              cc_msg = ', '.join(cc_list)
          else:
              cc_msg = f'from {cc_list[0]} to {cc_list[-1]}'
          print(f'{server_label} site: {SERVERS[server_label]}')
          plural = 's' if len(cc_list) != 1 else ''
          print(f'Searching for {len(cc_list)} flag{plural}: {cc_msg}')
          if actual_req == 1:
              print('1 connection will be used.')
          else:
              print(f'{actual_req} concurrent connections will be used.')
    
    
      def final_report(cc_list: list[str],
                       counter: Counter[DownloadStatus],
                       start_time: float) -> None:
          elapsed = time.perf_counter() - start_time
          print('-' * 20)
          plural = 's' if counter[DownloadStatus.OK] != 1 else ''
          print(f'{counter[DownloadStatus.OK]:3} flag{plural} downloaded.')
          if counter[DownloadStatus.NOT_FOUND]:
              print(f'{counter[DownloadStatus.NOT_FOUND]:3} not found.')
          if counter[DownloadStatus.ERROR]:
              plural = 's' if counter[DownloadStatus.ERROR] != 1 else ''
              print(f'{counter[DownloadStatus.ERROR]:3} error{plural}.')
          print(f'Elapsed time: {elapsed:.2f}s')
    
    
      def expand_cc_args(every_cc: bool,
                         all_cc: bool,
                         cc_args: list[str],
                         limit: int) -> list[str]:
          codes: set[str] = set()
          A_Z = string.ascii_uppercase
          if every_cc:
              codes.update(a+b for a in A_Z for b in A_Z)
          elif all_cc:
              text = COUNTRY_CODES_FILE.read_text()
              codes.update(text.split())
          else:
              for cc in (c.upper() for c in cc_args):
                  if len(cc) == 1 and cc in A_Z:
                      codes.update(cc + c for c in A_Z)
                  elif len(cc) == 2 and all(c in A_Z for c in cc):
                      codes.add(cc)
                  else:
                      raise ValueError('*** Usage error: each CC argument '
                                       'must be A to Z or AA to ZZ.')
          return sorted(codes)[:limit]
    
    
      def process_args(default_concur_req):
          server_options = ', '.join(sorted(SERVERS))
          parser = argparse.ArgumentParser(
              description='Download flags for country codes. '
                          'Default: top 20 countries by population.')
          parser.add_argument(
              'cc', metavar='CC', nargs='*',
              help='country code or 1st letter (eg. B for BA...BZ)')
          parser.add_argument(
              '-a', '--all', action='store_true',
              help='get all available flags (AD to ZW)')
          parser.add_argument(
              '-e', '--every', action='store_true',
              help='get flags for every possible code (AA...ZZ)')
          parser.add_argument(
              '-l', '--limit', metavar='N', type=int, help='limit to N first codes',
              default=sys.maxsize)
          parser.add_argument(
              '-m', '--max_req', metavar='CONCURRENT', type=int,
              default=default_concur_req,
              help=f'maximum concurrent requests (default={default_concur_req})')
          parser.add_argument(
              '-s', '--server', metavar='LABEL', default=DEFAULT_SERVER,
              help=f'Server to hit; one of {server_options} '
                   f'(default={DEFAULT_SERVER})')
          parser.add_argument(
              '-v', '--verbose', action='store_true',
              help='output detailed progress info')
          args = parser.parse_args()
          if args.max_req < 1:
              print('*** Usage error: --max_req CONCURRENT must be >= 1')
              parser.print_usage()
              # "standard" exit status codes:
              # https://stackoverflow.com/questions/1101957/are-there-any-standard-exit-status-codes-in-linux/40484670#40484670
              sys.exit(2)  # command line usage error
          if args.limit < 1:
              print('*** Usage error: --limit N must be >= 1')
              parser.print_usage()
              sys.exit(2)  # command line usage error
          args.server = args.server.upper()
          if args.server not in SERVERS:
              print(f'*** Usage error: --server LABEL '
                    f'must be one of {server_options}')
              parser.print_usage()
              sys.exit(2)  # command line usage error
          try:
              cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
          except ValueError as exc:
              print(exc.args[0])
              parser.print_usage()
              sys.exit(2)  # command line usage error
    
          if not cc_list:
              cc_list = sorted(POP20_CC)[:args.limit]
          return args, cc_list
    
    
      def main(download_many, default_concur_req, max_concur_req):
          args, cc_list = process_args(default_concur_req)
          actual_req = min(args.max_req, max_concur_req, len(cc_list))
          initial_report(cc_list, actual_req, args.server)
          base_url = SERVERS[args.server]
          DEST_DIR.mkdir(exist_ok=True)
          t0 = time.perf_counter()
          counter = download_many(cc_list, base_url, args.verbose, actual_req)
          final_report(cc_list, counter, t0)
    

Error Handling in the flags2 Examples #

  • sequential version Uses a sequential HTTPX client
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    
      #!/usr/bin/env python3
    
      """Download flags of countries (with error handling).
    
      Sequential version
    
      Sample run::
    
          $ python3 flags2_sequential.py -s DELAY b
          DELAY site: http://localhost:8002/flags
          Searching for 26 flags: from BA to BZ
          1 concurrent connection will be used.
          --------------------
          17 flags downloaded.
          9 not found.
          Elapsed time: 13.36s
    
      """
    
      # tag::FLAGS2_BASIC_HTTP_FUNCTIONS[]
      from collections import Counter
      from http import HTTPStatus
    
      import httpx
      import tqdm  # type: ignore  # <1>
    
      from flags2_common import main, save_flag, DownloadStatus  # <2> get the commons
    
      DEFAULT_CONCUR_REQ = 1
      MAX_CONCUR_REQ = 1
    
      def get_flag(base_url: str, cc: str) -> bytes:
          url = f'{base_url}/{cc}/{cc}.gif'.lower()
          resp = httpx.get(url, timeout=3.1, follow_redirects=True)
          resp.raise_for_status()  # <3> raises if HTTP status code not in range(200, 300)
          return resp.content
    
      def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:
          try:
              image = get_flag(base_url, cc)
          except httpx.HTTPStatusError as exc:  # <4> handles the 404 errors specifically
              res = exc.response
              if res.status_code == HTTPStatus.NOT_FOUND:
                  status = DownloadStatus.NOT_FOUND  # <5> replaces it with an internal download status
                  msg = f'not found: {res.url}'
              else:
                  raise  # <6> re-propagate any other errors other than 404
          else:
              save_flag(image, f'{cc}.gif')
              status = DownloadStatus.OK
              msg = 'OK'
    
          if verbose:  # <7> verbosity flag
              print(cc, msg)
    
          return status
      # end::FLAGS2_BASIC_HTTP_FUNCTIONS[]
    
      # tag::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]
      def download_many(cc_list: list[str],
                        base_url: str,
                        verbose: bool,
                        _unused_concur_req: int) -> Counter[DownloadStatus]:
          counter: Counter[DownloadStatus] = Counter()  # <1> to tally the download outcomes
          cc_iter = sorted(cc_list)  # <2>
          if not verbose:
              cc_iter = tqdm.tqdm(cc_iter)  # <3> tqdm returns an iterator yielding the items in cc_iter and also animating the progress bar
          for cc in cc_iter:
              try:
                  status = download_one(cc, base_url, verbose)  # <4> successive calls to the singular function
              except httpx.HTTPStatusError as exc:  # <5> the non 404 errors handled here
                  error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                  error_msg = error_msg.format(resp=exc.response)
              except httpx.RequestError as exc:  # <6>
                  error_msg = f'{exc} {type(exc)}'.strip()
              except KeyboardInterrupt:  # <7> manging keyboard interrupts
                  break
              else:  # <8> clear the error msg if there's no error that came down
                  error_msg = ''
    
              if error_msg:
                  status = DownloadStatus.ERROR  # <9> local status check based on the internal enum
              counter[status] += 1           # <10>
              if verbose and error_msg:      # <11>
                  print(f'{cc} error: {error_msg}')
    
          return counter  # <12>
      # end::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]
    
      if __name__ == '__main__':
          main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
    

Using futures.as_completed #

  • threadpool

    Uses concurrent HTTP client based on futures.ThreadPoolExecutor to show error handling

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    
      #!/usr/bin/env python3
    
      """Download flags of countries (with error handling).
    
      ThreadPool version
    
      Sample run::
    
          $ python3 flags2_threadpool.py -s ERROR -e
          ERROR site: http://localhost:8003/flags
          Searching for 676 flags: from AA to ZZ
          30 concurrent connections will be used.
          --------------------
          150 flags downloaded.
          361 not found.
          165 errors.
          Elapsed time: 7.46s
    
      """
    
      # tag::FLAGS2_THREADPOOL[]
      from collections import Counter
      from concurrent.futures import ThreadPoolExecutor, as_completed
    
      import httpx
      import tqdm  # type: ignore
    
      from flags2_common import main, DownloadStatus
      from flags2_sequential import download_one  # <1>
    
      DEFAULT_CONCUR_REQ = 30  # <2> defaults for max num of concurrent requests, size of threadpool
      MAX_CONCUR_REQ = 1000  # <3> max num concurrent reqs
    
    
      def download_many(cc_list: list[str],
                        base_url: str,
                        verbose: bool,
                        concur_req: int) -> Counter[DownloadStatus]:
          counter: Counter[DownloadStatus] = Counter()
          with ThreadPoolExecutor(max_workers=concur_req) as executor:  # <4>
              to_do_map = {}  # <5> maps each Future instance (representing one download) with the cc for error reporting
              for cc in sorted(cc_list):  # <6> response order is more based on timing of the HTTP responses more  so than anything
                  future = executor.submit(download_one, cc,
                                           base_url, verbose)  # <7> each submission does the scheduling and returns a Future
                  to_do_map[future] = cc  # <8>  Future instances are hashable
              done_iter = as_completed(to_do_map)  # <9> returns an iterator that yields futures as each task is done
              if not verbose:
                  done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  # <10> wrap the iterator within the progress bar
              for future in done_iter:  # <11> iterates on futures as they are completed
                  try:
                      status = future.result()  # <12> this could have been blocking but NOT in this case because it's handled by the as_completed()
                  except httpx.HTTPStatusError as exc:  # <13> error handling
                      error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                      error_msg = error_msg.format(resp=exc.response)
                  except httpx.RequestError as exc:
                      error_msg = f'{exc} {type(exc)}'.strip()
                  except KeyboardInterrupt:
                      break
                  else:
                      error_msg = ''
    
                  if error_msg:
                      status = DownloadStatus.ERROR
                  counter[status] += 1
                  if verbose and error_msg:
                      cc = to_do_map[future]  # <14>
                      print(f'{cc} error: {error_msg}')
    
          return counter
    
    
      if __name__ == '__main__':
         main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
      # end::FLAGS2_THREADPOOL[]
    
    • NOTE: Future instances are hashable, that’s why we can use it as keys within a dictionary

    • IDIOM: use a map to store futures to do follow up processing

      building a dict to map each future to other data that may be useful when the future is completed.

  • asyncio

    Concurrent HTTPX client

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    
      #!/usr/bin/env python3
    
      """Download flags of countries (with error handling).
    
      asyncio async/await version
    
      """
      # tag::FLAGS2_ASYNCIO_TOP[]
      import asyncio
      from collections import Counter
      from http import HTTPStatus
      from pathlib import Path
    
      import httpx
      import tqdm  # type: ignore
    
      from flags2_common import main, DownloadStatus, save_flag
    
      # low concurrency default to avoid errors from remote site,
      # such as 503 - Service Temporarily Unavailable
      DEFAULT_CONCUR_REQ = 5
      MAX_CONCUR_REQ = 1000
    
      async def get_flag(client: httpx.AsyncClient,  # <1>
                         base_url: str,
                         cc: str) -> bytes:
          url = f'{base_url}/{cc}/{cc}.gif'.lower()
          resp = await client.get(url, timeout=3.1, follow_redirects=True)   # <2>
          resp.raise_for_status()
          return resp.content
    
      async def download_one(client: httpx.AsyncClient,
                             cc: str,
                             base_url: str,
                             semaphore: asyncio.Semaphore,
                             verbose: bool) -> DownloadStatus:
          try:
              async with semaphore:  # <3>
                  image = await get_flag(client, base_url, cc)
          except httpx.HTTPStatusError as exc:  # <4>
              res = exc.response
              if res.status_code == HTTPStatus.NOT_FOUND:
                  status = DownloadStatus.NOT_FOUND
                  msg = f'not found: {res.url}'
              else:
                  raise
          else:
              await asyncio.to_thread(save_flag, image, f'{cc}.gif')  # <5>
              status = DownloadStatus.OK
              msg = 'OK'
          if verbose and msg:
              print(cc, msg)
          return status
      # end::FLAGS2_ASYNCIO_TOP[]
    
      # tag::FLAGS2_ASYNCIO_START[]
      async def supervisor(cc_list: list[str],
                           base_url: str,
                           verbose: bool,
                           concur_req: int) -> Counter[DownloadStatus]:  # <1>
          counter: Counter[DownloadStatus] = Counter()
          semaphore = asyncio.Semaphore(concur_req)  # <2>
          async with httpx.AsyncClient() as client:
              to_do = [download_one(client, cc, base_url, semaphore, verbose)
                       for cc in sorted(cc_list)]  # <3>
              to_do_iter = asyncio.as_completed(to_do)  # <4>
              if not verbose:
                  to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # <5>
              error: httpx.HTTPError | None = None  # <6>
              for coro in to_do_iter:  # <7>
                  try:
                      status = await coro  # <8>
                  except httpx.HTTPStatusError as exc:
                      error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                      error_msg = error_msg.format(resp=exc.response)
                      error = exc  # <9>
                  except httpx.RequestError as exc:
                      error_msg = f'{exc} {type(exc)}'.strip()
                      error = exc  # <10>
                  except KeyboardInterrupt:
                      break
                  else:
                      error = None
    
                  if error:
                      status = DownloadStatus.ERROR  # <11>
                      if verbose:
                          url = str(error.request.url)  # <12>
                          cc = Path(url).stem.upper()   # <13>
                          print(f'{cc} error: {error_msg}')
                  counter[status] += 1
    
          return counter
    
      def download_many(cc_list: list[str],
                        base_url: str,
                        verbose: bool,
                        concur_req: int) -> Counter[DownloadStatus]:
          coro = supervisor(cc_list, base_url, verbose, concur_req)
          counts = asyncio.run(coro)  # <14>
    
          return counts
    
      if __name__ == '__main__':
          main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
      # end::FLAGS2_ASYNCIO_START[]
    

Chapter Summary #

Further Reading #