你想明确多进程和多线程各自的适用场景,核心是要结合任务类型、资源需求、数据共享等维度来判断——简单来说,IO密集型任务优先用多线程,CPU密集型任务必须用多进程,下面拆解具体场景和选型逻辑,附实战案例帮你落地。
多线程的核心优势是“轻量、切换快、数据共享方便”,但受GIL限制无法利用多核,因此只适合“等待时间远大于计算时间”的任务(IO等待时线程释放GIL,其他线程可执行)。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import threading import time import requests def crawl(url): print(f"线程{threading.current_thread().name}:请求{url}") resp = requests.get(url) 网络IO等待(释放GIL) print(f"{url}:状态码{resp.status_code}") if __name__ == "__main__": start = time.time() urls = [f"https://httpbin.org/get?num={i}" for i in range(10)] threads = [threading.Thread(target=crawl, args=(url,)) for url in urls] [t.start() for t in threads] [t.join() for t in threads] print(f"总耗时:{time.time()-start:.2f}秒") 约2秒(并发),单线程约10秒 |
|
1 2 3 4 5 6 7 8 9 10 11 12 |
from concurrent.futures import ThreadPoolExecutor import os def read_file(filepath): with open(filepath, "r", encoding="utf-8") as f: content = f.read() return f"{filepath}:字符数{len(content)}" if __name__ == "__main__": files = [f for f in os.listdir() if f.endswith(".txt")][:10] with ThreadPoolExecutor(max_workers=5) as executor: results = executor.map(read_file, files) for res in results: print(res) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import threading import time count = 0 lock = threading.Lock() def add_count(): global count for _ in range(10000): with lock: 加锁避免数据竞争 count += 1 if __name__ == "__main__": t1 = threading.Thread(target=add_count) t2 = threading.Thread(target=add_count) t1.start() t2.start() t1.join() t2.join() print(f"最终计数:{count}") 输出20000(准确) |
多进程的核心优势是“突破GIL限制、利用多核CPU、进程隔离更稳定”,但启动/通信开销大,适合“计算时间占主导”的任务。
|
1 2 3 4 5 6 7 8 9 10 11 |
from concurrent.futures import ProcessPoolExecutor import time import numpy as np def matrix_calc(matrix): return np.dot(matrix, matrix) 纯CPU计算 if __name__ == "__main__": start = time.time() matrices = [np.random.rand(1000, 1000) for _ in range(10)] with ProcessPoolExecutor() as executor: executor.map(matrix_calc, matrices) print(f"总耗时:{time.time()-start:.2f}秒") 4核CPU约3秒,单进程约10秒 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import multiprocessing from PIL import Image import os def compress_img(filepath): img = Image.open(filepath) img.save(f"compressed_{filepath}", quality=50) 压缩图片(CPU密集) return f"{filepath}压缩完成" if __name__ == "__main__": imgs = [f for f in os.listdir() if f.endswith((".jpg", ".png"))][:5] pool = multiprocessing.Pool(processes=4) results = pool.map(compress_img, imgs) pool.close() pool.join() for res in results: print(res) |
| 任务特征 | 优先选择 | 核心原因 |
|---|---|---|
| CPU密集(计算为主) | 多进程 | 突破GIL,利用多核并行计算 |
| IO密集(等待为主) | 多线程 | 轻量并发,IO等待时释放GIL |
| 需频繁共享数据 | 多线程 | 共享内存,通信成本低 |
| 任务独立、无数据共享 | 多进程 | 进程隔离,稳定性高 |
| 任务数量多、启动频繁 | 多线程 | 线程启动快,切换开销低 |
| 单个任务易崩溃 | 多进程 | 进程隔离,不影响其他任务 |
| 单机最大化算力 | 多进程 | 充分利用多核CPU |
如果任务是“IO+CPU混合”(如爬取数据后立即计算),可结合二者优势:
示例:多进程(处理CPU计算)+ 多线程(处理IO爬取)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import requests import numpy as np 线程:爬取数据(IO) def crawl_data(url): resp = requests.get(url) return resp.json()["data"] 进程:处理数据(CPU) def process_data(data_list): arr = np.array(data_list) return arr.mean() 数值计算 if __name__ == "__main__": 1. 多线程爬取数据 urls = [f"https://httpbin.org/get?data={i}" for i in range(10)] with ThreadPoolExecutor(max_workers=5) as t_executor: data = list(t_executor.map(crawl_data, urls)) 2. 多进程处理数据 with ProcessPoolExecutor() as p_executor: result = p_executor.submit(process_data, data).result() print(f"数据均值:{result}") |