Asynchronous Programming in Python: A Deep Dive


Asynchronous Programming in Python: A Deep Dive

Asynchronous programming is essential for building high-performance applications that can handle multiple tasks efficiently. In this guide, we'll explore Python's asyncio library and learn how to write asynchronous code.

Understanding Asynchronous Programming

Asynchronous programming allows you to write concurrent code without using threads. Key concepts include:

  1. Coroutines
  2. Event Loops
  3. Tasks
  4. Futures

Getting Started with Asyncio

Let's start with a simple example:

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    await hello()

asyncio.run(main())

Coroutines and Tasks

Coroutines are the building blocks of async code:

import asyncio
import time

async def fetch_data(delay):
    print(f"Starting fetch_data({delay})")
    await asyncio.sleep(delay)
    print(f"Finished fetch_data({delay})")
    return f"Data from {delay}"

async def main():
    # Create tasks
    task1 = asyncio.create_task(fetch_data(2))
    task2 = asyncio.create_task(fetch_data(1))
    
    # Wait for both tasks to complete
    start_time = time.time()
    results = await asyncio.gather(task1, task2)
    end_time = time.time()
    
    print(f"Results: {results}")
    print(f"Total time: {end_time - start_time:.2f} seconds")

asyncio.run(main())

Async Context Managers

Create reusable async resources:

import asyncio
import aiohttp

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource")
        await asyncio.sleep(1)
    
    async def process(self):
        print("Processing data")
        await asyncio.sleep(1)

async def main():
    async with AsyncResource() as resource:
        await resource.process()

asyncio.run(main())

Real-World Example: Async Web Scraper

Let's build a web scraper that fetches multiple URLs concurrently:

import asyncio
import aiohttp
import bs4
from typing import List, Dict
import time

