Python 并发学习讲义:多线程与异步从入门到实操

目标读者:有 Python 基础,但几乎没有并发经验。
学习目标:读完就能写出稳定可用的多线程和异步程序,并知道怎么排错和优化。


目录

  1. 先建立并发思维:为什么要并发
  2. 进程、线程、协程的区别
  3. GIL 与 I/O/CPU 任务:怎么选方案
  4. threading 基础:从 0 到可用
  5. threading 进阶:同步与通信
  6. 线程池 concurrent.futures
  7. asyncio 核心:事件循环、协程、Task
  8. asyncio 常用能力:超时、取消、异常处理
  9. aiohttp 与异步文件操作
  10. 生产实践:限流、重试、背压、资源清理、日志
  11. 性能测试与基准方法
  12. 7 天学习计划
  13. 3 个实战项目(由易到难)
  14. 常见坑与排查清单
  15. 面试常问(含回答要点)
  16. 总结:你的并发路线图

1. 先建立并发思维:为什么要并发

概念

并发的核心是:把等待时间利用起来。比如网络请求、磁盘读写、数据库调用,大量时间都在“等”。

为什么

  • 单线程顺序执行:等待期间 CPU 空转
  • 并发执行:A 在等网络时,B 可以继续干活

怎么做

  • I/O 密集:优先异步(asyncio)或线程池
  • CPU 密集:优先多进程(multiprocessing

2. 进程、线程、协程的区别

维度进程线程协程
调度者操作系统操作系统事件循环(用户态)
内存独立共享进程内存共享线程内存
创建成本
适用CPU 密集隔离I/O 兼容库多高并发 I/O
典型库multiprocessingthreadingasyncio

一句话理解:

  • 进程:隔离好,重。
  • 线程:共享内存方便,需同步。
  • 协程:轻量高并发,但要 async 生态支持。

3. GIL 与 I/O/CPU 任务:怎么选方案

概念

GIL(Global Interpreter Lock)让 CPython 同一时刻只有一个线程执行 Python 字节码。

为什么

  • CPU 密集线程不一定加速(可能更慢)
  • I/O 密集线程/协程通常明显加速

快速决策

  • CPU 密集:multiprocessing / ProcessPoolExecutor
  • I/O 密集:asyncio(优先)或 ThreadPoolExecutor
  • 混合任务:主流程 asyncio,阻塞段用 to_thread 或进程池

4. threading 基础:从 0 到可用

最小示例:启动多个线程

import threading
import time

def worker(name, delay):
    print(f"[{name}] start")
    time.sleep(delay)
    print(f"[{name}] done")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"T{i}", 1), daemon=False)
    t.start()
    threads.append(t)

for t in threads:
    t.join()  # 等待线程完成

print("all done")

重点:

  • start() 真正启动
  • join() 等待结束
  • daemon=True 的线程随主线程退出,不适合关键任务

5. threading 进阶:同步与通信

5.1 Lock:保护共享资源

import threading

counter = 0
lock = threading.Lock()

def inc(n):
    global counter
    for _ in range(n):
        with lock:
            counter += 1

