| 
                            
                                  1.概述
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | """ 基础知识: 1.多任务:操作系统可以同时运行多个任务; 2.单核CPU执行多任务:操作系统轮流让各个任务交替执行; 3.一个任务即一个进程(process),如:打开一个浏览器,即启动一个浏览器进程; 4.在一个进程内,要同时干多件事,需要同时运行多个子任务,把进程内的子任务称为"线程(Thread)"; 5.每个进程至少做一件事,因此,一个进程至少有一个线程; 同时执行多线程的解决方案: a.启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务; b.启动一个进程,在一个进程内启动多个线程,多个线程一块执行多个任务; c.启动多个进程,每个进程启动多个线程; 即多任务的实现方式: a.多进程模式; b.多线程模式; c.多进程+多线程模式; """ |  2.多进程
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 | import os print("Process (%s) start..." % os.getpid()) """ 只能在Linux/Unix/Mac上工作 pid = os.fork() if pid == 0:     print("I am child process (%s) and my parent is %s." % (os.getpid(), os.getppid())) else:     print("I (%s) just created a child process (%s)." % (os.getpid(), pid)) """ print("Hello.") |  
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 | # multiprocessing:跨平台多线程模块 # process_test.py文件,在交互下python process_test.py from multiprocessing import Process import os def run_process(name):     print("Run child process %s (%s)..." % (name, os.getpid())) if __name__ == "__main__":     print("Parent process %s." % os.getpid())     p = Process(target = run_process, args = ("test",))     print("Child process will start.")     p.start()     p.join()        # join()方法可以等待子进程结束后再继续往下运行,用于进程间的同步     print("Child process end.") |  
 # 结果输出:Parent process 28340.
 Child process will start.
 Run child process test (31152)...
 Child process end.
 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | # Pool:用进程池批量创建子进程 # process.py文件,交互下python process.py from multiprocessing import Pool import os, time, random def long_time_task(name):     print('Run task %s (%s)...' % (name, os.getpid()))     start = time.time()     time.sleep(random.random() * 3)     end = time.time()     print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__':     print('Parent process %s.' % os.getpid())     p = Pool(4)     for i in range(5):         p.apply_async(long_time_task, args=(i,))     print('Waiting for all subprocesses done...')     p.close()     p.join()     print('All subprocesses done.') |  
# 结果输出:Parent process 31576.
 Waiting for all subprocesses done...
 Run task 0 (20416)...
 Run task 1 (15900)...
 Run task 2 (24716)...
 Run task 3 (31148)...
 Task 2 runs 0.72 seconds.
 Run task 4 (24716)...
 Task 4 runs 1.03 seconds.
 Task 3 runs 1.82 seconds.
 Task 1 runs 2.73 seconds.
 Task 0 runs 2.82 seconds.
 All subprocesses done.
 3.子进程
	
		
			| 1 2 3 4 5 6 | # subprocess模块:启动一个子进程,控制其输入和输出 # subprocess_test.py文件,注:文件名不要和模块名相同,否则报错 import subprocess print("$ nslookup www.python.org") r = subprocess.call(["nslookup", "www.python.org"]) print("Exit code:", r) |  
 # 结果输出:$ nslookup www.python.org
 服务器:  cache-a.guangzhou.gd.cn
 Address:  202.96.128.86
 非权威应答:
 名称:    www.python.org
 Addresses:  2a04:4e42:1a::223
 151.101.72.223
 Exit code: 0
 
	
		
			| 1 2 3 4 5 6 7 | # 子进程需要输入,通过communicate()方法 import subprocess print("$ nslookup") p = subprocess.Popen(["nslookup"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) output, err = p.communicate(b"set q = mx\npython.org\nexit\n") print(output.decode("gbk")) print("Exit code:", p.returncode) |  
