一、Python編程中,使用多進程完成多任務(wù)的方法
Python提供了multiprocessing模塊,可以用于在單個計算機上創(chuàng)建多個進程,從而實現(xiàn)多任務(wù)處理。下面是一個簡單的例子,展示了如何使用multiprocessing模塊創(chuàng)建多進程來完成多任務(wù)。
代碼:
import multiprocessingimport time# 跳舞任務(wù)def dance(): for i in range(5): print("跳舞中...") time.sleep(0.2)# 唱歌任務(wù)def sing(): for i in range(5): print("唱歌中...") time.sleep(0.2)if __name__ == '__main__': # 創(chuàng)建跳舞的子進程 # group: 表示進程組,目前只能使用None # target: 表示執(zhí)行的目標(biāo)任務(wù)名(函數(shù)名、方法名) # name: 進程名稱, 默認是Process-1, ..... dance_process = multiprocessing.Process(target=dance, name="myprocess1") sing_process = multiprocessing.Process(target=sing) # 啟動子進程執(zhí)行對應(yīng)的任務(wù) dance_process.start() sing_process.start()
執(zhí)行結(jié)果:
唱歌中...跳舞中...唱歌中...跳舞中...唱歌中...跳舞中...唱歌中...跳舞中...唱歌中...跳舞中...
二、進程有哪些注意點
1、進程之間不共享全局變量
代碼:
import multiprocessingimport time# 定義全局變量g_list = list()# 添加數(shù)據(jù)的任務(wù)def add_data(): for i in range(5): g_list.append(i) print("add:", i) time.sleep(0.2) # 代碼執(zhí)行到此,說明數(shù)據(jù)添加完成 print("add_data:", g_list)def read_data(): print("read_data", g_list)if __name__ == '__main__': # 創(chuàng)建添加數(shù)據(jù)的子進程 add_data_process = multiprocessing.Process(target=add_data) # 創(chuàng)建讀取數(shù)據(jù)的子進程 read_data_process = multiprocessing.Process(target=read_data) # 啟動子進程執(zhí)行對應(yīng)的任務(wù) add_data_process.start() # 主進程等待添加數(shù)據(jù)的子進程執(zhí)行完成以后程序再繼續(xù)往下執(zhí)行,讀取數(shù)據(jù) add_data_process.join() read_data_process.start() print("main:", g_list) # 總結(jié): 多進程之間不共享全局變量
執(zhí)行結(jié)果:
add: 0
add: 1
add: 2
add: 3
add: 4
add_data: [0, 1, 2, 3, 4]
main: []
read_data []
創(chuàng)建子進程會對主進程資源進行拷貝,也就是說子進程是主進程的一個副本,好比是一對雙胞胎,之所以進程之間不共享全局變量,是因為操作的不是同一個進程里面的全局變量,只不過不同進程里面的全局變量名字相同而已。
2、主進程會等待所有的子進程執(zhí)行結(jié)束再結(jié)束
假如我們現(xiàn)在創(chuàng)建一個子進程,這個子進程執(zhí)行完大概需要2秒鐘,現(xiàn)在讓主進程執(zhí)行0.5秒鐘就退出程序,查看一下執(zhí)行結(jié)果,示例代碼如下:
import multiprocessingimport time# 定義進程所需要執(zhí)行的任務(wù)def task(): for i in range(10): print("任務(wù)執(zhí)行中...") time.sleep(0.2)if __name__ == '__main__': # 創(chuàng)建子進程 sub_process = multiprocessing.Process(target=task) sub_process.start() # 主進程延時0.5秒鐘 time.sleep(0.5) print("over") exit() # 總結(jié):主進程會等待所有的子進程執(zhí)行完成以后程序再退出
執(zhí)行結(jié)果:
任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...over任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...
說明:通過上面代碼的執(zhí)行結(jié)果,我們可以得知:主進程會等待所有的子進程執(zhí)行結(jié)束再結(jié)束。
假如我們就讓主進程執(zhí)行0.5秒鐘,子進程就銷毀不再執(zhí)行,那怎么辦呢?我們可以設(shè)置守護主進程或者在主進程退出之前讓子進程銷毀:
守護主進程:守護主進程就是主進程退出子進程銷毀不再執(zhí)行子進程銷毀:子進程執(zhí)行結(jié)束保證主進程正常退出的示例代碼:
import multiprocessingimport time# 定義進程所需要執(zhí)行的任務(wù)def task(): for i in range(10): print("任務(wù)執(zhí)行中...") time.sleep(0.2)if __name__ == '__main__': # 創(chuàng)建子進程 sub_process = multiprocessing.Process(target=task) # 設(shè)置守護主進程,主進程退出子進程直接銷毀,子進程的生命周期依賴與主進程 # sub_process.daemon = True sub_process.start() time.sleep(0.5) print("over") # 讓子進程銷毀 sub_process.terminate() exit() # 總結(jié):主進程會等待所有的子進程執(zhí)行完成以后程序再退出 # 如果想要主進程退出子進程銷毀,可以設(shè)置守護主進程或者在主進程退出之前讓子進程銷毀
執(zhí)行結(jié)果:
任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...任務(wù)執(zhí)行中...over
三、Python多線程的通信
進程是系統(tǒng)獨立調(diào)度核分配系統(tǒng)資源(CPU、內(nèi)存)的基本單位,進程之間是相互獨立的,每啟動一個新的進程相當(dāng)于把數(shù)據(jù)進行了一次克隆,子進程里的數(shù)據(jù)修改無法影響到主進程中的數(shù)據(jù),不同子進程之間的數(shù)據(jù)也不能共享,這是多進程在使用中與多線程最明顯的區(qū)別。但是難道Python多進程中間難道就是孤立的嗎?當(dāng)然不是,python也提供了多種方法實現(xiàn)了多進程中間的通信和數(shù)據(jù)共享(可以修改一份數(shù)據(jù))。
1、進程對列Queue
Queue在多線程中也說到過,在生成者消費者模式中使用,是線程安全的,是生產(chǎn)者和消費者中間的數(shù)據(jù)管道,那在python多進程中,它其實就是進程之間的數(shù)據(jù)管道,實現(xiàn)進程通信。
from multiprocessing import Process,Queuedef fun1(q,i): print('子進程%s 開始put數(shù)據(jù)' %i) q.put('我是%s 通過Queue通信' %i)if __name__ == '__main__': q = Queue() process_list = [] for i in range(3): p = Process(target=fun1,args=(q,i,)) #注意args里面要把q對象傳給我們要執(zhí)行的方法,這樣子進程才能和主進程用Queue來通信 p.start() process_list.append(p) for i in process_list: p.join() print('主進程獲取Queue數(shù)據(jù)') print(q.get()) print(q.get()) print(q.get()) print('結(jié)束測試')
結(jié)果:
子進程0 開始put數(shù)據(jù)子進程1 開始put數(shù)據(jù)子進程2 開始put數(shù)據(jù)主進程獲取Queue數(shù)據(jù)我是0 通過Queue通信我是1 通過Queue通信我是2 通過Queue通信結(jié)束測試Process finished with exit code 0
上面的代碼結(jié)果可以看到我們主進程中可以通過Queue獲取子進程中put的數(shù)據(jù),實現(xiàn)進程間的通信。
2、管道Pipe
管道Pipe和Queue的作用大致差不多,也是實現(xiàn)進程間的通信。
from multiprocessing import Process, Pipedef fun1(conn): print('子進程發(fā)送消息:') conn.send('你好主進程') print('子進程接受消息:') print(conn.recv()) conn.close()if __name__ == '__main__': conn1, conn2 = Pipe() #關(guān)鍵點,pipe實例化生成一個雙向管 p = Process(target=fun1, args=(conn2,)) #conn2傳給子進程 p.start() print('主進程接受消息:') print(conn1.recv()) print('主進程發(fā)送消息:') conn1.send("你好子進程") p.join() print('結(jié)束測試')
結(jié)果:
主進程接受消息:子進程發(fā)送消息:子進程接受消息:你好主進程主進程發(fā)送消息:你好子進程結(jié)束測試Process finished with exit code 0
上面可以看到主進程和子進程可以相互發(fā)送消息。
3、Managers
Queue和Pipe只是實現(xiàn)了數(shù)據(jù)交互,并沒實現(xiàn)數(shù)據(jù)共享,即一個進程去更改另一個進程的數(shù)據(jù)。那么就要用到Managers。
from multiprocessing import Process, Managerdef fun1(dic,lis,index): dic[index] = 'a' dic['2'] = 'b' lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9] #print(l)if __name__ == '__main__': with Manager() as manager: dic = manager.dict()#注意字典的聲明方式,不能直接通過{}來定義 l = manager.list(range(5))#[0,1,2,3,4] process_list = [] for i in range(10): p = Process(target=fun1, args=(dic,l,i)) p.start() process_list.append(p) for res in process_list: res.join() print(dic) print(l)
結(jié)果:
{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]
可以看到主進程定義了一個字典和一個列表,在子進程中,可以添加和修改字典的內(nèi)容,在列表中插入新的數(shù)據(jù),實現(xiàn)進程間的數(shù)據(jù)共享,即可以共同修改同一份數(shù)據(jù)。
延伸閱讀1:Python協(xié)程簡介
協(xié)程,又稱微線程,纖程,英文名Coroutine。協(xié)程的作用是在執(zhí)行函數(shù)A時可以隨時中斷去執(zhí)行函數(shù)B,然后中斷函數(shù)B繼續(xù)執(zhí)行函數(shù)A(可以自由切換)。但這一過程并不是函數(shù)調(diào)用,這一整個過程看似像多線程,然而協(xié)程只有一個線程執(zhí)行。