Подтвердить что ты не робот

Оптимизация кодирования переменной длины

У меня есть случай, когда мне нужно сжать много часто небольших значений. Таким образом, я сжимаю их с байтовой кодировкой переменной длины (ULEB128, чтобы быть конкретным):

size_t
compress_unsigned_int(unsigned int n, char* data)
{
  size_t size = 0;
  while (n  > 127)
  {
    ++size;
    *data++ = (n & 127)|128;
    n >>= 7;
  }
  *data++ = n;
  return ++size;
}

Есть ли более эффективный способ сделать это (возможно, используя SSE)?

Изменить: после этого сжатия результат сохраняется в data, беря size байты. Затем функция сжатия вызывается в следующем неподписанном int.

4b9b3361

Ответ 1

Если ваши значения unsigned int ограничены определенным диапазоном - скажем, 32 бита - вы можете развернуть цикл:

size_t
compress_unsigned_int(unsigned int n, char* data)
{
  size_t size;

  if (n < 0x00000080U) {
    size = 1;
    goto b1;
  }
  if (n < 0x00004000U) {
    size = 2;
    goto b2;
  }
  if (n < 0x00200000U) {
    size = 3;
    goto b3;
  }
  if (n < 0x10000000U) {
    size = 4;
    goto b4;
  }
  size = 5;

  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b4:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b3:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b2:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b1:
  *data = n;
  return size;
}

Ответ 2

Первое, что вы хотите сделать, это проверить любое возможное решение против вашего текущего кода.

Я думаю, вы можете попытаться избавиться от зависимостей данных, чтобы процессор мог работать больше в одно и то же время.

Каковы зависимости от данных? Когда данные передаются через вашу функцию, текущее значение n зависит от предыдущего значения n, которое зависит от значения до этого... представляет собой длинную цепочку зависимостей данных. В приведенном ниже коде n никогда не изменяется, поэтому процессор может "пропустить" и делать пару разных вещей одновременно, не дожидаясь вычисления нового n.

// NOTE: This code is actually incorrect, as caf noted.
// The byte order is reversed.
size_t
compress_unsigned_int(unsigned int n, char *data)
{
    if (n < (1U << 14)) {
        if (n < (1U << 7)) {
            data[0] = n;
            return 1;
        } else {
            data[0] = (n >> 7) | 0x80;
            data[1] = n & 0x7f;
            return 2;
        }
    } else if (n < (1U << 28)) {
        if (n < (1U << 21)) {
            data[0] = (n >> 14) | 0x80;
            data[1] = ((n >> 7) & 0x7f) | 0x80;
            data[2] = n & 0x7f;
            return 3;
        } else {
            data[0] = (n >> 21) | 0x80;
            data[1] = ((n >> 14) & 0x7f) | 0x80;
            data[2] = ((n >> 7) & 0x7f) | 0x80;
            data[3] = n & 0x7f;
            return 4;
        }
    } else {
        data[0] = (n >> 28) | 0x80;
        data[1] = ((n >> 21) & 0x7f) | 0x80;
        data[2] = ((n >> 14) & 0x7f) | 0x80;
        data[3] = ((n >> 7) & 0x7f) | 0x80;
        data[4] = n & 0x7f;
        return 5;
    }
}

Я тестировал производительность, выполняя ее в узком цикле с 0..UINT_MAX. В моей системе время выполнения:

(Lower is better)
Original: 100%
caf unrolled version: 79%
My version: 57%

Некоторая незначительная настройка может привести к лучшим результатам, но я сомневаюсь, что вы получите гораздо больше улучшения, если не пойдете на сборку. Если ваши целые числа имеют определенные диапазоны, то вы можете использовать профилирование, чтобы заставить компилятор добавить правильные предсказания ветвей в каждую ветвь. Это может дать вам несколько дополнительных процентных пунктов скорости. ( EDIT: Я получил 8% от переупорядочивания ветвей, но это извращенная оптимизация, потому что он полагается на то, что каждый номер 0... UINT_MAX появляется с равной частотой. Я не рекомендую это. )

