Подтвердить что ты не робот

Совместное использование непрерывных массивов numpy между процессами в python

В то время как я нашел множество ответов на вопросы, похожие на мои, я не верю, что это прямо адресовано здесь, - и у меня есть несколько дополнительных вопросов. Мотивация совместного использования непрерывных массивов numpy заключается в следующем:

  • Я использую сверточную нейронную сеть, запущенную на Caffe, для выполнения регрессии на изображениях на серии меток непрерывной ценности.
  • Для изображений требуется специальная предварительная обработка и расширение данных.
  • Ограничения (1) непрерывного характера меток (они являются поплавками) и (2) увеличение данных означает, что я предварительно обрабатываю данные в python, а затем обслуживаю его как непрерывные массивы numpy, используя -memory в слое Caffe.
  • Загрузка обучающих данных в память происходит сравнительно медленно. Я хотел бы распараллелить его так, чтобы:

(1) Питон, который я пишу, создает класс "обработчик данных", который создает два непрерывных массива numpy. (2) Рабочий процесс чередуется между этими массивами numpy, загружает данные с диска, выполняет предварительную обработку и вставляет данные в массив numpy. (3) Между тем, python Caffe wrappers отправляют данные из другого массива на GPU для запуска через сеть.

У меня есть несколько вопросов:

  • Можно ли выделить память в непрерывном массиве numpy, а затем обернуть его в объект общей памяти (я не уверен, что здесь "правильный" термин), используя что-то вроде класса Array из многопроцессорности python?

  • У массивов Numpy есть атрибут .ctypes, я полагаю, что это полезно для создания массивов разделяемой памяти из массива Array(), но не может точно определить, как их использовать.

  • Если совместно используемая память создается без массива numpy, она остается непрерывной? Если нет, есть ли способ обеспечить его непрерывность?

Можно ли сделать что-то вроде:

import numpy as np
from multiprocessing import Array
contArr = np.ascontiguousarray(np.zeros((n_images, n_channels, img_height, img_width)), dtype=np.float32)
sm_contArr = Array(contArr.ctypes.?, contArr?)

Затем создайте экземпляр рабочего с помощью

p.append(Process(target=some_worker_function, args=(data_to_load, sm_contArr)))
p.start()

Спасибо!

Изменить: я знаю, что есть несколько библиотек, которые имеют аналогичные функции в разных состояниях обслуживания. Я предпочел бы ограничивать это чистым питоном и numpy, но если это не возможно, я, конечно, захочу его использовать.

4b9b3361

Ответ 1

Wrap numpy ndarray вокруг многопроцессорной обработки RawArray()

Существует несколько способов совместного использования массивов numpy в памяти через процессы. Давайте посмотрим, как вы можете это сделать, используя модуль многопроцессорности.

Первое важное замечание состоит в том, что numpy предоставляет функцию np.frombuffer() для переноса интерфейса ndarray вокруг существующего объекта, который поддерживает протокол буфера (например, bytes(), bytearray(), array() и т.д.). Это создает массивы только для чтения из объектов только для чтения и записываемых массивов из записываемых объектов.

Мы можем объединить это с разделяемой памятью RawArray(), что обеспечивает многопроцессорность. Обратите внимание, что array() не работает для этой цели, поскольку это прокси-объект с блокировкой и напрямую не отображает интерфейс буфера. Конечно, это означает, что мы должны обеспечить правильную синхронизацию наших нумерованных RawArrays.

Существует одна сложная проблема, связанная с RawArrays, обернутыми ndarray: когда многопроцессорность отправляет такой массив между процессами - и действительно, он должен будет отправить наши массивы, как только они созданы, для обоих рабочих - он рассохнет, а затем раскроет их. К сожалению, это приводит к созданию копий ndarrays вместо совместного использования их в памяти.

Решение, хотя и немного уродливое, должно сохранять RawArrays как, пока они не будут переданы рабочим, а только обернуть их в ndarrays после запуска каждого рабочего процесса > .

Кроме того, было бы предпочтительнее передавать массивы, будь то простой RawArray или обернутый ndarray, непосредственно через multiprocessing.Queue, но это тоже не работает. RawArray не может быть помещен в такую ​​очередь, и обернутый ndarray был бы маринован и рассыпан, поэтому фактически копируется.

Обходной путь - отправить список всех предварительно выделенных массивов рабочим процессам и сообщить индексы в этот список по очереди. Это очень похоже на прохождение маркеров (индексов), и тот, кто имеет токен, может работать с соответствующим массивом.

