Skip to main content

Mastering Python AsyncIO for High-Performance Applications

· 6 min read
Shiaondo Orkuma
AI Engineer & Full Stack Developer @ Hash Dynamics

Python's AsyncIO is a powerful library for writing concurrent code using the async/await syntax. Whether you're building web scrapers, API clients, or high-throughput web applications, understanding AsyncIO is crucial for performance-critical Python applications.

Understanding AsyncIO Fundamentals

AsyncIO enables cooperative multitasking where tasks voluntarily yield control, allowing other tasks to run. This is perfect for I/O-bound operations where your code spends time waiting.

Key Concepts

  • Event Loop: The heart of AsyncIO that manages and executes async tasks
  • Coroutines: Functions defined with async def that can be paused and resumed
  • Tasks: Wrapped coroutines that can be scheduled and managed
  • Futures: Objects representing the eventual result of an async operation

Basic AsyncIO Patterns

Simple Async Function

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
"""Fetch a single URL asynchronously"""
async with session.get(url) as response:
return await response.text()

async def fetch_multiple_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results

# Usage
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/3'
]

start_time = time.time()
results = await fetch_multiple_urls(urls)
end_time = time.time()

print(f"Fetched {len(results)} URLs in {end_time - start_time:.2f} seconds")

# Run the async function
asyncio.run(main())

Producer-Consumer Pattern

import asyncio
import random
from asyncio import Queue

async def producer(queue: Queue, producer_id: int):
"""Produce items and put them in the queue"""
for i in range(5):
# Simulate work
await asyncio.sleep(random.uniform(0.1, 0.5))

item = f"item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced {item}")

# Signal completion
await queue.put(None)

async def consumer(queue: Queue, consumer_id: int):
"""Consume items from the queue"""
while True:
item = await queue.get()

if item is None:
# Signal other consumers to stop
await queue.put(None)
break

# Simulate processing
await asyncio.sleep(random.uniform(0.1, 0.3))
print(f"Consumer {consumer_id} processed {item}")

queue.task_done()

async def producer_consumer_example():
queue = Queue(maxsize=10)

# Create producers and consumers
producers = [producer(queue, i) for i in range(2)]
consumers = [consumer(queue, i) for i in range(3)]

# Run all tasks concurrently
await asyncio.gather(*producers, *consumers)

asyncio.run(producer_consumer_example())

Advanced AsyncIO Patterns

Rate Limiting with Semaphores

import asyncio
import aiohttp
from asyncio import Semaphore

class RateLimitedClient:
def __init__(self, max_concurrent_requests: int = 10):
self.semaphore = Semaphore(max_concurrent_requests)
self.session = None

async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()

async def fetch(self, url: str):
async with self.semaphore: # Limit concurrent requests
async with self.session.get(url) as response:
return await response.json()

async def rate_limited_requests():
urls = [f'https://jsonplaceholder.typicode.com/posts/{i}'
for i in range(1, 21)]

async with RateLimitedClient(max_concurrent_requests=5) as client:
tasks = [client.fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)

successful = [r for r in results if not isinstance(r, Exception)]
print(f"Successfully fetched {len(successful)} out of {len(urls)} URLs")

asyncio.run(rate_limited_requests())

Circuit Breaker Pattern

import asyncio
import time
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"

class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED

async def call(self, func: Callable, *args, **kwargs) -> Any:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")

try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e

def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED

def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

# Usage example
async def unreliable_service():
"""Simulate an unreliable service"""
import random
if random.random() < 0.7: # 70% failure rate
raise Exception("Service unavailable")
return "Success!"

async def circuit_breaker_example():
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5)

for i in range(10):
try:
result = await circuit_breaker.call(unreliable_service)
print(f"Call {i}: {result}")
except Exception as e:
print(f"Call {i}: Failed - {e}")

await asyncio.sleep(1)

asyncio.run(circuit_breaker_example())

Database Operations with AsyncIO

Using asyncpg for PostgreSQL

import asyncio
import asyncpg
from typing import List, Dict, Any

class AsyncDatabase:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None

async def create_pool(self):
self.pool = await asyncpg.create_pool(self.connection_string)

async def close_pool(self):
if self.pool:
await self.pool.close()

async def fetch_one(self, query: str, *args) -> Dict[str, Any]:
async with self.pool.acquire() as connection:
row = await connection.fetchrow(query, *args)
return dict(row) if row else None

async def fetch_many(self, query: str, *args) -> List[Dict[str, Any]]:
async with self.pool.acquire() as connection:
rows = await connection.fetch(query, *args)
return [dict(row) for row in rows]

async def execute(self, query: str, *args) -> str:
async with self.pool.acquire() as connection:
return await connection.execute(query, *args)

async def batch_insert(self, table: str, data: List[Dict[str, Any]]):
if not data:
return

columns = list(data[0].keys())
values = [[row[col] for col in columns] for row in data]

query = f"""
INSERT INTO {table} ({', '.join(columns)})
VALUES ({', '.join(f'${i+1}' for i in range(len(columns)))})
"""

async with self.pool.acquire() as connection:
await connection.executemany(query, values)

