To understand asyncio, you have to understand the difference between Blocking (Synchronous) and Non-blocking (Asynchronous) code.

  • Synchronous (Sync aka blocking): You are a waiter. You take an order to the kitchen and stand there staring at the chef until the food is ready. You can’t serve anyone else. The whole restaurant stops.
  • Asynchronous (Async aka non-blocking): You take an order to the kitchen, hand it over, and immediately go to the next table to take another order. You only return to the first table when the chef rings the bell.

Async is not necessarily “faster” (the chef still takes 10 minutes to cook), but it is more efficient because the waiter never sits idle.

Basic Vocabulary

  • Coroutine: Unlike a standard function that runs from top to bottom immediately, a coroutine (defined with async def) is an awaitable object. When you call it, it doesn’t run yet, but instead creates a “job” that is ready to be scheduled.
  • await: This is the Hand-off. It tells Python: “I am waiting for an external response (like a database query or network request). I am voluntarily giving up control of the CPU so you can run other tasks while I wait.”
  • Event Loop: This is the Scheduler. It’s a continuous loop that monitors all your “awaiting” tasks. When an external response finally arrives, the Event Loop marks that task as “ready” and resumes it from exactly where it left off.
import asyncio

async def brew_coffee():
    print("Starting coffee...")
    await asyncio.sleep(1) # Pause here! Waiter goes to do other things.
    print("Coffee ready!")
    return "Cappuccino"

# You run the entry point using asyncio.run()
if __name__ == "__main__":
    asyncio.run(brew_coffee())

Common Mistake

A common mistake is thinking that just using async/await makes things happen at the same time. It doesn’t.

# WRONG: This takes 3 seconds total. It's just sync code with extra steps.
async def main():
    await fetch_data(1) # Waiter stands here for 1s
    await fetch_data(2) # Waiter stands here for 2s

To make things concurrent, you must schedule them as Tasks so the Event Loop knows to run them in the background.

# CORRECT: This takes 2 seconds total.
async def main():
    task1 = asyncio.create_task(fetch_data(1)) # Start cooking 1
    task2 = asyncio.create_task(fetch_data(2)) # Start cooking 2
    
    # Now we wait for both to finish
    result1 = await task1
    result2 = await task2

Handling Groups: gather and TaskGroup

When you have 100s of tasks, you don’t want to create 100 variables.

asyncio.gather

Think of gather like a teacher collecting homework from 30 students. If one student fails to turn it in, the teacher doesn’t set the school on fire, but simple marks that one as a “fail” and continue grading the rest.

Always use return_exceptions=True. Without it, if one task crashes, gather stops immediately. With it, the error is simply returned as if it were a normal result, allowing all other tasks to finish.

import asyncio

async def fetch_api(id):
    if id == 2:
        raise ValueError("API Down!")
    await asyncio.sleep(1)
    return f"Data {id}"

async def main():
    # Bundling tasks together
    # return_exceptions=True means "Don't crash the whole party if one person trips"
    results = await asyncio.gather(
        fetch_api(1),
        fetch_api(2),
        fetch_api(3),
        return_exceptions=True 
    )

    print(results) 
    # Output: ['Data 1', ValueError('API Down!'), 'Data 3']

Best for: Tasks that are independent, like scraping 100 different websites. If one site is down, you still want the data from the other 99.

asyncio.TaskGroup

TaskGroup (Python 3.11+) is stricter. It follows a “fail-fast” philosophy: if any task within the group raises an exception, the group immediately cancels all other remaining tasks and raises an ExceptionGroup.

# Using TaskGroup (Modern & Strict)
async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(fetch_api(1))
            t2 = tg.create_task(fetch_api(2)) # This one crashes!
            
        # This part is only reached if ALL tasks succeed
        print(t1.result(), t2.result())
    except ExceptionGroup as eg:
        print(f"Mission aborted! Errors: {eg}")

Best for: Use this when your tasks are dependent on each other. If Task A fails and Task B requires Task A’s success to make sense, TaskGroup ensures you don’t waste resources running a broken process.

Async, Threads, and Processes

How do you choose between Async, Threads, and Processes?

Asyncio

  • What it is: Runs on a single CPU thread. It uses an Event Loop to jump between tasks whenever one task is waiting for a response (like an API call).
  • Best for: High-volume I/O Bound tasks (Network requests, Database queries) where the libraries support async/await.
    • Example: You are downloading 100 files using httpx.
    • Solution: Asyncio. It’s the most lightweight and efficient.

Threads

  • What it is: Spawns multiple threads within a single Process. They all share the same memory space.
  • The Catch: In Python, the Global Interpreter Lock (GIL) prevents two threads from executing Python bytecode at the exact same time. This means even with 10 threads, only one is “thinking” while the others are “waiting.”
  • Best for: I/O Bound tasks where you are forced to use synchronous libraries (like requests or boto3) that don’t support asyncio.
    • Example: You are downloading 100 files, but you must use the requests library (which is blocking/synchronous). If you just use Async, it will still take one-by-one because requests doesn’t know how to “pause.”
    • Solution: Threads. You use asyncio.to_thread() to run the blocking code in a separate thread so it doesn’t freeze your main loop.
import asyncio
import requests

def sync_download(url):
    # This library blocks the whole program!
    return requests.get(url).status_code

async def main():
    # We "offload" the blocking task to a separate thread
    result = await asyncio.to_thread(sync_download, "https://google.com")
    print(result)

Processes

  • What it is: Spawns entirely separate Instances of Python. Each process has its own memory and its own GIL. This is true Parallelism. If you have an 8-core CPU, you can actually do 8 things at the exact same millisecond.
  • Best for: CPU Bound tasks. If you are doing heavy math, image processing, or data crunching, this is the only way to use your full CPU power.
from concurrent.futures import ProcessPoolExecutor
import asyncio

def heavy_math(n):
    # Imagine 2 seconds of heavy calculation here
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_running_loop()
    
    # ProcessPoolExecutor creates separate (Processes)
    with ProcessPoolExecutor() as pool:
        # Each calculation runs on a different CPU core
        result = await loop.run_in_executor(pool, heavy_math, 10_000_000)
        print(result)

Controlling Traffic: Semaphore

If you try to download 10,000 images at once, the website will probably block your IP address (or your computer will crash). A Semaphore lets a certain number of tasks run at once.

# Only allow 5 downloads at the same time
max_concurrent_requests = asyncio.Semaphore(5)

async def limited_download(url):
    async with max_concurrent_requests:
        await download_image(url)