Подтвердить что ты не робот

Как достичь максимальной производительности процессора с помощью продукта Dot?

Проблема

Я изучаю HPC, в частности, используя множитель в качестве моего проекта (см. мои другие сообщения в профиле). Я достигаю хороших результатов в тех, но недостаточно. Я делаю шаг назад, чтобы понять, насколько хорошо я могу это сделать с помощью вычисления точечного продукта.

Dot Product vs. Matrix Multiplication

Точечный продукт проще и позволит мне тестировать концепции HPC без учета упаковки и других связанных с этим проблем. Блокировка кеша все еще является проблемой, которая формирует мой второй вопрос.

Алгоритм

Умножьте n соответствующие элементы в двух массивах double A и B и суммируйте их. A double dot product в сборке - это всего лишь серия movapd, mulpd, addpd. Развернутые и организованные с умом, возможно иметь группы movapd/mulpd/addpd, которые работают с различными регистрами xmm и, следовательно, независимы, оптимизируя конвейерную обработку. Конечно, оказывается, что это не имеет значения, так как мой процессор имеет внеуровневое исполнение. Также обратите внимание, что для повторной компоновки требуется отслоение последней итерации.

Другие предположения

Я не пишу код для общих продуктов точек. Код предназначен для определенных размеров, и я не обрабатываю случайные случаи. Это просто для проверки концепций HPC и для просмотра того, какой тип использования ЦП я могу достичь.

Результаты

Скомпилирован с gcc -std=c99 -O2 -m32 -mincoming-stack-boundary=2 -msse3 -mfpmath=sse,387 -masm=intel. Я на другом компьютере, чем обычно. Этот компьютер имеет i5 540m, который может получить 2.8 GHz * 4 FLOPS/cycle/core = 11.2 GFLOPS/s per core после двухступенчатого Intel Turbo Boost (оба ядра находятся прямо сейчас, поэтому он получает только 2 шага... 4-ступенчатое повышение возможно, если я отключу одно ядро), 32-разрядная LINPACK получает около 9,5 GFLOPS/с, когда она запускается с одним потоком.

       N   Total Gflops/s         Residual
     256         5.580521    1.421085e-014
     384         5.734344   -2.842171e-014
     512         5.791168    0.000000e+000
     640         5.821629    0.000000e+000
     768         5.814255    2.842171e-014
     896         5.807132    0.000000e+000
    1024         5.817208   -1.421085e-013
    1152         5.805388    0.000000e+000
    1280         5.830746   -5.684342e-014
    1408         5.881937   -5.684342e-014
    1536         5.872159   -1.705303e-013
    1664         5.881536    5.684342e-014
    1792         5.906261   -2.842171e-013
    1920         5.477966    2.273737e-013
    2048         5.620931    0.000000e+000
    2176         3.998713    1.136868e-013
    2304         3.370095   -3.410605e-013
    2432         3.371386   -3.410605e-013

Вопрос 1

Как я могу сделать лучше, чем это? Я даже не приближаюсь к максимальной производительности. Я оптимизировал код сборки на небесах. Дальнейшее разворачивание может увеличить его чуть больше, но меньшая разворачиваемость, по-видимому, ухудшает производительность.

Вопрос 2

Когда n > 2048, вы можете увидеть снижение производительности. Это связано с тем, что мой кеш L1 составляет 32 КБ, а когда n = 2048 и A и B являются double, они занимают весь кеш. Все больше, и они передаются из памяти.

Я попытался заблокировать кеш (не показан в источнике), но, возможно, я сделал это неправильно. Может ли кто-нибудь предоставить какой-либо код или объяснить, как заблокировать точечный продукт для кеша?

Исходный код

    #include <stdio.h>
    #include <time.h>
    #include <stdlib.h>
    #include <string.h>
    #include <x86intrin.h>
    #include <math.h>
    #include <omp.h>
    #include <stdint.h>
    #include <windows.h>

    // computes 8 dot products