threads = [threading.Thread(target=inc, args=(100_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()

print(counter)  # 期望 400000

5.2 Queue:线程安全通信

import threading
import queue
import time

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)
    q.put(None)  # 结束信号

def consumer():
    while True:
        item = q.get()
        if item is None:
            q.task_done()
            break
        print("consume", item)
        time.sleep(0.2)
        q.task_done()

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
q.join()
print("queue done")

5.3 Event / Condition / Semaphore(知道用途)

  • Event:一个开关,广播通知
  • Condition:复杂等待条件
  • Semaphore:控制并发数量(比如最多 10 个线程)

6. 线程池 concurrent.futures

为什么用线程池

  • 不用自己管理线程生命周期
  • 控制并发数简单
  • 有 Future,结果/异常可追踪
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def fetch(url):
    r = requests.get(url, timeout=5)
    return url, r.status_code, len(r.text)

urls = ["https://example.com"] * 10

with ThreadPoolExecutor(max_workers=5) as ex:
    futures = [ex.submit(fetch, u) for u in urls]
    for fu in as_completed(futures):
        try:
            print(fu.result())
        except Exception as e:
            print("error:", e)

CPU 密集版本改成 ProcessPoolExecutor


7. asyncio 核心:事件循环、协程、Task

7.1 协程与 await

import asyncio

async def work(i):
    await asyncio.sleep(1)
    return f"task-{i}"

async def main():
    results = await asyncio.gather(*(work(i) for i in range(3)))
    print(results)

asyncio.run(main())

7.2 Task:让协程并发执行

import asyncio

async def foo(name, sec):
    await asyncio.sleep(sec)
    return name

async def main():
    t1 = asyncio.create_task(foo("A", 1))
    t2 = asyncio.create_task(foo("B", 2))
    print(await t1, await t2)

asyncio.run(main())

8. asyncio 常用能力:超时、取消、异常处理

超时控制

import asyncio

async def slow():
    await asyncio.sleep(3)
    return 42

async def main():
    try:
        val = await asyncio.wait_for(slow(), timeout=1)
        print(val)
    except asyncio.TimeoutError:
        print("timeout")

asyncio.run(main())

取消任务

import asyncio

async def worker():
    try:
        while True:
            await asyncio.sleep(0.5)
            print("working...")
    except asyncio.CancelledError:
        print("cleanup before cancel")
        raise

async def main():
    t = asyncio.create_task(worker())
    await asyncio.sleep(1.2)
    t.cancel()
    try:
        await t
    except asyncio.CancelledError:
        print("cancelled")

asyncio.run(main())

gather 异常策略

  • gather(..., return_exceptions=False):任一异常会抛出
  • return_exceptions=True:收集异常继续返回

9. aiohttp 与异步文件操作

9.1 aiohttp 并发抓取

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url, timeout=10) as resp:
        text = await resp.text()
        return url, resp.status, len(text)

async def main():
    urls = ["https://example.com"] * 20
    timeout = aiohttp.ClientTimeout(total=15)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        results = await asyncio.gather(*(fetch(session, u) for u in urls), return_exceptions=True)
    print(results[:3])

asyncio.run(main())

9.2 异步文件(aiofiles)

import asyncio
import aiofiles

async def write_file(path, content):
    async with aiofiles.open(path, "w", encoding="utf-8") as f:
        await f.write(content)

async def main():
    await write_file("out.txt", "hello async file")

asyncio.run(main())

10. 生产实践:限流、重试、背压、资源清理、日志

10.1 限流(Semaphore)

import asyncio
import aiohttp

sem = asyncio.Semaphore(10)  # 最多 10 并发

async def safe_fetch(session, url):
    async with sem:
        async with session.get(url) as r:
            return await r.text()

10.2 重试(指数退避)

import asyncio

async def retry(coro_factory, retries=3, base=0.5):
    for i in range(retries):
        try:
            return await coro_factory()
        except Exception:
            if i == retries - 1:
                raise
            await asyncio.sleep(base * (2 ** i))

10.3 背压

当生产速度 > 消费速度时,用有界队列限制内存膨胀:

q = asyncio.Queue(maxsize=1000)

10.4 资源清理

  • async with 管理连接池/文件
  • 捕获 CancelledError 做清理
  • 程序退出时关闭 session / executor

10.5 日志与排错

  • 打印 task id、请求 id、重试次数、耗时
  • 统一结构化日志(json)更好检索

11. 性能测试与基准方法

测什么

  • 吞吐(QPS)
  • 平均耗时 / P95 / P99
  • 错误率

怎么测

  1. 先做串行 baseline
  2. 再做并发版本
  3. 调整并发度(如 10/50/100)
  4. 找“拐点”(再加并发性能不升反降)

最小计时示例

import time

start = time.perf_counter()
# run workload
elapsed = time.perf_counter() - start
print(f"elapsed={elapsed:.3f}s")

12. 7 天学习计划

Day1:并发心智 + GIL

  • 目标:能说清线程/协程/进程差别
  • 练习:画一张“任务类型 -> 技术选型”图

Day2:threading 基础

  • 练习:写 5 线程下载器(模拟 sleep)

Day3:threading 同步

  • 练习:Lock + Queue 的生产者消费者

Day4:线程池

  • 练习:ThreadPoolExecutor 批量请求 + 错误处理

Day5:asyncio 核心

  • 练习:create_task + gather + wait_for

Day6:aiohttp 实战

  • 练习:异步爬取 + 限流 + 重试

Day7:整合与复盘

  • 练习:完成一个小项目,写性能对比报告

13. 3 个实战项目(由易到难)

项目 1:并发网页状态检查器(简单)

  • 输入 URL 列表
  • 输出状态码、耗时、失败重试
  • 技术:线程池 或 asyncio + aiohttp

项目 2:异步日志聚合器(中等)

  • 多来源日志并发读取
  • 异步写入汇总文件
  • 技术:asyncio + queue + aiofiles

项目 3:高并发 API 拉取器(进阶)

  • 需求:限流、重试、超时、熔断(可选)
  • 输出:成功率、P95、错误明细
  • 技术:asyncio + aiohttp + semaphore + metrics

14. 常见坑与排查清单

  1. 线程改了共享变量但结果不对
    • 查:是否用 Lock
  2. async 函数没执行
    • 查:有没有 await / asyncio.run
  3. 程序卡住不退出
    • 查:是否有未消费队列、未关闭 session、未结束后台 task
  4. 并发越高越慢
    • 查:是否打爆下游、网络瓶颈、CPU 打满、上下文切换过高
  5. 异常丢失
    • 查:Task 是否被 await,gather 是否吞异常

15. 面试常问(含回答要点)

Q1:GIL 会让多线程失效吗?

A:对 CPU 密集,线程收益有限;对 I/O 密集,线程仍有效。

Q2:什么时候选 asyncio?

A:大量 I/O、高并发连接、可用 async 库时优先。

Q3:线程池和协程怎么选?

A:已有阻塞库优先线程池;全新 I/O 服务优先 asyncio。

Q4:如何保证并发程序稳定?

A:限流 + 超时 + 重试 + 取消 + 资源清理 + 监控日志。

Q5:如何做并发性能优化?

A:先基线、再逐级增并发、观察吞吐和延迟拐点、针对瓶颈优化。


16. 总结:你的并发路线图

学并发不是背 API,而是建立这条路径:

任务类型识别(I/O or CPU) → 选对并发模型 → 控制风险(限流/超时/重试) → 可观测(日志/指标) → 基准优化。

如果你按这份讲义完成 7 天计划,再做完 3 个项目,已经具备“能独立落地并发程序”的实操能力。

下一步建议:把你正在写的一个真实小脚本(比如批量请求、日志处理、文件下载)改造成并发版,并输出一份“改造前后性能对比”报告。这一步会让你从“会”变成“熟”。