task_master.py
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import random,time,queue from multiprocessing.managers import BaseManager #发送任务队列 task_queue=queue.Queue() #接收结果队列 result_queue=queue.Queue() #继承BaseManager class QueueManager(BaseManager): pass def return_task_queue(): #global 用于函数内部,修改全局变量的值 global task_queue return task_queue def return_result_queue(): global result_queue return result_queue if __name__=='__main__': #将两个Queue注册到网络上,callable参数关联Queue对象 #!win10中callale不对lambda匿名函数做处理 QueueManager.register('get_task_queue',callable=return_task_queue) QueueManager.register('get_result_queue',callable=return_result_queue) #绑定端口5000,这5000怎么来的?两个文件中的端口一样就行!,设置验证码abc #通过QueueManager将Queue暴露出去 manager=QueueManager(address=('127.0.0.1',5000),authkey=b'abc') manager.start() task=manager.get_task_queue() result=manager.get_result_queue() #放10个任务进去 for i in range(10): n=random.randint(0,1000) print('Put task %d...'%n) #将数据放到任务队列 task.put(n) #取任务执行结果 print('Try get results...') for i in range(10): #从结果队列中取结果 #等待10是因为计算需要时间 r=result.get(timeout=10) print('REsult:%s'%r) #关闭 manager.shutdown() print('master end')
task_worker.py
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time,sys,queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass if __name__=='__main__': QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') server_addr='127.0.0.1' print('Connect to server %s...'%server_addr) #端口设置和task_master.py中一样 m=QueueManager(address=(server_addr,5000),authkey=b'abc') #连接网络 m.connect() task=m.get_task_queue() result=m.get_result_queue() for i in range(10): try: #接收任务队列中的数据 n=task.get(timeout=1) print('Run task %d*%d'%(n,n)) r='%d*%d=%d'%(n,n,n*n) time.sleep(1) #放进结果队列 result.put(r) except Queue.Empty: print('task queue is empty') print('work done')
!!!运行时,要放在两个cmd窗口,分别运行两个.py文件
系统防火墙把5000端口打开,如果是两台机器,要不发不出去数据
Sign in to make a reply
浅言87780
task_master.py
task_worker.py
!!!运行时,要放在两个cmd窗口,分别运行两个.py文件