Do a pile of work better

Saturday 22 August 2020

A few days ago I wrote about doing a pile of work with concurrent.futures. Since then, I discovered a problem with the code: exceptions raised by the work function were silently ignored.

Here’s the improved code that logs exceptions:

def wait_first(futures):
    """
    Wait for the first future to complete.

    Returns:
        (done, not_done): two sets of futures.

    """
    return cf.wait(futures, return_when=cf.FIRST_COMPLETED)

def do_work(threads, argsfn, workfn):
    """
    Do a pile of work, maybe in threads, with a progress bar.

    Two callables are provided: `workfn` is the unit of work to be done,
    many times.  Its arguments are provided by calling `argsfn`, which
    must produce a sequence of tuples.  `argsfn` will be called a few
    times, and must produce the same sequence each time.

    Args:
        threads: the number of threads to use.
        argsfn: a callable that produces tuples, the arguments to `workfn`.
        workfn: a callable that does work.

    """
    total = sum(1 for _ in argsfn())
    with tqdm(total=total, smoothing=0.02) as progressbar:
        if threads:
            limit = 2 * threads
            not_done = set()

            def finish_some():
                nonlocal not_done
                done, not_done = wait_first(not_done)
                for done_future in done:
                    exc = done_future.exception()
                    if exc is not None:
                        log.error("Failed future:", exc_info=exc)
                progressbar.update(len(done))

            with cf.ThreadPoolExecutor(max_workers=threads) as executor:
                for args in argsfn():
                    while len(not_done) >= limit:
                        finish_some()
                    not_done.add(executor.submit(workfn, *args))
                while not_done:
                    finish_some()
        else:
            for args in argsfn():
                workfn(*args)
                progressbar.update(1)

This might also be the first time I’ve used “nonlocal” in real code...

Comments

[gravatar]
Isn't this exactly what ThreadPoolExecutor.map() is for? What are the advantages of this over the standard library, memory efficiency when dealing with a large iterator?
[gravatar]
ThreadPoolExecutor.map will create a list of all the function calls to be made. I didn't want to do that before any work started. This code generates the work items slowly, as the work progresses.

To be fair, I might have been needlessly concerned about that, but it is a difference between the two.
[gravatar]
you consume all the work to be done up front with:
total = sum(1 for _ in argsfn())
and don't have support for backpressure or chunking

have a look at aiostream.map https://aiostream.readthedocs.io/en/stable/operators.html#aiostream.stream.map which supports both

to do sync work you can use:
async def _run_in_executor(executor, fn, /, *args, **kwargs):
    return await asyncio.get_running_loop().run_in_executor(executor, functools.partial(
        fn, *args, **kwargs
    ))
or asgiref.sync.sync_to_async to do sync work

Add a comment:

Ignore this:
Leave this empty:
Name is required. Either email or web are required. Email won't be displayed and I won't spam you. Your web site won't be indexed by search engines.
Don't put anything here:
Leave this empty:
Comment text is Markdown.