Discuss / Python / homework

homework

Topic source

gitKong

#1 Created at ... [Delete] [Delete and Lock User]
'Multi Process Learning'

__author__ = 'gitkong'

import os, time, random, functools
from multiprocessing import Process, Queue
from multiprocessing import Pool
import subprocess

# fork 函数仅在Unix/Linux中有效,windows无效,多进程需要利用multiprocess库
# print('Process (%s) start...' % os.getpid())

# pid = os.fork()
# if pid == 0:
#     print('i am child process (%s) and my parent is %s' % (os.getpid(), os.getppid))
# else:
#     print('i %s just created a child process %s' % (os.getpid(), pid))


#  ----- 跨进场实现多进程可利用multiprocess库

def run_proc(name):
    print('Run child process %s (%s), parent:%s' % (name, os.getpid(), os.getppid()))

def test_process():
    print('Parent process %s', os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start')
    p.start()
    # Cannot close a process while it is still running. You should first call join() or terminate().
    # p.close()
    p.join()
    print('Child process end')

# ----- 使用进程池管理大量子进程,引入Pool

def long_time_task(name):
    print('Run task %s %s...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %.2f seconds' % (name, end - start))

def test_process_pool():
    print('Parent process %s', os.getpid())
    # 参数表示CPU核数,可以理解为同时可执行的次数
    p = Pool(3)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    # 对Pool对象调用join()方法会等待所有子进程执行完毕
    # 调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了
    p.close()
    p.join()
    print('All subprocesses done.')

# ----- 子进程,subprocess

def test_subprocess():
    # print('$ nslookup www.python.org')
    # r = subprocess.call(['nslookup', 'www.python.org'])
    # print('Exit code:%s' % r)

    # 子进程需要输入
    print('$ nslookup')
    p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
    print(output.decode('utf-8'), err)
    print('Exit code:%s',p.returncode)

# ----- 进场间通信
def logTextDecorator(msg):
    def logDecorator(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kw):
            print('Process to %s: %s' % (msg, os.getpid()))
            return func(self, *args, **kw)
        return wrapper
    return logDecorator


@logTextDecorator('write')
def write(q):
    for value in ['A','B', 'C']:
        print('Put %s to queue' % value)
        q.put(value)
        time.sleep(random.random())

@logTextDecorator('read')
def read(q):
    while True:
        value = q.get(True)
        print('Get %s from queue' % value)

def test_process_rw():
    # 父进场创建Queue,传给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(group=None, target=read, name=None, args=(q,))
    # 启动子进程
    pw.start()
    pr.start()
    # 等待写结束后,强行终止死循环读,无法通过join()停止
    pw.join()
    pr.terminate()

if __name__ == '__main__':
    # test_process()
    # test_process_pool()
    # test_subprocess()
    test_process_rw()

  • 1

Reply