Discuss / Python / 稍微改了点点

稍微改了点点

Topic source

task_master.py

import queue
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass

task_queue = queue.Queue()
result_queue = queue.Queue()


def return_task_queue():
    return task_queue


def return_result_queue():
    return result_queue


QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)

if __name__ == '__main__':
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
    manager.start()

    task = manager.get_task_queue()
    result = manager.get_result_queue()

    for i in range(100):
        print('向队列中插入了数字%d' % i)
        task.put(i)

    print('尝试从结果队列中取出结果')
    try:
        for i in range(100):
            r = result.get(timeout=100)
            print("结果:%s" % r)
    except queue.Empty:
        print('结果队列为空')

    manager.shutdown()
    print("主服务关闭")

task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager
import os
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]


class QueueManager(BaseManager):
    passQueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

server_addr = '127.0.0.1'print('正在连接主服务%s......' % server_addr)
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()

while True:
    try:
        n = task.get(timeout=1)
        print('从Queue取到了数字 %d' % n)
        print('正在执行运算 %d * %d' % (n, n))
        r = '%d * %d = %d  (from %s @ %s)' % (n, n, n * n, os.getpid(), ip)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('任务队列已空')
        break

print('工作进程【%s】已结束' % os.getpid())

可以起多个task_worker进程来从task_queue消费数据,这就是简单的   一个生产者,多个消费者的模型

Lucky Cedric

#2 Created at ... [Delete] [Delete and Lock User]

你这里能够区分任务队列和结果队列吗?我发现“任务队列已空”的报错无法触发。。


  • 1

Reply