multiprocessing.Process에 전달 된 함수의 반환 값을 어떻게 복구 할 수 있습니까?
아래 예제 코드에서 함수의 반환 값을 복구하고 싶습니다 worker
. 이 작업을 어떻게 수행 할 수 있습니까? 이 값은 어디에 저장됩니까?
예제 코드 :
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
산출:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
에 저장된 객체에서 관련 속성을 찾을 수 없습니다 jobs
.
공유 변수 를 사용 하여 통신하십시오. 예를 들면 다음과 같습니다.
import multiprocessing
def worker(procnum, return_dict):
'''worker function'''
print str(procnum) + ' represent!'
return_dict[procnum] = procnum
if __name__ == '__main__':
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print return_dict.values()
@sega_sai가 제안한 접근법이 더 낫다고 생각합니다. 그러나 실제로 코드 예제가 필요하므로 다음과 같이하십시오.
import multiprocessing
from os import getpid
def worker(procnum):
print 'I am number %d in process %d' % (procnum, getpid())
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print pool.map(worker, range(5))
반환 값을 인쇄합니다 :
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
map
(Python 2 내장)에 익숙하다면 너무 도전해서는 안됩니다. 그렇지 않으면 sega_Sai의 링크를 살펴보십시오 .
코드가 거의 필요하지 않습니다. 또한 프로세스를 재사용하는 방법에 유의하십시오.
이 예제는 multiprocessing.Pipe 인스턴스 목록을 사용 하여 임의의 수의 프로세스에서 문자열을 반환 하는 방법을 보여줍니다 .
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
산출:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
이 솔루션은 멀티 프로세싱 보다 적은 리소스 를 사용합니다.
- 파이프
- 적어도 하나의 자물쇠
- 버퍼
- 실
또는 멀티 프로세싱 을 사용 하는 SimpleQueue
- 파이프
- 적어도 하나의 자물쇠
이러한 각 유형의 소스를 보는 것이 매우 유익합니다.
어떤 이유로 든 어디서나이 작업을 수행하는 방법에 대한 일반적인 예를 찾을 수 없었습니다 Queue
(파이썬의 doc 예제는 여러 프로세스를 생성하지 않습니다). 그래서 10 번 시도한 후에 작업 한 내용은 다음과 같습니다.
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
하위 프로세스의 리턴 값을 저장하는 데 사용할 수있는 블로킹 스레드 안전 큐입니다. 따라서 각 프로세스에 큐를 전달해야합니다. 여기에 덜 분명 뭔가는해야한다는 것입니다 get()
전에 대기열에서 ES 또는 다른 큐가 가득하고 차단 다.join
Process
객체 지향적 인 사람들을 위해 업데이트하십시오 (Python 3.4에서 테스트 됨).
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
Process
using 에서 가치를 얻는 방법을 찾고있는 다른 사람들을 위해 Queue
:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
print queue.get() # Prints {"foo": True}
p.join()
당신이 사용해야하는 것 같다 multiprocessing.Pool의 ) (지도 대신 클래스 등) (.apply .apply_async을 () 메서드를 사용
http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult
exit
내장을 사용하여 프로세스의 종료 코드를 설정할 수 있습니다 . exitcode
프로세스 의 속성 에서 얻을 수 있습니다 .
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
산출:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
간단한 해결책 :
import multiprocessing
output=[]
data = range(0,10)
def f(x):
return x**2
def handler():
p = multiprocessing.Pool(64)
r=p.map(f, data)
return r
if __name__ == '__main__':
output.append(handler())
print(output[0])
산출:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
조약돌 패키지는 좋은 추상화 레버리지가 multiprocessing.Pipe
이 매우 간단합니다 :
from pebble import concurrent
@concurrent.process
def function(arg, kwarg=0):
return arg + kwarg
future = function(1, kwarg=1)
print(future.result())
예 : https://pythonhosted.org/Pebble/#concurrent-decorators
Python 3을 사용 concurrent.futures.ProcessPoolExecutor
하는 경우 편리한 추상화로 사용할 수 있습니다 .
from concurrent.futures import ProcessPoolExecutor
def worker(procnum):
'''worker function'''
print(str(procnum) + ' represent!')
return procnum
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(5))))
산출:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
함수에서 오류 코드를 가져와야했기 때문에 vartec의 답변을 약간 수정했습니다. (버텍 감사합니다! 대단한 트릭입니다)
This can also be done with a manager.list
but I think is better to have it in a dict and store a list within it. That way, way we keep the function and the results since we can't be sure of the order in which the list will be populated.
from multiprocessing import Process
import time
import datetime
import multiprocessing
def func1(fn, m_list):
print 'func1: starting'
time.sleep(1)
m_list[fn] = "this is the first function"
print 'func1: finishing'
# return "func1" # no need for return since Multiprocess doesnt return it =(
def func2(fn, m_list):
print 'func2: starting'
time.sleep(3)
m_list[fn] = "this is function 2"
print 'func2: finishing'
# return "func2"
def func3(fn, m_list):
print 'func3: starting'
time.sleep(9)
# if fail wont join the rest because it never populate the dict
# or do a try/except to get something in return.
raise ValueError("failed here")
# if we want to get the error in the manager dict we can catch the error
try:
raise ValueError("failed here")
m_list[fn] = "this is third"
except:
m_list[fn] = "this is third and it fail horrible"
# print 'func3: finishing'
# return "func3"
def runInParallel(*fns): # * is to accept any input in list
start_time = datetime.datetime.now()
proc = []
manager = multiprocessing.Manager()
m_list = manager.dict()
for fn in fns:
# print fn
# print dir(fn)
p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
p.start()
proc.append(p)
for p in proc:
p.join() # 5 is the time out
print datetime.datetime.now() - start_time
return m_list, proc
if __name__ == '__main__':
manager, proc = runInParallel(func1, func2, func3)
# print dir(proc[0])
# print proc[0]._name
# print proc[0].name
# print proc[0].exitcode
# here you can check what did fail
for i in proc:
print i.name, i.exitcode # name was set up in the Process line 53
# here will only show the function that worked and where able to populate the
# manager dict
for i, j in manager.items():
print dir(i) # things you can do to the function
print i, j
'programing tip' 카테고리의 다른 글
MySQL의 BLOB 열에 넣을 수있는 최대 데이터 길이는 얼마입니까? (0) | 2020.06.23 |
---|---|
jQuery로 DOM 요소를 파괴하는 방법? (0) | 2020.06.23 |
Visual Studio 2017에서 세로 점선 들여 쓰기 줄 제거 (0) | 2020.06.23 |
for 루프의 Python 루프 카운터 (0) | 2020.06.23 |
Git을 사용하면“LF는 CRLF로 대체됩니다”경고를 끄려면 어떻게합니까 (0) | 2020.06.23 |