Подтвердить что ты не робот

Как ускорить мой разрешенный матричный решатель?

Я пишу разреженный матричный решатель, используя метод Гаусса-Зейделя. По профилированию я решил, что около половины моего времени программы проводится внутри решателя. Критическая часть производительности выглядит следующим образом:

size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
    for (size_t x = 1; x < d_nx - 1; ++x) {
        d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
        ++ic; ++iw; ++ie; ++is; ++in;
    }
    ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}

Все задействованные массивы имеют тип float. На самом деле это не массивы, а объекты с перегруженным оператором [], который (я думаю) должен быть оптимизирован, но определяется следующим образом:

inline float &operator[](size_t i) { return d_cells[i]; }
inline float const &operator[](size_t i) const { return d_cells[i]; }

Для d_nx = d_ny = 128, это может быть запущено около 3500 раз в секунду на Intel i7 920. Это означает, что тело внутреннего цикла работает 3500 * 128 * 128 = 57 миллионов раз в секунду. Поскольку задействована только какая-то простая арифметика, это поражает меня как низкое число для процессора с частотой 2,66 ГГц.

Возможно, это не ограничено мощностью процессора, а пропускной способностью памяти? Ну, один массив 128 * 128 float питается 65 кБ, поэтому все 6 массивов должны легко вписываться в кеш процессора L3 (что составляет 8 МБ). Предполагая, что ничего не кэшируется в регистрах, я подсчитываю 15 обращений к памяти во внутреннем цикле. В 64-битной системе это составляет 120 байт на итерацию, поэтому 57 миллионов * 120 байт = 6,8 ГБ/с. Кэш L3 работает на частоте 2,66 ГГц, поэтому он имеет тот же порядок величины. Я предполагаю, что память действительно является узким местом.

Чтобы ускорить это, я попытался выполнить следующее:

  • Скомпилируйте с помощью g++ -O3. (Ну, я делал это с самого начала.)

  • Параллелизация более 4 ядер с использованием OpenMP-прагм. Я должен перейти к алгоритму Якоби, чтобы избежать чтения и записи в тот же массив. Это требует, чтобы я делал в два раза больше итераций, что приводило к чистому результату примерно той же скорости.

  • Реализация деталей реализации тела цикла, например, с использованием указателей вместо индексов. Нет эффекта.

Какой лучший способ ускорить этот парень? Помогло бы оно переписать внутреннее тело в собрании (я должен был бы это узнать первым)? Должен ли я запускать это на графическом процессоре (что я знаю, как это сделать, но это такая проблема)? Любые другие яркие идеи?

(N.B. Я принимаю "нет" для ответа, как в: "это невозможно сделать значительно быстрее, потому что..." )

Обновить: по запросу, здесь полная программа:

#include <iostream>
#include <cstdlib>
#include <cstring>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

