你想明确多进程和多线程各自的适用场景,核心是要结合任务类型、资源需求、数据共享等维度来判断——简单来说,IO密集型任务优先用多线程,CPU密集型任务必须用多进程,下面拆解具体场景和选型逻辑,附实战案例帮你落地。
一、多线程(threading):主打“IO密集型”场景
多线程的核心优势是“轻量、切换快、数据共享方便”,但受GIL限制无法利用多核,因此只适合“等待时间远大于计算时间”的任务(IO等待时线程释放GIL,其他线程可执行)。
1. 核心适用场景(附案例)
(1)网络相关任务(最典型)
- 爬虫(批量爬取网页、接口数据)、API调用、消息队列消费、网络监控;
- 案例:多线程爬取10个网页(IO等待占90%)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import threading
import time
import requests
def crawl(url):
print(f"线程{threading.current_thread().name}:请求{url}")
resp = requests.get(url) 网络IO等待(释放GIL)
print(f"{url}:状态码{resp.status_code}")
if __name__ == "__main__":
start = time.time()
urls = [f"https://httpbin.org/get?num={i}" for i in range(10)]
threads = [threading.Thread(target=crawl, args=(url,)) for url in urls]
[t.start() for t in threads]
[t.join() for t in threads]
print(f"总耗时:{time.time()-start:.2f}秒") 约2秒(并发),单线程约10秒
|
(2)文件/数据库IO任务
- 批量读写本地文件、数据库查询/插入(如MySQL/Redis操作)、日志采集;
- 特点:这类任务大部分时间在“等磁盘/数据库响应”,线程切换开销可忽略;
- 案例:多线程读取10个大文本文件
|
1
2
3
4
5
6
7
8
9
10
11
12
|
from concurrent.futures import ThreadPoolExecutor
import os
def read_file(filepath):
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
return f"{filepath}:字符数{len(content)}"
if __name__ == "__main__":
files = [f for f in os.listdir() if f.endswith(".txt")][:10]
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(read_file, files)
for res in results:
print(res)
|
(3)需频繁数据共享的轻量任务
- 实时统计(如统计程序运行状态、实时计数)、GUI程序后台任务(如界面刷新+数据加载);
- 特点:线程共享进程内存,无需复杂通信,加锁即可保证数据安全;
- 案例:多线程实时计数(共享变量)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import threading
import time
count = 0
lock = threading.Lock()
def add_count():
global count
for _ in range(10000):
with lock: 加锁避免数据竞争
count += 1
if __name__ == "__main__":
t1 = threading.Thread(target=add_count)
t2 = threading.Thread(target=add_count)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终计数:{count}") 输出20000(准确)
|
2. 多线程避坑场景
- ? 绝对不要用多线程处理CPU密集任务(如矩阵运算、数据加密):GIL会让多线程变成“伪并发”,效率甚至低于单线程;
- ? 不要创建过多线程(如上千个):线程切换开销累积,会导致程序卡顿。
二、多进程(multiprocessing):主打“CPU密集型”场景
多进程的核心优势是“突破GIL限制、利用多核CPU、进程隔离更稳定”,但启动/通信开销大,适合“计算时间占主导”的任务。
1. 核心适用场景(附案例)
(1)数据计算类任务(最典型)
- 矩阵运算、数值模拟、数据挖掘(如批量特征计算)、密码破解、AI模型训练(单机多进程);
- 案例:多进程计算10个大矩阵的乘法
|
1
2
3
4
5
6
7
8
9
10
11
|
from concurrent.futures import ProcessPoolExecutor
import time
import numpy as np
def matrix_calc(matrix):
return np.dot(matrix, matrix) 纯CPU计算
if __name__ == "__main__":
start = time.time()
matrices = [np.random.rand(1000, 1000) for _ in range(10)]
with ProcessPoolExecutor() as executor:
executor.map(matrix_calc, matrices)
print(f"总耗时:{time.time()-start:.2f}秒") 4核CPU约3秒,单进程约10秒
|
(2)批量独立任务处理
- 图片/视频处理(如批量压缩图片、视频转码)、文件格式转换(如CSV转Excel)、批量数据分析;
- 特点:任务间无依赖、可独立执行,进程隔离避免一个任务崩溃影响全局;
- 案例:多进程批量压缩图片
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import multiprocessing
from PIL import Image
import os
def compress_img(filepath):
img = Image.open(filepath)
img.save(f"compressed_{filepath}", quality=50) 压缩图片(CPU密集)
return f"{filepath}压缩完成"
if __name__ == "__main__":
imgs = [f for f in os.listdir() if f.endswith((".jpg", ".png"))][:5]
pool = multiprocessing.Pool(processes=4)
results = pool.map(compress_img, imgs)
pool.close()
pool.join()
for res in results:
print(res)
|
(3)高稳定性要求的任务
- 批量执行第三方脚本/工具、不稳定的任务(如调用易崩溃的接口);
- 特点:单个进程崩溃不会导致整个程序挂掉,容错性更高。
2. 多进程避坑场景
- ? 不要用多进程处理高频小任务(如每秒执行一次的定时检测):进程启动/通信开销会抵消并行优势;
- ? 不要用多进程处理需频繁数据共享的任务:进程间通信(Queue/Pipe)开销大,效率远低于线程。
三、选型速查表(新手直接套用)
| 任务特征 |
优先选择 |
核心原因 |
| CPU密集(计算为主) |
多进程 |
突破GIL,利用多核并行计算 |
| IO密集(等待为主) |
多线程 |
轻量并发,IO等待时释放GIL |
| 需频繁共享数据 |
多线程 |
共享内存,通信成本低 |
| 任务独立、无数据共享 |
多进程 |
进程隔离,稳定性高 |
| 任务数量多、启动频繁 |
多线程 |
线程启动快,切换开销低 |
| 单个任务易崩溃 |
多进程 |
进程隔离,不影响其他任务 |
| 单机最大化算力 |
多进程 |
充分利用多核CPU |
四、特殊场景:混合使用(线程+进程)
如果任务是“IO+CPU混合”(如爬取数据后立即计算),可结合二者优势:
示例:多进程(处理CPU计算)+ 多线程(处理IO爬取)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests
import numpy as np
线程:爬取数据(IO)
def crawl_data(url):
resp = requests.get(url)
return resp.json()["data"]
进程:处理数据(CPU)
def process_data(data_list):
arr = np.array(data_list)
return arr.mean() 数值计算
if __name__ == "__main__":
1. 多线程爬取数据
urls = [f"https://httpbin.org/get?data={i}" for i in range(10)]
with ThreadPoolExecutor(max_workers=5) as t_executor:
data = list(t_executor.map(crawl_data, urls))
2. 多进程处理数据
with ProcessPoolExecutor() as p_executor:
result = p_executor.submit(process_data, data).result()
print(f"数据均值:{result}")
|
|