# Usage example
async def database_operations():
db = AsyncDatabase("postgresql://user:password@localhost/mydb")
await db.create_pool()

try:
# Create table
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
)
""")

# Batch insert
users_data = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
{"name": "Charlie", "email": "charlie@example.com"}
]
await db.batch_insert("users", users_data)

# Fetch data
users = await db.fetch_many("SELECT * FROM users ORDER BY id")
for user in users:
print(f"User: {user['name']} - {user['email']}")

finally:
await db.close_pool()

# asyncio.run(database_operations())

Web Scraping with AsyncIO

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from typing import Set, List

class AsyncWebScraper:
def __init__(self, max_concurrent: int = 10, delay: float = 1.0):
self.max_concurrent = asyncio.Semaphore(max_concurrent)
self.delay = delay
self.visited_urls: Set[str] = set()

async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> str:
async with self.max_concurrent:
try:
async with session.get(url) as response:
if response.status == 200:
content = await response.text()
await asyncio.sleep(self.delay) # Rate limiting
return content
except Exception as e:
print(f"Error fetching {url}: {e}")
return ""

def extract_links(self, html: str, base_url: str) -> List[str]:
soup = BeautifulSoup(html, 'html.parser')
links = []

for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(base_url, href)

# Only include HTTP(S) links from the same domain
parsed = urlparse(full_url)
if parsed.scheme in ['http', 'https'] and parsed.netloc == urlparse(base_url).netloc:
links.append(full_url)

return links

async def scrape_recursive(self, start_url: str, max_depth: int = 2) -> List[str]:
urls_to_visit = [(start_url, 0)] # (url, depth)
scraped_content = []

async with aiohttp.ClientSession() as session:
while urls_to_visit:
current_batch = []

# Process URLs in batches
while urls_to_visit and len(current_batch) < 10:
url, depth = urls_to_visit.pop(0)

if url not in self.visited_urls and depth <= max_depth:
self.visited_urls.add(url)
current_batch.append((url, depth))

if not current_batch:
break

# Fetch all URLs in the current batch
tasks = [self.fetch_page(session, url) for url, _ in current_batch]
results = await asyncio.gather(*tasks)

for (url, depth), content in zip(current_batch, results):
if content:
scraped_content.append(content)

# Extract links for next level (if not at max depth)
if depth < max_depth:
links = self.extract_links(content, url)
for link in links:
if link not in self.visited_urls:
urls_to_visit.append((link, depth + 1))

return scraped_content

# Usage
async def scraping_example():
scraper = AsyncWebScraper(max_concurrent=5, delay=0.5)
content = await scraper.scrape_recursive("https://example.com", max_depth=2)
print(f"Scraped {len(content)} pages")

# asyncio.run(scraping_example())

Performance Optimization Tips

1. Use Connection Pooling

# Good: Reuse connections
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)

# Bad: Creating new session for each request
# for url in urls:
# async with aiohttp.ClientSession() as session:
# async with session.get(url) as response:
# ...

2. Batch Operations

# Good: Batch database operations
await db.executemany(query, batch_data)

# Bad: Individual operations
# for item in data:
# await db.execute(query, item)

3. Use asyncio.gather() for Independent Tasks

# Good: Run independent tasks concurrently
results = await asyncio.gather(
fetch_user_data(user_id),
fetch_user_orders(user_id),
fetch_user_preferences(user_id)
)

# Bad: Sequential execution
# user_data = await fetch_user_data(user_id)
# orders = await fetch_user_orders(user_id)
# preferences = await fetch_user_preferences(user_id)

Common Pitfalls and Solutions

1. Blocking Operations

# Bad: Blocking operation in async function
async def bad_example():
time.sleep(1) # Blocks the entire event loop!
return "done"

# Good: Use async sleep
async def good_example():
await asyncio.sleep(1) # Yields control to event loop
return "done"

2. Exception Handling

async def safe_fetch(session, url):
try:
async with session.get(url) as response:
return await response.json()
except asyncio.TimeoutError:
print(f"Timeout for {url}")
return None
except Exception as e:
print(f"Error fetching {url}: {e}")
return None

# Use return_exceptions=True to handle exceptions gracefully
results = await asyncio.gather(*tasks, return_exceptions=True)

Testing AsyncIO Code

import pytest
import asyncio

@pytest.mark.asyncio
async def test_async_function():
result = await my_async_function()
assert result == expected_value

# For more complex testing
@pytest.fixture
async def async_client():
async with aiohttp.ClientSession() as session:
yield session

@pytest.mark.asyncio
async def test_with_client(async_client):
response = await async_client.get("https://api.example.com/data")
assert response.status == 200

Conclusion

AsyncIO is a powerful tool for building high-performance Python applications. By understanding these patterns and best practices, you can:

  • Handle thousands of concurrent connections
  • Build responsive web applications
  • Create efficient data processing pipelines
  • Implement robust error handling and recovery

The key is to embrace the async/await paradigm and think in terms of cooperative multitasking rather than traditional threading models.

Next up: I'll cover building production-ready async web APIs with FastAPI!