task_master.py
import queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass task_queue = queue.Queue() result_queue = queue.Queue() def return_task_queue(): return task_queue def return_result_queue(): return result_queue QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue) if __name__ == '__main__': manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') manager.start() task = manager.get_task_queue() result = manager.get_result_queue() for i in range(100): print('向队列中插入了数字%d' % i) task.put(i) print('尝试从结果队列中取出结果') try: for i in range(100): r = result.get(timeout=100) print("结果:%s" % r) except queue.Empty: print('结果队列为空') manager.shutdown() print("主服务关闭")
task_worker.py
import time, sys, queue from multiprocessing.managers import BaseManager import os import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] class QueueManager(BaseManager): passQueueManager.register('get_task_queue') QueueManager.register('get_result_queue') server_addr = '127.0.0.1'print('正在连接主服务%s......' % server_addr) m = QueueManager(address=(server_addr, 5000), authkey=b'abc') m.connect() task = m.get_task_queue() result = m.get_result_queue() while True: try: n = task.get(timeout=1) print('从Queue取到了数字 %d' % n) print('正在执行运算 %d * %d' % (n, n)) r = '%d * %d = %d (from %s @ %s)' % (n, n, n * n, os.getpid(), ip) time.sleep(1) result.put(r) except queue.Empty: print('任务队列已空') break print('工作进程【%s】已结束' % os.getpid())
可以起多个task_worker进程来从task_queue消费数据,这就是简单的 一个生产者,多个消费者的模型
你这里能够区分任务队列和结果队列吗?我发现“任务队列已空”的报错无法触发。。
Sign in to make a reply
蝶殇/yl待月
task_master.py
task_worker.py
可以起多个task_worker进程来从task_queue消费数据,这就是简单的 一个生产者,多个消费者的模型