Why Async? The Core Idea
Traditional Python code is synchronous: while one I/O operation is waiting (HTTP request, file read, database query), the entire thread is blocked. Async Python flips this model — instead of blocking, the event loop suspends the waiting coroutine and runs other work, achieving high concurrency on a single thread.
files
covered
version
The skill is split into three reference files. Each covers a specific surface area of Python async programming:
| Reference File | What It Covers |
|---|---|
references/asyncio.md | Core asyncio — Tasks, gather, queues, locks, semaphores, generators, event loop lifecycle |
references/aiohttp.md | aiohttp client + server — sessions, connection pooling, streaming, middleware, WebSockets, SSE |
references/pitfalls.md | Common errors, anti-patterns, debugging, sync↔async bridging |
The One Correct Entry Point
Every async Python program starts the same way. Use asyncio.run() — introduced in Python 3.7 — as the single top-level entry point. Never call get_event_loop().run_until_complete() in new code.
import asyncio
import aiohttp
async def main() -> None:
# --- your async logic here ---
pass
if __name__ == "__main__":
asyncio.run(main()) # Python 3.7+ preferred entry pointThree rules to internalize
Never call asyncio.get_event_loop().run_until_complete() in new code.
Never create a bare asyncio.get_event_loop() outside an async context.
Always use asyncio.run() as the single top-level entry point.
Coroutines, Tasks, and Futures
Understanding the difference between a coroutine object, a Task, and a Future is fundamental. Many bugs come from calling an async def function without await — which creates a coroutine object but never runs it.
| Term | What It Is | Awaitable? |
|---|---|---|
| Coroutine function | async def function | No — calling it returns a coroutine object |
| Coroutine object | The pending execution | Yes |
| Task | Coroutine scheduled on the loop | Yes |
| Future | Low-level result placeholder | Yes |
import asyncio
async def greet(name: str) -> str:
await asyncio.sleep(0.1) # yields control to the event loop
return f"Hello, {name}"
async def main():
# Await directly — simplest form
result = await greet("world")
# Schedule as a background Task
task = asyncio.create_task(greet("background"))
# ... do other work concurrently ...
result = await task # get result / propagate exceptionFan-Out Concurrency: gather vs TaskGroup
Running multiple coroutines concurrently is one of the most common async patterns. Python offers two main tools: the older asyncio.gather() (works on all 3.7+ versions) and the modern asyncio.TaskGroup (Python 3.11+, structured concurrency).
async def main():
# All three run concurrently — total time ≈ max(each), not sum
a, b, c = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
# Prevent one failure from killing all — return_exceptions=True
results = await asyncio.gather(
risky_task(1),
risky_task(2),
risky_task(3),
return_exceptions=True, # exceptions become values, not raises
)
for r in results:
if isinstance(r, Exception):
print(f"Task failed: {r}")
else:
process(r)async def main():
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch_user(1))
task_b = tg.create_task(fetch_user(2))
task_c = tg.create_task(fetch_user(3))
# All tasks guaranteed to finish before this line
print(task_a.result(), task_b.result(), task_c.result())
# Handle multiple exception types with except*
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(risky(i)) for i in range(10)]
except* ValueError as eg:
for exc in eg.exceptions:
print(f"ValueError: {exc}")gather vs TaskGroup — when to use which
Use gather when targeting Python < 3.11 or when you need return_exceptions=True behavior. Use TaskGroupfor all new Python 3.11+ code — it guarantees structured cancellation (if one task fails, all sibling tasks are cancelled and awaited before the error propagates), which prevents silent resource leaks.
Rate Limiting with Semaphore
When running hundreds of concurrent tasks (e.g., scraping URLs), you need to limit how many run at once to avoid overloading the target server or your own connection pool. asyncio.Semaphore is the standard tool.
import asyncio, aiohttp
async def fetch(session, url: str, sem: asyncio.Semaphore) -> dict:
async with sem: # blocks if limit reached
async with session.get(url) as resp:
return await resp.json()
async def fetch_all(urls: list[str], concurrency: int = 10) -> list[dict]:
sem = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, sem) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)Producer / Consumer with Queue
For streaming pipelines where producers generate work faster than consumers can process it, asyncio.Queue provides built-in backpressure via maxsize. The task_done() / join()pair lets you wait until all work is fully processed.
import asyncio
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
await queue.put(None) # sentinel to signal done
async def consumer(queue: asyncio.Queue, worker_id: int):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # re-broadcast sentinel
break
print(f"Worker {worker_id} processed {item}")
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=50) # backpressure at 50 items
NUM_WORKERS = 5
await asyncio.gather(
producer(queue, range(100)),
*[consumer(queue, i) for i in range(NUM_WORKERS)],
)| Queue Class | Behaviour |
|---|---|
asyncio.Queue | FIFO (default) |
asyncio.LifoQueue | LIFO (stack) |
asyncio.PriorityQueue | Smallest item first |
Cancellation & Timeouts
Async code must handle cancellation gracefully. When a Task is cancelled, an asyncio.CancelledError is injected at the next await point. Always re-raise CancelledError after any cleanup — never swallow it.
import asyncio
# --- asyncio.timeout() — Python 3.11+ (preferred) ---
async def main():
try:
async with asyncio.timeout(10.0):
result = await slow_operation()
except TimeoutError:
print("operation timed out")
# --- asyncio.wait_for() — all versions ---
try:
result = await asyncio.wait_for(slow_operation(), timeout=10.0)
except asyncio.TimeoutError:
print("timed out")
# --- Graceful cleanup in a coroutine ---
async def worker():
try:
while True:
await do_work()
except asyncio.CancelledError:
await cleanup() # run cleanup BEFORE re-raising
raise # always re-raise CancelledError
# --- Shield: protect critical work from external cancellation ---
async def main():
task = asyncio.create_task(important_cleanup())
try:
await asyncio.shield(task)
except asyncio.CancelledError:
await task # still wait for cleanup to finishaiohttp — HTTP Client Patterns
The golden rule of aiohttp: one session per application, not one per request. Creating a ClientSession per request leaks connections and defeats connection pooling entirely.
import aiohttp, asyncio
# ❌ WRONG — new session per call, leaks connections
async def fetch_wrong(url):
async with aiohttp.ClientSession() as session:
...
# ✓ CORRECT — shared session injected into each call
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
async def main(urls: list[str]):
timeout = aiohttp.ClientTimeout(total=30, connect=5)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
raise_for_status=True,
) as session:
results = await asyncio.gather(*[fetch(session, u) for u in urls])async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
retries: int = 3,
backoff: float = 1.0,
) -> dict:
for attempt in range(retries):
try:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == retries - 1:
raise
wait = backoff * (2 ** attempt)
await asyncio.sleep(wait)aiohttp — HTTP Server & WebSocket
aiohttp ships with a full async HTTP server. Route handlers are coroutines that receive a Request and return a Response. Middleware follows the same async pattern and can be stacked for logging, auth, and error handling.
from aiohttp import web
import time, logging
# --- Request handlers ---
async def handle_get(request: web.Request) -> web.Response:
name = request.match_info.get("name", "World")
return web.json_response({"message": f"Hello, {name}"})
async def handle_post(request: web.Request) -> web.Response:
body = await request.json()
return web.json_response({"received": body}, status=201)
# --- Middleware: logging ---
@web.middleware
async def logging_middleware(request: web.Request, handler) -> web.Response:
start = time.monotonic()
response = await handler(request)
elapsed = time.monotonic() - start
logging.info(f"{request.method} {request.path} → {response.status} ({elapsed:.3f}s)")
return response
# --- App factory ---
def create_app() -> web.Application:
app = web.Application(middlewares=[logging_middleware])
app.router.add_get("/hello/{name}", handle_get)
app.router.add_post("/items", handle_post)
return app
if __name__ == "__main__":
web.run_app(create_app(), host="0.0.0.0", port=8080)from aiohttp import web
import aiohttp
clients: set[web.WebSocketResponse] = set()
async def ws_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
clients.add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
# broadcast to all other connected clients
for client in clients - {ws}:
if not client.closed:
await client.send_str(msg.data)
finally:
clients.discard(ws)
return wsAsync Generators & Context Managers
Async generators allow lazy streaming of data across await boundaries — perfect for paginated APIs, large file reads, or live data feeds. Pair with async context managers for automatic resource cleanup.
import asyncio
from contextlib import asynccontextmanager
# --- Async generator: paginated API ---
async def paginated_fetch(base_url: str):
page = 1
while True:
data = await fetch_page(base_url, page)
if not data:
return
for item in data:
yield item # one item at a time
page += 1
# Consume
async def main():
async for item in paginated_fetch("https://api.example.com/items"):
process(item)
# Async comprehension
items = [item async for item in paginated_fetch("https://api.example.com/items")]
# --- Async context manager ---
@asynccontextmanager
async def managed_resource(name: str):
resource = await acquire(name)
try:
yield resource
finally:
await release(resource) # guaranteed cleanup
async with managed_resource("db") as db:
await db.query("SELECT 1")Sync ↔ Async Bridging
Real-world code often mixes sync and async. Running a blocking function from async code, or calling an async function from sync code, each requires a specific approach. Using the wrong one will either block the event loop or raise a RuntimeError.
asyncio.to_thread() — Python 3.9+requests.get(), file I/O, heavy CPU libraries. For CPU-bound work use ProcessPoolExecutor instead.asyncio.run() — simplest, no existing loopasyncio.run_coroutine_threadsafe()concurrent.futures.Future — call .result(timeout=30)to block until done.import asyncio, functools
# Sync → Async: offload blocking function to thread
async def main():
result = await asyncio.to_thread(blocking_function, arg1, arg2)
# With keyword args
fn = functools.partial(blocking_function, key="value")
result = await asyncio.to_thread(fn)
# CPU-bound: use ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy_cpu_func, data)
# Async → Sync: call from regular function
def sync_caller():
result = asyncio.run(my_async_function()) # creates + runs + closes loop
return result
# Cross-thread: schedule on running loop from another thread
future = asyncio.run_coroutine_threadsafe(my_coro(), existing_loop)
result = future.result(timeout=30)10 Common Pitfalls — Quick Reference
The references/pitfalls.md file covers each of these in detail with root cause, minimal reproduction, and the correct fix.
| # | Error / Symptom | Root Cause | Fix |
|---|---|---|---|
| P1 | coroutine was never awaited | Called async def without await | Add await |
| P2 | no running event loop | get_event_loop() outside async context | Use asyncio.run() |
| P3 | cannot run nested event loop | asyncio.run() inside running loop | await directly; use nest_asyncio in Jupyter |
| P4 | Task destroyed but pending | Task GC'd before completing | Keep reference; graceful shutdown |
| P5 | Everything hangs | Blocking sync call in async fn | asyncio.to_thread() / run_in_executor |
| P6 | Silent exception in Task | Exception never retrieved | add_done_callback with task.result() |
| P7 | Data corruption under concurrency | Shared state mutated across await | asyncio.Lock() |
| P8 | Unclosed client session | Session not closed on exception | Always use async with ClientSession() |
| P9 | Race condition "fixed" by sleep(0) | Masking a real concurrency bug | Use Lock or Event properly |
| P10 | Shared mutable default argument | Python default arg evaluated once | Use None sentinel, create inside fn |
Debugging Async Code
asyncio ships with a built-in debug mode that warns on slow callbacks, logs unawaited coroutines, and detects coroutines awaited on the wrong thread. Enable it during development — it costs some performance but catches hard-to-reproduce bugs early.
import asyncio, logging
# Enable debug mode (warns on slow callbacks > 100ms, logs unawaited coroutines)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)
# Or via env: PYTHONASYNCIODEBUG=1 python myscript.py
# Inspect all running tasks
async def print_tasks():
for task in asyncio.all_tasks():
print(f" {task.get_name()}: {task.get_coro().__name__}")
task.print_stack()
# Loop blocking watchdog — warns if event loop blocked > 100ms
def loop_watchdog(loop: asyncio.AbstractEventLoop, threshold: float = 0.1):
import time
last_tick = time.monotonic()
def on_tick():
nonlocal last_tick
now = time.monotonic()
blocked = now - last_tick
if blocked > threshold:
print(f"WARNING: event loop blocked for {blocked:.3f}s")
last_tick = now
loop.call_later(0.05, on_tick)
loop.call_soon(on_tick)pip install pytest-asyncio
# Test
@pytest.mark.asyncio
async def test_fetch():
async with aiohttp.ClientSession() as session:
result = await fetch(session, "https://httpbin.org/get")
assert result["url"] == "https://httpbin.org/get"
# Async fixture
@pytest_asyncio.fixture
async def client_session():
async with aiohttp.ClientSession() as session:
yield sessionPython Version Compatibility Matrix
| Feature | Min Version |
|---|---|
asyncio.run() | 3.7 |
asyncio.create_task() | 3.7 |
asyncio.to_thread() | 3.9 |
anext() / aiter() builtins | 3.10 |
asyncio.TaskGroup | 3.11 |
asyncio.timeout() context manager | 3.11 |
ExceptionGroup / except* | 3.11 |
Performance tip: uvloop
Drop-in replacement for the default event loop — typically 2–4× faster for I/O-heavy workloads. Install: pip install uvloop, then at the very top of your entry point: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()). Requires Python 3.8+ on Linux/macOS (not available on Windows).
Download python-async-patterns Skill
This .skill file contains 3 comprehensive reference files — asyncio, aiohttp, and pitfalls — ready to use with Claude or any other AI tool as expert context for Python async programming.
Hosted by ZynU Host · host.zynu.net