Подтвердить что ты не робот

Производительность numpy.searchsorted плоха на структурированных массивах

Извините заранее, если я злоупотребляю любыми условиями, не стесняйтесь исправить это.

У меня есть отсортированный массив с dtype '<f16, |S30'. Когда я использую searchsorted в своем первом поле, он работает очень медленно (около 0,4 секунды для 3 миллионов элементов). Это намного длиннее, чем bisect принимает, чтобы сделать то же самое в простом списке кортежей Python.

%timeit a['f0'].searchsorted(400.)
1 loops, best of 3: 398 ms per loop

Однако, если я скопирую float-часть в другую, отдельный массив, поиск будет быстрее, чем bisect:

b = a['f0'].copy()

%timeit b.searchsorted(400.)
1000000 loops, best of 3: 945 ns per loop

Мои вопросы:

  • Я делаю что-то неправильно или это регрессия в NumPy?
  • Есть ли способ обойти это без дублирования данных?
4b9b3361

Ответ 1

Я помню это некоторое время назад. Если я правильно помню, я думаю, что searchsorted делает временную копию данных, когда данные не смежны. Если у меня будет время позже, я взгляну на код, чтобы подтвердить, что происходит (или, может быть, кто-то, знакомый с кодом, может подтвердить это).

В то же время, если вы не хотите реструктурировать свой код, чтобы избежать использования структурированного массива, лучше всего использовать bisect_left(a['f0'], 400.). На моей машине он в 8 раз медленнее, чем поиск в массиве смежных объектов, но на 1000 раз быстрее, чем поиск в несмежном массиве.

In [5]: a = np.arange((6e6)).view([('f0', float), ('f1', float)])

In [6]: timeit a['f0'].searchsorted(400.)
10 loops, best of 3: 51.1 ms per loop

In [7]: timeit a['f0'].copy()
10 loops, best of 3: 51 ms per loop

In [8]: timeit bisect_left(a['f0'], 400.)
10000 loops, best of 3: 52.8 us per loop

In [9]: f0 = a['f0'].copy()

In [10]: timeit f0.searchsorted(400.)
100000 loops, best of 3: 7.85 us per loop

Ответ 2

Вот какой код, чтобы проиллюстрировать размер проблемы (начиная с 11 мая 2015 года) и как ее исправить.

import numpy as np
import bisect
import timeit
from random import randint

dtype = np.dtype([ ('pos','<I'),('sig','<H') ])             # my data is unsigned 32bit, and unsigned 16bit
data1 = np.fromfile('./all2/840d.0a9b45e8c5344abf6ac761017e93b5bb.2.1bp.binary', dtype)

dtype2 = np.dtype([('pos',np.uint32),('sig',np.uint32)])    # convert data to both unsigned 32bit
data2 = data1.astype(dtype2)

data3 = data2.view(('uint32', len(data2.dtype.names)))    # convert above to a normal array (not structured array)

print data1.dtype.descr # [('pos', '<u4'), ('sig', '<u2')]
print data2.dtype.descr # [('pos', '<u4'), ('sig', '<u4')]
print data3.dtype.descr # [('', '<u4')]

print data1.nbytes  # 50344494
print data2.nbytes  # 67125992
print data3.nbytes  # 67125992

print data1['pos'].max() # 2099257376
print data2['pos'].max() # 2099257376
print data3[:,0].max()   # 2099257376

def b1():   return bisect.bisect_left(data1['pos'],           randint(100000000,200000000))
def b2():   return bisect.bisect_left(data2['pos'],           randint(100000000,200000000))
def b3():   return bisect.bisect_left(data3[:,0],             randint(100000000,200000000))
def ss1():  return np.searchsorted(data1['pos'],              randint(100000000,200000000))
def ss2():  return np.searchsorted(data2['pos'],              randint(100000000,200000000))
def ss3():  return np.searchsorted(data3[:,0],                randint(100000000,200000000))

def ricob1():   return bisect.bisect_left(data1['pos'], np.uint32(randint(100000000,200000000)))
def ricob2():   return bisect.bisect_left(data2['pos'], np.uint32(randint(100000000,200000000)))
def ricob3():   return bisect.bisect_left(data3[:,0],   np.uint32(randint(100000000,200000000)))
def ricoss1():  return np.searchsorted(data1['pos'],    np.uint32(randint(100000000,200000000)))
def ricoss2():  return np.searchsorted(data2['pos'],    np.uint32(randint(100000000,200000000)))
def ricoss3():  return np.searchsorted(data3[:,0],      np.uint32(randint(100000000,200000000)))

print timeit.timeit(b1,number=300)  # 0.0085117816925
print timeit.timeit(b2,number=300)  # 0.00826191902161
print timeit.timeit(b3,number=300)  # 0.00828003883362
print timeit.timeit(ss1,number=300) # 6.57477498055
print timeit.timeit(ss2,number=300) # 5.95308589935
print timeit.timeit(ss3,number=300) # 5.92483091354

print timeit.timeit(ricob1,number=300)  # 0.00120902061462
print timeit.timeit(ricob2,number=300)  # 0.00120401382446
print timeit.timeit(ricob3,number=300)  # 0.00120711326599
print timeit.timeit(ricoss1,number=300) # 4.39265394211
print timeit.timeit(ricoss2,number=300) # 0.00116586685181
print timeit.timeit(ricoss3,number=300) # 0.00108909606934

Update! Таким образом, благодаря комментариям Rico, кажется, что установка типа для числа, которое вы хотите найти, сортировки /bisect действительно импортируется! Тем не менее, в структурированном массиве с 32-битным и 16-битным ints он все еще медленный (хотя нет, где почти так же медленно, как раньше)