#define KERNEL(address) \
            "movapd     xmm4, XMMWORD PTR [eax+"#address"]      \n\t" \
            "mulpd      xmm7, XMMWORD PTR [edx+48+"#address"]   \n\t" \
            "addpd      xmm2, xmm6                              \n\t" \
            "movapd     xmm5, XMMWORD PTR [eax+16+"#address"]   \n\t" \
            "mulpd      xmm4, XMMWORD PTR [edx+"#address"]      \n\t" \
            "addpd      xmm3, xmm7                              \n\t" \
            "movapd     xmm6, XMMWORD PTR [eax+96+"#address"]   \n\t" \
            "mulpd      xmm5, XMMWORD PTR [edx+16+"#address"]   \n\t" \
            "addpd      xmm0, xmm4                              \n\t" \
            "movapd     xmm7, XMMWORD PTR [eax+112+"#address"]  \n\t" \
            "mulpd      xmm6, XMMWORD PTR [edx+96+"#address"]   \n\t" \
            "addpd      xmm1, xmm5                              \n\t"

#define PEELED(address) \
            "movapd     xmm4, XMMWORD PTR [eax+"#address"]      \n\t" \
            "mulpd      xmm7, [edx+48+"#address"]               \n\t" \
            "addpd      xmm2, xmm6                  \n\t" \
            "movapd     xmm5, XMMWORD PTR [eax+16+"#address"]   \n\t" \
            "mulpd      xmm4, XMMWORD PTR [edx+"#address"]      \n\t" \
            "addpd      xmm3, xmm7                  \n\t" \
            "mulpd      xmm5, XMMWORD PTR [edx+16+"#address"]   \n\t" \
            "addpd      xmm0, xmm4                  \n\t" \
            "addpd      xmm1, xmm5                  \n\t"

inline double 
__attribute__ ((gnu_inline))        
__attribute__ ((aligned(64))) ddot_ref(
    int n,
    const double* restrict A,
    const double* restrict B)
{
    double sum0 = 0.0;
    double sum1 = 0.0;
    double sum2 = 0.0;
    double sum3 = 0.0;
    double sum;
    for(int i = 0; i < n; i+=4) {
        sum0 += *(A + i  ) * *(B + i  );
        sum1 += *(A + i+1) * *(B + i+1);
        sum2 += *(A + i+2) * *(B + i+2);
        sum3 += *(A + i+3) * *(B + i+3);
    }
    sum = sum0+sum1+sum2+sum3;
    return(sum);
}

