Подтвердить что ты не робот

Обработка одного файла из нескольких процессов в python

У меня есть один большой текстовый файл, в котором я хочу обрабатывать каждую строку (выполнять некоторые операции) и хранить их в базе данных. Поскольку одна простая программа занимает слишком много времени, я хочу, чтобы она выполнялась через несколько процессов или потоков. Каждый поток/процесс должен читать РАЗЛИЧНЫЕ данные (разные строки) из этого единственного файла и выполнять некоторые операции с их частью данных (строк) и помещать их в базу данных, чтобы в конце концов у меня было обработано все данные и база данных сбрасывается с данными, которые мне нужны.

Но я не могу понять, как это сделать.

4b9b3361

Ответ 1

Что вы ищете, это образец производителя/потребителя

Пример базовой нити

Вот базовый пример, используя модуль потоковой передачи (вместо многопроцессорности)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

Вы не будете делиться файловым объектом с потоками. Вы могли бы создать для них работу, предоставив queue с линией данных. Затем каждый поток будет поднимать его и обрабатывать, а затем возвращать в очередь.

В модуль многопроцессорный модуль добавлены дополнительные средства для обмена данными, такими как списки и специальный вид очереди. Существуют компромиссы для использования многопроцессорных процессов и потоков, и это зависит от того, связана ли ваша работа с привязкой процессора или привязана IO.

Базовый многопроцессорный пример. Пример использования

Вот действительно базовый пример многопроцессорного пула

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

Пул - это объект удобства, который управляет своими собственными процессами. Поскольку открытый файл может выполнять итерацию по его линиям, вы можете передать его на карту, которая будет перебирать его и доставлять строки функции рабочего. Map блокирует и возвращает весь результат при его завершении. Имейте в виду, что в слишком простом примере map будет потреблять ваш файл сразу, прежде чем выходить из работы. Поэтому имейте в виду, если он больше. Существуют более продвинутые способы разработки установки производителя/потребителя.

Ручной пул с перераспределением лимитов и строк

Это ручной пример Pool.map, но вместо того, чтобы потреблять целую итерабельность, вы можете установить размер очереди, чтобы вы только кормят его по частям так быстро, как он может обрабатывать. Я также добавил номера строк, чтобы вы могли отслеживать их и прибегать к ним, если хотите позже.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)

Ответ 2

Вот действительно глупый пример, который я приготовил:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

Трудная часть здесь заключается в том, чтобы мы разделили файл на символы новой строки, чтобы вы не пропустили ни одной строки (или только прочитали частичные строки). Затем каждый процесс считывает его часть файла и возвращает объект, который может быть помещен в базу данных по основному потоку. Конечно, вам может понадобиться сделать эту часть в кусках, чтобы вам не приходилось хранить всю информацию в памяти сразу. (это довольно легко выполнить - просто разделите список "args" на X-фрагменты и вызовите pool.map(wrapper,chunk) - см. здесь)

Ответ 3

хорошо разбить один большой файл на несколько меньших файлов и каждый из них обрабатывается в отдельных потоках.