programing tip

multiprocessing.Process에 전달 된 함수의 반환 값을 어떻게 복구 할 수 있습니까?

itbloger 2020. 6. 23. 07:55
반응형

multiprocessing.Process에 전달 된 함수의 반환 값을 어떻게 복구 할 수 있습니까?


아래 예제 코드에서 함수의 반환 값을 복구하고 싶습니다 worker. 이 작업을 어떻게 수행 할 수 있습니까? 이 값은 어디에 저장됩니까?

예제 코드 :

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

산출:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

에 저장된 객체에서 관련 속성을 찾을 수 없습니다 jobs.


공유 변수사용 하여 통신하십시오. 예를 들면 다음과 같습니다.

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()

@sega_sai가 제안한 접근법이 더 낫다고 생각합니다. 그러나 실제로 코드 예제가 필요하므로 다음과 같이하십시오.

import multiprocessing
from os import getpid

def worker(procnum):
    print 'I am number %d in process %d' % (procnum, getpid())
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print pool.map(worker, range(5))

반환 값을 인쇄합니다 :

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

map(Python 2 내장)에 익숙하다면 너무 도전해서는 안됩니다. 그렇지 않으면 sega_Sai의 링크를 살펴보십시오 .

코드가 거의 필요하지 않습니다. 또한 프로세스를 재사용하는 방법에 유의하십시오.


이 예제는 multiprocessing.Pipe 인스턴스 목록을 사용 하여 임의의 수의 프로세스에서 문자열을 반환 하는 방법을 보여줍니다 .

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

산출:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

이 솔루션은 멀티 프로세싱 보다 적은 리소스 를 사용합니다.

  • 파이프
  • 적어도 하나의 자물쇠
  • 버퍼

또는 멀티 프로세싱 을 사용 하는 SimpleQueue

  • 파이프
  • 적어도 하나의 자물쇠

이러한 각 유형의 소스를 보는 것이 매우 유익합니다.


어떤 이유로 든 어디서나이 작업을 수행하는 방법에 대한 일반적인 예를 찾을 수 없었습니다 Queue(파이썬의 doc 예제는 여러 프로세스를 생성하지 않습니다). 그래서 10 번 시도한 후에 작업 한 내용은 다음과 같습니다.

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue하위 프로세스의 리턴 값을 저장하는 데 사용할 수있는 블로킹 스레드 안전 큐입니다. 따라서 각 프로세스에 큐를 전달해야합니다. 여기에 덜 분명 뭔가는해야한다는 것입니다 get()전에 대기열에서 ES 또는 다른 큐가 가득하고 차단 다.joinProcess

객체 지향적 인 사람들을 위해 업데이트하십시오 (Python 3.4에서 테스트 됨).

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

Processusing 에서 가치를 얻는 방법을 찾고있는 다른 사람들을 위해 Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()

당신이 사용해야하는 것 같다 multiprocessing.Pool의 ) (지도 대신 클래스 등) (.apply .apply_async을 () 메서드를 사용

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult


exit내장을 사용하여 프로세스의 종료 코드를 설정할 수 있습니다 . exitcode프로세스 속성 에서 얻을 수 있습니다 .

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

산출:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

간단한 해결책 :

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

산출:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

조약돌 패키지는 좋은 추상화 레버리지가 multiprocessing.Pipe이 매우 간단합니다 :

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

예 : https://pythonhosted.org/Pebble/#concurrent-decorators


Python 3을 사용 concurrent.futures.ProcessPoolExecutor하는 경우 편리한 추상화로 사용할 수 있습니다 .

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

산출:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

함수에서 오류 코드를 가져와야했기 때문에 vartec의 답변을 약간 수정했습니다. (버텍 감사합니다! 대단한 트릭입니다)

This can also be done with a manager.list but I think is better to have it in a dict and store a list within it. That way, way we keep the function and the results since we can't be sure of the order in which the list will be populated.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j

참고URL : https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce

반응형