Подтвердить что ты не робот

Безветровое K-средство (или другие оптимизации)

Примечание. Я хотел бы больше узнать о том, как подойти и придумать такие решения, а не само решение.

У меня есть очень критически важная функция в моей системе, которая появляется в качестве профилирующей точки доступа номер один в определенных контекстах. Он находится в середине итерации k-означает (уже многопоточность с использованием параллели для обработки поддиапазонов точек в каждом рабочем потоке).

ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
    const ClusterCentroid& cent = centroids[i];
    const float dist = ...;
    if (dist < pt.min_dist) // <-- #1 hotspot
    {
        pt.min_dist = dist;
        pt.min_index = i;
    }
}

Любая экономия времени, затрачиваемого на обработку этого раздела кода, существенно возрастает, поэтому я часто занимаюсь этим много. Возможно, стоит поставить петлю центроида снаружи, например, и повторить параллельные точки для заданного центра тяжести. Количество кластерных точек здесь составляет миллионы, а число центроидов - в тысячах. Алгоритм применяется для нескольких итераций (часто до 10). Он не стремится к идеальной конвергенции/стабильности, а лишь к некоторому "разумному" приближению.

Любые идеи приветствуются, но то, что я действительно хочу обнаружить, - это сделать этот код безветристым, поскольку это позволит использовать SIMD-версию. Я не разработал такого рода умственные способности, чтобы легко понять, как придумать веткистые решения: мой мозг терпит неудачу там, как это было, когда я впервые был подвергнут рекурсии в первые дни, поэтому руководство по написанию нераспространения код и способы разработки соответствующего мышления для него также будут полезны.

Короче говоря, я ищу любые руководства и подсказки и предложения (не обязательно решения) о том, как оптимизировать этот код для микро-оптимизации. У него, скорее всего, есть место для алгоритмических улучшений, но мой блайнд всегда был в решениях по микрооптимизации (и мне любопытно узнать, как применять их более эффективно, не выходя за борт с ним). Он уже плотно многопоточен с короткой параллелью для логики, поэтому я довольно сильно подтолкнул в угол микро-оптимизации как одну из более быстрых задач, чтобы попробовать без более разумного алгоритма. Мы полностью можем изменить расположение памяти.

В ответ на алгоритмические предложения

О том, как смотреть на все это не так, пытаясь оптимизировать алгоритм O (knm), который можно явно улучшить на уровне алгоритма, я полностью согласен. Это подталкивает этот конкретный вопрос в несколько академическую и непрактичную сферу. Однако, если мне удастся дать анекдот, я исхожу из первоначального фона программирования высокого уровня - большой акцент на широкой, крупномасштабной точке зрения, безопасности и очень мало на деталях реализации на низком уровне. Недавно я переключил проекты на совершенно другой вид современного вкуса, и я изучаю всевозможные новые трюки от своих сверстников кэш-памяти, GPGPU, нераспространенных технологий, SIMD, специализированных распределителей памяти, которые фактически превосходят malloc ( но для конкретных сценариев) и т.д.

Здесь я пытаюсь догнать последние тенденции в производительности, и я неожиданно обнаружил, что те старые структуры данных, которые я часто предпочитал в 90-е годы, которые часто были связаны/структуры дерева, на самом деле значительно превосходят гораздо более наивный, грубый, микро-оптимизированный, параллельный код, применяющий настроенные инструкции по смежным блокам памяти. Это несколько разочаровывает в то же время, поскольку я чувствую, что теперь мы подгоняем алгоритмы больше для машины и сужаем возможности таким образом (особенно с GPGPU).

Самое забавное, что я считаю, что этот тип микро-оптимизированного, быстрого кода обработки массива намного проще в обслуживании, чем сложные алгоритмы и структуры данных, которые я использовал раньше. Для начала их проще обобщить. Кроме того, мои сверстники часто могут жаловаться на жалобы относительно определенного замедления в области, просто пощекотать параллель для и, возможно, некоторых SIMD и назвать это с приличной скоростью. Алгоритмические улучшения часто могут предлагать значительно больше, но скорость и неинтрузивность, при которых могут применяться эти микрооптимизации, мне хотелось бы узнать больше в этой области, поскольку чтение статей по лучшим алгоритмам может занять некоторое время (а также потребовать больше обширные изменения). Таким образом, я в последнее время прыгал на эту группу микро-оптимизации немного позже и, возможно, слишком много в этом конкретном случае, но мое любопытство больше связано с расширением диапазона возможных решений для любого сценария.

