Подтвердить что ты не робот

NumPy против многопроцессорности и mmap

Я использую модуль Python multiprocessing для параллельной обработки больших массивов numpy. Массивы преобразуются в память с использованием numpy.load(mmap_mode='r') в основном процессе. После этого multiprocessing.Pool() вызывает процесс (я полагаю).

Кажется, все работает нормально, за исключением того, что я получаю такие строки, как:

AttributeError (объект 'NoneType' не имеет атрибута 'tell' ",) в <bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)> игнорируется

в журналах unittest. Тем не менее тесты проходят отлично.

Любая идея, что там происходит?

Использование Python 2.7.2, OS X, NumPy 1.6.1.


UPDATE:

После некоторой отладки я искал причину пути кода, который использовал (небольшой фрагмент) этот массив с памятью с памятью в качестве входа в вызов Pool.imap.

По-видимому, "проблема" связана с тем, как multiprocessing.Pool.imap передает свой вход в новые процессы: он использует pickle. Это не работает с mmap ed numpy массивами и чем-то внутри разрывов, что приводит к ошибке.

Я нашел этот ответ Робертом Керном, который, похоже, решает ту же проблему. Он предлагает создать специальный путь кода, когда вход imap поступает из массива с отображением памяти: память-сопоставление одного массива вручную в порожденном процессе.

Это было бы настолько сложно и уродливо, что я предпочел бы жить с ошибкой и дополнительными копиями памяти. Есть ли другой способ, который будет легче при изменении существующего кода?

4b9b3361

Ответ 1

Мой обычный подход (если вы можете жить с дополнительными копиями памяти) - это делать все IO в одном процессе, а затем отправлять вещи в пул рабочих потоков. Чтобы загрузить фрагмент memmapped массива в память, просто x = np.array(data[yourslice]) (data[yourslice].copy() на самом деле не делает этого, что может привести к некоторой путанице.).

Прежде всего, позвольте генерировать некоторые тестовые данные:

import numpy as np
np.random.random(10000).tofile('data.dat')

Вы можете воспроизвести свои ошибки примерно так:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield data[start:stop]

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

И если вы просто переключитесь на получение np.array(data[start:stop]), вы устраните проблему:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield np.array(data[start:stop])

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

Конечно, это делает дополнительную копию в памяти каждого фрагмента.

В конечном итоге вы, вероятно, обнаружите, что проще отказаться от memmapped файлов и перейти к чему-то вроде HDF. Это особенно актуально, если ваши данные многомерны. (Я рекомендовал бы h5py, но pyTables приятно, если ваши данные "похожи на таблицу".)

Удачи, во всяком случае!