Do a pile of work better

Saturday 22 August 2020

A few days ago I wrote about doing a pile of work with concurrent.futures. Since then, I discovered a problem with the code: exceptions raised by the work function were silently ignored.

Here’s the improved code that logs exceptions:

def wait_first(futures):
    Wait for the first future to complete.

        (done, not_done): two sets of futures.

    return cf.wait(futures, return_when=cf.FIRST_COMPLETED)

def do_work(threads, argsfn, workfn):
    Do a pile of work, maybe in threads, with a progress bar.

    Two callables are provided: `workfn` is the unit of work to be done,
    many times.  Its arguments are provided by calling `argsfn`, which
    must produce a sequence of tuples.  `argsfn` will be called a few
    times, and must produce the same sequence each time.

        threads: the number of threads to use.
        argsfn: a callable that produces tuples, the arguments to `workfn`.
        workfn: a callable that does work.

    total = sum(1 for _ in argsfn())
    with tqdm(total=total, smoothing=0.02) as progressbar:
        if threads:
            limit = 2 * threads
            not_done = set()

            def finish_some():
                nonlocal not_done
                done, not_done = wait_first(not_done)
                for done_future in done:
                    exc = done_future.exception()
                    if exc is not None:
                        log.error("Failed future:", exc_info=exc)

            with cf.ThreadPoolExecutor(max_workers=threads) as executor:
                for args in argsfn():
                    while len(not_done) >= limit:
                    not_done.add(executor.submit(workfn, *args))
                while not_done:
            for args in argsfn():

This might also be the first time I’ve used “nonlocal” in real code...


Remi Rampin 3:44 AM on 24 Aug 2020

Isn't this exactly what is for? What are the advantages of this over the standard library, memory efficiency when dealing with a large iterator?

Ned Batchelder 12:01 PM on 24 Aug 2020 will create a list of all the function calls to be made. I didn't want to do that before any work started. This code generates the work items slowly, as the work progresses.

To be fair, I might have been needlessly concerned about that, but it is a difference between the two.

Thomas Grainger 11:34 AM on 27 Aug 2020

you consume all the work to be done up front with:

total = sum(1 for _ in argsfn())
and don't have support for backpressure or chunking

have a look at which supports both

to do sync work you can use:
async def _run_in_executor(executor, fn, /, *args, **kwargs):
    return await asyncio.get_running_loop().run_in_executor(executor, functools.partial(
        fn, *args, **kwargs
or asgiref.sync.sync_to_async to do sync work

Add a comment:

Ignore this:
Leave this empty:
Name is required. Either email or web are required. Email won't be displayed and I won't spam you. Your web site won't be indexed by search engines.
Don't put anything here:
Leave this empty:
URLs auto-link and some tags are allowed: <a><b><i><p><br><pre>.