updated December 21, 2023

Concurrency introduction with Python and AsyncIO

We'll see how to run Python tasks (functions) with AsyncIO library in concurrent way.

Assume you are going to make a coffee and fry eggs for the breakfast. You prepare the coffee ingridients in 1 minute then wait for 5 minutes till the coffee is ready. For fried eggs it takes 1 minute to prepare and 3 minutes to wait.

You can make a coffee then fry eggs:

from time import sleep, time

def make_coffee():
    print("coffee: prepare ingridients")
    sleep(1)
    print("coffee: waiting...")
    sleep(5)
    print("coffee: ready")

def fry_eggs():
    print("eggs: prepare ingridients")
    sleep(1)
    print("eggs: frying...")
    sleep(3)
    print("eggs: ready")

def main():
    start = time()
    make_coffee()
    fry_eggs()
    print(f"breakfast is ready in {time()-start} min")


main()

Let's save the code above in main.py file and run via python main.py command:

coffee: prepare ingridients
coffee: waiting...
coffee: ready
eggs: prepare ingridients
eggs: frying...
eggs: ready
breakfast is ready in 10.009082555770874 min

It took 6 + 4 = 10 min, as expected.

But it's possible to make the breakfast in less time if start frying eggs when waiting for the coffee is ready:

  • prepare coffee ingridients, start making a coffee: 1 min
  • prepare eggs ingridients, start frying: 1 min
  • wait till the coffee and eggs are ready: 4 min

What's concurrency?

Definition 1. Tasks are run concurrently, if we can start a task when there's another started not finished task (in progress, but could be paused), or task execution times are overlapping.

Definition 2.A task is called a coroutine (can be run concurrently) if it has explicit points where it could be paused and resumed later. When a task is paused at the points, where it waits for condition or results received, other tasks could be run.
Like in the example above, where we started frying eggs when waiting for the coffee ready.

Starting another tasks at waiting points allows to reduce total execution time than if only start the next task after the previous has completed.

Examples of tasks that can be run in concurrent way: network request paused when it is sent, and resumed after getting the response; database client paused when a query is sent to the database server and resumed after getting the query results.

How to change a Python function to a coroutine

  1. Mark the function as coroutine: add async keyword to the function definition.
  2. Inside the function body: identify coroutine calls and call them asynchronously, see the item 3. Their definitions should be marked as async as well, so they also could be paused and resumed at some points.
  3. Call the function itself: special awaitable object is immediately returned without the function execution instead the real return value. This awaitable object is used to control the function execution (start or wait for completion etc) and getting the return value. One option to run and wait for task is await expression on the awaitable function return value: await func().
    The top level function is run differently: asyncio.run(main())

Let's change the code above to run concurrently:

import asyncio
from time import sleep, time


async def make_coffee():  # 1
    print("coffee: prepare ingridients")
    sleep(1)
    print("coffee: waiting...")
    await asyncio.sleep(5)  # 2: pause, another tasks can be run
    print("coffee: ready")

async def fry_eggs():  # 1
    print("eggs: prepare ingridients")
    sleep(1)
    print("eggs: frying...")
    await asyncio.sleep(3)  # 2: pause, another tasks can be run
    print("eggs: ready")

async def main():  # 1
    start = time()
    await make_coffee()  # run task with await
    await fry_eggs()
    print(f"breakfast is ready in {time()-start} min")


asyncio.run(main())  # run top-level function concurrently

If run the code above, you'll see that tasks were run in sequence, as before:

coffee: prepare ingridients
coffee: waiting...
coffee: ready
eggs: prepare ingridients
eggs: frying...
eggs: ready
breakfast is ready in 10.012510776519775 min

Why? It's because the await expression starts a task and waits for completion.

How to run several functions concurrently

Let's change this to start both tasks first with asyncio.create_task() that schedules a task for execution without waiting for its completion. We'll change main() function only for this:

async def main():
    start = time()
    coffee_task = asyncio.create_task(make_coffee())  # schedule execution
    eggs_task = asyncio.create_task(fry_eggs())  # schedule execution
    # wait for completion, both tasks are scheduled for execution already
    await coffee_task
    await eggs_task
    print(f"breakfast is ready in {time()-start} min")

Now it runs as expected, concurrently in 6 seconds:

coffee: prepare ingridients
coffee: waiting...
eggs: prepare ingridients
eggs: frying...
eggs: ready
coffee: ready
breakfast is ready in 6.003154516220093 min

We can wait for several tasks finished with await asyncio.gather() call without explicit task creation:

async def main():
    start = time()
    await asyncio.gather(make_coffee(), fry_eggs())
    print(f"breakfast is ready in {time()-start} min")
This call schedules coroutines as tasks and waits for their completion.

How to get the function return value

If a coroutine has return value, the await expression returns it:

result = await func_call()
task_res = await coffee_task

await asyncio.gather() returns tuple of task results, one item per task. Example:

res1, res2, res3 = asyncio.gather(func1(), func2(), func3())

Benefits and limitations of asyncio

  • Coroutines are run in single thread (called asyncio event loop) so there's no more than one task performing at the moment, which is different from parallelism.
  • Task code execution progress looks linear, so the code is simpler than multithreading code.
  • Concurrent execution is faster than sequental because wait time is not wasted and used to perform another tasks. But it works only for coroutines that wait for some time, like network, database or I/O calls. It doesn't work for CPU intensive tasks without significant waiting.
  • Saves CPU time avoiding thread context switches every short period, switching instead at explicit points.