🐍 Python·Mar 2026·3 reference files · 22 min read

python-async-patterns

Patterns, recipes, and pitfall fixes for Python async/await programming — asyncio internals, aiohttp HTTP clients and servers, concurrency primitives (Semaphore, Queue, Lock, TaskGroup), event loop lifecycle management, sync↔async bridging, and debugging. Everything you need to write correct, fast, and maintainable async Python code.

asyncioaiohttpasync/awaitcoroutinesTaskTaskGroupSemaphoreQueueWebSocketSSEevent-loopdebugging

Why Async? The Core Idea

Traditional Python code is synchronous: while one I/O operation is waiting (HTTP request, file read, database query), the entire thread is blocked. Async Python flips this model — instead of blocking, the event loop suspends the waiting coroutine and runs other work, achieving high concurrency on a single thread.

3
Reference
files
10+
Pitfalls
covered
3.7+
Min Python
version

The skill is split into three reference files. Each covers a specific surface area of Python async programming:

Reference FileWhat It Covers
references/asyncio.mdCore asyncio — Tasks, gather, queues, locks, semaphores, generators, event loop lifecycle
references/aiohttp.mdaiohttp client + server — sessions, connection pooling, streaming, middleware, WebSockets, SSE
references/pitfalls.mdCommon errors, anti-patterns, debugging, sync↔async bridging

The One Correct Entry Point

Every async Python program starts the same way. Use asyncio.run() — introduced in Python 3.7 — as the single top-level entry point. Never call get_event_loop().run_until_complete() in new code.

pythonuniversal boilerplate
import asyncio
import aiohttp

async def main() -> None:
    # --- your async logic here ---
    pass

if __name__ == "__main__":
    asyncio.run(main())          # Python 3.7+ preferred entry point

Three rules to internalize

Never call asyncio.get_event_loop().run_until_complete() in new code.
Never create a bare asyncio.get_event_loop() outside an async context.
Always use asyncio.run() as the single top-level entry point.

Coroutines, Tasks, and Futures

Understanding the difference between a coroutine object, a Task, and a Future is fundamental. Many bugs come from calling an async def function without await — which creates a coroutine object but never runs it.

TermWhat It IsAwaitable?
Coroutine functionasync def functionNo — calling it returns a coroutine object
Coroutine objectThe pending executionYes
TaskCoroutine scheduled on the loopYes
FutureLow-level result placeholderYes
pythoncoroutines vs tasks
import asyncio

async def greet(name: str) -> str:
    await asyncio.sleep(0.1)   # yields control to the event loop
    return f"Hello, {name}"

async def main():
    # Await directly — simplest form
    result = await greet("world")

    # Schedule as a background Task
    task = asyncio.create_task(greet("background"))
    # ... do other work concurrently ...
    result = await task           # get result / propagate exception

Fan-Out Concurrency: gather vs TaskGroup

Running multiple coroutines concurrently is one of the most common async patterns. Python offers two main tools: the older asyncio.gather() (works on all 3.7+ versions) and the modern asyncio.TaskGroup (Python 3.11+, structured concurrency).

pythonasyncio.gather — all Python 3.7+
async def main():
    # All three run concurrently — total time ≈ max(each), not sum
    a, b, c = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )

# Prevent one failure from killing all — return_exceptions=True
results = await asyncio.gather(
    risky_task(1),
    risky_task(2),
    risky_task(3),
    return_exceptions=True,        # exceptions become values, not raises
)
for r in results:
    if isinstance(r, Exception):
        print(f"Task failed: {r}")
    else:
        process(r)
pythonasyncio.TaskGroup — Python 3.11+ (recommended)
async def main():
    async with asyncio.TaskGroup() as tg:
        task_a = tg.create_task(fetch_user(1))
        task_b = tg.create_task(fetch_user(2))
        task_c = tg.create_task(fetch_user(3))
    # All tasks guaranteed to finish before this line

    print(task_a.result(), task_b.result(), task_c.result())

# Handle multiple exception types with except*
try:
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(risky(i)) for i in range(10)]
except* ValueError as eg:
    for exc in eg.exceptions:
        print(f"ValueError: {exc}")

