Do a pile of work better

Saturday 22 August 2020

A few days ago I wrote about doing a pile of work with concurrent.futures. Since then, I discovered a problem with the code: exceptions raised by the work function were silently ignored.

Here’s the improved code that logs exceptions:

def wait_first(futures):
    Wait for the first future to complete.

        (done, not_done): two sets of futures.

    return cf.wait(futures, return_when=cf.FIRST_COMPLETED)

def do_work(threads, argsfn, workfn):
    Do a pile of work, maybe in threads, with a progress bar.

    Two callables are provided: `workfn` is the unit of work to be done,
    many times.  Its arguments are provided by calling `argsfn`, which
    must produce a sequence of tuples.  `argsfn` will be called a few
    times, and must produce the same sequence each time.

        threads: the number of threads to use.
        argsfn: a callable that produces tuples, the arguments to `workfn`.
        workfn: a callable that does work.

    total = sum(1 for _ in argsfn())
    with tqdm(total=total, smoothing=0.02) as progressbar:
        if threads:
            limit = 2 * threads
            not_done = set()

            def finish_some():
                nonlocal not_done
                done, not_done = wait_first(not_done)
                for done_future in done:
                    exc = done_future.exception()
                    if exc is not None:
                        log.error("Failed future:", exc_info=exc)

            with cf.ThreadPoolExecutor(max_workers=threads) as executor:
                for args in argsfn():
                    while len(not_done) >= limit:
                    not_done.add(executor.submit(workfn, *args))
                while not_done:
            for args in argsfn():

This might also be the first time I’ve used “nonlocal” in real code...


Isn't this exactly what is for? What are the advantages of this over the standard library, memory efficiency when dealing with a large iterator?
[gravatar] will create a list of all the function calls to be made. I didn't want to do that before any work started. This code generates the work items slowly, as the work progresses.

To be fair, I might have been needlessly concerned about that, but it is a difference between the two.
you consume all the work to be done up front with:
total = sum(1 for _ in argsfn())
and don't have support for backpressure or chunking

have a look at which supports both

to do sync work you can use:
async def _run_in_executor(executor, fn, /, *args, **kwargs):
    return await asyncio.get_running_loop().run_in_executor(executor, functools.partial(
        fn, *args, **kwargs
or asgiref.sync.sync_to_async to do sync work

Add a comment:

Ignore this:
Leave this empty:
Name is required. Either email or web are required. Email won't be displayed and I won't spam you. Your web site won't be indexed by search engines.
Don't put anything here:
Leave this empty:
Comment text is Markdown.