Структура основной программы может выглядеть так:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import queue

from multiprocessing import freeze_support, set_start_method
from multiprocessing import Event, Process, Queue
from multiprocessing.sharedctypes import RawArray


def create_shared_arrays(size, dtype=np.int32, num=2):
    dtype = np.dtype(dtype)
    if dtype.isbuiltin and dtype.char in 'bBhHiIlLfd':
        typecode = dtype.char
    else:
        typecode, size = 'B', size * dtype.itemsize

    return [RawArray(typecode, size) for _ in range(num)]


def main():
    my_dtype = np.float32

    # 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage
    arrays = create_shared_arrays(125000000, dtype=my_dtype)
    q_free = Queue()
    q_used = Queue()
    bail = Event()

    for arr_id in range(len(arrays)):
        q_free.put(arr_id)  # pre-fill free queue with allocated array indices

    pr1 = MyDataLoader(arrays, q_free, q_used, bail,
                       dtype=my_dtype, step=1024)
    pr2 = MyDataProcessor(arrays, q_free, q_used, bail,
                          dtype=my_dtype, step=1024)

    pr1.start()
    pr2.start()

    pr2.join()
    print("\n{} joined.".format(pr2.name))

    pr1.join()
    print("{} joined.".format(pr1.name))


if __name__ == '__main__':
    freeze_support()

    # On Windows, only "spawn" is available.
    # Also, this tests proper sharing of the arrays without "cheating".
    set_start_method('spawn')
    main()

Это готовит список из двух массивов, две очереди - "свободную" очередь, в которой MyDataProcessor помещает индексы массивов, с которыми она выполняется, и из нее извлекает MyDataLoader, а также "использованную" очередь, в которой MyDataLoader помещает индексы легко заполненных массивов и MyDataProcessor извлекает их из - и multiprocessing.Event, чтобы начать согласованное спасение всех работников. На данный момент мы можем покончить с последним, поскольку у нас есть только один производитель и один потребитель массивов, но не больно быть готовым к большему количеству работников.

Затем мы предварительно заполняем "пустую" очередь всеми индексами наших RawArrays в списке и создаем экземпляр одного из каждого типа работников, передавая им необходимые объекты связи. Мы запускаем оба из них и просто ждем их до join().

Вот как выглядит MyDataProcessor, который потребляет индексы массива из "использованной" очереди и отправляет данные в какой-либо внешний черный ящик (debugio.output в примере):

