파일에 안전하게 쓰는 Python 다중 처리
나는 많은 하위 문제를 포함하는 큰 수치 문제를 해결하려고 노력하고 있으며 Python의 다중 처리 모듈 (특히 Pool.map)을 사용하여 서로 다른 독립 하위 문제를 서로 다른 코어로 분할하고 있습니다. 각 하위 문제는 많은 하위 하위 문제를 계산하는 것과 관련이 있으며, 아직 프로세스에 의해 계산되지 않은 경우 파일에 저장하여 이러한 결과를 효과적으로 메모하고 그렇지 않으면 계산을 건너 뛰고 파일에서 결과를 읽습니다.
파일에 동시성 문제가 있습니다. 다른 프로세스는 때때로 하위 하위 문제가 아직 계산되었는지 확인하고 (결과가 저장 될 파일을 찾아서) 계산되지 않았는지 확인하고, 계산을 실행합니다. 그런 다음 결과를 같은 파일에 동시에 쓰십시오. 이와 같은 충돌을 작성하지 않으려면 어떻게해야합니까?
@ GP89가 좋은 해결책을 언급했습니다. 큐를 사용하여 파일에 대한 쓰기 권한 만있는 전용 프로세스에 쓰기 작업을 보냅니다. 다른 모든 작업자는 읽기 전용 액세스 권한이 있습니다. 이것은 충돌을 제거합니다. 다음은 apply_async를 사용하는 예이지만 맵에서도 작동합니다.
import multiprocessing as mp
import time
fn = 'c:/temp/temp.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in range(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
It looks to me that you need to use Manager
to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap
to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap
, which includes the managed list.
from multiprocessing import Manager
from multiprocessing import Pool
import pandas as pd
def worker(row, param):
# do something here and then append it to row
x = param**2
row.append(x)
if __name__ == '__main__':
pool_parameter = [] # list of objects to process
with Manager() as mgr:
row = mgr.list([])
# build list of parameters to send to starmap
for param in pool_parameter:
params.append([row,param])
with Pool() as p:
p.starmap(worker, params)
From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.
df = pd.concat(row, ignore_index=True)
df.to_pickle('data.pickle')
df.to_csv('data.csv')
참고URL : https://stackoverflow.com/questions/13446445/python-multiprocessing-safely-writing-to-a-file
'programing tip' 카테고리의 다른 글
정규식 오류-반복 할 사항 없음 (0) | 2020.12.13 |
---|---|
컴퓨터가 절전 모드가되면 setTimeout은 어떻게됩니까? (0) | 2020.12.13 |
SCRIPT438 : 개체가 속성 또는 메서드 IE를 지원하지 않습니다. (0) | 2020.12.12 |
Cygwin 32 비트에서 Cygwin 64 비트로 전환하는 것이 좋습니까? (0) | 2020.12.12 |
CSS 전달 최적화 : CSS 로딩을 연기하는 방법? (0) | 2020.12.12 |