Python基础之多进程

文章目录

  • 1 多进程
    • 1.1 简介
    • 1.2 Linux下多进程
    • 1.3 multiprocessing
    • 1.4 Pool
    • 1.5 进程间通信
    • 1.6 分布式进程

1 多进程

1.1 简介

要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

1.2 Linux下多进程

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

# multiprocessing.py
import os

print ('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid==0:
    print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
运行结果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

1.3 multiprocessing

如果打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?
由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print ('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print ('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print ('Process will start.')
    p.start()
    p.join()
    print 'Process end.'

执行结果如下:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

1.4 Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print ('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print ('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print ('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print 'All subprocesses done.'

执行结果如下:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

代码解读:

  • 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
  • 注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:p = Pool(5),就可以同时跑5个进程。
  • 由于Pool的默认大小是CPU的核数,如果拥有8核CPU,那么要提交至少9个子进程才能看到上面的等待效果。

1.5 进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Pythonmultiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

我们以 Queue 为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print ('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
    while True:
        value = q.get(True)
        print ('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果如下:

Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessingWindows下调用失败了,要先考虑是不是pickle失败了。

1.6 分布式进程

Pythonmultiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。

举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = Queue.Queue()
# 接收结果的队列:
result_queue = Queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。
然后,在另一台机器上启动任务进程(本机上启动也可以):

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行taskmanager.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与taskmanager.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

现在,可以试试分布式进程的工作效果了。先启动taskmanager.py服务进程:

$ python taskmanager.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
taskmanager进程发送完任务后,开始等待result队列的结果。现在启动taskworker.py进程:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

taskworker进程结束,在taskmanager进程中会继续打印出结果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

这个简单的Manager/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

Queue对象存储在哪?注意到taskworker.py中根本没有创建Queue的代码,所以,Queue对象存储在taskmanager.py进程中:
在这里插入图片描述
Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue

authkey:是为了保证两台机器正常通信,不被其他机器恶意干扰。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定连接不上。

注意Queue的作用是用来传递任务接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/761771.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何在本地一键配置最强国产大模型

自从OpenAI的ChatGPT横空出世以来,国内外各类大语言模型(LLM)层出不穷,其中不乏Google的Gemini、Claude、文心一言等等。相较于竞争激烈的商业模型赛道,以Llama为代表的开源大模型的进步速度也十分惊人。 伴随着大语言…

ANSYS新能源汽车动力电池仿真应用案例

燃料电池是一种非燃烧过程的电化学能转换装置,将氢气(等燃料)和氧气的化学能连续不断地转换为电能,是发电设备而非储能设备。 根据电解质的不同,分为碱性燃料电池AFC、磷酸燃料电池PAFC、熔融碳酸盐燃料电池MCFC、固体…

微机原理 复习

第一章导论 1.3 冯诺依曼体系结构 (1)以二进制形式表示指令和数据 (2)程序和数据事先放在存储器中(预存储) (3)由运算器、控制器、输入设备和输出设备五大部件组成 字长、主频…

css实现一个三角形

实现不用方向的三角形可根据border进行设置。具体代码如下: .triangle-up {width: 0;height: 0;border-top: 10px solid transparent;border-left: 10px solid transparent;border-right: 10px solid transparent;border-bottom: 10px solid black;}.triangle-rig…

6-14题连接 - 高频 SQL 50 题基础版

目录 1. 相关知识点2. 例子2.6. 使用唯一标识码替换员工ID2.7- 产品销售分析 I2.8 - 进店却未进行过交易的顾客2.9 - 上升的温度2.10 - 每台机器的进程平均运行时间2.11- 员工奖金2.12-学生们参加各科测试的次数2.13-至少有5名直接下属的经理2.14 - 确认率 1. 相关知识点 left …

Redis Cluster 模式 的具体实施细节是什么样的?

概述 参考:What are Redis Cluster and How to setup Redis Cluster locally ? | by Rajat Pachauri | Medium Redis Cluster 的工作原理是将数据分布在多个节点上,同时确保高可用性和容错能力。以下是 Redis Cluster 运行方式的简要概述: …

Vue 快速入门案例

步骤一&#xff1a;引入vue.js文件 添加<script>标签并标明路径 步骤二&#xff1a;定义Vue对象 el Vue接管区域 data 定义数据模型 步骤三&#xff1a;编写视图层的展示 v-model 绑定数据模型 {{要展示的数据模型}} 运行效果 总结 文本框里的值&a…

欢太主题商店 官方资源提取与应用第三方资源方法一览

前言叠甲&#xff1a;支持正版&#xff0c;尊重他人劳动成果&#xff0c;反对盗版提取&#xff0c;不要传播提取版&#xff0c;我本人也在支持正版&#xff0c;但是最近懒得用主题&#xff0c;用一段时间的默认吧&#xff0c;如有主题开发者不满&#xff0c;请联系删除 &#x…

湖南省教育网络协会莅临麒麟信安调研教育网络数字化建设及教育信创发展情况

6月28日下午&#xff0c;湖南省教育网络协会理事长张智勇、秘书长刘志勇、副理事长黄旭、胡洪波、周中伟等协会相关负责人一行莅临麒麟信安&#xff0c;就湖南省教育网络数字化建设、教育信创工作等主题进行深入调研。麒麟信安副总裁王攀热情接待。 协会成员一行来到麒麟信安展…

让企业更进一步:AAA信用企业认证详解

AAA信用企业认证是企业在市场竞争中展示其信用和实力的重要方式&#xff0c;它不仅能够提升企业的公信力&#xff0c;还有助于企业在多方面获得竞争优势。以下是对AAA信用企业认证的详细解释&#xff1a; AAA信用企业认证的定义 AAA信用企业认证&#xff0c;又称3A认证&#…

《数据安全技术的数据分类分级规则》解析

数据安全技术的数据分类分级规则是一项国家标准&#xff0c;用于指导和规范数据分类与分级的方法和标准&#xff0c;以保障数据的安全性和保密性。该标准明确了数据分类与分级的基本原则&#xff0c;包括业务相关性、数据敏感性、风险可控性等。具体而言&#xff0c;数据分类应…

【UE5.1】Chaos物理系统基础——01 创建可被破坏的物体

目录 步骤 一、通过笔刷创建静态网格体 二、破裂静态网格体 三、“统一” 多层级破裂 四、“簇” 群集化的破裂 五、几何体集的材质 六、防止几何体集自动破碎 步骤 一、通过笔刷创建静态网格体 1. 可以在Quixel Bridge中下载两个纹理&#xff0c;用于表示石块的内外纹…

MySQL中的常用逻辑操作符

逻辑运算符在MySQL查询中扮演着重要角色&#xff0c;通过AND、OR、NOT等运算符的组合使用&#xff0c;可以提高查询的准确性和灵活性&#xff0c;确保查询结果满足业务需求。合理使用这些运算符还能优化查询性能&#xff0c;减少不必要的数据检索&#xff0c;并提高SQL语句的可…

SpringBoot创建一个初始化项目

提示&#xff1a;这一篇文章&#xff0c;主要是为了之后可以快速的去搭建项目&#xff0c;当然这篇博客&#xff0c;作者也会根据以后学习到的东西&#xff0c;慢慢去整理 文章目录 前言 搭建一个SpringBoot项目&#xff0c;目的是为了快速开发项目 项目列表 响应枚举类 /***…

AI奥林匹克竞赛:Claude-3.5-Sonnet对决GPT-4o,谁是最聪明的AI?

目录 实验设置 评估对象 评估方法 结果与分析 针对学科的细粒度分析 GPT-4o vs. Claude-3.5-Sonnet GPT-4V vs. Gemini-1.5-Pro 结论 AI技术日新月异&#xff0c;Anthropic公司最新发布的Claude-3.5-Sonnet因在知识型推理、数学推理、编程任务及视觉推理等任务上设立新…

网络攻防题录集

文章目录 第一章 网络攻防概述第二章 密码学第三章 网络协议脆弱性分析第四 自测题三第五章 自测题五第六章 自测题六第七章 自测题七第八章 自测题八第九章 自测题九第十章 自测题十第十一章 自测题十一第十二章 自测题十二第十三章 自测题十三 第一章 网络攻防概述 第一代安…

Anti-Canine Heartworm Antibody (Chicken) - HRP Conjugated

犬心丝虫&#xff08;学名Dirofilaria immitis&#xff09;是一种寄生丝虫&#xff0c;通过蚊子叮咬而传播。感染犬在早期阶段&#xff0c;大多不会出现症状。随着病情发展&#xff0c;将出现咳嗽、呼吸困难等症状&#xff0c;并伴有右心功能衰竭&#xff0c;最终全身衰弱或虚脱…

2008-2022年款哈弗维修手册和电路图线路图接线图资料更新

经过整理&#xff0c;2005-2022年款长城哈弗全系列已经更新至汽修帮手资料库内&#xff0c;覆盖市面上99%车型&#xff0c;包括维修手册、电路图、新车特征、车身钣金维修数据、全车拆装、扭力、发动机大修、发动机正时、保养、电路图、针脚定义、模块传感器、保险丝盒图解对照…

关于windows,wifi图标显示不了的解决方法

解决方法一&#xff08;解决了我的问题的方法&#xff09;&#xff1a; winr -->输入 regedit 打开注册表 --> 删除HKEY-CLASSES_ROOT\CLSID\{3d09c1ca-2bcc-40b7-b9bb-3f3ec143a87b} CLSID在下面仔细找&#xff0c;然后找到09开头那个删掉重启就可以了&#xff0c;可能…

工程师这几招降低电机EMI的方法,提高系统电磁兼容性能

通过在电机端子之间放置陶瓷电容器、工模滤波器或BDL滤波器均可抑制差模和共模噪声&#xff0c;以提高系统的EMC性能。工程师在本文详细介绍这几种降低电机EMI的方法。 EMC和EMI背景 电磁干扰(EMI)是系统上的电磁噪声的辐射或感应。与大多数电磁电路组件一样&#xff0c;直流…