- rtshkmr's digital garden/
- Readings/
- Books/
- Fluent Python: Clear, Concise, and Effective Programming – Luciano Ramalho/
- Chapter 20. Concurrent Executors/
Chapter 20. Concurrent Executors
Table of Contents
concurrent.futures.Executorclasses that encapsulate the pattern of “spawning a bunch of independent threads and collecting the results in a queue,” described by Michele Simionato.can be used with threads as well as processes
introduces
futures, similar to JS promises. futures are the low level objects herethis chapter is more demo, less theoretical
What’s New in This Chapter #
Concurrent Web Downloads #
the concurrent scripts are about 5x faster
typically when well done, concurrent scripts can outpace sequential ones by a factor of 20x or more
TRICK: I didn’t know that the
HTTPXlibrary is more modern and the go-to vsrequestslib. HTTPX gives both async and sync functions but requests will only give sync versions.for server-side, servers that may be hit by many clients, there is a difference between what concurrency primitive we use (threading vs coroutines):
coroutines scale better because they use much less memory than threads, and also reduce the cost of context switching
A Sequential Download Script #
| |
Downloading with concurrent.futures #
| |
The context manager is
ThreadPoolExecutor, theexecutor.__exit__method will callexecutor.shutdown(wait=True)and this is blocking until all the threads are done.executor.map()similar tomapbuiltin,the function is called concurrently from multiple threads
it returns a generator that we need to iterate to retrieve the value returned by each function call
any exceptions from a particular call will also be within this.
concurrent.futuresmakes it easy for us to add concurrent execution atop legacy sequential codeOther useful args to
ThreadPoolExecutor:max_workersthe default is
max_workers = min(32, os.cpu_count() + 4)the extra ones are for I/O-BOUND tasksAlso it will try to reuse idle workers instead of using new workers. (lmao meeting rooms II leetcode question be like)
Where Are the Futures? #
purpose: an instance of either Future class represents a deferred computation that may or may not have completed.
like Promise in JS
both async frameworks give us futures:
concurrent.futures.Futureandasyncio.Futureallows us to put them in queues and check if they’re done
HOWEVER, it is the job of the concurrency framework to handle futures, WE DON’T create them directly. This is because a future represents something that will eventually run, so it must be scheduled to run and that’s the role of the framework
e.g.
Executor.submit(<callable>)does the scheduling and returns aFutureWho can change the state of a future?
Only the concurrency framework, never the application code.
We are NOT in control of the state of a future.
push/pull method to determine completion:
pull:
Future.done()where the applogic keeps pollingpush:
Future.add_done_callback()to register a callback that will be invoked when the future is done. NOTE: the callback callable will run in the same worker thread or process that ran the function wrapped in the future.futures have a
result()when done, it works the same for both libs
when not done, it works differently for the two libs:
=concurrency.futures.Future: callingf.result()will block the caller’s thread until the result is ready (we can pass a timeout to avoid infinite blocking)
- demo:In this example, because we’re getting the futures from
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31#!/usr/bin/env python3 """Download flags of top 20 countries by population ThreadPoolExecutor example with ``as_completed``. """ from concurrent import futures from flags import main from flags_threadpool import download_one # tag::FLAGS_THREADPOOL_AS_COMPLETED[] def download_many(cc_list: list[str]) -> int: cc_list = cc_list[:5] # <1> smaller sample with futures.ThreadPoolExecutor(max_workers=3) as executor: # <2> attempt to see pending futures in the output to_do: list[futures.Future] = [] for cc in sorted(cc_list): # <3> future = executor.submit(download_one, cc) # <4> schedules the callable to be executed, returns a future representing this pending operation to_do.append(future) # <5> just storing it for inspection print(f'Scheduled for {cc}: {future}') # <6> we'll see something like this: Scheduled for BR: <Future at 0x100791518 state=running> for count, future in enumerate(futures.as_completed(to_do), 1): # <7> yields futures as they are completed res: str = future.result() # <8> retrieving the result print(f'{future} result: {res!r}') # <9> will look something like this: IN <Future at 0x101807080 state=finished returned str> result: 'IN' return count # end::FLAGS_THREADPOOL_AS_COMPLETED[] if __name__ == '__main__': main(download_many)as_completed, when we callfuture.result(), it will never be blocking.
Launching Processes with concurrent.futures #
Both
ProcessPoolExecutorandThreadPoolExecutorimplement theExecutorinterfacethis allows us to switch from thread-based to process-based concurrency using
concurrent.futuresso we can use process-based primitives just like we can use thread-based primitives, we just have to call a different pool executor
main usecase for process-based is for CPU-intensive jobs
Using process-based allows us to go around the GIL and use multiple CPU cores to simplify
Remember processes use more memory and take longer to start than threads
Main usecase for thread-based is I/O intensive applications.
Multicore Prime Checker Redux #
| |
the use of
executor.map()will block until all child processes are done. It preserves the order in which they were spawned.blocking overall but not individually, that’s why the rest return almost instantly
Experimenting with Executor.map #
| |
the display will be seen to get updated incrementally.
enumerate call in the for loop will implicitly invoke
next(results), which in turn will invoke_f.result()on the (internal)_ffuture representing the first call,loiter(0)the
_f.result()will block unti the future is doneExecutor.map()will block until all the jobs are done.Alternatively, to make it more JIT, we can use
Executor.submitandfutures.as_completedTRICK : This is more flexible than
executor.mapbecause you can submit different callables and arguments, while executor.map is designed to run the same callable on the different arguments.TRICK: we can pass futures to
futures.as_completedsuch that the futures come from different pool executors (including different type of pool executors)
Downloads with Progress Display and Error Handling #
- common functions
just a reference on the support code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155"""Utilities for second set of flag examples. """ import argparse import string import sys import time from collections import Counter from enum import Enum from pathlib import Path DownloadStatus = Enum('DownloadStatus', 'OK NOT_FOUND ERROR') POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() DEFAULT_CONCUR_REQ = 1 MAX_CONCUR_REQ = 1 SERVERS = { 'REMOTE': 'https://www.fluentpython.com/data/flags', 'LOCAL': 'http://localhost:8000/flags', 'DELAY': 'http://localhost:8001/flags', 'ERROR': 'http://localhost:8002/flags', } DEFAULT_SERVER = 'LOCAL' DEST_DIR = Path('downloaded') COUNTRY_CODES_FILE = Path('country_codes.txt') def save_flag(img: bytes, filename: str) -> None: (DEST_DIR / filename).write_bytes(img) def initial_report(cc_list: list[str], actual_req: int, server_label: str) -> None: if len(cc_list) <= 10: cc_msg = ', '.join(cc_list) else: cc_msg = f'from {cc_list[0]} to {cc_list[-1]}' print(f'{server_label} site: {SERVERS[server_label]}') plural = 's' if len(cc_list) != 1 else '' print(f'Searching for {len(cc_list)} flag{plural}: {cc_msg}') if actual_req == 1: print('1 connection will be used.') else: print(f'{actual_req} concurrent connections will be used.') def final_report(cc_list: list[str], counter: Counter[DownloadStatus], start_time: float) -> None: elapsed = time.perf_counter() - start_time print('-' * 20) plural = 's' if counter[DownloadStatus.OK] != 1 else '' print(f'{counter[DownloadStatus.OK]:3} flag{plural} downloaded.') if counter[DownloadStatus.NOT_FOUND]: print(f'{counter[DownloadStatus.NOT_FOUND]:3} not found.') if counter[DownloadStatus.ERROR]: plural = 's' if counter[DownloadStatus.ERROR] != 1 else '' print(f'{counter[DownloadStatus.ERROR]:3} error{plural}.') print(f'Elapsed time: {elapsed:.2f}s') def expand_cc_args(every_cc: bool, all_cc: bool, cc_args: list[str], limit: int) -> list[str]: codes: set[str] = set() A_Z = string.ascii_uppercase if every_cc: codes.update(a+b for a in A_Z for b in A_Z) elif all_cc: text = COUNTRY_CODES_FILE.read_text() codes.update(text.split()) else: for cc in (c.upper() for c in cc_args): if len(cc) == 1 and cc in A_Z: codes.update(cc + c for c in A_Z) elif len(cc) == 2 and all(c in A_Z for c in cc): codes.add(cc) else: raise ValueError('*** Usage error: each CC argument ' 'must be A to Z or AA to ZZ.') return sorted(codes)[:limit] def process_args(default_concur_req): server_options = ', '.join(sorted(SERVERS)) parser = argparse.ArgumentParser( description='Download flags for country codes. ' 'Default: top 20 countries by population.') parser.add_argument( 'cc', metavar='CC', nargs='*', help='country code or 1st letter (eg. B for BA...BZ)') parser.add_argument( '-a', '--all', action='store_true', help='get all available flags (AD to ZW)') parser.add_argument( '-e', '--every', action='store_true', help='get flags for every possible code (AA...ZZ)') parser.add_argument( '-l', '--limit', metavar='N', type=int, help='limit to N first codes', default=sys.maxsize) parser.add_argument( '-m', '--max_req', metavar='CONCURRENT', type=int, default=default_concur_req, help=f'maximum concurrent requests (default={default_concur_req})') parser.add_argument( '-s', '--server', metavar='LABEL', default=DEFAULT_SERVER, help=f'Server to hit; one of {server_options} ' f'(default={DEFAULT_SERVER})') parser.add_argument( '-v', '--verbose', action='store_true', help='output detailed progress info') args = parser.parse_args() if args.max_req < 1: print('*** Usage error: --max_req CONCURRENT must be >= 1') parser.print_usage() # "standard" exit status codes: # https://stackoverflow.com/questions/1101957/are-there-any-standard-exit-status-codes-in-linux/40484670#40484670 sys.exit(2) # command line usage error if args.limit < 1: print('*** Usage error: --limit N must be >= 1') parser.print_usage() sys.exit(2) # command line usage error args.server = args.server.upper() if args.server not in SERVERS: print(f'*** Usage error: --server LABEL ' f'must be one of {server_options}') parser.print_usage() sys.exit(2) # command line usage error try: cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit) except ValueError as exc: print(exc.args[0]) parser.print_usage() sys.exit(2) # command line usage error if not cc_list: cc_list = sorted(POP20_CC)[:args.limit] return args, cc_list def main(download_many, default_concur_req, max_concur_req): args, cc_list = process_args(default_concur_req) actual_req = min(args.max_req, max_concur_req, len(cc_list)) initial_report(cc_list, actual_req, args.server) base_url = SERVERS[args.server] DEST_DIR.mkdir(exist_ok=True) t0 = time.perf_counter() counter = download_many(cc_list, base_url, args.verbose, actual_req) final_report(cc_list, counter, t0)
Error Handling in the flags2 Examples #
- sequential version
Uses a sequential HTTPX client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91#!/usr/bin/env python3 """Download flags of countries (with error handling). Sequential version Sample run:: $ python3 flags2_sequential.py -s DELAY b DELAY site: http://localhost:8002/flags Searching for 26 flags: from BA to BZ 1 concurrent connection will be used. -------------------- 17 flags downloaded. 9 not found. Elapsed time: 13.36s """ # tag::FLAGS2_BASIC_HTTP_FUNCTIONS[] from collections import Counter from http import HTTPStatus import httpx import tqdm # type: ignore # <1> from flags2_common import main, save_flag, DownloadStatus # <2> get the commons DEFAULT_CONCUR_REQ = 1 MAX_CONCUR_REQ = 1 def get_flag(base_url: str, cc: str) -> bytes: url = f'{base_url}/{cc}/{cc}.gif'.lower() resp = httpx.get(url, timeout=3.1, follow_redirects=True) resp.raise_for_status() # <3> raises if HTTP status code not in range(200, 300) return resp.content def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus: try: image = get_flag(base_url, cc) except httpx.HTTPStatusError as exc: # <4> handles the 404 errors specifically res = exc.response if res.status_code == HTTPStatus.NOT_FOUND: status = DownloadStatus.NOT_FOUND # <5> replaces it with an internal download status msg = f'not found: {res.url}' else: raise # <6> re-propagate any other errors other than 404 else: save_flag(image, f'{cc}.gif') status = DownloadStatus.OK msg = 'OK' if verbose: # <7> verbosity flag print(cc, msg) return status # end::FLAGS2_BASIC_HTTP_FUNCTIONS[] # tag::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[] def download_many(cc_list: list[str], base_url: str, verbose: bool, _unused_concur_req: int) -> Counter[DownloadStatus]: counter: Counter[DownloadStatus] = Counter() # <1> to tally the download outcomes cc_iter = sorted(cc_list) # <2> if not verbose: cc_iter = tqdm.tqdm(cc_iter) # <3> tqdm returns an iterator yielding the items in cc_iter and also animating the progress bar for cc in cc_iter: try: status = download_one(cc, base_url, verbose) # <4> successive calls to the singular function except httpx.HTTPStatusError as exc: # <5> the non 404 errors handled here error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = error_msg.format(resp=exc.response) except httpx.RequestError as exc: # <6> error_msg = f'{exc} {type(exc)}'.strip() except KeyboardInterrupt: # <7> manging keyboard interrupts break else: # <8> clear the error msg if there's no error that came down error_msg = '' if error_msg: status = DownloadStatus.ERROR # <9> local status check based on the internal enum counter[status] += 1 # <10> if verbose and error_msg: # <11> print(f'{cc} error: {error_msg}') return counter # <12> # end::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[] if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
Using futures.as_completed #
threadpool
Uses concurrent HTTP client based on
futures.ThreadPoolExecutorto show error handling1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74#!/usr/bin/env python3 """Download flags of countries (with error handling). ThreadPool version Sample run:: $ python3 flags2_threadpool.py -s ERROR -e ERROR site: http://localhost:8003/flags Searching for 676 flags: from AA to ZZ 30 concurrent connections will be used. -------------------- 150 flags downloaded. 361 not found. 165 errors. Elapsed time: 7.46s """ # tag::FLAGS2_THREADPOOL[] from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed import httpx import tqdm # type: ignore from flags2_common import main, DownloadStatus from flags2_sequential import download_one # <1> DEFAULT_CONCUR_REQ = 30 # <2> defaults for max num of concurrent requests, size of threadpool MAX_CONCUR_REQ = 1000 # <3> max num concurrent reqs def download_many(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]: counter: Counter[DownloadStatus] = Counter() with ThreadPoolExecutor(max_workers=concur_req) as executor: # <4> to_do_map = {} # <5> maps each Future instance (representing one download) with the cc for error reporting for cc in sorted(cc_list): # <6> response order is more based on timing of the HTTP responses more so than anything future = executor.submit(download_one, cc, base_url, verbose) # <7> each submission does the scheduling and returns a Future to_do_map[future] = cc # <8> Future instances are hashable done_iter = as_completed(to_do_map) # <9> returns an iterator that yields futures as each task is done if not verbose: done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <10> wrap the iterator within the progress bar for future in done_iter: # <11> iterates on futures as they are completed try: status = future.result() # <12> this could have been blocking but NOT in this case because it's handled by the as_completed() except httpx.HTTPStatusError as exc: # <13> error handling error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = error_msg.format(resp=exc.response) except httpx.RequestError as exc: error_msg = f'{exc} {type(exc)}'.strip() except KeyboardInterrupt: break else: error_msg = '' if error_msg: status = DownloadStatus.ERROR counter[status] += 1 if verbose and error_msg: cc = to_do_map[future] # <14> print(f'{cc} error: {error_msg}') return counter if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ) # end::FLAGS2_THREADPOOL[]NOTE:
Futureinstances are hashable, that’s why we can use it as keys within a dictionaryIDIOM: use a map to store futures to do follow up processing
building a dict to map each future to other data that may be useful when the future is completed.
asyncio
Concurrent HTTPX client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106#!/usr/bin/env python3 """Download flags of countries (with error handling). asyncio async/await version """ # tag::FLAGS2_ASYNCIO_TOP[] import asyncio from collections import Counter from http import HTTPStatus from pathlib import Path import httpx import tqdm # type: ignore from flags2_common import main, DownloadStatus, save_flag # low concurrency default to avoid errors from remote site, # such as 503 - Service Temporarily Unavailable DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 async def get_flag(client: httpx.AsyncClient, # <1> base_url: str, cc: str) -> bytes: url = f'{base_url}/{cc}/{cc}.gif'.lower() resp = await client.get(url, timeout=3.1, follow_redirects=True) # <2> resp.raise_for_status() return resp.content async def download_one(client: httpx.AsyncClient, cc: str, base_url: str, semaphore: asyncio.Semaphore, verbose: bool) -> DownloadStatus: try: async with semaphore: # <3> image = await get_flag(client, base_url, cc) except httpx.HTTPStatusError as exc: # <4> res = exc.response if res.status_code == HTTPStatus.NOT_FOUND: status = DownloadStatus.NOT_FOUND msg = f'not found: {res.url}' else: raise else: await asyncio.to_thread(save_flag, image, f'{cc}.gif') # <5> status = DownloadStatus.OK msg = 'OK' if verbose and msg: print(cc, msg) return status # end::FLAGS2_ASYNCIO_TOP[] # tag::FLAGS2_ASYNCIO_START[] async def supervisor(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]: # <1> counter: Counter[DownloadStatus] = Counter() semaphore = asyncio.Semaphore(concur_req) # <2> async with httpx.AsyncClient() as client: to_do = [download_one(client, cc, base_url, semaphore, verbose) for cc in sorted(cc_list)] # <3> to_do_iter = asyncio.as_completed(to_do) # <4> if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5> error: httpx.HTTPError | None = None # <6> for coro in to_do_iter: # <7> try: status = await coro # <8> except httpx.HTTPStatusError as exc: error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = error_msg.format(resp=exc.response) error = exc # <9> except httpx.RequestError as exc: error_msg = f'{exc} {type(exc)}'.strip() error = exc # <10> except KeyboardInterrupt: break else: error = None if error: status = DownloadStatus.ERROR # <11> if verbose: url = str(error.request.url) # <12> cc = Path(url).stem.upper() # <13> print(f'{cc} error: {error_msg}') counter[status] += 1 return counter def download_many(cc_list: list[str], base_url: str, verbose: bool, concur_req: int) -> Counter[DownloadStatus]: coro = supervisor(cc_list, base_url, verbose, concur_req) counts = asyncio.run(coro) # <14> return counts if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ) # end::FLAGS2_ASYNCIO_START[]