inline double 
__attribute__ ((gnu_inline))        
__attribute__ ((aligned(64))) ddot_asm
(   int n,
    const double* restrict A,
    const double* restrict B)
{

        double sum;

            __asm__ __volatile__
        (
            "mov        eax, %[A]                   \n\t"
            "mov        edx, %[B]                   \n\t"
            "mov        ecx, %[n]                   \n\t"
            "pxor       xmm0, xmm0                  \n\t"
            "pxor       xmm1, xmm1                  \n\t"
            "pxor       xmm2, xmm2                  \n\t"
            "pxor       xmm3, xmm3                  \n\t"
            "movapd     xmm6, XMMWORD PTR [eax+32]  \n\t"
            "movapd     xmm7, XMMWORD PTR [eax+48]  \n\t"
            "mulpd      xmm6, XMMWORD PTR [edx+32]  \n\t"
            "sar        ecx, 7                      \n\t"
            "sub        ecx, 1                      \n\t" // peel
            "L%=:                                   \n\t"
            KERNEL(64   *   0)
            KERNEL(64   *   1)
            KERNEL(64   *   2)
            KERNEL(64   *   3)
            KERNEL(64   *   4)
            KERNEL(64   *   5)
            KERNEL(64   *   6)
            KERNEL(64   *   7)
            KERNEL(64   *   8)
            KERNEL(64   *   9)
            KERNEL(64   *   10)
            KERNEL(64   *   11)
            KERNEL(64   *   12)
            KERNEL(64   *   13)
            KERNEL(64   *   14)
            KERNEL(64   *   15)
            "lea        eax, [eax+1024]             \n\t"
            "lea        edx, [edx+1024]             \n\t"
            "                                       \n\t"
            "dec        ecx                         \n\t"
            "jnz        L%=                         \n\t" // end loop
            "                                       \n\t"
            KERNEL(64   *   0)
            KERNEL(64   *   1)
            KERNEL(64   *   2)
            KERNEL(64   *   3)
            KERNEL(64   *   4)
            KERNEL(64   *   5)
            KERNEL(64   *   6)
            KERNEL(64   *   7)
            KERNEL(64   *   8)
            KERNEL(64   *   9)
            KERNEL(64   *   10)
            KERNEL(64   *   11)
            KERNEL(64   *   12)
            KERNEL(64   *   13)
            KERNEL(64   *   14)
            PEELED(64   *   15)
            "                                       \n\t"
            "addpd      xmm0, xmm1                  \n\t" // summing result
            "addpd      xmm2, xmm3                  \n\t"
            "addpd      xmm0, xmm2                  \n\t" // cascading add
            "movapd     xmm1, xmm0                  \n\t" // copy xmm0
            "shufpd     xmm1, xmm0, 0x03            \n\t" // shuffle
            "addsd      xmm0, xmm1                  \n\t" // add low qword
            "movsd      %[sum], xmm0                \n\t" // mov low qw to sum
            : // outputs
            [sum]   "=m"    (sum)
            : // inputs
            [A] "m" (A),
            [B] "m" (B), 
            [n] "m" (n)
            : //register clobber
            "memory",
            "eax","ecx","edx","edi",
            "xmm0","xmm1","xmm2","xmm3","xmm4","xmm5","xmm6","xmm7"
            );
        return(sum);
}

int main()
{
    // timers
    LARGE_INTEGER frequency, time1, time2;
    double time3;
    QueryPerformanceFrequency(&frequency);
    // clock_t time1, time2;
    double gflops;

    int nmax = 4096;
    int trials = 1e7;
    double sum, residual;
    FILE *f = fopen("soddot.txt","w+");

    printf("%16s %16s %16s\n","N","Total Gflops/s","Residual");
    fprintf(f,"%16s %16s %16s\n","N","Total Gflops/s","Residual");

    for(int n = 256; n <= nmax; n += 128 ) {
        double* A = NULL;
        double* B = NULL;
        A = _mm_malloc(n*sizeof(*A), 64); if (!A) {printf("A failed\n"); return(1);}
        B = _mm_malloc(n*sizeof(*B), 64); if (!B) {printf("B failed\n"); return(1);}

        srand(time(NULL));

        // create arrays
        for(int i = 0; i < n; ++i) {
            *(A + i) = (double) rand()/RAND_MAX;
            *(B + i) = (double) rand()/RAND_MAX;
        }

        // warmup
        sum = ddot_asm(n,A,B);

        QueryPerformanceCounter(&time1);
        // time1 = clock();
        for (int count = 0; count < trials; count++){
            // sum = ddot_ref(n,A,B);
            sum = ddot_asm(n,A,B);
        }
        QueryPerformanceCounter(&time2);
        time3 = (double)(time2.QuadPart - time1.QuadPart) / frequency.QuadPart;
        // time3 = (double) (clock() - time1)/CLOCKS_PER_SEC;
        gflops = (double) (2.0*n*trials)/time3/1.0e9;
        residual = ddot_ref(n,A,B) - sum;
        printf("%16d %16f %16e\n",n,gflops,residual);
        fprintf(f,"%16d %16f %16e\n",n,gflops,residual);

        _mm_free(A);
        _mm_free(B);
    }
    fclose(f);
    return(0); // successful completion
}

EDIT: объяснение сборки