class MyDataProcessor(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
        super().__init__()
        self.arrays = arrays
        self.q_free = q_free
        self.q_used = q_used
        self.bail = bail
        self.dtype = dtype
        self.step = step

    def run(self):
        # wrap RawArrays inside ndarrays
        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

        from debugio import output as writer

        while True:
            arr_id = self.q_used.get()
            if arr_id is None:
                break

            arr = arrays[arr_id]

            print('(', end='', flush=True)          # just visualizing activity
            for j in range(0, len(arr), self.step):
                writer.write(str(arr[j]) + '\n')
            print(')', end='', flush=True)          # just visualizing activity

            self.q_free.put(arr_id)

            writer.flush()

        self.bail.set()                     # tell loaders to bail out ASAP
        self.q_free.put(None, timeout=1)    # wake up loader blocking on get()

        try:
            while True:
                self.q_used.get_nowait()    # wake up loader blocking on put()
        except queue.Empty:
            pass

Первое, что он делает, - это обернуть полученные RawArrays в ndarrays с помощью "np.frombuffer()" и сохранить новый список, поэтому они могут использоваться как массивы numpy во время выполнения процесса, и не нужно их обертывать и снова.

Обратите внимание, что MyDataProcessor только записывает событие self.bail, он никогда не проверяет его. Вместо этого, если нужно сказать, что нужно выйти, он найдет знак None в очереди, а не индекс массива. Это делается для того, когда MyDataLoader не имеет больше доступных данных и запускает процедуру слежения, MyDataProcessor все равно может обрабатывать все допустимые массивы, находящиеся в очереди, без преждевременного выхода.

Вот как выглядит MyDataLoader:

class MyDataLoader(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
        super().__init__()
        self.arrays = arrays
        self.q_free = q_free
        self.q_used = q_used
        self.bail = bail
        self.dtype = dtype
        self.step = step

    def run(self):
        # wrap RawArrays inside ndarrays
        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

        from debugio import input as reader

        for _ in range(10):  # for testing we end after a set amount of passes
            if self.bail.is_set():
                # we were asked to bail out while waiting on put()
                return

            arr_id = self.q_free.get()
            if arr_id is None:
                # we were asked to bail out while waiting on get()
                self.q_free.put(None, timeout=1)  # put it back for next loader
                return

            if self.bail.is_set():
                # we were asked to bail out while we got a normal array
                return

            arr = arrays[arr_id]

            eof = False
            print('<', end='', flush=True)          # just visualizing activity
            for j in range(0, len(arr), self.step):
                line = reader.readline()
                if not line:
                    eof = True
                    break

                arr[j] = np.fromstring(line, dtype=self.dtype, sep='\n')

            if eof:
                print('EOF>', end='', flush=True)   # just visualizing activity
                break

            print('>', end='', flush=True)          # just visualizing activity

            if self.bail.is_set():
                # we were asked to bail out while we filled the array
                return

            self.q_used.put(arr_id)     # tell processor an array is filled

        if not self.bail.is_set():
            self.bail.set()             # tell other loaders to bail out ASAP
            # mark end of data for processor as we are the first to bail out
            self.q_used.put(None)

Он очень похож по структуре на другого работника. Причина, по которой она немного раздута, заключается в том, что она проверяет событие self.bail во многих точках, чтобы уменьшить вероятность застревания. (Он не является полностью надежным, так как существует небольшая вероятность того, что событие может быть установлено между проверкой и доступом к очереди. Если это проблема, нужно использовать некоторый примитив примитива синхронизации, который обеспечивает арбитражный доступ как к объединению Событий, так и к очереди.)

Он также обматывает полученные RawArrays в ndarrays в самом начале и считывает данные из внешнего черного ящика (debugio.input в примере).

Обратите внимание, что, играя с аргументами step= для обоих работников в функции main(), мы можем изменить соотношение количества чтения и записи (строго для целей тестирования) в производственной среде step= будет 1, чтение и запись всех членов массива numpy).

Увеличение обоих значений приводит к тому, что работники получают доступ только к нескольким значениям в массивах numpy, тем самым значительно ускоряя все, что показывает, что производительность не ограничена связью между рабочими процессами. Если бы мы помещали массивы numpy непосредственно в Очереди, копируя их вперед и назад между процессами целиком, увеличение размера шага не привело бы к значительному повышению производительности - оно оставалось бы медленным.

Для справки, вот модуль debugio, который я использовал для тестирования:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from ast import literal_eval
from io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapper


class DebugInput(RawIOBase):
    def __init__(self, end=None):
        if end is not None and end < 0:
            raise ValueError("end must be non-negative")

        super().__init__()
        self.pos = 0
        self.end = end

    def readable(self):
        return True

    def read(self, size=-1):
        if self.end is None:
            if size < 0:
                raise NotImplementedError("size must be non-negative")
            end = self.pos + size
        elif size < 0:
            end = self.end
        else:
            end = min(self.pos + size, self.end)

        lines = []
        while self.pos < end:
            offset = self.pos % 400
            pos = self.pos - offset
            if offset < 18:
                i = (offset + 2) // 2
                pos += i * 2 - 2
            elif offset < 288:
                i = (offset + 12) // 3
                pos += i * 3 - 12
            else:
                i = (offset + 112) // 4
                pos += i * 4 - 112

            line = str(i).encode('ascii') + b'\n'
            line = line[self.pos - pos:end - pos]
            self.pos += len(line)
            size -= len(line)
            lines.append(line)

        return b''.join(lines)

    def readinto(self, b):
        data = self.read(len(b))
        b[:len(data)] = data
        return len(data)

    def seekable(self):
        return True

    def seek(self, offset, whence=0):
        if whence == 0:
            pos = offset
        elif whence == 1:
            pos = self.pos + offset
        elif whence == 2:
            if self.end is None:
                raise ValueError("cannot seek to end of infinite stream")
            pos = self.end + offset
        else:
            raise NotImplementedError("unknown whence value")

        self.pos = max((pos if self.end is None else min(pos, self.end)), 0)
        return self.pos


class DebugOutput(RawIOBase):
    def __init__(self):
        super().__init__()
        self.buf = b''
        self.num = 1

    def writable(self):
        return True

    def write(self, b):
        *lines, self.buf = (self.buf + b).split(b'\n')

        for line in lines:
            value = literal_eval(line.decode('ascii'))
            if value != int(value) or int(value) & 255 != self.num:
                raise ValueError("expected {}, got {}".format(self.num, value))

            self.num = self.num % 127 + 1

        return len(b)


input = TextIOWrapper(BufferedReader(DebugInput()), encoding='ascii')
output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding='ascii')