SSE не поможет. SSE предназначен для одновременной работы с несколькими частями данных с одинаковой шириной, что значительно затрудняет получение SIMD для ускорения чего-либо с кодировкой переменной длины. (Это не обязательно невозможно, но это может быть невозможно, и вам нужно быть довольно умным, чтобы понять это.)

Ответ 3

Быстрая реализация в буферах протокола google:

http://code.google.com/p/protobuf/

Посмотрите на методы CodedOutputStream:: WriteVarintXXX.

Первый метод может быть переписан как:

char *start = data;
while (n>=0x80)
{
    *data++=(n|0x80);
    n>>=7;
}
*data++=n;
return data-start;

Согласно моему тесту, реализация буферов google лучшая, а затем другие реализации. Однако мой тест довольно искусственный, лучше проверить каждый подход в своем приложении и выбрать лучшее. Представленные оптимизации лучше работают с определенными значениями числа.

Вот код моего тестового приложения. (Примечание: я удалил код из compress_unsigned_int_google_buf. Вы можете найти реализацию в следующем файле из протокола буфера google: метод coded_stream.cc CodedOutputStream:: WriteVarint32FallbackToArrayInline)

size_t compress_unsigned_int(unsigned int n, char* data)
{
    size_t size = 0;
    while (n  > 127)
    {
        ++size;
        *data++ = (n & 127)|128;
        n >>= 7;
    }
    *data++ = n;
    return ++size;
}

size_t compress_unsigned_int_improved(unsigned int n, char* data)
{
    size_t size;

    if (n < 0x00000080U) {
        size = 1;
        goto b1;
    }
    if (n < 0x00004000U) {
        size = 2;
        goto b2;
    }
    if (n < 0x00200000U) {
        size = 3;
        goto b3;
    }
    if (n < 0x10000000U) {
        size = 4;
        goto b4;
    }
    size = 5;

    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b4:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b3:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b2:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b1:
    *data = n;
    return size;
}

size_t compress_unsigned_int_more_improved(unsigned int n, char *data)
{
    if (n < (1U << 14)) {
        if (n < (1U << 7)) {
            data[0] = n;
            return 1;
        } else {
            data[0] = (n >> 7) | 0x80;
            data[1] = n & 0x7f;
            return 2;
        }
    } else if (n < (1U << 28)) {
        if (n < (1U << 21)) {
            data[0] = (n >> 14) | 0x80;
            data[1] = ((n >> 7) & 0x7f) | 0x80;
            data[2] = n & 0x7f;
            return 3;
        } else {
            data[0] = (n >> 21) | 0x80;
            data[1] = ((n >> 14) & 0x7f) | 0x80;
            data[2] = ((n >> 7) & 0x7f) | 0x80;
            data[3] = n & 0x7f;
            return 4;
        }
    } else {
        data[0] = (n >> 28) | 0x80;
        data[1] = ((n >> 21) & 0x7f) | 0x80;
        data[2] = ((n >> 14) & 0x7f) | 0x80;
        data[3] = ((n >> 7) & 0x7f) | 0x80;
        data[4] = n & 0x7f;
        return 5;
    }
}

size_t compress_unsigned_int_simple(unsigned int n, char *data)
{
    char *start = data;
    while (n>=0x80)
    {
        *data++=(n|0x80);
        n>>=7;
    }
    *data++=n;
    return data-start;
}

inline size_t compress_unsigned_int_google_buf(unsigned int value, unsigned char* target) {

          // This implementation might be found in google protocol buffers

}



