Skip to content

Asynchronous Programming in Python

This guide covers asynchronous programming in Python, from basic concepts to advanced patterns and best practices.

Table of Contents

Introduction

What is Asynchronous Programming?

Asynchronous programming is a method that allows for the execution of certain tasks concurrently without blocking the main thread. Instead of waiting for one task to complete before moving on to the next, asynchronous programming allows multiple tasks to run in "parallel", making better use of system resources and often speeding up overall execution.

When to Use Async

  • I/O-bound operations (network requests, file operations)
  • Web applications and APIs
  • Database operations
  • Long-running tasks that involve waiting

Async vs Threading

  • Async: Single-threaded, event-loop based
  • Threading: Multiple threads running in parallel
  • Use Async When: I/O-bound operations, many concurrent tasks
  • Use Threading When: CPU-bound operations, true parallelism needed

Core Concepts

Event Loop

The event loop is the heart of every asyncio application. It manages the execution of coroutines and callbacks.

1
2
3
4
5
6
7
8
9
import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Run the event loop
asyncio.run(main())

Coroutines

Coroutines are special functions that can pause their execution and yield control back to the event loop.

1
2
3
4
async def my_coroutine():
    print("Starting")
    await asyncio.sleep(1)
    print("Finished")

Tasks

Tasks are used to schedule coroutines concurrently.

1
2
3
4
5
6
7
8
async def main():
    # Create tasks
    task1 = asyncio.create_task(my_coroutine())
    task2 = asyncio.create_task(my_coroutine())

    # Wait for tasks to complete
    await task1
    await task2

Basic Usage

Creating Async Functions

1
2
3
4
5
6
7
8
async def fetch_data():
    # Simulate I/O operation
    await asyncio.sleep(1)
    return "Data"

async def process_data():
    data = await fetch_data()
    return data.upper()

Running Multiple Tasks

async def main():
    # Create multiple tasks
    tasks = [
        asyncio.create_task(fetch_data()),
        asyncio.create_task(fetch_data()),
        asyncio.create_task(fetch_data())
    ]

    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)
    return results

Error Handling

1
2
3
4
5
6
7
async def safe_operation():
    try:
        result = await risky_operation()
        return result
    except Exception as e:
        print(f"Error occurred: {e}")
        return None

Advanced Patterns

Task Management

async def main():
    # Create tasks
    tasks = [asyncio.create_task(fetch_data()) for _ in range(3)]

    # Wait for first completed task
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel remaining tasks
    for task in pending:
        task.cancel()

Timeouts

async def main():
    try:
        # Set timeout for operation
        result = await asyncio.wait_for(
            long_operation(),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        print("Operation timed out")
        return None

Semaphores

async def main():
    # Limit concurrent operations
    sem = asyncio.Semaphore(3)

    async def bounded_operation():
        async with sem:
            return await operation()

    tasks = [bounded_operation() for _ in range(10)]
    return await asyncio.gather(*tasks)

Integration with Libraries

HTTP Requests (aiohttp)

import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ["http://example.com", "http://example.org"]
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

Database Operations (aiomysql)

import aiomysql

async def fetch_data():
    pool = await aiomysql.create_pool(
        host='localhost',
        user='user',
        password='password',
        db='database'
    )

    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT * FROM table")
            return await cur.fetchall()

File Operations (aiofiles)

1
2
3
4
5
6
7
8
9
import aiofiles

async def read_file(filename):
    async with aiofiles.open(filename, mode='r') as f:
        return await f.read()

async def write_file(filename, content):
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)

Best Practices

  1. Error Handling
  2. Always handle exceptions in async functions
  3. Use proper error propagation
  4. Implement cleanup in finally blocks

  5. Resource Management

  6. Use async context managers
  7. Properly close connections and files
  8. Implement proper cleanup

  9. Performance

  10. Use appropriate concurrency limits
  11. Monitor memory usage
  12. Profile async operations

  13. Code Organization

  14. Keep async functions focused
  15. Use proper naming conventions
  16. Document async behavior

Common Patterns

Async Context Managers

1
2
3
4
5
6
7
@contextlib.asynccontextmanager
async def db_management():
    try:
        await stop_database()
        yield
    finally:
        await start_database()

Async Iterators

class AsyncIterator:
    def __init__(self, max_rows=100):
        self._current_row = 0
        self._max_rows = max_rows

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self._current_row < self._max_rows:
            row = (self._current_row, await coroutine())
            self._current_row += 1
            return row
        raise StopAsyncIteration

Async Generators

1
2
3
4
5
6
async def record_streamer(max_rows):
    current_row = 0
    while current_row < max_rows:
        row = (current_row, await coroutine())
        current_row += 1
        yield row
  • Threading
  • Concurrency Patterns
  • Performance Optimization
  • Debugging