'Multi Process Learning' __author__ = 'gitkong' import os, time, random, functools from multiprocessing import Process, Queue from multiprocessing import Pool import subprocess # fork 函数仅在Unix/Linux中有效,windows无效,多进程需要利用multiprocess库 # print('Process (%s) start...' % os.getpid()) # pid = os.fork() # if pid == 0: # print('i am child process (%s) and my parent is %s' % (os.getpid(), os.getppid)) # else: # print('i %s just created a child process %s' % (os.getpid(), pid)) # ----- 跨进场实现多进程可利用multiprocess库 def run_proc(name): print('Run child process %s (%s), parent:%s' % (name, os.getpid(), os.getppid())) def test_process(): print('Parent process %s', os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start') p.start() # Cannot close a process while it is still running. You should first call join() or terminate(). # p.close() p.join() print('Child process end') # ----- 使用进程池管理大量子进程,引入Pool def long_time_task(name): print('Run task %s %s...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %.2f seconds' % (name, end - start)) def test_process_pool(): print('Parent process %s', os.getpid()) # 参数表示CPU核数,可以理解为同时可执行的次数 p = Pool(3) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') # 对Pool对象调用join()方法会等待所有子进程执行完毕 # 调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了 p.close() p.join() print('All subprocesses done.') # ----- 子进程,subprocess def test_subprocess(): # print('$ nslookup www.python.org') # r = subprocess.call(['nslookup', 'www.python.org']) # print('Exit code:%s' % r) # 子进程需要输入 print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8'), err) print('Exit code:%s',p.returncode) # ----- 进场间通信 def logTextDecorator(msg): def logDecorator(func): @functools.wraps(func) def wrapper(self, *args, **kw): print('Process to %s: %s' % (msg, os.getpid())) return func(self, *args, **kw) return wrapper return logDecorator @logTextDecorator('write') def write(q): for value in ['A','B', 'C']: print('Put %s to queue' % value) q.put(value) time.sleep(random.random()) @logTextDecorator('read') def read(q): while True: value = q.get(True) print('Get %s from queue' % value) def test_process_rw(): # 父进场创建Queue,传给各个子进程 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(group=None, target=read, name=None, args=(q,)) # 启动子进程 pw.start() pr.start() # 等待写结束后,强行终止死循环读,无法通过join()停止 pw.join() pr.terminate() if __name__ == '__main__': # test_process() # test_process_pool() # test_subprocess() test_process_rw()
Sign in to make a reply
gitKong