# 结果输出:$ nslookup
 默认服务器:  cache-a.guangzhou.gd.cn
 Address:  202.96.128.86
 > Unrecognized command: set q = mx
 > 服务器:  cache-a.guangzhou.gd.cn
 Address:  202.96.128.86
 名称:    python.org
 Address:  138.197.63.241
 >
 Exit code: 0
 4.进程间通信
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | # 在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据 # queue_test.py文件,交互下python queue_test.py from multiprocessing import Process, Queue import os, time, random def write(q):     print("Process to write:%s" % os.getpid())     for value in ["W", "I", "L", "L", "A", "R", "D"]:         print("Put %s to queue..." % value)         q.put(value)         time.sleep(random.random()) def read(q):     print("Process to read:%s" % os.getpid())     while True:         value = q.get(True)         print("Get %s from queue." % value) if __name__ == "__main__":     # 父进程创建Queue,并传给各个子进程     q = Queue()     pw = Process(target = write, args = (q,))     pr = Process(target = read, args = (q,))     # 启动子进程pw,写入     pw.start()     # 启动子进程pr,读取     pr.start()     # 等待pw结束     pw.join()     # pr进程是死循环,无法等待其结束,需要强行终止     pr.terminate() |  
# 结果输出:Process to write:15720
 Process to read:21524
 Put W to queue...
 Get W from queue.
 Put I to queue...
 Get I from queue.
 Put L to queue...
 Get L from queue.
 Put L to queue...
 Get L from queue.
 Put A to queue...
 Get A from queue.
 Put R to queue...
 Get R from queue.
 Put D to queue...
 Get D from queue.
 5.多线程
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | # 线程库:_thread和threading # 启动一个线程:即把一个函数传入并创建一个Thread实例,然后调用start()开始执行 # 任何进程默认启动一个线程,该线程称为主线程,主线程可以启动新的线程 # current_thread()函数:返回当前线程的实例; # 主线程实例名字:MainThread; # 子线程名字的创建时指定,如果不指定,则自动给线程命名为Thread-1、Thread-2... import time, threading def loop():     print("Thread %s is running..." % threading.current_thread().name)     n = 0     while n < 5:         n = n + 1         print("Thread %s >>> %s" % (threading.current_thread().name, n))         time.sleep(1)     print("Thread %s ended." % threading.current_thread().name) print("Thread %s is running..." % threading.current_thread().name) thread1 = threading.Thread(target = loop, name = "LoopThread") thread1.start() thread1.join() print("Thread %s ended." % threading.current_thread().name) |  
# 结果输出:Thread MainThread is running...
 Thread LoopThread is running...
 Thread LoopThread >>> 1
 Thread LoopThread >>> 2
 Thread LoopThread >>> 3
 Thread LoopThread >>> 4
 Thread LoopThread >>> 5
 Thread LoopThread ended.
 Thread MainThread ended.
 6.Lock
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | # 多进程:同一个变量,各自有一份拷贝存在于每个进程中,互不影响; # 多线程:所有变量由所有线程共享,任何一个变量可以被任何一个线程修改; # 多线程同时操作一个变量 # 多运行几次,发现结果不为0 import time, threading balance = 0 def change_it(n):     global balance     balance = balance + n     balance = balance - n def run_thread(n):     # 线程交替执行,balance结果不一定为0     for i in range(2000000):         change_it(n) thread1 = threading.Thread(target = run_thread, args = (5,)) thread2 = threading.Thread(target = run_thread, args = (8,)) thread1.start() thread2.start() thread1.join() thread2.join() print(balance) # 结果输出: # 5(各自不同) |  
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | # 确保balance计算正确,需要给change_it()上一把锁 # 当线程开始执行change_it()时,该线程获得锁,其他线程不能同时执行change_it(), # 只能等待,直到锁被释放,获得该锁后才能改; # 通过threading.Lock()创建锁 import time, threading balance = 0 lock = threading.Lock() def change_it(n):     global balance     balance = balance + n     balance = balance - n def run_thread(n):     for i in range(2000000):         lock.acquire()         try:             change_it(n)         finally:             # 释放锁             lock.release() thread1 = threading.Thread(target = run_thread, args = (5,)) thread2 = threading.Thread(target = run_thread, args = (8,)) thread1.start() thread2.start() thread1.join() thread2.join() print(balance) # 结果输出: # 0 |  7.ThreadLocal
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | # 多线程环境下,每个线程有自己的数据; # 一个线程使用自己的局部变量比使用全局变量好; import threading # 创建全局ThreadLocal对象 local_school = threading.local() def process_student():     # 获取当前线程关联的student     std = local_school.student     print("Hello,%s (in %s)" % (std, threading.current_thread().name)) def process_thread(name):     # 绑定ThreadLocal的student     local_school.student = name     process_student() thread1 = threading.Thread(target = process_thread, args = ("Willard",), name = "Thread-1") thread2 = threading.Thread(target = process_thread, args = ("WenYu",), name = "Thread-2") thread1.start() thread2.start() thread1.join() thread2.join() |  
# 结果输出:# Hello,Willard (in Thread-1)
 # Hello,WenYu (in Thread-2)
 8.进程VS线程
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | # 进程和线程优缺点: # 1.要实现多任务,会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务, # 在多任务环境下,通常是一个Master,多个Worker; #     a.如果使用多进程实现Master-Worker,主进程即Master,其他进程即Worker; #     b.如果使用多线程实现Master-Worker,主线程即Master,其他线程即Worker; # 2.多进程优点:稳定性高,一个子进程崩溃不会影响主进程和其他子进程; # 3.多进程缺点:创建进程的代价大,操作系统能同时运行的进程数有限; # 4.多线程缺点:任何一个线程崩溃,可能直接造成整个进程崩溃; # 线程切换: # 1.依次完成任务的方式称为单任务模型,或批处理任务模型; # 2.任务1先做n分钟,切换到任务2做n分钟,再切换到任务3做n分钟,依此类推,称为多任务模型; # 计算密集型 VS IO密集型 # 1.计算密集型任务:要进行大量的计算,消耗CPU资源,如:对视频进行高清解码等; # 2.IO密集型任务:涉及到网络、磁盘IO的任务,均为IO密集型任务; # 3.IO密集型任务消耗CPU少,大部分时间在等待IO操作完成; # 异步IO # 1.事件驱动模型:用单进程单线程模型来执行多任务; # 2.Python语言中,单线程的异步编程模型称为协程; |  9.分布式进程
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | """ 实例: 有一个通过Queue通信的多进程程序在同一机器上运行,但现在处理任务的进程任务繁重, 希望把发送任务的进程和处理任务的进程发布到两台机器上; """ # task_master_test.py # 交互环境中:python task_master_test.py import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列 task_queue = queue.Queue() # 接收结果的队列 result_queue = queue.Queue() def return_task_queue():     global task_queue     return task_queue def return_result_queue():     global task_queue     return task_queue # 从BaseManager继承的QueueManager class QueueManager(BaseManager):     pass if __name__ == "__main__":     # 把两个Queue注册到网络上,callable参数关联Queue对象     QueueManager.register("get_task_queue", callable = return_task_queue)     QueueManager.register("get_result_queue", callable = return_result_queue)     # 绑定端口5000,设置验证码"Willard"     manager = QueueManager(address = ("127.0.0.1", 5000), authkey = b"Willard")     # 启动Queue     manager.start()     # 获得通过网络访问的Queue对象     task = manager.get_task_queue()     result = manager.get_result_queue()     # 放任务进去     for i in range(10):         n = random.randint(0, 10000)         print("Put task %d..." % n)         task.put(n)     # 从result队列读取结果     print("Try get results...")     for i in range(10):         r = result.get(timeout = 10)         print("Result:%s" % r)     # 关闭     manager.shutdown()     print("Master Exit.") |  
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | # task_worker_test.py文件 # 交互环境python task_worker_test.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建QueueManager class QueueManager(BaseManager):     pass QueueManager.register("get_task_queue") QueueManager.register("get_result_queue") # 连接到服务器 server_address = "127.0.0.1" print("Connect to server %s..." % server_address) # 端口和验证码 m = QueueManager(address = (server_address, 5000), authkey = b"Willard") # 网络连接 m.connect() # 获取Queue对象 task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,把结果写入result队列 for i in range(10):     try:         n = task.get(timeout = 1)         print("Run task %d * %d..." % (n, n))         r = "%d * %d = %d" % (n, n, n * n)         time.sleep(1)         result.put(r)     except Queue.Empty:         print("Task queue is empty.") print("Worker Exit.") |  
 |