gather vs TaskGroup — when to use which

Use gather when targeting Python < 3.11 or when you need return_exceptions=True behavior. Use TaskGroupfor all new Python 3.11+ code — it guarantees structured cancellation (if one task fails, all sibling tasks are cancelled and awaited before the error propagates), which prevents silent resource leaks.

Rate Limiting with Semaphore

When running hundreds of concurrent tasks (e.g., scraping URLs), you need to limit how many run at once to avoid overloading the target server or your own connection pool. asyncio.Semaphore is the standard tool.

pythonSemaphore + aiohttp — rate-limited HTTP
import asyncio, aiohttp

async def fetch(session, url: str, sem: asyncio.Semaphore) -> dict:
    async with sem:                    # blocks if limit reached
        async with session.get(url) as resp:
            return await resp.json()

async def fetch_all(urls: list[str], concurrency: int = 10) -> list[dict]:
    sem = asyncio.Semaphore(concurrency)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, sem) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

Producer / Consumer with Queue

For streaming pipelines where producers generate work faster than consumers can process it, asyncio.Queue provides built-in backpressure via maxsize. The task_done() / join()pair lets you wait until all work is fully processed.

pythonproducer / consumer pattern
import asyncio

async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)
    await queue.put(None)              # sentinel to signal done

async def consumer(queue: asyncio.Queue, worker_id: int):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)      # re-broadcast sentinel
            break
        print(f"Worker {worker_id} processed {item}")
        queue.task_done()

async def main():
    queue: asyncio.Queue = asyncio.Queue(maxsize=50)  # backpressure at 50 items
    NUM_WORKERS = 5

    await asyncio.gather(
        producer(queue, range(100)),
        *[consumer(queue, i) for i in range(NUM_WORKERS)],
    )
Queue ClassBehaviour
asyncio.QueueFIFO (default)
asyncio.LifoQueueLIFO (stack)
asyncio.PriorityQueueSmallest item first

Cancellation & Timeouts

Async code must handle cancellation gracefully. When a Task is cancelled, an asyncio.CancelledError is injected at the next await point. Always re-raise CancelledError after any cleanup — never swallow it.

pythoncancellation, timeouts, shield
import asyncio

# --- asyncio.timeout() — Python 3.11+ (preferred) ---
async def main():
    try:
        async with asyncio.timeout(10.0):
            result = await slow_operation()
    except TimeoutError:
        print("operation timed out")

# --- asyncio.wait_for() — all versions ---
try:
    result = await asyncio.wait_for(slow_operation(), timeout=10.0)
except asyncio.TimeoutError:
    print("timed out")

# --- Graceful cleanup in a coroutine ---
async def worker():
    try:
        while True:
            await do_work()
    except asyncio.CancelledError:
        await cleanup()            # run cleanup BEFORE re-raising
        raise                      # always re-raise CancelledError

# --- Shield: protect critical work from external cancellation ---
async def main():
    task = asyncio.create_task(important_cleanup())
    try:
        await asyncio.shield(task)
    except asyncio.CancelledError:
        await task                 # still wait for cleanup to finish

aiohttp — HTTP Client Patterns

The golden rule of aiohttp: one session per application, not one per request. Creating a ClientSession per request leaks connections and defeats connection pooling entirely.

pythonaiohttp — correct session usage
import aiohttp, asyncio

# ❌ WRONG — new session per call, leaks connections
async def fetch_wrong(url):
    async with aiohttp.ClientSession() as session:
        ...

# ✓ CORRECT — shared session injected into each call
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as resp:
        resp.raise_for_status()
        return await resp.json()

async def main(urls: list[str]):
    timeout = aiohttp.ClientTimeout(total=30, connect=5)
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)

    async with aiohttp.ClientSession(
        timeout=timeout,
        connector=connector,
        raise_for_status=True,
    ) as session:
        results = await asyncio.gather(*[fetch(session, u) for u in urls])