#include <iostream>
#include <Windows.h>
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
    char data[20];
    unsigned char udata[20];
    size_t size = 0;
    __int64 timer;

    cout << "Plain copy: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        memcpy(data,&i,sizeof(i));
        size += sizeof(i);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Original: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size << endl;

    cout << "Improved: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_improved(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "More Improved: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_more_improved(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Simple: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_simple(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Google Buffers: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_google_buf(i,udata);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    return 0;
}

На моей машине с компилятором Visual С++ у меня есть следующие результаты:

Обычная копия: 358 мс

Оригинал: 2497 мс

Улучшено: 2215 мс

Улучшено: 2231 мс

Простой: 2059 мс

Буферы Google: 968 мс

Ответ 4

После большего просмотра я нашел еще одну часто используемую реализацию в Sqlite3 (версия кода 3070900):

inline int sqlite3PutVarint(unsigned char *p, unsigned __int64 v){
  int i, j, n;
  unsigned char buf[10];
  if( v & (((unsigned __int64)0xff000000)<<32) ){
    p[8] = (unsigned char)v;
    v >>= 8;
    for(i=7; i>=0; i--){
      p[i] = (unsigned char)((v & 0x7f) | 0x80);
      v >>= 7;
    }
    return 9;
  }    
  n = 0;
  do{
    buf[n++] = (unsigned char)((v & 0x7f) | 0x80);
    v >>= 7;
  }while( v!=0 );
  buf[0] &= 0x7f;
  for(i=0, j=n-1; j>=0; j--, i++){
    p[i] = buf[j];
  }
  return n;
}

Существует также небольшая оптимизированная версия для 32-битного int:

int sqlite3PutVarint32(unsigned char *p, unsigned int v){

  if( (v & ~0x7f)==0 ){
    p[0] = v;
    return 1;
  }

  if( (v & ~0x3fff)==0 ){
    p[0] = (unsigned char)((v>>7) | 0x80);
    p[1] = (unsigned char)(v & 0x7f);
    return 2;
  }
  return sqlite3PutVarint(p, v);
}

Разочаровывает то, что реализация Sqlite выполняет самое худшее в моем тесте. Поэтому, если вы собираетесь использовать Sqlite, рассмотрите возможность замены реализации по умолчанию на оптимизированную.

Между тем я думаю о дальнейших возможных оптимизации.

Ответ 5

Вы можете сэкономить несколько операций, заменив size_t size=0;...++size;...;return size++; с помощью char* base=data;...;return data-base;

Ответ 6

Здесь моя оптимизация на языке ассемблера x86 (32 бит). Вы можете скомпилировать NASM и ссылку. Я не знаю, было ли это быстро или медленно, я просто получал удовольствие от кодирования:)

global compress_unsigned_int

;   bit fields:
;   31                              0
;    eeeedddddddcccccccbbbbbbbaaaaaaa


compress_unsigned_int:
    mov     eax, [esp+4]    ; n
    mov     ecx, [esp+8]    ; data

    cmp     eax, 00001111111111111111111111111111b
    jbe     out4b

    shld    edx, eax, 11
    shl     eax, 10
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    or      edx, 10000000100000001000000010000000b

    mov     [ecx], edx
    mov     eax, [esp+4]
    shr     eax, 28
    mov     [ecx+4], al

    mov     eax, 5
    jmp     exit

out4b:
    cmp     eax, 00000000000111111111111111111111b
    jbe     out3b

    shld    edx, eax, 11
    shl     eax, 10
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    shl     eax, 7
    shld    edx, eax, 8
    or      edx, 00000000100000001000000010000000b

    mov     [ecx], edx

    mov     eax, 4
    jmp     exit

out3b:
    cmp     eax, 00000000000000000011111111111111b
    jbe     out2b

    shld    edx, eax, 25
    shl     eax, 24
    shld    edx, eax, 8

    mov     eax, edx

    or      edx, 00000000000000001000000010000000b

    mov     [ecx], dx
    shr     eax, 15
    mov     [ecx+2], al

    mov     eax, 3
    jmp     exit

out2b:
    cmp     eax, 00000000000000000000000001111111b
    jbe     out1b

    shld    edx, eax, 25
    shl     eax, 24
    shld    edx, eax, 8
    or      edx, 00000000000000000000000010000000b

    mov     [ecx], dx

    mov     eax, 2
    jmp     exit

out1b:
    mov     [ecx], al

    mov     eax, 1

exit:
    ret