Точечный продукт является просто повторной суммой произведений двух чисел: sum += a[i]*b[i]. sum должен быть инициализирован до 0 до первой итерации. Векторизованный, вы можете сделать 2 суммы за раз, которые должны быть суммированы в конце: [sum0 sum1] = [a[i] a[i+1]]*[b[i] b[i+1]], sum = sum0 + sum1. В сборке Intel (Intel) это 3 шага (после инициализации):

pxor   xmm0, xmm0              // accumulator [sum0 sum1] = [0 0]
movapd xmm1, XMMWORD PTR [eax] // load [a[i] a[i+1]] into xmm1
mulpd  xmm1, XMMWORD PTR [edx] // xmm1 = xmm1 * [b[i] b[i+1]]
addpd  xmm0, xmm1              // xmm0 = xmm0 + xmm1

В этот момент у вас нет ничего особенного, компилятор может придумать это. Обычно вы можете получить лучшую производительность, достаточно развернуть код, чтобы использовать все доступные xmm регистры (8 регистров в 32-битном режиме). Поэтому, если вы разворачиваете его 4 раза, что позволяет использовать все 8 регистров xmm0 через xmm7. У вас будет 4 аккумулятора и 4 регистра для хранения результатов movapd и addpd. Опять же, компилятор может придумать это. Реальная часть мышления пытается создать способ конвейерного кода, т.е. Каждая команда в группе MOV/MUL/ADD работает с разными регистрами, так что все три команды выполняются одновременно (как правило, в случае с большинство процессоров). Это как вы избили компилятор. Таким образом, вам нужно создать 4-кратный развернутый код, чтобы сделать это, что может потребовать загрузки векторов досрочно и отслаивания первой или последней итерации. Это то, что KERNEL(address). Для удобства я сделал макрос 4x развернутого конвейерного кода. Таким образом, я могу легко развернуть его в 4 раза, просто изменив address. Каждый KERNEL вычисляет 8 точечных продуктов.

4b9b3361

Ответ 1

Чтобы ответить на ваш общий вопрос, вы не можете достичь максимальной производительности с помощью точечного продукта.

Проблема заключается в том, что ваш процессор может выполнять одну 128-битную нагрузку за такт, а для создания точечного продукта вам нужны две 128-разрядные нагрузки за такт.

Но это хуже, чем при больших n. Ответ на ваш второй вопрос заключается в том, что точечный продукт связан с памятью, а не вычисляется, и поэтому он не может распараллеливаться для больших n с быстрыми ядрами. Это объясняется здесь лучше why-vectorizing-the-loop-does-not-have-performance-improvement. Это большая проблема с распараллеливанием с быстрыми ядрами. Мне потребовалось некоторое время, чтобы понять это, но очень важно учиться.

На самом деле существует несколько базовых алгоритмов, которые могут в полной мере воспользоваться распараллеливанием на быстрых ядрах. В терминах BLAS-алгоритмов это только алгоритмы уровня 3 (O (n ^ 3)), такие как матричное умножение, которые действительно выигрывают от распараллеливания. Ситуация лучше на медленных ядрах, например. с графическими процессорами и Xeon Phi, поскольку расхождение между скоростью памяти и скоростью ядра намного меньше.

Если вы хотите найти алгоритм, который может приблизиться к пиковым выбросам для небольших попыток, например, скалярного * вектора или суммы скалярного * вектора. Первый случай должен делать одну нагрузку, один мульт и один магазин каждый такт, а второй случай - один мульти, один добавить и один нагрузка на каждый такт.

Я тестировал следующий код на Core 2 Duo [email protected] в Knoppix 7.3 32-бит. Я получаю около 75% пика для скалярного продукта и 75% от пика для суммы скалярного произведения. Флоп/цикл для скалярного произведения равен 2 и для суммы скалярного произведения это 4.

Скомпилирован с g++ -msse2 -O3 -fopenmp foo.cpp -ffast-math

#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#include <x86intrin.h>

void scalar_product(double * __restrict a, int n) {
    a = (double*)__builtin_assume_aligned (a, 64);
    double k = 3.14159;
    for(int i=0; i<n; i++) {
        a[i] = k*a[i]; 
    }
}

