Python 并发实战讲义:多线程与异步从入门到上手
目标读者:有 Python 基础,但刚开始接触并发。
学完这份讲义后,你应该能:
- 判断任务该用线程还是异步;
- 写出可运行的线程/asyncio 程序;
- 处理超时、取消、重试、限流与排错;
- 独立完成小型并发项目。
目录
- 并发到底解决什么问题
- 进程、线程、协程:区别与选型
- GIL 与 CPU/I/O 任务的关系
threading基础与进阶concurrent.futures线程池实战asyncio核心模型(事件循环、Task、await)aiohttp与异步 I/O(网络/文件)- 生产实践:限流、重试、背压、资源清理、日志
- 性能测试与基准对比
- 7 天学习计划
- 3 个实战项目(由易到难)
- 常见坑与排查清单
- 面试高频问答
1) 并发到底解决什么问题
概念
并发不是“让代码更高级”,而是提升等待期利用率。
为什么
很多程序慢,不是 CPU 算不动,而是在等:
- 等网络响应
- 等磁盘 I/O
- 等数据库
怎么做
- I/O 密集:优先线程或异步
- CPU 密集:优先多进程(或 C 扩展)
最小示例:串行 vs 并发(I/O 模拟)
import time
from concurrent.futures import ThreadPoolExecutor
def io_task(i):
time.sleep(1)
return i
# 串行
start = time.time()
for i in range(5):
io_task(i)
print("serial:", round(time.time() - start, 2), "s")
# 并发线程
start = time.time()
with ThreadPoolExecutor(max_workers=5) as ex:
list(ex.map(io_task, range(5)))
print("threaded:", round(time.time() - start, 2), "s")
2) 进程、线程、协程:区别与选型
| 维度 | 进程 | 线程 | 协程(asyncio) |
|---|---|---|---|
| 内存隔离 | 强(独立) | 弱(共享) | 同线程内 |
| 切换成本 | 高 | 中 | 低 |
| 适合 | CPU 密集 | I/O 密集 | 高并发 I/O |
| 编程复杂度 | 中 | 中 | 中-高 |
选型口诀:
- CPU 打满:
multiprocessing - 少量 I/O + 生态简单:线程池
- 大量网络 I/O:
asyncio+ 异步客户端
3) GIL 与 CPU/I/O 任务
概念
CPython 有 GIL(全局解释器锁),同一时刻一个解释器进程里通常只有一个线程执行 Python 字节码。
为什么
- I/O 任务会释放 GIL,所以线程对 I/O 常有效。
- CPU 纯计算不释放 GIL,多线程提速不明显。
演示:CPU 密集线程通常不快
import time
from concurrent.futures import ThreadPoolExecutor
def cpu_task(n=20_000_00):
s = 0
for i in range(n):
s += i * i
return s
start = time.time()
[cpu_task() for _ in range(4)]
print("cpu serial:", round(time.time()-start, 2), "s")
start = time.time()
with ThreadPoolExecutor(max_workers=4) as ex:
list(ex.map(lambda _: cpu_task(), range(4)))
print("cpu threaded:", round(time.time()-start, 2), "s")
4) threading 基础与进阶
4.1 创建线程与 join
import threading
import time
def worker(name):
for i in range(3):
print(name, i)
time.sleep(0.3)
t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))
t1.start(); t2.start()
t1.join(); t2.join()
print("done")
4.2 共享数据与 Lock
import threading
counter = 0
lock = threading.Lock()
def add():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = [threading.Thread(target=add) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 预期 400000
4.3 生产者-消费者(queue.Queue)
import queue
import threading
import time
q = queue.Queue(maxsize=10)
def producer():
for i in range(20):
q.put(i)
q.put(None) # 结束信号
def consumer():
while True:
item = q.get()
if item is None:
break
print("consume", item)
time.sleep(0.1)
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
5) concurrent.futures 线程池实战
为什么
比手动管理线程更易用,适合批量任务。
submit + as_completed
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(i):
time.sleep(0.2)
if i == 3:
raise ValueError("boom")
return i * 10
with ThreadPoolExecutor(max_workers=4) as ex:
futures = [ex.submit(task, i) for i in range(6)]
for f in as_completed(futures):
try:
print("result:", f.result())
except Exception as e:
print("error:", e)
6) asyncio 核心:事件循环、协程、Task
6.1 最小协程
import asyncio
async def hello():
await asyncio.sleep(1)
return "hi"
async def main():
x = await hello()
print(x)
asyncio.run(main())
6.2 并发执行:create_task + gather
import asyncio
async def work(i):
await asyncio.sleep(1)
return i
async def main():
tasks = [asyncio.create_task(work(i)) for i in range(5)]
res = await asyncio.gather(*tasks)
print(res)
asyncio.run(main())
6.3 超时与取消
import asyncio
async def slow():
await asyncio.sleep(5)
async def main():
task = asyncio.create_task(slow())
try:
await asyncio.wait_for(task, timeout=1)
except asyncio.TimeoutError:
task.cancel()
print("timeout & cancelled")
asyncio.run(main())
6.4 异常处理
import asyncio
async def bad():
await asyncio.sleep(0.1)
raise RuntimeError("x")
async def main():
tasks = [bad(), bad()]
res = await asyncio.gather(*tasks, return_exceptions=True)
print(res) # [RuntimeError('x'), RuntimeError('x')]
asyncio.run(main())
7) aiohttp 与异步 I/O
7.1 并发抓取网页
import asyncio
import aiohttp
URLS = [
"https://example.com",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
]
async def fetch(session, url):
async with session.get(url, timeout=10) as r:
return url, r.status, await r.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, u) for u in URLS]
for url, status, text in await asyncio.gather(*tasks):
print(url, status, len(text))
asyncio.run(main())
7.2 异步文件 I/O(aiofiles)
import asyncio
import aiofiles
async def main():
async with aiofiles.open("demo.txt", "w", encoding="utf-8") as f:
await f.write("hello async file\n")
async with aiofiles.open("demo.txt", "r", encoding="utf-8") as f:
print(await f.read())
asyncio.run(main())
8) 生产实践(最容易踩坑的地方)
8.1 限流(Semaphore)
import asyncio
sem = asyncio.Semaphore(5) # 同时最多 5 个请求
async def bounded_call(i):
async with sem:
await asyncio.sleep(0.2)
return i
8.2 重试(指数退避)
import asyncio
async def with_retry(coro_factory, retries=3, base=0.2):
for i in range(retries):
try:
return await coro_factory()
except Exception:
if i == retries - 1:
raise
await asyncio.sleep(base * (2 ** i))
8.3 背压
- 线程:用
queue.Queue(maxsize=N)控制生产速度。 - 异步:用
asyncio.Queue(maxsize=N),消费者跟不上时生产者会被阻塞。
8.4 资源清理
- 线程池、连接池必须
with/async with。 - 遇到取消要
finally里释放资源。
8.5 日志与排错
- 给每个任务带 request_id。
- 记录:开始时间、结束时间、耗时、异常类型、重试次数。
9) 性能测试与基准对比
建议三组对比:
- 串行
- 线程池
- asyncio
指标:
- 总耗时(wall time)
- 吞吐(requests/s)
- 失败率
- p95/p99 延迟
工具可用:
- 简单版:
time.perf_counter() - 进阶:
pytest-benchmark
10) 7 天学习计划(可执行)
Day 1
- 并发基础、GIL、任务类型判断
- 练习:串行 vs 线程 I/O 对比
Day 2
threading、Lock、Queue- 练习:多线程下载器(模拟)
Day 3
ThreadPoolExecutor- 练习:批量请求 + 异常聚合
Day 4
asyncio核心:协程/Task/gather- 练习:并发 sleep 与超时控制
Day 5
aiohttp+aiofiles- 练习:异步爬取 + 本地落盘
Day 6
- 限流、重试、取消、日志
- 练习:加重试与退避
Day 7
- 做一个完整项目 + 写复盘
11) 3 个实战项目(由易到难)
项目 1:并发 URL 状态检查器(入门)
- 输入 1000 个 URL
- 输出:状态码、耗时、失败原因
- 要求:线程池版本 + asyncio 版本各做一份
项目 2:异步日志聚合器(中阶)
- 多个数据源并发采集
- 入队、消费、写入数据库/文件
- 要求:限流 + 重试 + 背压
项目 3:迷你任务调度器(进阶)
- 支持任务超时、取消、优先级
- 输出任务运行报表
- 要求:可观测性(日志/统计)
12) 常见坑与排查清单
-
把 CPU 密集任务放在线程里
- 现象:速度没提升
- 处理:改多进程
-
忘记 await
- 现象:
coroutine was never awaited - 处理:检查每个协程调用点
- 现象:
-
连接没关
- 现象:句柄泄漏、进程变慢
- 处理:统一
with/async with
-
无限并发导致对方限流
- 处理:Semaphore + Retry + Jitter
-
取消后资源没释放
- 处理:
try/finally做清理
- 处理:
13) 面试高频问答(速记)
Q1:线程和协程本质区别?
线程是系统调度单位;协程是用户态调度,更轻量。
Q2:GIL 会让多线程失效吗?
对 CPU 密集通常是;对 I/O 密集不一定,线程依然有效。
Q3:什么时候用 asyncio?
高并发 I/O、网络请求量大、追求连接效率时。
Q4:如何处理异步超时和取消?
wait_for + cancel + finally 清理。
Q5:如何防止把下游服务打挂?
限流(Semaphore)、重试(指数退避)、背压(有界队列)。
结语:并发不是“快”,而是“稳且可控”
真正的并发能力不是会写 async,而是能在真实环境里做到:
- 任务不丢
- 失败可重试
- 服务不被打挂
- 出问题能快速定位
如果你按这份讲义完成 7 天计划 + 3 个项目,已经足够独立上手大多数 Python 并发开发任务。