class AsyncWebScraper:
    def __init__(self):
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_page(self, url: str) -> str:
        """Fetch a single page."""
        async with self.session.get(url) as response:
            return await response.text()
    
    async def parse_page(self, html: str) -> Dict:
        """Parse page content."""
        soup = bs4.BeautifulSoup(html, 'html.parser')
        return {
            'title': soup.title.string if soup.title else None,
            'h1_tags': [h1.text for h1 in soup.find_all('h1')],
            'links': len(soup.find_all('a')),
        }
    
    async def process_url(self, url: str) -> Dict:
        """Process a single URL."""
        try:
            html = await self.fetch_page(url)
            data = await self.parse_page(html)
            data['url'] = url
            return data
        except Exception as e:
            return {'url': url, 'error': str(e)}
    
    async def scrape_urls(self, urls: List[str]) -> List[Dict]:
        """Scrape multiple URLs concurrently."""
        tasks = [self.process_url(url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        'https://python.org',
        'https://github.com',
        'https://stackoverflow.com',
    ]
    
    start_time = time.time()
    
    async with AsyncWebScraper() as scraper:
        results = await scraper.scrape_urls(urls)
    
    end_time = time.time()
    
    print(f"Scraped {len(results)} pages in {end_time - start_time:.2f} seconds")
    
    for result in results:
        if 'error' in result:
            print(f"Error scraping {result['url']}: {result['error']}")
        else:
            print(f"\nURL: {result['url']}")
            print(f"Title: {result['title']}")
            print(f"H1 Tags: {result['h1_tags']}")
            print(f"Number of links: {result['links']}")

if __name__ == '__main__':
    asyncio.run(main())

Project: Async Task Queue

Let's build a task queue system that processes jobs asynchronously:

import asyncio
import uuid
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable, Awaitable
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Task:
    id: str
    name: str
    payload: Dict
    status: str
    created_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    result: Optional[Dict] = None
    error: Optional[str] = None

class AsyncTaskQueue:
    def __init__(self, max_workers: int = 3):
        self.max_workers = max_workers
        self.tasks: Dict[str, Task] = {}
        self.queue: asyncio.Queue = asyncio.Queue()
        self.workers: List[asyncio.Task] = []
        self.handlers: Dict[str, Callable[[Dict], Awaitable[Dict]]] = {}
    
    def register_handler(self, task_name: str, handler: Callable[[Dict], Awaitable[Dict]]):
        """Register a handler for a specific task type."""
        self.handlers[task_name] = handler
    
    async def add_task(self, name: str, payload: Dict) -> str:
        """Add a new task to the queue."""
        if name not in self.handlers:
            raise ValueError(f"No handler registered for task type: {name}")
        
        task_id = str(uuid.uuid4())
        task = Task(
            id=task_id,
            name=name,
            payload=payload,
            status='pending',
            created_at=datetime.now()
        )
        
        self.tasks[task_id] = task
        await self.queue.put(task_id)
        
        logger.info(f"Added task {task_id} of type {name}")
        return task_id
    
    async def get_task(self, task_id: str) -> Optional[Task]:
        """Get task status and result."""
        return self.tasks.get(task_id)
    
    async def process_task(self, task_id: str):
        """Process a single task."""
        task = self.tasks[task_id]
        handler = self.handlers[task.name]
        
        try:
            task.status = 'running'
            task.started_at = datetime.now()
            
            logger.info(f"Processing task {task_id}")
            result = await handler(task.payload)
            
            task.status = 'completed'
            task.result = result
            
        except Exception as e:
            logger.error(f"Error processing task {task_id}: {e}")
            task.status = 'failed'
            task.error = str(e)
        
        finally:
            task.completed_at = datetime.now()
    
    async def worker(self):
        """Worker process that handles tasks from the queue."""
        while True:
            try:
                task_id = await self.queue.get()
                await self.process_task(task_id)
                self.queue.task_done()
            except Exception as e:
                logger.error(f"Worker error: {e}")
    
    async def start(self):
        """Start the task queue workers."""
        self.workers = [
            asyncio.create_task(self.worker())
            for _ in range(self.max_workers)
        ]
        logger.info(f"Started {self.max_workers} workers")
    
    async def stop(self):
        """Stop the task queue workers."""
        for worker in self.workers:
            worker.cancel()
        
        await asyncio.gather(*self.workers, return_exceptions=True)
        logger.info("Stopped all workers")

# Example usage
async def example_handler(payload: Dict) -> Dict:
    """Example task handler that simulates processing."""
    await asyncio.sleep(2)  # Simulate work
    return {'processed': payload}

async def main():
    # Create and start task queue
    queue = AsyncTaskQueue(max_workers=3)
    queue.register_handler('example', example_handler)
    await queue.start()
    
    try:
        # Add some tasks
        tasks = []
        for i in range(5):
            task_id = await queue.add_task('example', {'data': f'task_{i}'})
            tasks.append(task_id)
        
        # Wait for tasks to complete
        while True:
            incomplete = False
            for task_id in tasks:
                task = await queue.get_task(task_id)
                if task.status not in ('completed', 'failed'):
                    incomplete = True
                    break
            
            if not incomplete:
                break
            
            await asyncio.sleep(0.5)
        
        # Print results
        for task_id in tasks:
            task = await queue.get_task(task_id)
            print(f"\nTask {task_id}:")
            print(f"Status: {task.status}")
            print(f"Result: {task.result}")
            print(f"Error: {task.error}")
            print(f"Duration: {(task.completed_at - task.started_at).total_seconds():.2f}s")
    
    finally:
        await queue.stop()

if __name__ == '__main__':
    asyncio.run(main())

Best Practices

  1. Error Handling
async def safe_operation():
    try:
        async with timeout(5):  # Set timeout
            await potentially_long_operation()
    except asyncio.TimeoutError:
        print("Operation timed out")
    except Exception as e:
        print(f"Operation failed: {e}")
  1. Resource Management
async def manage_resources():
    async with AsyncResource() as resource:
        try:
            await resource.process()
        except Exception:
            # Resource will be properly released
            raise
  1. Concurrency Control
async def controlled_concurrency():
    semaphore = asyncio.Semaphore(5)  # Limit concurrent operations
    async with semaphore:
        await limited_operation()
  1. Task Cancellation
async def handle_cancellation():
    try:
        await long_operation()
    except asyncio.CancelledError:
        # Clean up resources
        raise  # Re-raise to propagate cancellation

Common Patterns

  1. Producer-Consumer
async def producer(queue):
    for i in range(5):
        await queue.put(i)
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Processed {item}")
        queue.task_done()
  1. Fan-out/Fan-in
async def fan_out_fan_in(items):
    # Fan out
    tasks = [process_item(item) for item in items]
    
    # Fan in
    results = await asyncio.gather(*tasks)
    return results

Conclusion

Asynchronous programming in Python enables:

  • Efficient I/O operations
  • Better resource utilization
  • Improved application performance
  • Scalable concurrent operations

Keep exploring asyncio and its ecosystem to build robust async applications.

Further Reading