void scalar_product_SSE(double * __restrict a, int n) {
    a = (double*)__builtin_assume_aligned (a, 64);
    __m128d k = _mm_set1_pd(3.14159);    
    for(int i=0; i<n; i+=8) {
        __m128d t1 = _mm_load_pd(&a[i+0]);
        _mm_store_pd(&a[i],_mm_mul_pd(k,t1));
        __m128d t2 = _mm_load_pd(&a[i+2]);
        _mm_store_pd(&a[i+2],_mm_mul_pd(k,t2));
        __m128d t3 = _mm_load_pd(&a[i+4]);
        _mm_store_pd(&a[i+4],_mm_mul_pd(k,t3));
        __m128d t4 = _mm_load_pd(&a[i+6]);
        _mm_store_pd(&a[i+6],_mm_mul_pd(k,t4));
    }
}

double scalar_sum(double * __restrict a, int n) {
    a = (double*)__builtin_assume_aligned (a, 64);
    double sum = 0.0;    
    double k = 3.14159;
    for(int i=0; i<n; i++) {
        sum += k*a[i]; 
    }
    return sum;
}

double scalar_sum_SSE(double * __restrict a, int n) {
    a = (double*)__builtin_assume_aligned (a, 64);
    __m128d sum1 = _mm_setzero_pd();
    __m128d sum2 = _mm_setzero_pd();
    __m128d sum3 = _mm_setzero_pd();
    __m128d sum4 = _mm_setzero_pd();
    __m128d k = _mm_set1_pd(3.14159);   
    for(int i=0; i<n; i+=8) {
        __m128d t1 = _mm_load_pd(&a[i+0]);
        sum1 = _mm_add_pd(_mm_mul_pd(k,t1),sum1);
        __m128d t2 = _mm_load_pd(&a[i+2]);
        sum2 = _mm_add_pd(_mm_mul_pd(k,t2),sum2);
        __m128d t3 = _mm_load_pd(&a[i+4]);
        sum3 = _mm_add_pd(_mm_mul_pd(k,t3),sum3);
        __m128d t4 = _mm_load_pd(&a[i+6]);
        sum4 = _mm_add_pd(_mm_mul_pd(k,t4),sum4);      
    }
    double tmp[8];
    _mm_storeu_pd(&tmp[0],sum1);
    _mm_storeu_pd(&tmp[2],sum2);
    _mm_storeu_pd(&tmp[4],sum3);
    _mm_storeu_pd(&tmp[6],sum4);
    double sum = 0;
    for(int i=0; i<8; i++) sum+=tmp[i];
    return sum;
}

int main() {
    //_MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON);
    //_mm_setcsr(_mm_getcsr() | 0x8040);
    double dtime, peak, flops, sum;
    int repeat = 1<<18;
    const int n = 2048;
    double *a = (double*)_mm_malloc(sizeof(double)*n,64);
    double *b = (double*)_mm_malloc(sizeof(double)*n,64);
    for(int i=0; i<n; i++) a[i] = 1.0*rand()/RAND_MAX;

    dtime = omp_get_wtime();
    for(int r=0; r<repeat; r++) {
        scalar_product_SSE(a,n);
    }
    dtime = omp_get_wtime() - dtime;
    peak = 2*2.67;
    flops = 1.0*n/dtime*1E-9*repeat;
    printf("time %f, %f, %f\n", dtime,flops, flops/peak);

    //for(int i=0; i<n; i++) a[i] = 1.0*rand()/RAND_MAX;
    //sum = 0.0;    
    dtime = omp_get_wtime();
    for(int r=0; r<repeat; r++) {
        scalar_sum_SSE(a,n);
    }
    dtime = omp_get_wtime() - dtime;
    peak = 2*2*2.67;
    flops = 2.0*n/dtime*1E-9*repeat;
    printf("time %f, %f, %f\n", dtime,flops, flops/peak);
    //printf("sum %f\n", sum);

}