Asynchronous Programming in Python: A Deep Dive
Asynchronous Programming in Python: A Deep Dive
Asynchronous programming is essential for building high-performance applications that can handle multiple tasks efficiently. In this guide, we'll explore Python's asyncio library and learn how to write asynchronous code.
Understanding Asynchronous Programming
Asynchronous programming allows you to write concurrent code without using threads. Key concepts include:
- Coroutines
- Event Loops
- Tasks
- Futures
Getting Started with Asyncio
Let's start with a simple example:
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
await hello()
asyncio.run(main())
Coroutines and Tasks
Coroutines are the building blocks of async code:
import asyncio
import time
async def fetch_data(delay):
print(f"Starting fetch_data({delay})")
await asyncio.sleep(delay)
print(f"Finished fetch_data({delay})")
return f"Data from {delay}"
async def main():
# Create tasks
task1 = asyncio.create_task(fetch_data(2))
task2 = asyncio.create_task(fetch_data(1))
# Wait for both tasks to complete
start_time = time.time()
results = await asyncio.gather(task1, task2)
end_time = time.time()
print(f"Results: {results}")
print(f"Total time: {end_time - start_time:.2f} seconds")
asyncio.run(main())
Async Context Managers
Create reusable async resources:
import asyncio
import aiohttp
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(1)
async def process(self):
print("Processing data")
await asyncio.sleep(1)
async def main():
async with AsyncResource() as resource:
await resource.process()
asyncio.run(main())
Real-World Example: Async Web Scraper
Let's build a web scraper that fetches multiple URLs concurrently:
import asyncio
import aiohttp
import bs4
from typing import List, Dict
import time
class AsyncWebScraper:
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_page(self, url: str) -> str:
"""Fetch a single page."""
async with self.session.get(url) as response:
return await response.text()
async def parse_page(self, html: str) -> Dict:
"""Parse page content."""
soup = bs4.BeautifulSoup(html, 'html.parser')
return {
'title': soup.title.string if soup.title else None,
'h1_tags': [h1.text for h1 in soup.find_all('h1')],
'links': len(soup.find_all('a')),
}
async def process_url(self, url: str) -> Dict:
"""Process a single URL."""
try:
html = await self.fetch_page(url)
data = await self.parse_page(html)
data['url'] = url
return data
except Exception as e:
return {'url': url, 'error': str(e)}
async def scrape_urls(self, urls: List[str]) -> List[Dict]:
"""Scrape multiple URLs concurrently."""
tasks = [self.process_url(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
'https://python.org',
'https://github.com',
'https://stackoverflow.com',
]
start_time = time.time()
async with AsyncWebScraper() as scraper:
results = await scraper.scrape_urls(urls)
end_time = time.time()
print(f"Scraped {len(results)} pages in {end_time - start_time:.2f} seconds")
for result in results:
if 'error' in result:
print(f"Error scraping {result['url']}: {result['error']}")
else:
print(f"\nURL: {result['url']}")
print(f"Title: {result['title']}")
print(f"H1 Tags: {result['h1_tags']}")
print(f"Number of links: {result['links']}")
if __name__ == '__main__':
asyncio.run(main())
Project: Async Task Queue
Let's build a task queue system that processes jobs asynchronously:
import asyncio
import uuid
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable, Awaitable
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Task:
id: str
name: str
payload: Dict
status: str
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
result: Optional[Dict] = None
error: Optional[str] = None
class AsyncTaskQueue:
def __init__(self, max_workers: int = 3):
self.max_workers = max_workers
self.tasks: Dict[str, Task] = {}
self.queue: asyncio.Queue = asyncio.Queue()
self.workers: List[asyncio.Task] = []
self.handlers: Dict[str, Callable[[Dict], Awaitable[Dict]]] = {}
def register_handler(self, task_name: str, handler: Callable[[Dict], Awaitable[Dict]]):
"""Register a handler for a specific task type."""
self.handlers[task_name] = handler
async def add_task(self, name: str, payload: Dict) -> str:
"""Add a new task to the queue."""
if name not in self.handlers:
raise ValueError(f"No handler registered for task type: {name}")
task_id = str(uuid.uuid4())
task = Task(
id=task_id,
name=name,
payload=payload,
status='pending',
created_at=datetime.now()
)
self.tasks[task_id] = task
await self.queue.put(task_id)
logger.info(f"Added task {task_id} of type {name}")
return task_id
async def get_task(self, task_id: str) -> Optional[Task]:
"""Get task status and result."""
return self.tasks.get(task_id)
async def process_task(self, task_id: str):
"""Process a single task."""
task = self.tasks[task_id]
handler = self.handlers[task.name]
try:
task.status = 'running'
task.started_at = datetime.now()
logger.info(f"Processing task {task_id}")
result = await handler(task.payload)
task.status = 'completed'
task.result = result
except Exception as e:
logger.error(f"Error processing task {task_id}: {e}")
task.status = 'failed'
task.error = str(e)
finally:
task.completed_at = datetime.now()
async def worker(self):
"""Worker process that handles tasks from the queue."""
while True:
try:
task_id = await self.queue.get()
await self.process_task(task_id)
self.queue.task_done()
except Exception as e:
logger.error(f"Worker error: {e}")
async def start(self):
"""Start the task queue workers."""
self.workers = [
asyncio.create_task(self.worker())
for _ in range(self.max_workers)
]
logger.info(f"Started {self.max_workers} workers")
async def stop(self):
"""Stop the task queue workers."""
for worker in self.workers:
worker.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
logger.info("Stopped all workers")
# Example usage
async def example_handler(payload: Dict) -> Dict:
"""Example task handler that simulates processing."""
await asyncio.sleep(2) # Simulate work
return {'processed': payload}
async def main():
# Create and start task queue
queue = AsyncTaskQueue(max_workers=3)
queue.register_handler('example', example_handler)
await queue.start()
try:
# Add some tasks
tasks = []
for i in range(5):
task_id = await queue.add_task('example', {'data': f'task_{i}'})
tasks.append(task_id)
# Wait for tasks to complete
while True:
incomplete = False
for task_id in tasks:
task = await queue.get_task(task_id)
if task.status not in ('completed', 'failed'):
incomplete = True
break
if not incomplete:
break
await asyncio.sleep(0.5)
# Print results
for task_id in tasks:
task = await queue.get_task(task_id)
print(f"\nTask {task_id}:")
print(f"Status: {task.status}")
print(f"Result: {task.result}")
print(f"Error: {task.error}")
print(f"Duration: {(task.completed_at - task.started_at).total_seconds():.2f}s")
finally:
await queue.stop()
if __name__ == '__main__':
asyncio.run(main())
Best Practices
- Error Handling
async def safe_operation():
try:
async with timeout(5): # Set timeout
await potentially_long_operation()
except asyncio.TimeoutError:
print("Operation timed out")
except Exception as e:
print(f"Operation failed: {e}")
- Resource Management
async def manage_resources():
async with AsyncResource() as resource:
try:
await resource.process()
except Exception:
# Resource will be properly released
raise
- Concurrency Control
async def controlled_concurrency():
semaphore = asyncio.Semaphore(5) # Limit concurrent operations
async with semaphore:
await limited_operation()
- Task Cancellation
async def handle_cancellation():
try:
await long_operation()
except asyncio.CancelledError:
# Clean up resources
raise # Re-raise to propagate cancellation
Common Patterns
- Producer-Consumer
async def producer(queue):
for i in range(5):
await queue.put(i)
await asyncio.sleep(1)
async def consumer(queue):
while True:
item = await queue.get()
print(f"Processed {item}")
queue.task_done()
- Fan-out/Fan-in
async def fan_out_fan_in(items):
# Fan out
tasks = [process_item(item) for item in items]
# Fan in
results = await asyncio.gather(*tasks)
return results
Conclusion
Asynchronous programming in Python enables:
- Efficient I/O operations
- Better resource utilization
- Improved application performance
- Scalable concurrent operations
Keep exploring asyncio and its ecosystem to build robust async applications.