Примечание. Я хотел бы больше узнать о том, как подойти и придумать такие решения, а не само решение.
У меня есть очень критически важная функция в моей системе, которая появляется в качестве профилирующей точки доступа номер один в определенных контекстах. Он находится в середине итерации k-означает (уже многопоточность с использованием параллели для обработки поддиапазонов точек в каждом рабочем потоке).
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
if (dist < pt.min_dist) // <-- #1 hotspot
{
pt.min_dist = dist;
pt.min_index = i;
}
}
Любая экономия времени, затрачиваемого на обработку этого раздела кода, существенно возрастает, поэтому я часто занимаюсь этим много. Возможно, стоит поставить петлю центроида снаружи, например, и повторить параллельные точки для заданного центра тяжести. Количество кластерных точек здесь составляет миллионы, а число центроидов - в тысячах. Алгоритм применяется для нескольких итераций (часто до 10). Он не стремится к идеальной конвергенции/стабильности, а лишь к некоторому "разумному" приближению.
Любые идеи приветствуются, но то, что я действительно хочу обнаружить, - это сделать этот код безветристым, поскольку это позволит использовать SIMD-версию. Я не разработал такого рода умственные способности, чтобы легко понять, как придумать веткистые решения: мой мозг терпит неудачу там, как это было, когда я впервые был подвергнут рекурсии в первые дни, поэтому руководство по написанию нераспространения код и способы разработки соответствующего мышления для него также будут полезны.
Короче говоря, я ищу любые руководства и подсказки и предложения (не обязательно решения) о том, как оптимизировать этот код для микро-оптимизации. У него, скорее всего, есть место для алгоритмических улучшений, но мой блайнд всегда был в решениях по микрооптимизации (и мне любопытно узнать, как применять их более эффективно, не выходя за борт с ним). Он уже плотно многопоточен с короткой параллелью для логики, поэтому я довольно сильно подтолкнул в угол микро-оптимизации как одну из более быстрых задач, чтобы попробовать без более разумного алгоритма. Мы полностью можем изменить расположение памяти.
В ответ на алгоритмические предложения
О том, как смотреть на все это не так, пытаясь оптимизировать алгоритм O (knm), который можно явно улучшить на уровне алгоритма, я полностью согласен. Это подталкивает этот конкретный вопрос в несколько академическую и непрактичную сферу. Однако, если мне удастся дать анекдот, я исхожу из первоначального фона программирования высокого уровня - большой акцент на широкой, крупномасштабной точке зрения, безопасности и очень мало на деталях реализации на низком уровне. Недавно я переключил проекты на совершенно другой вид современного вкуса, и я изучаю всевозможные новые трюки от своих сверстников кэш-памяти, GPGPU, нераспространенных технологий, SIMD, специализированных распределителей памяти, которые фактически превосходят malloc ( но для конкретных сценариев) и т.д.
Здесь я пытаюсь догнать последние тенденции в производительности, и я неожиданно обнаружил, что те старые структуры данных, которые я часто предпочитал в 90-е годы, которые часто были связаны/структуры дерева, на самом деле значительно превосходят гораздо более наивный, грубый, микро-оптимизированный, параллельный код, применяющий настроенные инструкции по смежным блокам памяти. Это несколько разочаровывает в то же время, поскольку я чувствую, что теперь мы подгоняем алгоритмы больше для машины и сужаем возможности таким образом (особенно с GPGPU).
Самое забавное, что я считаю, что этот тип микро-оптимизированного, быстрого кода обработки массива намного проще в обслуживании, чем сложные алгоритмы и структуры данных, которые я использовал раньше. Для начала их проще обобщить. Кроме того, мои сверстники часто могут жаловаться на жалобы относительно определенного замедления в области, просто пощекотать параллель для и, возможно, некоторых SIMD и назвать это с приличной скоростью. Алгоритмические улучшения часто могут предлагать значительно больше, но скорость и неинтрузивность, при которых могут применяться эти микрооптимизации, мне хотелось бы узнать больше в этой области, поскольку чтение статей по лучшим алгоритмам может занять некоторое время (а также потребовать больше обширные изменения). Таким образом, я в последнее время прыгал на эту группу микро-оптимизации немного позже и, возможно, слишком много в этом конкретном случае, но мое любопытство больше связано с расширением диапазона возможных решений для любого сценария.
Демонтажные
Примечание. На самом деле, я очень плохо разбираюсь в сборке, поэтому я часто настраивал вещи больше в пробной и ошибочной форме, придумывая несколько образованных догадок о том, почему горячая точка, показанная в vtune, может быть узким местом, а затем попробовать вещи чтобы увидеть, улучшатся ли времена, предполагая, что у догадок есть какой-то намек на правду, если время действительно улучшилось или полностью пропустило отметку, если они этого не делают.
000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h)
{
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7 cmp ecx,r10d
000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h)
000007FEEE3FB8AC lea rax,[rbx+rbx*2]
000007FEEE3FB8B0 add rax,rax
000007FEEE3FB8B3 lea r8,[rbp+rax*8+8]
{
const ClusterCentroid& cent = centroids[i];
const float x = pt.pos[0] - cent.pos[0];
const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8 movss xmm0,dword ptr [rdx]
const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4]
000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4]
000007FEEE3FB8C6 subss xmm2,dword ptr [r8]
000007FEEE3FB8CB subss xmm0,dword ptr [r8-4]
000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8]
const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7 mulss xmm2,xmm2
000007FEEE3FB8DB mulss xmm0,xmm0
000007FEEE3FB8DF mulss xmm1,xmm1
000007FEEE3FB8E3 addss xmm2,xmm0
000007FEEE3FB8E7 addss xmm2,xmm1
if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB comiss xmm2,dword ptr [rdx-8]
000007FEEE3FB8EF jae thread_partition+1E9h (7FEEE3FB8F9h)
{
pt.min_dist = dist;
000007FEEE3FB8F1 movss dword ptr [rdx-8],xmm2
pt.min_index = i;
000007FEEE3FB8F6 mov dword ptr [rdx-10h],ecx
000007FEEE3FB8F9 inc ecx
000007FEEE3FB8FB add r8,30h
000007FEEE3FB8FF cmp ecx,r10d
000007FEEE3FB902 jl thread_partition+1A8h (7FEEE3FB8B8h)
for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904 inc edi
000007FEEE3FB906 add rdx,20h
000007FEEE3FB90A cmp edi,dword ptr [rsi+4]
000007FEEE3FB90D jl thread_partition+31h (7FEEE3FB741h)
000007FEEE3FB913 mov rbx,qword ptr [irange]
}
}
}
}
Мы вынуждены ориентироваться на SSE 2 - немного позади в наше время, но пользовательская база фактически сработала один раз, когда мы предположили, что даже SSE 4 был в порядке как минимальное требование (у пользователя была прототипная машина Intel).
Обновление с автономным тестом: ~ 5.6 с
Я очень благодарен за любую помощь! Поскольку кодовая база довольно обширна, и условия для запуска этого кода сложны (системные события запускаются по нескольким потокам), это немного громоздко, чтобы делать экспериментальные изменения и анализировать их каждый раз. Поэтому я установил поверхностный тест на стороне как отдельное приложение, которое другие могут запускать и опробовать, чтобы я мог экспериментировать со всеми этими любезно предложенными решениями.
#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
#define ALIGN16 __declspec(align(16))
#else
#include <malloc.h>
#define ALIGN16 __attribute__((aligned(16)))
#endif
using namespace std;
// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
#ifdef _MSC_VER
return _aligned_malloc(amount, 16);
#else
void* mem = 0;
posix_memalign(&mem, 16, amount);
return mem;
#endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}
// Aligned free.
static void free16(void* mem)
{
#ifdef _MSC_VER
return _aligned_free(mem);
#else
free(mem);
#endif
}
// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;
class Points
{
public:
Points(): data(malloc16_t<Point>(num_points))
{
for (int p=0; p < num_points; ++p)
{
const float xyz[3] =
{
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX
};
init(p, xyz);
}
}
~Points()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].centroid = -1;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void associate(int n, int new_centroid)
{
data[n].centroid = new_centroid;
}
int centroid(int n) const
{
return data[n].centroid;
}
float* operator[](int n)
{
return data[n].xyz;
}
private:
Points(const Points&);
Points& operator=(const Points&);
struct Point
{
int centroid;
float xyz[3];
};
Point* data;
};
class Centroids
{
public:
Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
{
// Naive initial selection algorithm, but outside the
// current area of interest.
for (int c=0; c < num_centroids; ++c)
init(c, points[c]);
}
~Centroids()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].count = 0;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void reset(int n)
{
data[n].count = 0;
data[n].xyz[0] = 0.0f;
data[n].xyz[1] = 0.0f;
data[n].xyz[2] = 0.0f;
}
void sum(int n, const float* pt_xyz)
{
data[n].xyz[0] += pt_xyz[0];
data[n].xyz[1] += pt_xyz[1];
data[n].xyz[2] += pt_xyz[2];
++data[n].count;
}
void average(int n)
{
if (data[n].count > 0)
{
const float inv_count = 1.0f / data[n].count;
data[n].xyz[0] *= inv_count;
data[n].xyz[1] *= inv_count;
data[n].xyz[2] *= inv_count;
}
}
float* operator[](int n)
{
return data[n].xyz;
}
int find_nearest(const float* pt_xyz) const
{
float min_dist_squared = numeric_limits<float>::max();
int min_centroid = -1;
for (int c=0; c < num_centroids; ++c)
{
const float* cen_xyz = data[c].xyz;
const float x = pt_xyz[0] - cen_xyz[0];
const float y = pt_xyz[1] - cen_xyz[1];
const float z = pt_xyz[2] - cen_xyz[2];
const float dist_squared = x*x + y*y * z*z;
if (min_dist_squared > dist_squared)
{
min_dist_squared = dist_squared;
min_centroid = c;
}
}
return min_centroid;
}
private:
Centroids(const Centroids&);
Centroids& operator=(const Centroids&);
struct Centroid
{
int count;
float xyz[3];
};
Centroid* data;
};
// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}
static void k_means(Points& points, Centroids& centroids)
{
// Find the closest centroid for each point.
for (int p=0; p < num_points; ++p)
{
const float* pt_xyz = points[p];
points.associate(p, centroids.find_nearest(pt_xyz));
}
// Reset the data of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.reset(c);
// Compute new position sum of each centroid.
for (int p=0; p < num_points; ++p)
centroids.sum(points.centroid(p), points[p]);
// Compute average position of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.average(c);
}
int main()
{
Points points;
Centroids centroids(points);
cout << "Starting simulation..." << endl;
double start_time = sys_time();
for (int i=0; i < num_iterations; ++i)
k_means(points, centroids);
cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
cout << "# Points: " << num_points << endl;
cout << "# Centroids: " << num_centroids << endl;
// Write the centroids to a file to give us some crude verification
// of consistency as we make changes.
ofstream out("centroids.txt");
for (int c=0; c < num_centroids; ++c)
out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}
Я знаю об опасности поверхностного тестирования, но, поскольку он уже считается горячей точкой на предыдущих сеансах реального мира, я надеюсь, что это простительно. Меня также интересуют общие методы, связанные с микро-оптимизацией такого кода.
Я получил несколько разные результаты в профилировании этого. Время здесь немного более равномерно распределено внутри цикла, и я не уверен, почему. Возможно, это потому, что данные меньше (я пропустил элементы и вытащил член min_dist
и сделал его локальной переменной). Точное соотношение между центроидами и точками также немного отличается, но, надеюсь, достаточно близко, чтобы перевести улучшения в исходный код. Он также однопоточный в этом поверхностном тесте, и разборка выглядит совсем по-другому, поэтому я могу рискнуть оптимизировать этот поверхностный тест без оригинала (риск, который я готов принять сейчас, так как я больше заинтересован в расширении моих знаний методов, которые могли бы оптимизировать эти случаи, а не решение для этого точного случая).
Обновление с предложением Yochai Timmer - ~ 12,5 секунд
О, я сталкиваюсь с проблемами микро-оптимизации без понимания сборки очень хорошо. Я заменил это:
-if (min_dist_squared > dist_squared)
-{
- min_dist_squared = dist_squared;
- pt.centroid = c;
-}
При этом:
+const bool found_closer = min_dist_squared > dist_squared;
+pt.centroid = bitselect(found_closer, c, pt.centroid);
+min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);
.. только для того, чтобы найти время, увеличенное с ~ 5.6 сек до ~ 12,5 сек. Тем не менее, это не его вина и не отнимает от ценности его решения - это мое за то, что он не понимает, что происходит на уровне машины и берет удары в темноте. Это, по-видимому, пропустило, и, по-видимому, я не был жертвой неправильного предсказания ветки, как я изначально думал. Тем не менее, его предлагаемое решение - замечательная и обобщенная функция, чтобы попробовать в таких случаях, и я благодарен за добавление его в свой набор подсказок и трюков. Теперь для раунда 2.
Решение Harold SIMD - 2.496 секунд (см. оговорку)
Это решение может быть потрясающим. После преобразования репрезентации кластера в SoA, я получаю время ~ 2,5 секунды с этим! К сожалению, похоже, какой-то сбой. Я получаю очень разные результаты для конечного результата, который предлагает более чем небольшие различия в точности, в том числе некоторые центроиды к концу со значениями 0 (подразумевая, что они не были найдены в поиске). Я пытаюсь пройти через SIMD-логику с помощью отладчика, чтобы узнать, что может быть - это может быть просто ошибкой транскрипции с моей стороны, но здесь код, если кто-то может обнаружить ошибку.
Если ошибка может быть исправлена без замедления результатов, это улучшение скорости больше, чем я когда-либо себе представлял из чистой микро-оптимизации!
// New version of Centroids::find_nearest (from harold solution):
int find_nearest(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
Решение Гарольда SIMD (исправлено) - ~ 2,5 с
После применения исправлений и тестирования их результаты не повреждены и будут корректно работать с аналогичными улучшениями в исходной кодовой базе!
Так как это касается святого грааля знаний, который я пытался понять лучше (бездисковый SIMD), я собираюсь наградить решение дополнительными реквизитами для более чем удвоения скорости операции. У меня есть домашнее задание, пытаясь понять это, поскольку моя цель состояла не только в том, чтобы смягчить эту точку доступа, но и в расширении моего личного понимания возможных решений для их решения.
Тем не менее, я благодарен за все вклады от алгоритмических предложений к действительно классному трюку bitselect
! Хотел бы я принять все ответы. В какой-то момент я могу попробовать все из них, но теперь у меня есть домашняя работа, вырезанная в понимании некоторых из этих неарифметических операций SIMD.
int find_nearest_simd(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
__m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
__m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);
__m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}