Демонтажные

Примечание. На самом деле, я очень плохо разбираюсь в сборке, поэтому я часто настраивал вещи больше в пробной и ошибочной форме, придумывая несколько образованных догадок о том, почему горячая точка, показанная в vtune, может быть узким местом, а затем попробовать вещи чтобы увидеть, улучшатся ли времена, предполагая, что у догадок есть какой-то намек на правду, если время действительно улучшилось или полностью пропустило отметку, если они этого не делают.

000007FEEE3FB8A1  jl          thread_partition+70h (7FEEE3FB780h) 
    {
        ClusterPoint& pt = points[j];
        pt.min_index = -1;
        pt.min_dist = numeric_limits<float>::max();
        for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7  cmp         ecx,r10d 
000007FEEE3FB8AA  jge         thread_partition+1F4h (7FEEE3FB904h) 
000007FEEE3FB8AC  lea         rax,[rbx+rbx*2] 
000007FEEE3FB8B0  add         rax,rax 
000007FEEE3FB8B3  lea         r8,[rbp+rax*8+8] 
        {
            const ClusterCentroid& cent = centroids[i];
            const float x = pt.pos[0] - cent.pos[0];
            const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8  movss       xmm0,dword ptr [rdx] 
            const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC  movss       xmm2,dword ptr [rdx+4] 
000007FEEE3FB8C1  movss       xmm1,dword ptr [rdx-4] 
000007FEEE3FB8C6  subss       xmm2,dword ptr [r8] 
000007FEEE3FB8CB  subss       xmm0,dword ptr [r8-4] 
000007FEEE3FB8D1  subss       xmm1,dword ptr [r8-8] 
            const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7  mulss       xmm2,xmm2 
000007FEEE3FB8DB  mulss       xmm0,xmm0 
000007FEEE3FB8DF  mulss       xmm1,xmm1 
000007FEEE3FB8E3  addss       xmm2,xmm0 
000007FEEE3FB8E7  addss       xmm2,xmm1 

            if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB  comiss      xmm2,dword ptr [rdx-8] 
000007FEEE3FB8EF  jae         thread_partition+1E9h (7FEEE3FB8F9h) 
            {
                pt.min_dist = dist;
000007FEEE3FB8F1  movss       dword ptr [rdx-8],xmm2 
                pt.min_index = i;
000007FEEE3FB8F6  mov         dword ptr [rdx-10h],ecx 
000007FEEE3FB8F9  inc         ecx  
000007FEEE3FB8FB  add         r8,30h 
000007FEEE3FB8FF  cmp         ecx,r10d 
000007FEEE3FB902  jl          thread_partition+1A8h (7FEEE3FB8B8h) 
    for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904  inc         edi  
000007FEEE3FB906  add         rdx,20h 
000007FEEE3FB90A  cmp         edi,dword ptr [rsi+4] 
000007FEEE3FB90D  jl          thread_partition+31h (7FEEE3FB741h) 
000007FEEE3FB913  mov         rbx,qword ptr [irange] 
            }
        }
    }
}

Мы вынуждены ориентироваться на SSE 2 - немного позади в наше время, но пользовательская база фактически сработала один раз, когда мы предположили, что даже SSE 4 был в порядке как минимальное требование (у пользователя была прототипная машина Intel).

Обновление с автономным тестом: ~ 5.6 с

Я очень благодарен за любую помощь! Поскольку кодовая база довольно обширна, и условия для запуска этого кода сложны (системные события запускаются по нескольким потокам), это немного громоздко, чтобы делать экспериментальные изменения и анализировать их каждый раз. Поэтому я установил поверхностный тест на стороне как отдельное приложение, которое другие могут запускать и опробовать, чтобы я мог экспериментировать со всеми этими любезно предложенными решениями.

#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
    #define ALIGN16 __declspec(align(16))
#else
    #include <malloc.h>
    #define ALIGN16 __attribute__((aligned(16)))
#endif

using namespace std;

// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
    #ifdef _MSC_VER
        return _aligned_malloc(amount, 16);
    #else
        void* mem = 0;
        posix_memalign(&mem, 16, amount);
        return mem;
    #endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
    return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}

// Aligned free.
static void free16(void* mem)
{
    #ifdef _MSC_VER
        return _aligned_free(mem);
    #else
        free(mem);
    #endif
}

// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;

class Points
{
public:
    Points(): data(malloc16_t<Point>(num_points))
    {
        for (int p=0; p < num_points; ++p)
        {
            const float xyz[3] =
            {
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX
            };
            init(p, xyz);
        }
    }
    ~Points()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].centroid = -1;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void associate(int n, int new_centroid)
    {
        data[n].centroid = new_centroid;
    }
    int centroid(int n) const
    {
        return data[n].centroid;
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }

private:
    Points(const Points&);
    Points& operator=(const Points&);
    struct Point
    {
        int centroid;
        float xyz[3];
    };
    Point* data;
};

class Centroids
{
public:
    Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
    {
        // Naive initial selection algorithm, but outside the 
        // current area of interest.
        for (int c=0; c < num_centroids; ++c)
            init(c, points[c]);
    }
    ~Centroids()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].count = 0;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void reset(int n)
    {
        data[n].count = 0;
        data[n].xyz[0] = 0.0f;
        data[n].xyz[1] = 0.0f;
        data[n].xyz[2] = 0.0f;
    }
    void sum(int n, const float* pt_xyz)
    {
        data[n].xyz[0] += pt_xyz[0];
        data[n].xyz[1] += pt_xyz[1];
        data[n].xyz[2] += pt_xyz[2];
        ++data[n].count;
    }
    void average(int n)
    {
        if (data[n].count > 0)
        {
            const float inv_count = 1.0f / data[n].count;
            data[n].xyz[0] *= inv_count;
            data[n].xyz[1] *= inv_count;
            data[n].xyz[2] *= inv_count;
        }
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }
    int find_nearest(const float* pt_xyz) const
    {
        float min_dist_squared = numeric_limits<float>::max();
        int min_centroid = -1;
        for (int c=0; c < num_centroids; ++c)
        {
            const float* cen_xyz = data[c].xyz;
            const float x = pt_xyz[0] - cen_xyz[0];
            const float y = pt_xyz[1] - cen_xyz[1];
            const float z = pt_xyz[2] - cen_xyz[2];
            const float dist_squared = x*x + y*y * z*z;

            if (min_dist_squared > dist_squared)
            {
                min_dist_squared = dist_squared;
                min_centroid = c;
            }
        }
        return min_centroid;
    }

private:
    Centroids(const Centroids&);
    Centroids& operator=(const Centroids&);
    struct Centroid
    {
        int count;
        float xyz[3];
    };
    Centroid* data;
};

// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
    return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}

static void k_means(Points& points, Centroids& centroids)
{
    // Find the closest centroid for each point.
    for (int p=0; p < num_points; ++p)
    {
        const float* pt_xyz = points[p];
        points.associate(p, centroids.find_nearest(pt_xyz));
    }

    // Reset the data of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.reset(c);

    // Compute new position sum of each centroid.
    for (int p=0; p < num_points; ++p)
        centroids.sum(points.centroid(p), points[p]);

    // Compute average position of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.average(c);
}

int main()
{
    Points points;
    Centroids centroids(points);

    cout << "Starting simulation..." << endl;
    double start_time = sys_time();
    for (int i=0; i < num_iterations; ++i)
        k_means(points, centroids);
    cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
    cout << "# Points: " << num_points << endl;
    cout << "# Centroids: " << num_centroids << endl;

    // Write the centroids to a file to give us some crude verification
    // of consistency as we make changes.
    ofstream out("centroids.txt");
    for (int c=0; c < num_centroids; ++c)
        out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}

Я знаю об опасности поверхностного тестирования, но, поскольку он уже считается горячей точкой на предыдущих сеансах реального мира, я надеюсь, что это простительно. Меня также интересуют общие методы, связанные с микро-оптимизацией такого кода.

Я получил несколько разные результаты в профилировании этого. Время здесь немного более равномерно распределено внутри цикла, и я не уверен, почему. Возможно, это потому, что данные меньше (я пропустил элементы и вытащил член min_dist и сделал его локальной переменной). Точное соотношение между центроидами и точками также немного отличается, но, надеюсь, достаточно близко, чтобы перевести улучшения в исходный код. Он также однопоточный в этом поверхностном тесте, и разборка выглядит совсем по-другому, поэтому я могу рискнуть оптимизировать этот поверхностный тест без оригинала (риск, который я готов принять сейчас, так как я больше заинтересован в расширении моих знаний методов, которые могли бы оптимизировать эти случаи, а не решение для этого точного случая).

