python
主页 > 脚本 > python >

Python多进程与多线程适用场景案例

2026-01-08 | 佚名 | 点击:

你想明确多进程和多线程各自的适用场景,核心是要结合任务类型、资源需求、数据共享等维度来判断——简单来说,IO密集型任务优先用多线程,CPU密集型任务必须用多进程,下面拆解具体场景和选型逻辑,附实战案例帮你落地。

一、多线程(threading):主打“IO密集型”场景

多线程的核心优势是“轻量、切换快、数据共享方便”,但受GIL限制无法利用多核,因此只适合“等待时间远大于计算时间”的任务(IO等待时线程释放GIL,其他线程可执行)。

1. 核心适用场景(附案例)

(1)网络相关任务(最典型)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

import threading

import time

import requests

def crawl(url):

    print(f"线程{threading.current_thread().name}:请求{url}")

    resp = requests.get(url)   网络IO等待(释放GIL)

    print(f"{url}:状态码{resp.status_code}")

if __name__ == "__main__":

    start = time.time()

    urls = [f"https://httpbin.org/get?num={i}" for i in range(10)]

    threads = [threading.Thread(target=crawl, args=(url,)) for url in urls]

    [t.start() for t in threads]

    [t.join() for t in threads]

    print(f"总耗时:{time.time()-start:.2f}秒")   约2秒(并发),单线程约10秒

(2)文件/数据库IO任务

1

2

3

4

5

6

7

8

9

10

11

12

from concurrent.futures import ThreadPoolExecutor

import os

def read_file(filepath):

    with open(filepath, "r", encoding="utf-8") as f:

        content = f.read()

    return f"{filepath}:字符数{len(content)}"

if __name__ == "__main__":

    files = [f for f in os.listdir() if f.endswith(".txt")][:10]

    with ThreadPoolExecutor(max_workers=5) as executor:

        results = executor.map(read_file, files)

    for res in results:

        print(res)

(3)需频繁数据共享的轻量任务

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

import threading

import time

count = 0

lock = threading.Lock()

def add_count():

    global count

    for _ in range(10000):

        with lock:   加锁避免数据竞争

            count += 1

if __name__ == "__main__":

    t1 = threading.Thread(target=add_count)

    t2 = threading.Thread(target=add_count)

    t1.start()

    t2.start()

    t1.join()

    t2.join()

    print(f"最终计数:{count}")   输出20000(准确)

2. 多线程避坑场景

二、多进程(multiprocessing):主打“CPU密集型”场景

多进程的核心优势是“突破GIL限制、利用多核CPU、进程隔离更稳定”,但启动/通信开销大,适合“计算时间占主导”的任务。

1. 核心适用场景(附案例)

(1)数据计算类任务(最典型)

1

2

3

4

5

6

7

8

9

10

11

from concurrent.futures import ProcessPoolExecutor

import time

import numpy as np

def matrix_calc(matrix):

    return np.dot(matrix, matrix)   纯CPU计算

if __name__ == "__main__":

    start = time.time()

    matrices = [np.random.rand(1000, 1000) for _ in range(10)]

    with ProcessPoolExecutor() as executor:

        executor.map(matrix_calc, matrices)

    print(f"总耗时:{time.time()-start:.2f}秒")   4核CPU约3秒,单进程约10秒

(2)批量独立任务处理

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

import multiprocessing

from PIL import Image

import os

def compress_img(filepath):

    img = Image.open(filepath)

    img.save(f"compressed_{filepath}", quality=50)   压缩图片(CPU密集)

    return f"{filepath}压缩完成"

if __name__ == "__main__":

    imgs = [f for f in os.listdir() if f.endswith((".jpg", ".png"))][:5]

    pool = multiprocessing.Pool(processes=4)

    results = pool.map(compress_img, imgs)

    pool.close()

    pool.join()

    for res in results:

        print(res)

(3)高稳定性要求的任务

2. 多进程避坑场景

三、选型速查表(新手直接套用)

任务特征 优先选择 核心原因
CPU密集(计算为主) 多进程 突破GIL,利用多核并行计算
IO密集(等待为主) 多线程 轻量并发,IO等待时释放GIL
需频繁共享数据 多线程 共享内存,通信成本低
任务独立、无数据共享 多进程 进程隔离,稳定性高
任务数量多、启动频繁 多线程 线程启动快,切换开销低
单个任务易崩溃 多进程 进程隔离,不影响其他任务
单机最大化算力 多进程 充分利用多核CPU

四、特殊场景:混合使用(线程+进程)

如果任务是“IO+CPU混合”(如爬取数据后立即计算),可结合二者优势:

示例:多进程(处理CPU计算)+ 多线程(处理IO爬取)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

import requests

import numpy as np

 线程:爬取数据(IO)

def crawl_data(url):

    resp = requests.get(url)

    return resp.json()["data"]

 进程:处理数据(CPU)

def process_data(data_list):

    arr = np.array(data_list)

    return arr.mean()   数值计算

if __name__ == "__main__":

     1. 多线程爬取数据

    urls = [f"https://httpbin.org/get?data={i}" for i in range(10)]

    with ThreadPoolExecutor(max_workers=5) as t_executor:

        data = list(t_executor.map(crawl_data, urls))

     2. 多进程处理数据

    with ProcessPoolExecutor() as p_executor:

        result = p_executor.submit(process_data, data).result()

    print(f"数据均值:{result}")

原文链接:
相关文章
最新更新