Python 基础Technical Deep Dive
Python 异步编程:asyncio 详解
发布时间2025/11/12
分类Python 基础
预计阅读9 分钟
作者吴长龙
*
异步编程是现代 Python 的核心技能。本文详细介绍 asyncio、async/await、并发任务管理,以及在实际项目中的应用。
01.内容
# Python 异步编程:asyncio 详解
异步编程是现代 Python 开发的核心技能,尤其是在处理 I/O 密集型任务(如 API 调用、文件操作、网络请求)时。
02.1. 异步编程基础
1.1 什么是异步编程
python snippetpython
# 同步代码:一步步执行
import time
def sync_task():
print("开始任务1")
time.sleep(1) # 阻塞等待
print("任务1完成")
print("开始任务2")
time.sleep(1) # 阻塞等待
print("任务2完成")
# 执行时间:2秒
sync_task()python snippetpython
# 异步代码:并发执行
import asyncio
async def async_task(name, delay):
print(f"开始{name}")
await asyncio.sleep(delay) # 非阻塞,等待其他任务
print(f"{name}完成")
async def main():
# 并发执行两个任务
await asyncio.gather(
async_task("任务1", 1),
async_task("任务2", 1)
)
# 执行时间:1秒
asyncio.run(main())1.2 async/await 语法
python snippetpython
# 定义异步函数
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(1) # 等待异步操作完成
return {"data": "hello"}
# 调用异步函数
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())03.2. 核心概念
2.1 await 关键字
python snippetpython
import asyncio
async def task1():
await asyncio.sleep(1)
return "结果1"
async def task2():
await asyncio.sleep(0.5)
return "结果2"
async def main():
# 顺序执行
r1 = await task1() # 等1秒
r2 = await task2() # 再等0.5秒
# 并发执行
r1, r2 = await asyncio.gather(task1(), task2()) # 只需1秒2.2 asyncio.gather 并发执行
python snippetpython
async def main():
# 多个任务并发执行
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
for result in results:
print(result)2.3 asyncio.create_task 任务调度
python snippetpython
async def main():
# 创建任务(不立即执行)
task1 = asyncio.create_task(fetch_data())
task2 = asyncio.create_task(process_data())
# 执行其他操作
print("准备就绪")
# 等待任务完成
result1 = await task1
result2 = await task204.3. 实际应用
3.1 异步 HTTP 请求
python snippetpython
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/torvalds",
"https://api.github.com/users/",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
responses = await asyncio.gather(*tasks)
for url, response in zip(urls, responses):
print(f"{url}: {len(response)} bytes")
asyncio.run(main())3.2 异步数据库操作
python snippetpython
import asyncio
import aiosqlite
async def query_user(db, user_id):
async with db.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
) as cursor:
return await cursor.fetchone()
async def main():
async with aiosqlite.connect("app.db") as db:
# 并发查询多个用户
users = await asyncio.gather(
query_user(db, 1),
query_user(db, 2),
query_user(db, 3),
)
print(users)
asyncio.run(main())3.3 异步文件操作
python snippetpython
import aiofiles
import asyncio
async def read_file(filename):
async with aiofiles.open(filename, mode='r') as file:
return await file.read()
async def write_file(filename, content):
async with aiofiles.open(filename, mode='w') as file:
await file.write(content)
async def main():
# 并发读写
await asyncio.gather(
write_file("file1.txt", "Hello"),
write_file("file2.txt", "World"),
)
content = await read_file("file1.txt")
print(content)
asyncio.run(main())05.4. 任务管理
4.1 Task 控制
python snippetpython
async def long_task():
for i in range(5):
print(f"任务进度: {i}/4")
await asyncio.sleep(1)
return "完成"
async def main():
task = asyncio.create_task(long_task())
# 等待一段时间后取消
await asyncio.sleep(2)
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已取消")4.2 等待多个任务
python snippetpython
async def main():
# 等待任意一个任务完成
done, pending = await asyncio.wait(
[asyncio.create_task(task1()),
asyncio.create_task(task2())],
return_when=asyncio.FIRST_COMPLETED
)
print(f"完成: {done}")
print(f"等待: {pending}")4.3 超时控制
python snippetpython
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# 设置超时
result = await asyncio.wait_for(slow_task(), timeout=5)
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())06.5. 异步迭代器与生成器
5.1 异步生成器
python snippetpython
async def async_range(start, end, step=1):
"""异步生成器"""
current = start
while current < end:
yield current
await asyncio.sleep(0.1) # 模拟异步操作
current += step
async def main():
async for num in async_range(0, 10):
print(num)
asyncio.run(main())5.2 异步列表推导式
python snippetpython
async def main():
# 异步列表推导式
results = [await task() for task in tasks] # 顺序执行
# 使用 gather 并发
results = await asyncio.gather(*[task() for task in tasks])07.6. 异步上下文管理器
python snippetpython
class AsyncContextManager:
async def __aenter__(self):
await asyncio.sleep(0.1) # 异步初始化
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.1) # 异步清理
async def main():
async with AsyncContextManager() as cm:
print("在上下文中")
asyncio.run(main())08.7. 与同步代码混合
7.1 在同步函数中调用异步
python snippetpython
import asyncio
async def async_func():
await asyncio.sleep(1)
return "异步结果"
# 方法1:asyncio.run (推荐)
def sync_wrapper():
result = asyncio.run(async_func())
return result
# 方法2:在已有事件循环中运行
def sync_wrapper2():
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(async_func())
finally:
loop.close()
return result7.2 在异步函数中调用同步
python snippetpython
import asyncio
import time
def sync_function():
time.sleep(1) # 同步阻塞
return "同步结果"
async def main():
# 方法1:使用 run_in_executor
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, sync_function)
# 方法2:使用 to_thread (Python 3.9+)
result = await asyncio.to_thread(sync_function)
print(result)
asyncio.run(main())09.8. 实战案例:并发 API 调用
python snippetpython
import aiohttp
import asyncio
from typing import List, Dict
async def fetch_user(session: aiohttp.ClientSession, user_id: int) -> Dict:
"""获取单个用户信息"""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
async with session.get(url) as response:
return await response.json()
async def fetch_all_users(user_ids: List[int]) -> List[Dict]:
"""并发获取所有用户"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_user(session, uid) for uid in user_ids]
return await asyncio.gather(*tasks)
async def main():
user_ids = range(1, 11) # 1-10
import time
start = time.time()
users = await fetch_all_users(user_ids)
print(f"获取 {len(users)} 个用户")
print(f"耗时: {time.time() - start:.2f}秒")
asyncio.run(main())10.总结
| 同步 | 异步 |
|---|---|
time.sleep() | await asyncio.sleep() |
requests.get() | await session.get() |
open() | async with aiofiles.open() |
for item in items: | async for item in items: |
ThreadPoolExecutor | asyncio.gather() |
异步编程让 Python 可以高效处理 I/O 密集型任务,是构建高性能 AI 应用的必备技能。下一篇文章我们将介绍 Python 类型提示,让代码更健壮。