Discuss / Python / 小结

小结

Topic source

zx_sunrise

#1 Created at ... [Delete] [Delete and Lock User]

Master端主要是设置任务及参数 task.put() -> 等待并获取任务结果 result.get(param) Worker端主要是获取任务及参数 task.get(param) -> 执行task -> 返回任务结果 result.put(value)

Windows下task_master.py需要修改后才能运行

白隐1

#2 Created at ... [Delete] [Delete and Lock User]

F:\work>master.py Traceback (most recent call last): File "F:\work\master.py", line 19, in <module> manager.start() File "C:\ProgramData\Anaconda3\lib\multiprocessing\managers.py", line 543, in start self._process.start() File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 112, in s tart self._popen = self.Popen(self) File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in Popen return Popen(process_obj) File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in init reduction.dump(process_obj, to_child) File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function <lambda> at 0x0000000001D1C1E0>: a ttribute lookup <lambda> on main failed

F:\work>Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 99, in spaw n_main new_handle = reduction.steal_handle(parent_pid, pipe_handle) File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 82, in steal_handle _winapi.PROCESS_DUP_HANDLE, False, source_pid) OSError: [WinError 87] 参数错误。

F:\work>

请问windows7系统下如何修改啊,运行错误。

白隐1

#3 Created at ... [Delete] [Delete and Lock User]

修改下有用了,Windows下可执行

#task_master.py import random,time,queue

from multiprocessing.managers import BaseManager

from multiprocessing import freeze_support

task_queue = queue.Queue()

result_queue = queue.Queue()

def get_task(): return task_queue

def get_result(): return result_queue

class QueueManager(BaseManager): pass

def test1():

QueueManager.register('get_task',callable = get_task)
QueueManager.register('get_result',callable = get_result)

manager = QueueManager(address=('127.0.0.1',5002),authkey=b'abc')

manager.start()

try:
    task = manager.get_task()
    result = manager.get_result()

    for i in range(10):
          n = random.randint(0,10000)
          print('Put task %d..'%n)
          task.put(n)

    print('try get results...')

    for i in range(10):
         r = result.get(timeout=100)
         print('Result:%s'%r)
except:
      print('Manager error')    

finally:
    manager.shutdown()

if name == 'main':
freeze_support() test1() print('master exit...')

#!/usr/bin/env python3

-- coding : utf-8 --

#task_worker.py import time,sys,queue

from multiprocessing.managers import BaseManager

class QueueManager(BaseManager): pass

QueueManager.register('get_task') QueueManager.register('get_result')

server_addr = '127.0.0.1' print('connect to server %s' % server_addr)

m = QueueManager(address=(server_addr,5002),authkey=b'abc')

try: m.connect() except: print('connect error!!') sys.exit()

task = m.get_task() result = m.get_result()

for i in range(10): try: n=task.get(timeout=1) print('run task %d %d...' % (n,n)) r = '%d %d =%d'% (n,n,n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.')

print('worker exit...')


  • 1

Reply