进程相关知识
- 进程不能共享数据
- 进程会复制一份新的数据,数据具有新的控件存储
进程的概念
- 进程就是一个程序对各种资源的集合
- 多线程在根本上并不是真正意义上的并发,而是多个线程进行切换
- 多进程在本质上才是并发的
- 一个线程里面至少有一个线程
- 进程的第一个线程就是主线程
- 两个线程之间可以直接通信
进程和线程的区别
- 线程与进程没有可比性
- 创建一个线程比创建一个进程快,进程需要资源多
- 进程内部还是需要线程进行执行的
- 线程之间可以共享数据
- 进程之间是独立的
CPU密集型(多进程)
- 计算,深度学习训练
- 科学计算
- 内存版检索开房
IO密集型(多线程)
- 网络下载
- 网络等待
- 文件操作
创建一个进程
import multiprocessing
def gogogo():
time.sleep(10)
mylist = [x for x in range(10000000)]
print(hello)
print(os.getppid()) # 获取父进程编号
print(os.getpid()) # 获取当前进程编号
print(id(mylist))
if __name__=="__main__":
gogogo()
p = multiprocessing.Process(target=gogogo,args=())
p.start()
p.join() # 阻塞主进程
进程的一些方法
- os.getppid() # 获取父进程编号
- os.getpid() # 获取进程编号
进程锁
lock=multiprocessing.RLock()#创建一个锁
# Author:Luo
import time
import multiprocessing
import os
def TestGo(lock, title):
print(title) # 上半段并发
with lock: # 下半段,lock多个进程之间共享
time.sleep(2)
print(__name__) # 执行模块的名称
print("father pid", os.getppid()) # 获取父进程编号
print("self pid", os.getpid()) # 获取进程编号
if __name__ == "__main__":
lock = multiprocessing.RLock() # 创建一个锁
TestGo(lock, "王毅")
namelist = ["房久博", "王雪飞", "李培伦", "王涛"]
plist = [] # 进程列表
for name in namelist:
p = multiprocessing.Process(target=TestGo, args=(lock, name))
p.start()
plist.append(p)
for p in plist:
p.join()
进程间通信Pipe
import multiprocessing
import os
def go(conn): # 传递一个管道
print("子进程开始")
list1 = conn.recv() # 同步运行,没有数据发送会阻塞
print(os.getppid(), os.getpid(), list1)
conn.send(["1234"])
print("子进程结束")
if __name__ == "__main__":
conn_a, conn_b = multiprocessing.Pipe()
multiprocessing.Process(target=go, args=(conn_a,)).start()
conn_b.send(["abcd"])
list2 = conn_b.recv()
print(os.getppid(), os.getpid(), list2)
进程间共享数据Queue
import multiprocessing
import time
import os
def go(myQue):
print(myQue.get())
myQue.put("123")
if __name__ == "__main__":
myQue = multiprocessing.Queue()
myQue.put("abc")
myQue.put("abcd")
multiprocessing.Process(target=go, args=(myQue,)).start()
print(myQue.get())
进程间共享数据Value
import multiprocessing
def func(num):
num.value=12
if __name__=="__main__":
num=multiprocessing.Value("d",10)# d整数 多个进程共享
print(num.value)
p=multiprocessing.Process(target=func,args=(num,))
p.start()
p.join()
print(num.value)
进程间共享数据Array
import multiprocessing
def func(num):
num[2]=9999
if __name__=="__main__":
array=multiprocessing.Array("i",[1,2,3,4,5]) # i整数,不能增加元素
print(array[:])
p=multiprocessing.Process(target=func,args=(array,))
p.start()
p.join()
print(array[:])
进程间共享数据dict
进程间共享数据list
import multiprocessing
def func(mydict, mylist):
mylist.append(10)
mylist.append(11)
mylist.append(12)
mydict["王毅"]=175
if __name__=="__main__":
mydict=multiprocessing.Manager().dict()
mylist=multiprocessing.Manager().list(range(5))
mydict["王雪飞"]=180
print(mylist)
print(mydict)
p=multiprocessing.Process(target=func,args=(mydict, mylist))
p.start()
p.join()
print(mylist)
print(mydict)
进程池
import time
import multiprocessing
import os
def TestGo(title):
print(title) #上半段并发
time.sleep(5)
print(__name__) #执行模块的名称
print("father pid", os.getppid()) # 获取父进程编号
print("self pid", os.getpid()) # 获取进程编号
if __name__=="__main__":
namelist=["房久博","王雪飞","李培伦","王涛"]
pool=multiprocessing.Pool(1)#限制并发进程数量
for name in namelist:
pool.apply_async(TestGo,args=(name,))
#pool.apply()#同步
pool.close()#结束插入进程,开始执行
pool.join()#等待
print("完成")
分布式进程
Master
import multiprocessing
import multiprocessing.managers
import random,time,queue
task_queue=queue.Queue() #任务队列
result_queue=queue.Queue()#结果队列
def return_task(): #返回任务队列
return task_queue
def return_result(): #返回结果队列
return result_queue
class QueueManger(multiprocessing.managers.BaseManager):#继承,实现进程数据共享
pass
if __name__=="__main__":
multiprocessing.freeze_support() #初始化,开启分布式支持
QueueManger.register("get_task",callable=return_task)#注册共享,
QueueManger.register("get_result", callable=return_result)
#创建一个管理者,设置ip,端口,密码
manager=QueueManger(address=('10.0.147.185',8848),authkey=b'abc')
manager.start() #开启服务器管理
task,result=manager.get_task(),manager.get_result() #task任务,result结果
for i in range(5):
print("task加入数据",i)
task.put(i)
print("尝试开始取出结果")
for i in range(5):
r=result.get(timeout=50)
print("抓到数据",r)
manager.shutdown()#关闭服务器
print("完成")
Worker
import multiprocessing
import multiprocessing.managers
import random,time,queue
class QueueManger(multiprocessing.managers.BaseManager):#继承,实现进程数据共享
pass
if __name__=="__main__":
QueueManger.register("get_task")#注册共享,
QueueManger.register("get_result")
severip="10.0.147.185"
print("开始链接服务器")
manager=QueueManger(address=(severip,8848),authkey=b'abc')
manager.connect()#链接
print("服务器连接成功")
task=manager.get_task()
result=manager.get_result()#抓取任务队列,结果队列
for i in range(5):
try:
t=task.get(timeout=10)
print("取出数据",t)
result.put(10+t)#加工数据返回到结果队列
except :
print("队伍为空其他异常")
网友评论