# task_master.py import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() print('Try to put tasks...') # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print(f'Put Task [{n}]') task.put(n) # 从result队列读取结果: print('Try to get results...') try: while True: r = result.get(timeout=20) print(f'Get Result: [{r}]') except queue.Empty: print('Result queue is empty.') # 关闭: manager.shutdown() print('master exit.')
# task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print(f'Connect to server ({server_addr})...') # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print(f'Run Task {n} * {n}...') r = f'{n} * {n} = {n*n}' time.sleep(1) result.put(r) except queue.Empty: print('Task queue is empty.') # 处理结束: print('worker exit.')
**(spyder) [owen@owen-pc Processing_Threading]$ python task_master.py ** Try to put tasks... Put Task [3229] Put Task [2717] Put Task [6338] Put Task [8219] Put Task [8967] Put Task [3763] Put Task [7069] Put Task [4611] Put Task [7177] Put Task [5552] Try to get results... Get Result: [3229 * 3229 = 10426441] Get Result: [2717 * 2717 = 7382089] Get Result: [6338 * 6338 = 40170244] Get Result: [8219 * 8219 = 67551961] Get Result: [8967 * 8967 = 80407089] Get Result: [3763 * 3763 = 14160169] Get Result: [7069 * 7069 = 49970761] Get Result: [4611 * 4611 = 21261321] Get Result: [7177 * 7177 = 51509329] Get Result: [5552 * 5552 = 30824704] Result queue is empty. master exit.
**(spyder) [owen@owen-pc Processing_Threading]$ python task_master.py **
Try to put tasks...
Put Task [3229]
Put Task [2717]
Put Task [6338]
Put Task [8219]
Put Task [8967]
Put Task [3763]
Put Task [7069]
Put Task [4611]
Put Task [7177]
Put Task [5552]
Try to get results...
Get Result: [3229 * 3229 = 10426441]
Get Result: [2717 * 2717 = 7382089]
Get Result: [6338 * 6338 = 40170244]
Get Result: [8219 * 8219 = 67551961]
Get Result: [8967 * 8967 = 80407089]
Get Result: [3763 * 3763 = 14160169]
Get Result: [7069 * 7069 = 49970761]
Get Result: [4611 * 4611 = 21261321]
Get Result: [7177 * 7177 = 51509329]
Get Result: [5552 * 5552 = 30824704]
Result queue is empty.
master exit.
========================================================================
**(spyder) [owen@owen-pc Processing_Threading]$ ** python task_worker.py Connect to server (127.0.0.1)... Run Task 3229 * 3229... Run Task 2717 * 2717... Run Task 6338 * 6338... Run Task 8219 * 8219... Run Task 8967 * 8967... Run Task 3763 * 3763... Run Task 7069 * 7069... Run Task 4611 * 4611... Run Task 7177 * 7177... Run Task 5552 * 5552... worker exit.
**(spyder) [owen@owen-pc Processing_Threading]$ ** python task_worker.py
Connect to server (127.0.0.1)...
Run Task 3229 * 3229...
Run Task 2717 * 2717...
Run Task 6338 * 6338...
Run Task 8219 * 8219...
Run Task 8967 * 8967...
Run Task 3763 * 3763...
Run Task 7069 * 7069...
Run Task 4611 * 4611...
Run Task 7177 * 7177...
Run Task 5552 * 5552...
worker exit.
Sign in to make a reply
HIT_Owen
========================================================================