VTune

Обновление с предложением Yochai Timmer - ~ 12,5 секунд

О, я сталкиваюсь с проблемами микро-оптимизации без понимания сборки очень хорошо. Я заменил это:

        -if (min_dist_squared > dist_squared)
        -{
        -    min_dist_squared = dist_squared;
        -    pt.centroid = c;
        -}

При этом:

        +const bool found_closer = min_dist_squared > dist_squared;
        +pt.centroid = bitselect(found_closer, c, pt.centroid);
        +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);

.. только для того, чтобы найти время, увеличенное с ~ 5.6 сек до ~ 12,5 сек. Тем не менее, это не его вина и не отнимает от ценности его решения - это мое за то, что он не понимает, что происходит на уровне машины и берет удары в темноте. Это, по-видимому, пропустило, и, по-видимому, я не был жертвой неправильного предсказания ветки, как я изначально думал. Тем не менее, его предлагаемое решение - замечательная и обобщенная функция, чтобы попробовать в таких случаях, и я благодарен за добавление его в свой набор подсказок и трюков. Теперь для раунда 2.

Решение Harold SIMD - 2.496 секунд (см. оговорку)

Это решение может быть потрясающим. После преобразования репрезентации кластера в SoA, я получаю время ~ 2,5 секунды с этим! К сожалению, похоже, какой-то сбой. Я получаю очень разные результаты для конечного результата, который предлагает более чем небольшие различия в точности, в том числе некоторые центроиды к концу со значениями 0 (подразумевая, что они не были найдены в поиске). Я пытаюсь пройти через SIMD-логику с помощью отладчика, чтобы узнать, что может быть - это может быть просто ошибкой транскрипции с моей стороны, но здесь код, если кто-то может обнаружить ошибку.

Если ошибка может быть исправлена ​​без замедления результатов, это улучшение скорости больше, чем я когда-либо себе представлял из чистой микро-оптимизации!

    // New version of Centroids::find_nearest (from harold solution):
    int find_nearest(const float* pt_xyz) const
    {
        __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
        __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
        __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
        __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
        __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
        __m128i index = min_index;
        for (int i=4; i < num_centroids; i += 4) 
        {
            xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
            ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
            zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
            __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
            __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
            min_dist = _mm_min_ps(min_dist, dist);
            min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                     _mm_andnot_si128(mask, min_index));
            index = _mm_add_epi32(index, _mm_set1_epi32(4));
        }

        ALIGN16 float mdist[4];
        ALIGN16 uint32_t mindex[4];
        _mm_store_ps(mdist, min_dist);
        _mm_store_si128((__m128i*)mindex, min_index);

        float closest = mdist[0];
        int closest_i = mindex[0];
        for (int i=1; i < 4; i++)
        {
            if (mdist[i] < closest) 
            {
                closest = mdist[i];
                closest_i = mindex[i];
            }
        }
        return closest_i;
    }

Решение Гарольда SIMD (исправлено) - ~ 2,5 с

После применения исправлений и тестирования их результаты не повреждены и будут корректно работать с аналогичными улучшениями в исходной кодовой базе!

Так как это касается святого грааля знаний, который я пытался понять лучше (бездисковый SIMD), я собираюсь наградить решение дополнительными реквизитами для более чем удвоения скорости операции. У меня есть домашнее задание, пытаясь понять это, поскольку моя цель состояла не только в том, чтобы смягчить эту точку доступа, но и в расширении моего личного понимания возможных решений для их решения.

Тем не менее, я благодарен за все вклады от алгоритмических предложений к действительно классному трюку bitselect! Хотел бы я принять все ответы. В какой-то момент я могу попробовать все из них, но теперь у меня есть домашняя работа, вырезанная в понимании некоторых из этих неарифметических операций SIMD.