pythonretry with exponential backoff
async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    retries: int = 3,
    backoff: float = 1.0,
) -> dict:
    for attempt in range(retries):
        try:
            async with session.get(url) as resp:
                resp.raise_for_status()
                return await resp.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == retries - 1:
                raise
            wait = backoff * (2 ** attempt)
            await asyncio.sleep(wait)

aiohttp — HTTP Server & WebSocket

aiohttp ships with a full async HTTP server. Route handlers are coroutines that receive a Request and return a Response. Middleware follows the same async pattern and can be stacked for logging, auth, and error handling.

pythonaiohttp server — basic setup with middleware
from aiohttp import web
import time, logging

# --- Request handlers ---
async def handle_get(request: web.Request) -> web.Response:
    name = request.match_info.get("name", "World")
    return web.json_response({"message": f"Hello, {name}"})

async def handle_post(request: web.Request) -> web.Response:
    body = await request.json()
    return web.json_response({"received": body}, status=201)

# --- Middleware: logging ---
@web.middleware
async def logging_middleware(request: web.Request, handler) -> web.Response:
    start = time.monotonic()
    response = await handler(request)
    elapsed = time.monotonic() - start
    logging.info(f"{request.method} {request.path} → {response.status} ({elapsed:.3f}s)")
    return response

# --- App factory ---
def create_app() -> web.Application:
    app = web.Application(middlewares=[logging_middleware])
    app.router.add_get("/hello/{name}", handle_get)
    app.router.add_post("/items", handle_post)
    return app

if __name__ == "__main__":
    web.run_app(create_app(), host="0.0.0.0", port=8080)
pythonWebSocket server with broadcast
from aiohttp import web
import aiohttp

clients: set[web.WebSocketResponse] = set()

async def ws_handler(request: web.Request) -> web.WebSocketResponse:
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    clients.add(ws)
    try:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                # broadcast to all other connected clients
                for client in clients - {ws}:
                    if not client.closed:
                        await client.send_str(msg.data)
    finally:
        clients.discard(ws)
    return ws

Async Generators & Context Managers

Async generators allow lazy streaming of data across await boundaries — perfect for paginated APIs, large file reads, or live data feeds. Pair with async context managers for automatic resource cleanup.

pythonasync generator + context manager
import asyncio
from contextlib import asynccontextmanager

# --- Async generator: paginated API ---
async def paginated_fetch(base_url: str):
    page = 1
    while True:
        data = await fetch_page(base_url, page)
        if not data:
            return
        for item in data:
            yield item             # one item at a time
        page += 1

# Consume
async def main():
    async for item in paginated_fetch("https://api.example.com/items"):
        process(item)

    # Async comprehension
    items = [item async for item in paginated_fetch("https://api.example.com/items")]

# --- Async context manager ---
@asynccontextmanager
async def managed_resource(name: str):
    resource = await acquire(name)
    try:
        yield resource
    finally:
        await release(resource)   # guaranteed cleanup

async with managed_resource("db") as db:
    await db.query("SELECT 1")

Sync ↔ Async Bridging

Real-world code often mixes sync and async. Running a blocking function from async code, or calling an async function from sync code, each requires a specific approach. Using the wrong one will either block the event loop or raise a RuntimeError.

SYNC → ASYNC (calling sync from async)
asyncio.to_thread() — Python 3.9+
Offloads any blocking function to a thread pool without blocking the event loop. Use this for requests.get(), file I/O, heavy CPU libraries. For CPU-bound work use ProcessPoolExecutor instead.
ASYNC → SYNC (calling async from sync)
asyncio.run() — simplest, no existing loop
Creates a new event loop, runs the coroutine, and closes the loop. Do not call from inside an already-running async context.
CROSS-THREAD
asyncio.run_coroutine_threadsafe()
When a running event loop lives on another thread, use this to safely schedule a coroutine from a sync thread. Returns a concurrent.futures.Future — call .result(timeout=30)to block until done.
pythonbridging examples
import asyncio, functools

# Sync → Async: offload blocking function to thread
async def main():
    result = await asyncio.to_thread(blocking_function, arg1, arg2)

    # With keyword args
    fn = functools.partial(blocking_function, key="value")
    result = await asyncio.to_thread(fn)

    # CPU-bound: use ProcessPoolExecutor
    from concurrent.futures import ProcessPoolExecutor
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, heavy_cpu_func, data)

