Подтвердить что ты не робот

Создание массива NumPy для общих процессов

Я прочитал немало вопросов о SO о совместном использовании массивов, и это кажется простым для простых массивов, но я застреваю, пытаясь заставить его работать для массива, который у меня есть.

import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')

Я попытался преобразовать это в общий массив, пытаясь как-то сделать mp.Array принять data, я также попытался создать массив с использованием ctypes как такового:

import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)

Единственный способ, которым мне удалось заставить работать мой код, - это не передавать данные в функцию, а передавать закодированную строку, которая должна быть несжатой/декодированной, но это будет в конечном итоге в n (число строк) процессов, вызываемых, которые кажутся излишними, Моя желаемая реализация основана на нарезке списка двоичных строк на x (количество процессов) и передаче этого фрагмента, data и index для процессов, которые работают, за исключением того, что data изменяется локально, поэтому вопрос о как сделать его общедоступным, любой пример, работающий с настраиваемым (вложенным) массивом numpy, уже будет большой помощью.

PS: Этот вопрос является продолжением многопроцессорной обработки Python

4b9b3361

Ответ 1

Обратите внимание, что вы можете начать с массива сложного типа dtype:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

и рассматривать его как массив однородного типа dtype:

In [5]: data2 = data.view('float32')

а затем преобразуйте его обратно в сложный тип dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

Изменение dtype - очень быстрая операция; это не влияет на базовые данные, только то, как NumPy интерпретирует его. Таким образом, изменение типа dtype практически безрезультатно.

Итак, то, что вы читали о массивах с простыми (однородными) типами, может быть легко применено к вашему сложному dtype с трюком выше.


Приведенный ниже код заимствует многие идеи из J.F. Ответ Себастьяна здесь.

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64


def decode(arg):
    chunk, counter = arg
    print len(chunk), counter
    for x in chunk:
        peak_counter = 0
        data_buff = base64.b64decode(x)
        buff_size = len(data_buff) / 4
        unpack_format = ">%dL" % buff_size
        index = 0
        for y in struct.unpack(unpack_format, data_buff):
            buff1 = struct.pack("I", y)
            buff2 = struct.unpack("f", buff1)[0]
            with shared_arr.get_lock():
                data = tonumpyarray(shared_arr).view(
                    [('f0', '<f4'), ('f1', '<f4', (250000, 2))])
                if (index % 2 == 0):
                    data[counter][1][peak_counter][0] = float(buff2)
                else:
                    data[counter][1][peak_counter][1] = float(buff2)
                    peak_counter += 1
            index += 1
        counter += 1


def pool_init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())


def numpy_array(shared_arr, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors = mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size = int(len(peaks) / processors)
        map_parameters = []
        for i in range(processors):
            counter = i * chunk_size
            # WARNING: I removed -1 from (i + 1)*chunk_size, since the right
            # index is non-inclusive. 
            chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode, map_parameters)

if __name__ == '__main__':
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
    peaks = ...
    numpy_array(shared_arr, peaks)

Если вы можете гарантировать, что различные процессы, выполняющие назначения

if (index % 2 == 0):
    data[counter][1][peak_counter][0] = float(buff2)
else:
    data[counter][1][peak_counter][1] = float(buff2)

никогда не конкурируют, чтобы изменять данные в тех же местах, тогда я считаю, что вы действительно можете отказаться от блокировки

with shared_arr.get_lock():

но я недостаточно понимаю ваш код, чтобы точно знать, поэтому, чтобы быть в безопасности, я включил замок.

Ответ 2

from multiprocessing import Process, Array
import numpy as np
import time
import ctypes

def fun(a):
    a[0] = -a[0]
    while 1:
        time.sleep(2)
        #print bytearray(a.get_obj())
        c=np.frombuffer(a.get_obj(),dtype=np.float32)
        c.shape=3,3
        print 'haha',c


def main():
    a = np.random.rand(3,3).astype(np.float32)
    a.shape=1*a.size
    #a=np.array([[1,3,4],[4,5,6]])
    #b=bytearray(a)
    h=Array(ctypes.c_float,a)
    print "Originally,",h

    # Create, start, and finish the child process
    p = Process(target=fun, args=(h,))
    p.start()
    #p.join()
    a.shape=3,3
    # Print out the changed values
    print 'first',a
    time.sleep(3)
    #h[0]=h[0]+1
    print 'main',np.frombuffer(h.get_obj(), dtype=np.float32)



if __name__=="__main__":
    main()