programing tip

파일에 안전하게 쓰는 Python 다중 처리

itbloger 2020. 12. 12. 10:13
반응형

파일에 안전하게 쓰는 Python 다중 처리


나는 많은 하위 문제를 포함하는 큰 수치 문제를 해결하려고 노력하고 있으며 Python의 다중 처리 모듈 (특히 Pool.map)을 사용하여 서로 다른 독립 하위 문제를 서로 다른 코어로 분할하고 있습니다. 각 하위 문제는 많은 하위 하위 문제를 계산하는 것과 관련이 있으며, 아직 프로세스에 의해 계산되지 않은 경우 파일에 저장하여 이러한 결과를 효과적으로 메모하고 그렇지 않으면 계산을 건너 뛰고 파일에서 결과를 읽습니다.

파일에 동시성 문제가 있습니다. 다른 프로세스는 때때로 하위 하위 문제가 아직 계산되었는지 확인하고 (결과가 저장 될 파일을 찾아서) 계산되지 않았는지 확인하고, 계산을 실행합니다. 그런 다음 결과를 같은 파일에 동시에 쓰십시오. 이와 같은 충돌을 작성하지 않으려면 어떻게해야합니까?


@ GP89가 좋은 해결책을 언급했습니다. 큐를 사용하여 파일에 대한 쓰기 권한 만있는 전용 프로세스에 쓰기 작업을 보냅니다. 다른 모든 작업자는 읽기 전용 액세스 권한이 있습니다. 이것은 충돌을 제거합니다. 다음은 apply_async를 사용하는 예이지만 맵에서도 작동합니다.

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()

It looks to me that you need to use Manager to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap, which includes the managed list.

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')

참고URL : https://stackoverflow.com/questions/13446445/python-multiprocessing-safely-writing-to-a-file

반응형