int find_nearest_simd(const float* pt_xyz) const
{
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
    __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
    __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);

    __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
    __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
    __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i=4; i < num_centroids; i += 4) 
    {
        xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
        ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
        zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }

    ALIGN16 float mdist[4];
    ALIGN16 uint32_t mindex[4];
    _mm_store_ps(mdist, min_dist);
    _mm_store_si128((__m128i*)mindex, min_index);

    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i=1; i < 4; i++)
    {
        if (mdist[i] < closest) 
        {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}
4b9b3361

Ответ 1

Слишком плохо, мы не можем использовать SSE4.1, но очень хорошо, SSE2. Я не тестировал это, просто скомпилировал его, чтобы увидеть, есть ли синтаксические ошибки и посмотреть, имеет ли смысл сборка (в основном это хорошо, хотя GCC разливает min_index даже с некоторыми регистрами xmm, которые не используются, не уверен, почему бывает)

int find_closest(float *x, float *y, float *z,
                 float pt_x, float pt_y, float pt_z, int n) {
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
    __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
    __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i = 4; i < n; i += 4) {
        xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
        ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
        zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }
    float mdist[4];
    _mm_store_ps(mdist, min_dist);
    uint32_t mindex[4];
    _mm_store_si128((__m128i*)mindex, min_index);
    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i = 1; i < 4; i++) {
        if (mdist[i] < closest) {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}

Как обычно, он ожидает, что указатели будут выровнены по 16. Кроме того, отступы должны быть с бесконечными точками (поэтому они никогда не приближаются к цели).

SSE 4.1 позволит вам заменить этот

min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                         _mm_andnot_si128(mask, min_index));

Под этим

min_index = _mm_blendv_epi8(min_index, index, mask);

Здесь версия asm, сделанная для vsyasm, немного протестировала (кажется, работает)

bits 64

section .data

align 16
centroid_four:
    dd 4, 4, 4, 4
centroid_index:
    dd 0, 1, 2, 3

section .text

global find_closest

proc_frame find_closest
    ;
    ;   arguments:
    ;       ecx: number of points (multiple of 4 and at least 4)
    ;       rdx -> array of 3 pointers to floats (x, y, z) (the points)
    ;       r8 -> array of 3 floats (the reference point)
    ;
    alloc_stack 0x58
    save_xmm128 xmm6, 0
    save_xmm128 xmm7, 16
    save_xmm128 xmm8, 32
    save_xmm128 xmm9, 48
[endprolog]
    movss xmm0, [r8]
    shufps xmm0, xmm0, 0
    movss xmm1, [r8 + 4]
    shufps xmm1, xmm1, 0
    movss xmm2, [r8 + 8]
    shufps xmm2, xmm2, 0
    ; pointers to x, y, z in r8, r9, r10
    mov r8, [rdx]
    mov r9, [rdx + 8]
    mov r10, [rdx + 16]
    ; reference point is in xmm0, xmm1, xmm2 (x, y, z)
    movdqa xmm3, [rel centroid_index]   ; min_index
    movdqa xmm4, xmm3                   ; current index
    movdqa xmm9, [rel centroid_four]     ; index increment
    paddd xmm4, xmm9
    ; calculate initial min_dist, xmm5
    movaps xmm5, [r8]
    subps xmm5, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm5, xmm5
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm5, xmm7
    addps xmm5, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    sub ecx, 4
    jna _tail
_loop:
    movaps xmm6, [r8]
    subps xmm6, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm6, xmm6
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm6, xmm7
    addps xmm6, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    movaps xmm7, xmm6
    cmpps xmm6, xmm5, 1
    minps xmm5, xmm7
    movdqa xmm7, xmm6
    pand xmm6, xmm4
    pandn xmm7, xmm3
    por xmm6, xmm7
    movdqa xmm3, xmm6
    paddd xmm4, xmm9
    sub ecx, 4
    ja _loop
_tail:
    ; calculate horizontal minumum
    pshufd xmm0, xmm5, 0xB1
    minps xmm0, xmm5
    pshufd xmm1, xmm0, 0x4E
    minps xmm0, xmm1
    ; find index of the minimum
    cmpps xmm0, xmm5, 0
    movmskps eax, xmm0
    bsf eax, eax
    ; index into xmm3, sort of
    movaps [rsp + 64], xmm3
    mov eax, [rsp + 64 + rax * 4]
    movaps xmm9, [rsp + 48]
    movaps xmm8, [rsp + 32]
    movaps xmm7, [rsp + 16]
    movaps xmm6, [rsp]
    add rsp, 0x58
    ret
endproc_frame

В С++:

extern "C" int find_closest(int n, float** points, float* reference_point);

Ответ 2

Вы можете использовать веткистый тернарный оператор, иногда называемый битселек (условие? true: false).
Просто используйте его для 2 участников, не выполнив ничего. Не беспокойтесь о дополнительных операциях, они ничто по сравнению с ветвлением if.

bitselect реализация:

inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
{
    return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
}

inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
{
    //Reinterpret floats. Would work because it just a bit select, no matter the actual value
    int& at = reinterpret_cast<int&>(truereturnvalue);
    int& af = reinterpret_cast<int&>(falsereturnvalue);
    int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
    return  reinterpret_cast<float&>(res);
}

И ваша петля должна выглядеть так:

for (int i=0; i < num_centroids; ++i)
{
  const ClusterCentroid& cent = centroids[i];
  const float dist = ...;
  bool isSmaeller = dist < pt.min_dist;

  //use same value if not smaller
  pt.min_index = bitselect(isSmaeller, i, pt.min_index);
  pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
}

Ответ 3

С++ - это язык высокого уровня. Ваше предположение о том, что поток управления в исходном коде С++ преобразуется в инструкции ветвления, является ошибочным. У меня нет определения некоторых типов из вашего примера, поэтому я сделал простую тестовую программу с аналогичными условными присвоениями:

int g(int, int);

int f(const int *arr)
{
    int min = 10000, minIndex = -1;
    for ( int i = 0; i < 1000; ++i )
    {
        if ( arr[i] < min )
        {
            min = arr[i];
            minIndex = i;
        }
    }
    return g(min, minIndex);
}

Обратите внимание, что использование undefined "g" заключается в том, чтобы исключить возможность удаления оптимизатора. Я перевел это с помощью g++ 4.9.2 с -O3 и -S в сборку x86_64 (даже не изменив значение по умолчанию для -march), и (не слишком неожиданным) результатом является то, что тело цикла не содержит ветвей

movl    (%rdi,%rax,4), %ecx
movl    %edx, %r8d
cmpl    %edx, %ecx
cmovle  %ecx, %r8d
cmovl   %eax, %esi
addq    $1, %rax

Кроме того, предположение о том, что разветвление обязательно быстрее, также может быть ошибочным, поскольку вероятность того, что новое расстояние "бьет" старое, уменьшает количество элементов, на которые вы смотрели. Это не монета. Трюк "битселек" был изобретен, когда компиляторы были гораздо менее агрессивны при создании сборки "как есть", чем сегодня. Я бы предпочел взглянуть на то, какую сборку создает ваш компилятор, прежде чем пытаться переработать код, чтобы компилятор мог лучше его оптимизировать или взяв результат в качестве основы для рукописной сборки. Если вы хотите изучить SIMD, я бы предложил попробовать подход "минимум минимумов" с уменьшенными зависимостями данных (в моем примере зависимости от "мин", вероятно, являются узким местом).

Ответ 4

Это может пойти в обоих направлениях, но я бы попробовал следующую структуру:

std::vector<float> centDists(num_centroids); //<-- one for each thread. 
for (size_t p=0; p<num_points; ++p) {
    Point& pt = points[p];
    for (size_t c=0; c<num_centroids; ++c) {
        const float dist = ...;
        centDists[c]=dist;
    }
    pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin();    
}

Очевидно, вам придется перебирать два раза по памяти, что, вероятно, повредит отношение кеша к пропуску (вы также можете разбить его на поддиапазоны), но, с другой стороны, каждая из внутренних циклов должна быть легко векторизовать и развернуть - так что вам нужно только измерить, стоит ли это.

И даже если вы придерживаетесь своей версии, я бы попытался использовать локальные переменные, чтобы отслеживать минимальный индекс и расстояние и применять результаты, чтобы указать в конце.
Рациональным является то, что каждое чтение или запись на pt.min_dist эффективно выполняется с помощью указателя, который - в зависимости от оптимизации компилятора - может или не может снизить вашу производительность.

Еще одна важная вещь для векторизации - превратить массив Structs (в данном случае cententroids) в структуру массивов (например, один массив для каждой координаты точек), так как вам не нужны дополнительные команды сбора, чтобы загрузить данные для использования с инструкциями SIMD. См. Эрик Брумер для получения дополнительной информации по этой теме.

EDIT: Некоторые номера для моей системы (haswell, clang 3.5):
Я сделал короткий тест с вашим эталоном и моей системой, выше кода замедлил алгоритм примерно на 10% - по существу, ничто не могло быть векторизованным.

Однако, применяя преобразование AoS к SoA для ваших центроидов, расчет расстояний был векторизован, что привело к сокращению общей продолжительности выполнения около 40% по сравнению с вашей исходной структурой с применением преобразований AoS для SoA.

Ответ 5