# Async → Sync: call from regular function
def sync_caller():
    result = asyncio.run(my_async_function())   # creates + runs + closes loop
    return result

# Cross-thread: schedule on running loop from another thread
future = asyncio.run_coroutine_threadsafe(my_coro(), existing_loop)
result = future.result(timeout=30)

10 Common Pitfalls — Quick Reference

The references/pitfalls.md file covers each of these in detail with root cause, minimal reproduction, and the correct fix.

#Error / SymptomRoot CauseFix
P1coroutine was never awaitedCalled async def without awaitAdd await
P2no running event loopget_event_loop() outside async contextUse asyncio.run()
P3cannot run nested event loopasyncio.run() inside running loopawait directly; use nest_asyncio in Jupyter
P4Task destroyed but pendingTask GC'd before completingKeep reference; graceful shutdown
P5Everything hangsBlocking sync call in async fnasyncio.to_thread() / run_in_executor
P6Silent exception in TaskException never retrievedadd_done_callback with task.result()
P7Data corruption under concurrencyShared state mutated across awaitasyncio.Lock()
P8Unclosed client sessionSession not closed on exceptionAlways use async with ClientSession()
P9Race condition "fixed" by sleep(0)Masking a real concurrency bugUse Lock or Event properly
P10Shared mutable default argumentPython default arg evaluated onceUse None sentinel, create inside fn

Debugging Async Code

asyncio ships with a built-in debug mode that warns on slow callbacks, logs unawaited coroutines, and detects coroutines awaited on the wrong thread. Enable it during development — it costs some performance but catches hard-to-reproduce bugs early.

pythondebugging tools
import asyncio, logging

# Enable debug mode (warns on slow callbacks > 100ms, logs unawaited coroutines)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)
# Or via env: PYTHONASYNCIODEBUG=1 python myscript.py

# Inspect all running tasks
async def print_tasks():
    for task in asyncio.all_tasks():
        print(f"  {task.get_name()}: {task.get_coro().__name__}")
        task.print_stack()

# Loop blocking watchdog — warns if event loop blocked > 100ms
def loop_watchdog(loop: asyncio.AbstractEventLoop, threshold: float = 0.1):
    import time
    last_tick = time.monotonic()

    def on_tick():
        nonlocal last_tick
        now = time.monotonic()
        blocked = now - last_tick
        if blocked > threshold:
            print(f"WARNING: event loop blocked for {blocked:.3f}s")
        last_tick = now
        loop.call_later(0.05, on_tick)

    loop.call_soon(on_tick)
bashpytest-asyncio for testing
pip install pytest-asyncio

# Test
@pytest.mark.asyncio
async def test_fetch():
    async with aiohttp.ClientSession() as session:
        result = await fetch(session, "https://httpbin.org/get")
    assert result["url"] == "https://httpbin.org/get"

# Async fixture
@pytest_asyncio.fixture
async def client_session():
    async with aiohttp.ClientSession() as session:
        yield session

Python Version Compatibility Matrix

FeatureMin Version
asyncio.run()3.7
asyncio.create_task()3.7
asyncio.to_thread()3.9
anext() / aiter() builtins3.10
asyncio.TaskGroup3.11
asyncio.timeout() context manager3.11
ExceptionGroup / except*3.11

Performance tip: uvloop

Drop-in replacement for the default event loop — typically 2–4× faster for I/O-heavy workloads. Install: pip install uvloop, then at the very top of your entry point: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()). Requires Python 3.8+ on Linux/macOS (not available on Windows).

AI Skill File

Download python-async-patterns Skill

This .skill file contains 3 comprehensive reference files — asyncio, aiohttp, and pitfalls — ready to use with Claude or any other AI tool as expert context for Python async programming.

asyncio reference
aiohttp client + server
10 pitfall fixes
sync↔async bridging
debug tools
⬇ Download Skill File

Hosted by ZynU Host · host.zynu.net