Python 基础Technical Deep Dive

Python 异步编程:asyncio 详解

发布时间2025/11/12
分类Python 基础
预计阅读9 分钟
作者吴长龙
*

异步编程是现代 Python 的核心技能。本文详细介绍 asyncio、async/await、并发任务管理,以及在实际项目中的应用。

01.内容

# Python 异步编程:asyncio 详解

异步编程是现代 Python 开发的核心技能,尤其是在处理 I/O 密集型任务(如 API 调用、文件操作、网络请求)时。

02.1. 异步编程基础

1.1 什么是异步编程

python snippetpython
# 同步代码:一步步执行
import time

def sync_task():
    print("开始任务1")
    time.sleep(1)  # 阻塞等待
    print("任务1完成")
    
    print("开始任务2")
    time.sleep(1)  # 阻塞等待
    print("任务2完成")

# 执行时间:2秒
sync_task()
python snippetpython
# 异步代码:并发执行
import asyncio

async def async_task(name, delay):
    print(f"开始{name}")
    await asyncio.sleep(delay)  # 非阻塞,等待其他任务
    print(f"{name}完成")

async def main():
    # 并发执行两个任务
    await asyncio.gather(
        async_task("任务1", 1),
        async_task("任务2", 1)
    )

# 执行时间:1秒
asyncio.run(main())

1.2 async/await 语法

python snippetpython
# 定义异步函数
async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(1)  # 等待异步操作完成
    return {"data": "hello"}

# 调用异步函数
async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

03.2. 核心概念

2.1 await 关键字

python snippetpython
import asyncio

async def task1():
    await asyncio.sleep(1)
    return "结果1"

async def task2():
    await asyncio.sleep(0.5)
    return "结果2"

async def main():
    # 顺序执行
    r1 = await task1()  # 等1秒
    r2 = await task2()  # 再等0.5秒
    
    # 并发执行
    r1, r2 = await asyncio.gather(task1(), task2())  # 只需1秒

2.2 asyncio.gather 并发执行

python snippetpython
async def main():
    # 多个任务并发执行
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    
    for result in results:
        print(result)

2.3 asyncio.create_task 任务调度

python snippetpython
async def main():
    # 创建任务(不立即执行)
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(process_data())
    
    # 执行其他操作
    print("准备就绪")
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2

04.3. 实际应用

3.1 异步 HTTP 请求

python snippetpython
import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/torvalds",
        "https://api.github.com/users/",
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        responses = await asyncio.gather(*tasks)
        
        for url, response in zip(urls, responses):
            print(f"{url}: {len(response)} bytes")

asyncio.run(main())

3.2 异步数据库操作

python snippetpython
import asyncio
import aiosqlite

async def query_user(db, user_id):
    async with db.execute(
        "SELECT * FROM users WHERE id = ?", (user_id,)
    ) as cursor:
        return await cursor.fetchone()

async def main():
    async with aiosqlite.connect("app.db") as db:
        # 并发查询多个用户
        users = await asyncio.gather(
            query_user(db, 1),
            query_user(db, 2),
            query_user(db, 3),
        )
        print(users)

asyncio.run(main())

3.3 异步文件操作

python snippetpython
import aiofiles
import asyncio

async def read_file(filename):
    async with aiofiles.open(filename, mode='r') as file:
        return await file.read()

async def write_file(filename, content):
    async with aiofiles.open(filename, mode='w') as file:
        await file.write(content)

async def main():
    # 并发读写
    await asyncio.gather(
        write_file("file1.txt", "Hello"),
        write_file("file2.txt", "World"),
    )
    
    content = await read_file("file1.txt")
    print(content)

asyncio.run(main())

05.4. 任务管理

4.1 Task 控制

python snippetpython
async def long_task():
    for i in range(5):
        print(f"任务进度: {i}/4")
        await asyncio.sleep(1)
    return "完成"

async def main():
    task = asyncio.create_task(long_task())
    
    # 等待一段时间后取消
    await asyncio.sleep(2)
    
    if not task.done():
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            print("任务已取消")

4.2 等待多个任务

python snippetpython
async def main():
    # 等待任意一个任务完成
    done, pending = await asyncio.wait(
        [asyncio.create_task(task1()),
         asyncio.create_task(task2())],
        return_when=asyncio.FIRST_COMPLETED
    )
    
    print(f"完成: {done}")
    print(f"等待: {pending}")

4.3 超时控制

python snippetpython
import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "完成"

async def main():
    try:
        # 设置超时
        result = await asyncio.wait_for(slow_task(), timeout=5)
    except asyncio.TimeoutError:
        print("任务超时")

asyncio.run(main())

06.5. 异步迭代器与生成器

5.1 异步生成器

python snippetpython
async def async_range(start, end, step=1):
    """异步生成器"""
    current = start
    while current < end:
        yield current
        await asyncio.sleep(0.1)  # 模拟异步操作
        current += step

async def main():
    async for num in async_range(0, 10):
        print(num)

asyncio.run(main())

5.2 异步列表推导式

python snippetpython
async def main():
    # 异步列表推导式
    results = [await task() for task in tasks]  # 顺序执行
    
    # 使用 gather 并发
    results = await asyncio.gather(*[task() for task in tasks])

07.6. 异步上下文管理器

python snippetpython
class AsyncContextManager:
    async def __aenter__(self):
        await asyncio.sleep(0.1)  # 异步初始化
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.1)  # 异步清理

async def main():
    async with AsyncContextManager() as cm:
        print("在上下文中")

asyncio.run(main())

08.7. 与同步代码混合

7.1 在同步函数中调用异步

python snippetpython
import asyncio

async def async_func():
    await asyncio.sleep(1)
    return "异步结果"

# 方法1:asyncio.run (推荐)
def sync_wrapper():
    result = asyncio.run(async_func())
    return result

# 方法2:在已有事件循环中运行
def sync_wrapper2():
    loop = asyncio.new_event_loop()
    try:
        result = loop.run_until_complete(async_func())
    finally:
        loop.close()
    return result

7.2 在异步函数中调用同步

python snippetpython
import asyncio
import time

def sync_function():
    time.sleep(1)  # 同步阻塞
    return "同步结果"

async def main():
    # 方法1:使用 run_in_executor
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sync_function)
    
    # 方法2:使用 to_thread (Python 3.9+)
    result = await asyncio.to_thread(sync_function)
    
    print(result)

asyncio.run(main())

09.8. 实战案例:并发 API 调用

python snippetpython
import aiohttp
import asyncio
from typing import List, Dict

async def fetch_user(session: aiohttp.ClientSession, user_id: int) -> Dict:
    """获取单个用户信息"""
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    async with session.get(url) as response:
        return await response.json()

async def fetch_all_users(user_ids: List[int]) -> List[Dict]:
    """并发获取所有用户"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user(session, uid) for uid in user_ids]
        return await asyncio.gather(*tasks)

async def main():
    user_ids = range(1, 11)  # 1-10
    
    import time
    start = time.time()
    
    users = await fetch_all_users(user_ids)
    
    print(f"获取 {len(users)} 个用户")
    print(f"耗时: {time.time() - start:.2f}秒")

asyncio.run(main())

10.总结

同步异步
time.sleep()await asyncio.sleep()
requests.get()await session.get()
open()async with aiofiles.open()
for item in items:async for item in items:
ThreadPoolExecutorasyncio.gather()

异步编程让 Python 可以高效处理 I/O 密集型任务,是构建高性能 AI 应用的必备技能。下一篇文章我们将介绍 Python 类型提示,让代码更健壮。