A few days ago I wrote about doing a pile of work with concurrent.futures. Since then, I discovered a problem with the code: exceptions raised by the work function were silently ignored.
Here’s the improved code that logs exceptions:
def wait_first(futures):
"""
Wait for the first future to complete.
Returns:
(done, not_done): two sets of futures.
"""
return cf.wait(futures, return_when=cf.FIRST_COMPLETED)
def do_work(threads, argsfn, workfn):
"""
Do a pile of work, maybe in threads, with a progress bar.
Two callables are provided: `workfn` is the unit of work to be done,
many times. Its arguments are provided by calling `argsfn`, which
must produce a sequence of tuples. `argsfn` will be called a few
times, and must produce the same sequence each time.
Args:
threads: the number of threads to use.
argsfn: a callable that produces tuples, the arguments to `workfn`.
workfn: a callable that does work.
"""
total = sum(1 for _ in argsfn())
with tqdm(total=total, smoothing=0.02) as progressbar:
if threads:
limit = 2 * threads
not_done = set()
def finish_some():
nonlocal not_done
done, not_done = wait_first(not_done)
for done_future in done:
exc = done_future.exception()
if exc is not None:
log.error("Failed future:", exc_info=exc)
progressbar.update(len(done))
with cf.ThreadPoolExecutor(max_workers=threads) as executor:
for args in argsfn():
while len(not_done) >= limit:
finish_some()
not_done.add(executor.submit(workfn, *args))
while not_done:
finish_some()
else:
for args in argsfn():
workfn(*args)
progressbar.update(1)
This might also be the first time I’ve used “nonlocal” in real code...
Comments
To be fair, I might have been needlessly concerned about that, but it is a difference between the two.
have a look at aiostream.map https://aiostream.readthedocs.io/en/stable/operators.html#aiostream.stream.map which supports both
to do sync work you can use: or asgiref.sync.sync_to_async to do sync work
Add a comment: