python
主页 > 脚本 > python >

python进程间通信的项目实践

2023-03-12 | 佚名 | 点击:

进程间通信表示进程之间的数据交换。 为了开发并行应用程序,需要在进程间交换数据。 下图显示了多个子过程之间同步的各种通信机制 -

在这里插入图片描述

各种通信机制

队列

队列可以用于多进程程序。 多处理模块的Queue类与Queue.Queue类相似。 因此,可以使用相同的API。 Multiprocessing.Queue提供了进程间通信的线程和进程安全FIFO(先进先出)机制。

例子

下面是一个简单的例子,从python官方文档多处理了解Queue类的多处理概念

1

2

3

4

5

6

7

8

9

10

11

12

from multiprocessing import Process, Queue

import queue

import random

def f(q):

   q.put([42, None, 'hello'])

def main():

   q = Queue()

   p = Process(target = f, args = (q,))

   p.start()

   print (q.get())

if __name__ == '__main__':

   main()

执行上面示例代码,得到以下结果 -

[42, None, 'hello']

管道

它是一种数据结构,用于在多进程程序中的进程之间进行通信。Pipe()函数返回一对由管道连接的连接对象,默认情况下是双工(双向)。 它的工作原理如下 -
它返回一对代表管道两端的连接对象。
每个对象都有两个方法 - send()和recv(),以在进程之间进行通信。

例子

下面是一个简单的例子,摘自python官方文档多处理,以理解Pipe()函数的多进程概念

1

2

3

4

5

6

7

8

9

10

11

12

from multiprocessing import Process, Pipe

 

def f(conn):

   conn.send([42, None, 'hello'])

   conn.close()

 

if __name__ == '__main__':

   parent_conn, child_conn = Pipe()

   p = Process(target = f, args = (child_conn,))

   p.start()

   print (parent_conn.recv())

   p.join()

执行上面代码,得到以下结果 -

[42, None, 'hello']

管理器

Manager是一类多处理模块,它提供了一种协调所有用户之间共享信息的方式。管理器对象控制服务器进程,该进程管理共享对象并允许其他进程操纵它们。 换句话说,管理器提供了一种方法来创建可以在不同进程之间共享的数据。 以下是Manager对象的不同属性 -

例子

以下是使用管理器对象在服务器进程中创建列表记录,然后在该列表中添加新记录的示例。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import multiprocessing

 

def print_records(records):

   for record in records:

      print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))

 

def insert_record(record, records):

   records.append(record)

      print("A New record is added\n")

 

if __name__ == '__main__':

   with multiprocessing.Manager() as manager:

 

      records = manager.list([('Computers', 1), ('Histoty', 5), ('Hindi',9)])

      new_record = ('English', 3)

 

      p1 = multiprocessing.Process(target = insert_record, args = (new_record, records))

      p2 = multiprocessing.Process(target = print_records, args = (records,))

      p1.start()

      p1.join()

      p2.start()

      p2.join()

执行上面代码,得到以下结果 -

A New record is added

Name: Computers
Score: 1

Name: Histoty
Score: 5

Name: Hindi
Score: 9

Name: English
Score: 3

管理器命名空间的概念

Manager类带有名称空间的概念,这是一种在多个进程间共享多个属性的快速方法。 命名空间不具有任何可以调用的公共方法,但它们具有可写的属性。

例子

以下Python脚本示例如何使用命名空间在主进程和子进程之间共享数据 -

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import multiprocessing

 

def Mng_NaSp(using_ns):

 

   using_ns.x +=5

   using_ns.y *= 10

 

if __name__ == '__main__':

   manager = multiprocessing.Manager()

   using_ns = manager.Namespace()

   using_ns.x = 1

   using_ns.y = 1

 

   print ('before', using_ns)

   p = multiprocessing.Process(target = Mng_NaSp, args = (using_ns,))

   p.start()

   p.join()

   print ('after', using_ns)

执行上面示例代码,得到以下结果 -

before Namespace(x = 1, y = 1)
after Namespace(x = 6, y = 10)

Ctypes数组和值

Multiprocessing模块提供了Array和Value对象,用于将数据存储在共享内存映射中。 Array是从共享内存分配的Array和Value是从共享内存分配的ctypes对象。
Multiprocessing模块导入Process,Value,Array。

例子

下面的Python脚本是一个从python文档中获取的例子,它利用Ctypes Array和Value在进程间共享一些数据。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

def f(n, a):

   n.value = 3.1415927

   for i in range(len(a)):

   a[i] = -a[i]

 

if __name__ == '__main__':

   num = Value('d', 0.0)

   arr = Array('i', range(10))

 

   p = Process(target = f, args = (num, arr))

   p.start()

   p.join()

   print (num.value)

   print (arr[:])

执行上面示例代码,得到以下结果 -

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

顺序进程(CSP)

CSP用于说明系统与具有并行模型的其他系统的交互。 CSP是通过消息传递编写并发或编程的框架,因此它对于描述并发是有效的

Python PyCSP库

要实现在CSP中找到的核心原语,Python有一个名为PyCSP的库。 它使实现非常简短和易读,因此可以非常容易地理解它。 以下是PyCSP的基本流程网络 -

在这里插入图片描述

在上面的PyCSP过程网络中,有两个过程 - 进程1和进程2。这些过程通过传递消息通过两个通道 - 通道1和通道2进行通信

安装PyCSP

通过以下命令来安装Python的PyCSP库 -

1

pip install PyCSP

例子

下面的Python脚本是一个简单的例子,它可以并行运行两个进程。 它是在PyCSP库的帮助下完成的

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

from pycsp.parallel import *

import time

@process

def P1():

   time.sleep(1)

   print('P1 exiting')

@process

def P2():

   time.sleep(1)

   print('P2 exiting')

def main():

   Parallel(P1(), P2())

   print('Terminating')

if __name__ == '__main__':

   main()

在上面的脚本中,已经创建了两个函数,即P1和P2,然后用@process进行装饰,将它们转换为进程。执行上面代码后,得到以下输出结果 -

P2 exiting
P1 exiting
Terminating

原文链接:https://blog.csdn.net/weixin_36058228/article/details/129419325
相关文章
最新更新