在Thread和Process中,應當優(yōu)選Process,因為Process更穩(wěn)定,而且,Process可以分布到多臺機器上,而Thread最多只能分布到同一臺機器的多個CPU上。
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由于managers模塊封裝很好,不必了解網絡通信的細節(jié),就可以很容易地編寫分布式多進程程序。
舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現(xiàn)在,由于處理任務的進程任務繁重,希望把發(fā)送任務的進程和處理任務的進程分布到兩臺機器上。怎么用分布式進程實現(xiàn)?
原有的Queue可以繼續(xù)使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。
我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:
# task_master.py import random, time, queuefrom multiprocessing.managers import BaseManager# 發(fā)送任務的隊列:task_queue = queue.Queue()# 接收結果的隊列:result_queue = queue.Queue()# 從BaseManager繼承的QueueManager:class QueueManager(BaseManager): pass# 把兩個Queue都注冊到網絡上, callable參數(shù)關聯(lián)了Queue對象:QueueManager.register('get_task_queue', callable=lambda: task_queue)QueueManager.register('get_result_queue', callable=lambda: result_queue)# 綁定端口5000, 設置驗證碼'abc':manager = QueueManager(address=('', 5000), authkey=b'abc')# 啟動Queue:manager.start()# 獲得通過網絡訪問的Queue對象:task = manager.get_task_queue()result = manager.get_result_queue()# 放幾個任務進去:for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n)# 從result隊列讀取結果:print('Try get results...')for i in range(10): r = result.get(timeout=10) print('Result: %s' % r)# 關閉:manager.shutdown()
請注意,當我們在一臺機器上寫多進程程序時,創(chuàng)建的Queue可以直接拿來用,但是,在分布式多進程環(huán)境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。
然后,在另一臺機器上啟動任務進程(本機上啟動也可以):
# task_master.pyimport random, time, queuefrom multiprocessing.managers import BaseManager# 發(fā)送任務的隊列:task_queue = queue.Queue()# 接收結果的隊列:result_queue = queue.Queue()# 從BaseManager繼承的QueueManager:class QueueManager(BaseManager): pass# 把兩個Queue都注冊到網絡上, callable參數(shù)關聯(lián)了Queue對象:QueueManager.register('get_task_queue', callable=lambda: task_queue)QueueManager.register('get_result_queue', callable=lambda: result_queue)# 綁定端口5000, 設置驗證碼'abc':manager = QueueManager(address=('', 5000), authkey=b'abc')# 啟動Queue:manager.start()# 獲得通過網絡訪問的Queue對象:task = manager.get_task_queue()result = manager.get_result_queue()# 放幾個任務進去:for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n)# 從result隊列讀取結果:print('Try get results...')for i in range(10): r = result.get(timeout=10) print('Result: %s' % r)# 關閉:manager.shutdown()
新聞熱點
疑難解答