void solve(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Я компилирую и запускаю его следующим образом:

$ g++ -o gstest -O3 gstest.cpp
$ time ./gstest 8000
0

real    0m1.052s
user    0m1.050s
sys     0m0.010s

(Он делает 8000 вместо 3500 итераций в секунду, потому что моя "настоящая" программа также делает много других вещей, но она репрезентативная.)

Обновление 2: мне сказали, что унифицированные значения могут не быть репрезентативными, потому что значения NaN и Inf могут замедлять работу. Теперь очистка памяти в примере кода. Тем не менее, это не имеет никакого значения для меня в скорости выполнения.

4b9b3361

Ответ 1

Пара идей:

  • Используйте SIMD. Вы можете загружать 4 поплавки за раз из каждого массива в регистр SIMD (например, SSE на Intel, VMX на PowerPC). Недостатком этого является то, что некоторые из значений d_x будут "устаревшими", поэтому ваш коэффициент конвергенции будет страдать (но не так плохо, как итерация jacobi); трудно сказать, ускоряет ли его ускорение.

  • Используйте SOR. Он прост, не добавляет большого количества вычислений и может значительно улучшить коэффициент конвергенции даже при относительно консервативной релаксации (скажем, 1.5).

  • Используйте сопряженный градиент. Если это для этапа проекции моделирования текучей среды (т.е. Обеспечения несжимаемости), вы должны иметь возможность применять CG и получать гораздо лучшую скорость конвергенции. Хороший предобуславливатель помогает еще больше.

  • Используйте специализированный решатель. Если линейная система возникает из уравнения Пуассона, вы можете сделать даже лучше, чем сопряженный градиент, используя методы на основе FFT.

Если вы можете объяснить больше о том, как выглядит система, которую вы пытаетесь решить, я могу, возможно, дать еще несколько советов о # 3 и # 4.

Ответ 2

Я думаю, что мне удалось оптимизировать его, вот код, создать новый проект в VС++, добавить этот код и просто скомпилировать под "Release".

#include <iostream>
#include <cstdlib>
#include <cstring>

#define _WIN32_WINNT 0x0400
#define WIN32_LEAN_AND_MEAN
#include <windows.h>

#include <conio.h>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void step_new() {
    //size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float
        *d_b_ic,
        *d_w_ic,
        *d_e_ic,
        *d_x_ic,
        *d_x_iw,
        *d_x_ie,
        *d_x_is,
        *d_x_in,
        *d_n_ic,
        *d_s_ic;

    d_b_ic = d_b;
    d_w_ic = d_w;
    d_e_ic = d_e;
    d_x_ic = d_x;
    d_x_iw = d_x;
    d_x_ie = d_x;
    d_x_is = d_x;
    d_x_in = d_x;
    d_n_ic = d_n;
    d_s_ic = d_s;

    for (size_t y = 1; y < d_ny - 1; ++y)
    {
        for (size_t x = 1; x < d_nx - 1; ++x)
        {
            /*d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];*/
            *d_x_ic = *d_b_ic
                - *d_w_ic * *d_x_iw - *d_e_ic * *d_x_ie
                - *d_s_ic * *d_x_is - *d_n_ic * *d_x_in;
            //++ic; ++iw; ++ie; ++is; ++in;
            d_b_ic++;
            d_w_ic++;
            d_e_ic++;
            d_x_ic++;
            d_x_iw++;
            d_x_ie++;
            d_x_is++;
            d_x_in++;
            d_n_ic++;
            d_s_ic++;
        }
        //ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        d_b_ic += 2;
        d_w_ic += 2;
        d_e_ic += 2;
        d_x_ic += 2;
        d_x_iw += 2;
        d_x_ie += 2;
        d_x_is += 2;
        d_x_in += 2;
        d_n_ic += 2;
        d_s_ic += 2;
    }
}

void solve_original(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_original();
    }
}
void solve_new(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_new();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);

    if(argc < 3)
        printf("app.exe (x)iters (o/n)algo\n");

    bool bOriginalStep = (argv[2][0] == 'o');
    size_t iters = atoi(argv[1]);

    /*printf("Press any key to start!");
    _getch();
    printf(" Running speed test..\n");*/

    __int64 freq, start, end, diff;
    if(!::QueryPerformanceFrequency((LARGE_INTEGER*)&freq))
        throw "Not supported!";
    freq /= 1000000; // microseconds!
    {
        ::QueryPerformanceCounter((LARGE_INTEGER*)&start);
        if(bOriginalStep)
            solve_original(iters);
        else
            solve_new(iters);
        ::QueryPerformanceCounter((LARGE_INTEGER*)&end);
        diff = (end - start) / freq;
    }
    printf("Speed (%s)\t\t: %u\n", (bOriginalStep ? "original" : "new"), diff);
    //_getch();


    //cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Запустите его следующим образом:

app.exe 10000 o

app.exe 10000 n

"o" означает старый код, ваш.

"n" - мой, новый.

Мои результаты: Скорость (оригинал):

1515028

1523171

1495988

Скорость (новая):

966012

984110

1006045

Улучшение около 30%.

Логика: Вы используете счетчики индексов для доступа/управления. Я использую указатели. Во время работы точка останова на определенной строке кода калькуляции в отладчике VС++ и нажмите F8. Вы получите окно дизассемблера. Вы увидите созданные коды операций (код сборки).

