Python 并发实战讲义:多线程与异步从入门到上手

目标读者:有 Python 基础,但刚开始接触并发。

学完这份讲义后,你应该能:

  1. 判断任务该用线程还是异步;
  2. 写出可运行的线程/asyncio 程序;
  3. 处理超时、取消、重试、限流与排错;
  4. 独立完成小型并发项目。

目录

  1. 并发到底解决什么问题
  2. 进程、线程、协程:区别与选型
  3. GIL 与 CPU/I/O 任务的关系
  4. threading 基础与进阶
  5. concurrent.futures 线程池实战
  6. asyncio 核心模型(事件循环、Task、await)
  7. aiohttp 与异步 I/O(网络/文件)
  8. 生产实践:限流、重试、背压、资源清理、日志
  9. 性能测试与基准对比
  10. 7 天学习计划
  11. 3 个实战项目(由易到难)
  12. 常见坑与排查清单
  13. 面试高频问答

1) 并发到底解决什么问题

概念

并发不是“让代码更高级”,而是提升等待期利用率

为什么

很多程序慢,不是 CPU 算不动,而是在等:

  • 等网络响应
  • 等磁盘 I/O
  • 等数据库

怎么做

  • I/O 密集:优先线程或异步
  • CPU 密集:优先多进程(或 C 扩展)

最小示例:串行 vs 并发(I/O 模拟)

import time
from concurrent.futures import ThreadPoolExecutor


def io_task(i):
    time.sleep(1)
    return i

# 串行
start = time.time()
for i in range(5):
    io_task(i)
print("serial:", round(time.time() - start, 2), "s")

# 并发线程
start = time.time()
with ThreadPoolExecutor(max_workers=5) as ex:
    list(ex.map(io_task, range(5)))
print("threaded:", round(time.time() - start, 2), "s")

2) 进程、线程、协程:区别与选型

维度进程线程协程(asyncio)
内存隔离强(独立)弱(共享)同线程内
切换成本
适合CPU 密集I/O 密集高并发 I/O
编程复杂度中-高

选型口诀

  • CPU 打满:multiprocessing
  • 少量 I/O + 生态简单:线程池
  • 大量网络 I/O:asyncio + 异步客户端

3) GIL 与 CPU/I/O 任务

概念

CPython 有 GIL(全局解释器锁),同一时刻一个解释器进程里通常只有一个线程执行 Python 字节码。

为什么

  • I/O 任务会释放 GIL,所以线程对 I/O 常有效。
  • CPU 纯计算不释放 GIL,多线程提速不明显。

演示:CPU 密集线程通常不快

import time
from concurrent.futures import ThreadPoolExecutor


def cpu_task(n=20_000_00):
    s = 0
    for i in range(n):
        s += i * i
    return s

start = time.time()
[cpu_task() for _ in range(4)]
print("cpu serial:", round(time.time()-start, 2), "s")

start = time.time()
with ThreadPoolExecutor(max_workers=4) as ex:
    list(ex.map(lambda _: cpu_task(), range(4)))
print("cpu threaded:", round(time.time()-start, 2), "s")

4) threading 基础与进阶

4.1 创建线程与 join

import threading
import time


def worker(name):
    for i in range(3):
        print(name, i)
        time.sleep(0.3)


t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))

t1.start(); t2.start()
t1.join(); t2.join()
print("done")

4.2 共享数据与 Lock

import threading

counter = 0
lock = threading.Lock()


def add():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = [threading.Thread(target=add) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # 预期 400000

4.3 生产者-消费者(queue.Queue

import queue
import threading
import time

q = queue.Queue(maxsize=10)


def producer():
    for i in range(20):
        q.put(i)
    q.put(None)  # 结束信号


def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print("consume", item)
        time.sleep(0.1)

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

5) concurrent.futures 线程池实战

为什么

比手动管理线程更易用,适合批量任务。

submit + as_completed

from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def task(i):
    time.sleep(0.2)
    if i == 3:
        raise ValueError("boom")
    return i * 10

with ThreadPoolExecutor(max_workers=4) as ex:
    futures = [ex.submit(task, i) for i in range(6)]
    for f in as_completed(futures):
        try:
            print("result:", f.result())
        except Exception as e:
            print("error:", e)

6) asyncio 核心:事件循环、协程、Task

6.1 最小协程

import asyncio


async def hello():
    await asyncio.sleep(1)
    return "hi"


async def main():
    x = await hello()
    print(x)

asyncio.run(main())

6.2 并发执行:create_task + gather

import asyncio


async def work(i):
    await asyncio.sleep(1)
    return i


async def main():
    tasks = [asyncio.create_task(work(i)) for i in range(5)]
    res = await asyncio.gather(*tasks)
    print(res)

asyncio.run(main())

6.3 超时与取消

import asyncio


async def slow():
    await asyncio.sleep(5)


async def main():
    task = asyncio.create_task(slow())
    try:
        await asyncio.wait_for(task, timeout=1)
    except asyncio.TimeoutError:
        task.cancel()
        print("timeout & cancelled")

asyncio.run(main())

6.4 异常处理

import asyncio


