Python 并发学习讲义:多线程与异步从入门到实操
目标读者:有 Python 基础,但几乎没有并发经验。
学习目标:读完就能写出稳定可用的多线程和异步程序,并知道怎么排错和优化。
目录
- 先建立并发思维:为什么要并发
- 进程、线程、协程的区别
- GIL 与 I/O/CPU 任务:怎么选方案
- threading 基础:从 0 到可用
- threading 进阶:同步与通信
- 线程池 concurrent.futures
- asyncio 核心:事件循环、协程、Task
- asyncio 常用能力:超时、取消、异常处理
- aiohttp 与异步文件操作
- 生产实践:限流、重试、背压、资源清理、日志
- 性能测试与基准方法
- 7 天学习计划
- 3 个实战项目(由易到难)
- 常见坑与排查清单
- 面试常问(含回答要点)
- 总结:你的并发路线图
1. 先建立并发思维:为什么要并发
概念
并发的核心是:把等待时间利用起来。比如网络请求、磁盘读写、数据库调用,大量时间都在“等”。
为什么
- 单线程顺序执行:等待期间 CPU 空转
- 并发执行:A 在等网络时,B 可以继续干活
怎么做
- I/O 密集:优先异步(
asyncio)或线程池 - CPU 密集:优先多进程(
multiprocessing)
2. 进程、线程、协程的区别
| 维度 | 进程 | 线程 | 协程 |
|---|---|---|---|
| 调度者 | 操作系统 | 操作系统 | 事件循环(用户态) |
| 内存 | 独立 | 共享进程内存 | 共享线程内存 |
| 创建成本 | 高 | 中 | 低 |
| 适用 | CPU 密集隔离 | I/O 兼容库多 | 高并发 I/O |
| 典型库 | multiprocessing | threading | asyncio |
一句话理解:
- 进程:隔离好,重。
- 线程:共享内存方便,需同步。
- 协程:轻量高并发,但要 async 生态支持。
3. GIL 与 I/O/CPU 任务:怎么选方案
概念
GIL(Global Interpreter Lock)让 CPython 同一时刻只有一个线程执行 Python 字节码。
为什么
- CPU 密集线程不一定加速(可能更慢)
- I/O 密集线程/协程通常明显加速
快速决策
- CPU 密集:
multiprocessing/ProcessPoolExecutor - I/O 密集:
asyncio(优先)或ThreadPoolExecutor - 混合任务:主流程 asyncio,阻塞段用
to_thread或进程池
4. threading 基础:从 0 到可用
最小示例:启动多个线程
import threading
import time
def worker(name, delay):
print(f"[{name}] start")
time.sleep(delay)
print(f"[{name}] done")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"T{i}", 1), daemon=False)
t.start()
threads.append(t)
for t in threads:
t.join() # 等待线程完成
print("all done")
重点:
start()真正启动join()等待结束daemon=True的线程随主线程退出,不适合关键任务
5. threading 进阶:同步与通信
5.1 Lock:保护共享资源
import threading
counter = 0
lock = threading.Lock()
def inc(n):
global counter
for _ in range(n):
with lock:
counter += 1
threads = [threading.Thread(target=inc, args=(100_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 期望 400000
5.2 Queue:线程安全通信
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
q.put(None) # 结束信号
def consumer():
while True:
item = q.get()
if item is None:
q.task_done()
break
print("consume", item)
time.sleep(0.2)
q.task_done()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
q.join()
print("queue done")
5.3 Event / Condition / Semaphore(知道用途)
Event:一个开关,广播通知Condition:复杂等待条件Semaphore:控制并发数量(比如最多 10 个线程)
6. 线程池 concurrent.futures
为什么用线程池
- 不用自己管理线程生命周期
- 控制并发数简单
- 有 Future,结果/异常可追踪
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch(url):
r = requests.get(url, timeout=5)
return url, r.status_code, len(r.text)
urls = ["https://example.com"] * 10
with ThreadPoolExecutor(max_workers=5) as ex:
futures = [ex.submit(fetch, u) for u in urls]
for fu in as_completed(futures):
try:
print(fu.result())
except Exception as e:
print("error:", e)
CPU 密集版本改成 ProcessPoolExecutor。
7. asyncio 核心:事件循环、协程、Task
7.1 协程与 await
import asyncio
async def work(i):
await asyncio.sleep(1)
return f"task-{i}"
async def main():
results = await asyncio.gather(*(work(i) for i in range(3)))
print(results)
asyncio.run(main())
7.2 Task:让协程并发执行
import asyncio
async def foo(name, sec):
await asyncio.sleep(sec)
return name
async def main():
t1 = asyncio.create_task(foo("A", 1))
t2 = asyncio.create_task(foo("B", 2))
print(await t1, await t2)
asyncio.run(main())
8. asyncio 常用能力:超时、取消、异常处理
超时控制
import asyncio
async def slow():
await asyncio.sleep(3)
return 42
async def main():
try:
val = await asyncio.wait_for(slow(), timeout=1)
print(val)
except asyncio.TimeoutError:
print("timeout")
asyncio.run(main())
取消任务
import asyncio
async def worker():
try:
while True:
await asyncio.sleep(0.5)
print("working...")
except asyncio.CancelledError:
print("cleanup before cancel")
raise
async def main():
t = asyncio.create_task(worker())
await asyncio.sleep(1.2)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("cancelled")
asyncio.run(main())
gather 异常策略
gather(..., return_exceptions=False):任一异常会抛出return_exceptions=True:收集异常继续返回
9. aiohttp 与异步文件操作
9.1 aiohttp 并发抓取
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url, timeout=10) as resp:
text = await resp.text()
return url, resp.status, len(text)
async def main():
urls = ["https://example.com"] * 20
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(timeout=timeout) as session:
results = await asyncio.gather(*(fetch(session, u) for u in urls), return_exceptions=True)
print(results[:3])
asyncio.run(main())
9.2 异步文件(aiofiles)
import asyncio
import aiofiles
async def write_file(path, content):
async with aiofiles.open(path, "w", encoding="utf-8") as f:
await f.write(content)
async def main():
await write_file("out.txt", "hello async file")
asyncio.run(main())
10. 生产实践:限流、重试、背压、资源清理、日志
10.1 限流(Semaphore)
import asyncio
import aiohttp
sem = asyncio.Semaphore(10) # 最多 10 并发
async def safe_fetch(session, url):
async with sem:
async with session.get(url) as r:
return await r.text()
10.2 重试(指数退避)
import asyncio
async def retry(coro_factory, retries=3, base=0.5):
for i in range(retries):
try:
return await coro_factory()
except Exception:
if i == retries - 1:
raise
await asyncio.sleep(base * (2 ** i))
10.3 背压
当生产速度 > 消费速度时,用有界队列限制内存膨胀:
q = asyncio.Queue(maxsize=1000)
10.4 资源清理
async with管理连接池/文件- 捕获
CancelledError做清理 - 程序退出时关闭 session / executor
10.5 日志与排错
- 打印 task id、请求 id、重试次数、耗时
- 统一结构化日志(json)更好检索
11. 性能测试与基准方法
测什么
- 吞吐(QPS)
- 平均耗时 / P95 / P99
- 错误率
怎么测
- 先做串行 baseline
- 再做并发版本
- 调整并发度(如 10/50/100)
- 找“拐点”(再加并发性能不升反降)
最小计时示例
import time
start = time.perf_counter()
# run workload
elapsed = time.perf_counter() - start
print(f"elapsed={elapsed:.3f}s")
12. 7 天学习计划
Day1:并发心智 + GIL
- 目标:能说清线程/协程/进程差别
- 练习:画一张“任务类型 -> 技术选型”图
Day2:threading 基础
- 练习:写 5 线程下载器(模拟 sleep)
Day3:threading 同步
- 练习:
Lock + Queue的生产者消费者
Day4:线程池
- 练习:
ThreadPoolExecutor批量请求 + 错误处理
Day5:asyncio 核心
- 练习:
create_task + gather + wait_for
Day6:aiohttp 实战
- 练习:异步爬取 + 限流 + 重试
Day7:整合与复盘
- 练习:完成一个小项目,写性能对比报告
13. 3 个实战项目(由易到难)
项目 1:并发网页状态检查器(简单)
- 输入 URL 列表
- 输出状态码、耗时、失败重试
- 技术:线程池 或 asyncio + aiohttp
项目 2:异步日志聚合器(中等)
- 多来源日志并发读取
- 异步写入汇总文件
- 技术:asyncio + queue + aiofiles
项目 3:高并发 API 拉取器(进阶)
- 需求:限流、重试、超时、熔断(可选)
- 输出:成功率、P95、错误明细
- 技术:asyncio + aiohttp + semaphore + metrics
14. 常见坑与排查清单
- 线程改了共享变量但结果不对
- 查:是否用
Lock
- 查:是否用
- async 函数没执行
- 查:有没有
await/asyncio.run
- 查:有没有
- 程序卡住不退出
- 查:是否有未消费队列、未关闭 session、未结束后台 task
- 并发越高越慢
- 查:是否打爆下游、网络瓶颈、CPU 打满、上下文切换过高
- 异常丢失
- 查:Task 是否被 await,
gather是否吞异常
- 查:Task 是否被 await,
15. 面试常问(含回答要点)
Q1:GIL 会让多线程失效吗?
A:对 CPU 密集,线程收益有限;对 I/O 密集,线程仍有效。
Q2:什么时候选 asyncio?
A:大量 I/O、高并发连接、可用 async 库时优先。
Q3:线程池和协程怎么选?
A:已有阻塞库优先线程池;全新 I/O 服务优先 asyncio。
Q4:如何保证并发程序稳定?
A:限流 + 超时 + 重试 + 取消 + 资源清理 + 监控日志。
Q5:如何做并发性能优化?
A:先基线、再逐级增并发、观察吞吐和延迟拐点、针对瓶颈优化。
16. 总结:你的并发路线图
学并发不是背 API,而是建立这条路径:
任务类型识别(I/O or CPU) → 选对并发模型 → 控制风险(限流/超时/重试) → 可观测(日志/指标) → 基准优化。
如果你按这份讲义完成 7 天计划,再做完 3 个项目,已经具备“能独立落地并发程序”的实操能力。
下一步建议:把你正在写的一个真实小脚本(比如批量请求、日志处理、文件下载)改造成并发版,并输出一份“改造前后性能对比”报告。这一步会让你从“会”变成“熟”。