Во-первых, я бы посоветовал, что перед тем, как попробовать какие-либо изменения кода, вы посмотрите на разборку в оптимизированной сборке. В идеале вы хотите посмотреть данные профилировщика на уровне сборки. Это может отображать различные вещи, например:

  • Компилятор, возможно, не сгенерировал действительную инструкцию ветвления.
  • Строка кода с узким местом может содержать гораздо больше инструкций, чем вы могли бы подумать - например, вычисление dist.

В дополнение к этому, стандартный трюк, который, когда вы говорите о вычислениях расстояний, часто требует квадратного корня. Вы должны сделать этот квадратный корень в конце процесса с минимальным квадратом.

SSE может обрабатывать четыре значения одновременно, без каких-либо ветвей, используя _ mm_min_ps. Если вам действительно нужна скорость, вы хотите использовать встроенные функции SSE (или AVX). Вот базовый пример:

  float MinimumDistance(const float *values, int count)
  {
    __m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX);
    int i=0;
    for (; i < count - 3; i+=4)
    {
        __m128 distances = _mm_loadu_ps(&values[i]);
        min = _mm_min_ps(min, distances);
    }
    // Combine the four separate minimums to a single value
    min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1)));
    min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2)));

    // Deal with the last 0-3 elements the slow way
    float result = FLT_MAX;
    if (count > 3) _mm_store_ss(&result, min);
    for (; i < count; i++)
    {
        result = min(values[i], result);
    }

    return result;
  }

Для лучшей производительности SSE вы должны убедиться, что нагрузки происходят по согласованным адресам. Вы можете обрабатывать первые несколько несогласованных элементов так же, как и последние несколько в коде выше, если это необходимо.

Другое, на что нужно обратить внимание - это пропускная способность памяти. Если есть несколько членов структуры ClusterCentroid, которые вы не используете в течение этого цикла, тогда вы будете считывать гораздо больше данных из памяти, чем вам действительно нужно, поскольку память читается в кусках размера строки кэша, каждая по 64 байта.

Ответ 6

Возможная микрооптимизация: Храните min_dist и min_index в локальных переменных. Компилятору, возможно, придется чаще писать в память так, как вы его написали; на некоторых архитектурах это может иметь большое влияние на производительность. См. мой ответ здесь для другого примера.

Предложение Адамса о выполнении 4 сравнений сразу также является хорошим.

Тем не менее, ваше лучшее ускорение будет связано с уменьшением количества центроидов, которые вы должны проверить. В идеале, постройте kd-дерево (или подобное) вокруг центроидов, затем запросите, чтобы найти ближайшую точку.

Если у вас нет кода дерева, лежащего вокруг, вот мой самый любимый поисковый запрос "бедный человек":

Sort the points by one coordinate, e.g. cent.pos[0]
Pick a starting index for the query point (pt)
Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist
Repeat the previous step going the opposite direction.

Дополнительное условие остановки для поиска означает, что вы должны пропустить достаточное количество очков; вы также не должны пропускать все точки ближе, чем те, которые вы уже нашли.

Итак, для вашего кода это выглядит примерно так:

// sort centroid by x coordinate.
min_index = -1;
min_dist = numeric_limits<float>::max();

// pick the start index. This works well if the points are evenly distributed.
float min_x = centroids[0].pos[0];
float max_x = centroids[num_centroids-1].pos[0];
float cur_x = pt.pos[0];
float t = (max_x - cur_x) / (max_x - min_x);
// TODO clamp t between 0 and 1
int start_index = int(t * float(num_centroids))

// Forward search
for (int i=start_index ; i < num_centroids; ++i)
{
    const ClusterCentroid& cent = centroids[i];
    if (fabs(cent.pos[0] - pt.pos[0]) > min_i)
        // Everything to the right of this must be further min_dist, so break.
        // This is where the savings comes from!
        break; 
    const float dist = ...;
    if (dist < min_dist)
    {
        min_dist = dist;
        min_index = i;
    }
}

// Backwards search
for (int i=start_index ; i >= 0; --i)
{
    // same as above
}
pt.min_dist = min_dist
pt.min_index = min_index

(Обратите внимание, что это предполагает, что вы вычисляете расстояние между точками, но ваша сборка указывает на квадрат расстояния. Соответственно отрегулируйте условие разрыва).

При создании дерева или сортировке центроидов незначительные накладные расходы, но это должно быть компенсировано за счет ускорения вычислений в более крупном цикле (по количеству точек).