В любом случае, посмотрите:

int * x =...;

x [3] = 123;

Это говорит ПК поставить указатель x в регистр (например, EAX). Добавьте его (3 * sizeof (int)). Только тогда установите значение 123.

Подход указателей намного лучше, чем вы можете понять, потому что мы сокращаем процесс добавления, на самом деле мы справляемся с этим сами, поэтому можем оптимизировать по мере необходимости.

Надеюсь, это поможет.

Поделитесь с сотрудниками stackoverflow.com: Отличный сайт, надеюсь, я давно об этом слышал!

Ответ 3

С одной стороны, здесь, похоже, проблема конвейерной обработки. Цикл читается из значения в d_x, которое только что было написано, но, видимо, оно должно дождаться завершения этой записи. Просто переставляя порядок вычислений, делая что-то полезное во время ожидания, он делает это почти в два раза быстрее:

d_x[ic] = d_b[ic]
                        - d_e[ic] * d_x[ie]
    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]
    - d_w[ic] * d_x[iw] /* d_x[iw] has just been written to, process this last */;

Это был Eamon Nerbonne, который понял это. У него много оборотов! Я бы никогда не догадался.

Ответ 4

Ответ Poni выглядит как правильный для меня.

Я просто хочу указать, что в этом типе проблемы вы часто получаете преимущества от локализации памяти. Прямо сейчас массивы b,w,e,s,n находятся в разных местах памяти. Если бы вы не смогли решить проблему в кеше L3 (в основном в L2), это было бы плохо, и решение такого рода было бы полезно:

size_t d_nx = 128, d_ny = 128;
float *d_x;

struct D { float b,w,e,s,n; };
D *d;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d[ic].b
                - d[ic].w * d_x[iw] - d[ic].e * d_x[ie]
                - d[ic].s * d_x[is] - d[ic].n * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) step(); }
void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); }

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d = new D[n]; memset(d,0,n * sizeof(D));
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Например, это решение на 1280x1280 немного меньше, чем в 2 раза быстрее, чем решение Poni (13s против 23s в моем тесте - ваша первоначальная реализация - это 22 с), тогда как в 128x128 он на 30% медленнее (7s против 10s- - ваш оригинал - 10 с).

(Итерации были увеличены до 80000 для базового футляра и 800 для случая размером 100x 1280x1280.)

Ответ 5

Я думаю, вы правы в том, что память является узким местом. Это довольно простой цикл с простой арифметикой для каждой итерации. ic, iw, т.е. есть, а в индексах, по-видимому, находятся на противоположных сторонах матрицы, поэтому я предполагаю, что там не хватает кеша.

Ответ 6

Я не эксперт по этому вопросу, но я видел, что есть несколько научных статей об улучшении использования кеша в Метод Гаусса-Зейделя.

Еще одна возможная оптимизация - использование варианта red-black, где точки обновляются двумя разметками в виде шахматной доски. Таким образом, все обновления в развертке независимы и могут быть распараллелены.

Ответ 7

Я предлагаю ввести некоторые предзадачные заявления, а также исследовать "ориентированный на данные дизайн":

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float dw_ic, dx_ic, db_ic, de_ic, dn_ic, ds_ic;
    float dx_iw, dx_is, dx_ie, dx_in, de_ic, db_ic;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
// Perform the prefetch
// Sorting these statements by array may increase speed;
//    although sorting by index name may increase speed too.
            db_ic = d_b[ic];
            dw_ic = d_w[ic];
            dx_iw = d_x[iw];
            de_ic = d_e[ic];
            dx_ie = d_x[ie];
            ds_ic = d_s[ic];
            dx_is = d_x[is];
            dn_ic = d_n[ic];
            dx_in = d_x[in];
// Calculate
            d_x[ic] = db_ic
                - dw_ic * dx_iw - de_ic * dx_ie
                - ds_ic * dx_is - dn_ic * dx_in;
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

Это отличается от вашего второго метода, поскольку значения копируются в локальные временные переменные перед выполнением вычисления.