async def bad():
    await asyncio.sleep(0.1)
    raise RuntimeError("x")


async def main():
    tasks = [bad(), bad()]
    res = await asyncio.gather(*tasks, return_exceptions=True)
    print(res)  # [RuntimeError('x'), RuntimeError('x')]

asyncio.run(main())

7) aiohttp 与异步 I/O

7.1 并发抓取网页

import asyncio
import aiohttp


URLS = [
    "https://example.com",
    "https://httpbin.org/get",
    "https://httpbin.org/uuid",
]


async def fetch(session, url):
    async with session.get(url, timeout=10) as r:
        return url, r.status, await r.text()


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, u) for u in URLS]
        for url, status, text in await asyncio.gather(*tasks):
            print(url, status, len(text))

asyncio.run(main())

7.2 异步文件 I/O(aiofiles

import asyncio
import aiofiles


async def main():
    async with aiofiles.open("demo.txt", "w", encoding="utf-8") as f:
        await f.write("hello async file\n")

    async with aiofiles.open("demo.txt", "r", encoding="utf-8") as f:
        print(await f.read())

asyncio.run(main())

8) 生产实践(最容易踩坑的地方)

8.1 限流(Semaphore)

import asyncio

sem = asyncio.Semaphore(5)  # 同时最多 5 个请求


async def bounded_call(i):
    async with sem:
        await asyncio.sleep(0.2)
        return i

8.2 重试(指数退避)

import asyncio


async def with_retry(coro_factory, retries=3, base=0.2):
    for i in range(retries):
        try:
            return await coro_factory()
        except Exception:
            if i == retries - 1:
                raise
            await asyncio.sleep(base * (2 ** i))

8.3 背压

  • 线程:用 queue.Queue(maxsize=N) 控制生产速度。
  • 异步:用 asyncio.Queue(maxsize=N),消费者跟不上时生产者会被阻塞。

8.4 资源清理

  • 线程池、连接池必须 with / async with
  • 遇到取消要 finally 里释放资源。

8.5 日志与排错

  • 给每个任务带 request_id。
  • 记录:开始时间、结束时间、耗时、异常类型、重试次数。

9) 性能测试与基准对比

建议三组对比:

  1. 串行
  2. 线程池
  3. asyncio

指标:

  • 总耗时(wall time)
  • 吞吐(requests/s)
  • 失败率
  • p95/p99 延迟

工具可用:

  • 简单版:time.perf_counter()
  • 进阶:pytest-benchmark

10) 7 天学习计划(可执行)

Day 1

  • 并发基础、GIL、任务类型判断
  • 练习:串行 vs 线程 I/O 对比

Day 2

  • threadingLockQueue
  • 练习:多线程下载器(模拟)

Day 3

  • ThreadPoolExecutor
  • 练习:批量请求 + 异常聚合

Day 4

  • asyncio 核心:协程/Task/gather
  • 练习:并发 sleep 与超时控制

Day 5

  • aiohttp + aiofiles
  • 练习:异步爬取 + 本地落盘

Day 6

  • 限流、重试、取消、日志
  • 练习:加重试与退避

Day 7

  • 做一个完整项目 + 写复盘

11) 3 个实战项目(由易到难)

项目 1:并发 URL 状态检查器(入门)

  • 输入 1000 个 URL
  • 输出:状态码、耗时、失败原因
  • 要求:线程池版本 + asyncio 版本各做一份

项目 2:异步日志聚合器(中阶)

  • 多个数据源并发采集
  • 入队、消费、写入数据库/文件
  • 要求:限流 + 重试 + 背压

项目 3:迷你任务调度器(进阶)

  • 支持任务超时、取消、优先级
  • 输出任务运行报表
  • 要求:可观测性(日志/统计)

12) 常见坑与排查清单

  1. 把 CPU 密集任务放在线程里

    • 现象:速度没提升
    • 处理:改多进程
  2. 忘记 await

    • 现象:coroutine was never awaited
    • 处理:检查每个协程调用点
  3. 连接没关

    • 现象:句柄泄漏、进程变慢
    • 处理:统一 with/async with
  4. 无限并发导致对方限流

    • 处理:Semaphore + Retry + Jitter
  5. 取消后资源没释放

    • 处理:try/finally 做清理

13) 面试高频问答(速记)

Q1:线程和协程本质区别?

线程是系统调度单位;协程是用户态调度,更轻量。

Q2:GIL 会让多线程失效吗?

对 CPU 密集通常是;对 I/O 密集不一定,线程依然有效。

Q3:什么时候用 asyncio?

高并发 I/O、网络请求量大、追求连接效率时。

Q4:如何处理异步超时和取消?

wait_for + cancel + finally 清理。

Q5:如何防止把下游服务打挂?

限流(Semaphore)、重试(指数退避)、背压(有界队列)。


结语:并发不是“快”,而是“稳且可控”

真正的并发能力不是会写 async,而是能在真实环境里做到:

  • 任务不丢
  • 失败可重试
  • 服务不被打挂
  • 出问题能快速定位

如果你按这份讲义完成 7 天计划 + 3 个项目,已经足够独立上手大多数 Python 并发开发任务。