Background tasks in Python 3.5

One of the recurring questions with asyncio is "How do I execute one or two operations asynchronously in an otherwise synchronous application?"

Say, for example, I have the following code:

>>> import itertools, time
>>> def ticker():
...     for i in itertools.count():
...         print(i)
...         time.sleep(1)
...
>>> ticker()
0
1
2
3
^CTraceback (most recent call last):
 File "<stdin>", line 1, in <module>
 File "<stdin>", line 4, in ticker
KeyboardInterrupt

With the native coroutine syntax coming in Python 3.5, I can change that synchronous code into event-driven asynchronous code easily enough:

import asyncio, itertools
async def ticker():
    for i in itertools.count():
        print(i)
        await asyncio.sleep(1)

But how do I arrange for that ticker to start running in the background? What's the coroutine equivalent of appending & to a shell command?

It turns out it looks something like this:

import asyncio
def schedule_coroutine(target, *, loop=None):
    """Schedules target coroutine in the given event loop

    If not given, *loop* defaults to the current thread's event loop

    Returns the scheduled task.
    """
    if asyncio.iscoroutine(target):
        return asyncio.ensure_future(target, loop=loop)
    raise TypeError("target must be a coroutine, "
                    "not {!r}".format(type(target)))

Update: This post originally suggested a combined "run_in_background" helper function that handle both scheduling coroutines and calling arbitrary callables in a background thread or process. On further reflection, I decided that was unhelpfully conflating two different concepts, so I replaced it with separate "schedule_coroutine" and "call_in_background" helpers

So now I can do:

>>> import itertools
>>> async def ticker():
...     for i in itertools.count():
...         print(i)
...         await asyncio.sleep(1)
...
>>> ticker1 = schedule_coroutine(ticker())
>>> ticker1
<Task pending coro=<ticker() running at <stdin>:1>>

But how do I run that for a while? The event loop won't run unless the current thread starts it running and either stops when a particular event occurs, or when explicitly stopped. Another helper function covers that:

def run_in_foreground(task, *, loop=None):
    """Runs event loop in current thread until the given task completes

    Returns the result of the task.
    For more complex conditions, combine with asyncio.wait()
    To include a timeout, combine with asyncio.wait_for()
    """
    if loop is None:
        loop = asyncio.get_event_loop()
    return loop.run_until_complete(asyncio.ensure_future(task, loop=loop))

And then I can do:

>>> run_in_foreground(asyncio.sleep(5))
0
1
2
3
4

Here we can see the background task running while we wait for the foreground task to complete. And if I do it again with a different timeout:

>>> run_in_foreground(asyncio.sleep(3))
5
6
7

We see that the background task picked up again right where it left off the first time.

We can also single step the event loop with a zero second sleep (the ticks reflect the fact there was more than a second delay between running each command):

>>> run_in_foreground(asyncio.sleep(0))
8
>>> run_in_foreground(asyncio.sleep(0))
9

And start a second ticker to run concurrently with the first one:

>>> ticker2 = schedule_coroutine(ticker())
>>> ticker2
<Task pending coro=<ticker() running at <stdin>:1>>
>>> run_in_foreground(asyncio.sleep(0))
0
10

The asynchronous tickers will happily hang around in the background, ready to resume operation whenever I give them the opportunity. If I decide I want to stop one of them, I can cancel the corresponding task:

>>> ticker1.cancel()
True
>>> run_in_foreground(asyncio.sleep(0))
1
>>> ticker2.cancel()
True
>>> run_in_foreground(asyncio.sleep(0))

But what about our original synchronous ticker? Can I run that as a background task? It turns out I can, with the aid of another helper function:

def call_in_background(target, *, loop=None, executor=None):
    """Schedules and starts target callable as a background task

    If not given, *loop* defaults to the current thread's event loop
    If not given, *executor* defaults to the loop's default executor

    Returns the scheduled task.
    """
    if loop is None:
        loop = asyncio.get_event_loop()
    if callable(target):
        return loop.run_in_executor(executor, target)
    raise TypeError("target must be a callable, "
                    "not {!r}".format(type(target)))

However, I haven't figured out how to reliably cancel a task running in a separate thread or process, so for demonstration purposes, we'll define a variant of the synchronous version that stops automatically after 5 ticks rather than ticking indefinitely:

import itertools, time
def tick_5_sync():
    for i in range(5):
        print(i)
        time.sleep(1)
    print("Finishing")

The key difference between scheduling a callable in a background thread and scheduling a coroutine in the current thread, is that the callable will start executing immediately, rather than waiting for the current thread to run the event loop:

>>> threaded_ticker = call_in_background(tick_5_sync); print("Starts immediately!")
0
Starts immediately!
>>> 1
2
3
4
Finishing

That's both a strength (as you can run multiple blocking IO operations in parallel), but also a significant weakness - one of the benefits of explicit coroutines is their predictability, as you know none of them will start doing anything until you start running the event loop.

Comments

Comments powered by Disqus