C++如何加速(使用x86 SIMD)批量可变长度整数编码/解码(可运行基准)

C++ how to speed up (with x86 SIMD) batch variable length integer encoding / decoding (runnable benchmark)

提问人:Huy Le 提问时间:10/27/2023 最后编辑:Huy Le 更新时间:11/15/2023 访问量:328

问:

我有这种编码方法,其工作原理是将 16x 的小块编码为打包成 8 个字节的 16 个标志半字节的小块,然后是每个输入的 1 个或更多字节的有效载荷:int64_tint64_t

  • 使用半字节(4 位)表示flag
  • 保留 1 个字节(1 个字节可以存储 2 个值的标志)flag
  • 存储在标志中(半字节中的高位)sign(value)
  • 存储在缓冲区中,字节数可变abs(value)
  • 在标志中存储所需的字节数。(半字节中下 3 位)

此示例基准测试已简化。我已经用实际数据测试了我的用例:它比 LEB128(又名 varint)和 lz4 更快,而且压缩效果也更好。因此,让我们专注于这种编码方法或类似的东西。

如何改进此代码?我最关心的是减压速度。如果可以提高速度,则可以对编码/解码格式(或其他格式)进行一些更改。我想知道在这种情况下是否可以使用 SIMD。

特别说明:

  • 大多数的长度为 <= 2 个字节。abs(value)
  • 在99%的情况下,连续4个可以用<= 7个字节来表示。同样,在99%的情况下,16个连续的字节可以用<=31字节来表示。abs(value)abs(value)
  • 2 个重要功能是和encodedecode_original
//  ##### from @aqrit's answer, tweaked by @petercordes #######

#include <cstddef>  // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy

unsigned char* encode_buf_bmi(size_t n, const int64_t* src, unsigned char* dst) {
    unsigned char* key_ptr = dst;
    size_t num_chunks = n >> 3;
    unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    while (num_chunks--) {
        uint64_t key = 0;
        for (size_t i = 0; i < 8; i++) {
            uint64_t v = (uint64_t)*src++;
            memcpy(data_ptr, &v, sizeof(v)); // assumes little endian.

            v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
            v = (v << 8) | v; // combine lengths of 7 and 8
            v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel... 
            data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
            key |= v << (i * 3);
        }
        memcpy(key_ptr, &key, 3); // assumes little endian.  Probably faster with overlapping 4-byte stores, except for the last chunk to avoid stepping on data
        key_ptr += 3;
    }
    // TODO: tail loop...
    
    return data_ptr;  // end of used part of buffer
}
    
void decode_buf_bmi(size_t n, const unsigned char* src, int64_t* dst) {
    const unsigned char* key_ptr = src;
    size_t num_chunks = n >> 3;
    const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
        data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian) 
        while (num_chunks--) {
            unsigned keys;
            //memcpy(&keys, key_ptr, 3);   // big slowdown with GCC13.2: like 258 ms per buffer on Skylake 3.9GHz,  n_msg = 100'000'000
        memcpy(&keys, key_ptr, 4);     // 138 ms per buffer with 4-byte loads.  (Clang 16: 138 ms per buffer either way.)
            key_ptr += 3;
            for (size_t i = 0; i < 8; i++) {
                uint64_t v;
                size_t k = keys & 0x07;
                size_t len = k + ((k + 1) >> 3); // k==7 is len=8
                uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
                size_t shift = (64 - (len * 8)) & (size_t)mask; 

                keys >>= 3;     
                data_ptr += len; 
                memcpy(&v, data_ptr, sizeof(v));
                v &= mask;
                *dst++ = (int64_t)v >> shift;
            }
        }
        data_ptr += sizeof(uint64_t);
    }

    // TODO: tail loop...
}


// ############## From @Soonts' answer ##############
#include <immintrin.h>
#include <stdint.h>

// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
    __m128i v = src;
    // https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
    v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );

    // So far we have computed inclusive prefix sum
    // Keep the last byte which is total sum of bytes in the vector
    uint16_t tmp = _mm_extract_epi16( v, 7 );
    tmp >>= 8;
    sum = tmp;

    // Subtract original numbers to make exclusive sum = byte offsets to load
    return _mm_sub_epi8( v, src );
}

// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
    // Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
    const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
    const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
    const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
    const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
    __m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );

    // Mask away the higher pieces in the loaded numbers
    __m256i shift = _mm256_cvtepu8_epi64( bits );
    const __m256i ones = _mm256_set1_epi32( -1 );
    __m256i mask = _mm256_sllv_epi64( ones, shift );
    return _mm256_andnot_si256( mask, r );
}

inline __m256i applySigns( __m256i v, __m128i signs )
{
    // Sign extend the masks from bytes to int64
    __m256i mask = _mm256_cvtepi8_epi64( signs );
    // Conditionally negate the numbers
    __m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
    return _mm256_blendv_epi8( v, neg, mask );
}

class BlockDecoder
{
    // Load offsets, in bytes
    __m128i offsetBytes;
    // Payload bits per element, the bytes are in [ 0 .. 64 ] interval
    __m128i payloadBits;
    // 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
    __m128i signs;
    // Count of payload bytes in the complete block
    size_t payloadBytes;

    // Decode the block header
    inline void loadHeader( const uint8_t* rsi )
    {
        // Load 8 bytes, and zero extend them into uint16_t
        const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );

        // Unpack lengths
        const __m128i seven = _mm_set1_epi8( 7 );
        const __m128i l4 = _mm_slli_epi16( v, 4 );
        __m128i lengths = _mm_or_si128( v, l4 );
        lengths = _mm_and_si128( lengths, seven );
        // Transform 7 into 8
        __m128i tmp = _mm_cmpeq_epi8( lengths, seven );
        lengths = _mm_sub_epi8( lengths, tmp );

        // Byte offsets to load 16 numbers, and count of payload bytes in the complete block
        offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
        // Count of payload bits in the loaded numbers, lengths * 8
        payloadBits = _mm_slli_epi16( lengths, 3 );
        // Signs vector, we only use the highest 0x80 bit in these bytes
        signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
    }

    // Decode and store 4 numbers
    template<int slice>
    inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
    {
        uint32_t off;
        __m128i bits, s;
        if constexpr( slice != 0 )
        {
            off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
            constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
            bits = _mm_shuffle_epi32( payloadBits, imm );
            s = _mm_shuffle_epi32( signs, imm );
        }
        else
        {
            off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
            // For the first slice of the block, the 4 lowest bytes are in the correct locations already
            bits = payloadBits;
            s = signs;
        }

        __m256i v = loadAndMask( payload, off, bits );
        v = applySigns( v, s );
        _mm256_storeu_si256( ( __m256i* )rdi, v );
    }

public:

    // Decode and store a block of 16 numbers, return pointer to the next block
    const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
    {
        loadHeader( rsi );
        rsi += 8;

        decodeSlice<0>( rdi, rsi );
        decodeSlice<1>( rdi + 4, rsi );
        decodeSlice<2>( rdi + 8, rsi );
        decodeSlice<3>( rdi + 12, rsi );

        return rsi + payloadBytes;
    }
};


//   ############ reference version and updated benchmark framework ###########
#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <vector>
#include <chrono>
#include <iomanip>
using namespace std;

// https://en.wikipedia.org/wiki/Xorshift   IIRC, wikipedia used to have different constants for some PRNG, maybe some of these, vs. the upstream site; I just copied wikipedia
#include <stdint.h>
uint64_t rol64(uint64_t x, int k)
{
    return (x << k) | (x >> (64 - k));
}

struct xoshiro256ss_state {
    uint64_t s[4];
};

static xoshiro256ss_state rng_state; // init by main
uint64_t xoshiro256ss(struct xoshiro256ss_state *state)
{
    uint64_t *s = state->s;
    uint64_t const result = rol64(s[1] * 5, 7) * 9;
    uint64_t const t = s[1] << 17;

    s[2] ^= s[0];
    s[3] ^= s[1];
    s[1] ^= s[2];
    s[0] ^= s[3];

    s[2] ^= t;
    s[3] = rol64(s[3], 45);

    return result;
}


namespace {
    class MyTimer {
    std::chrono::time_point<std::chrono::system_clock> start;

public:
    void startCounter() {
        start = std::chrono::system_clock::now();
    }

    int64_t getCounterNs() {
        return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count();
    }

    int64_t getCounterMs() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count();
    }

    double getCounterMsPrecise() {
        return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count()
                / 1000000.0;
    }
};
}

template <typename T>
void leb128_encode(uint8_t* data, T value, size_t &idx)
{
  bool more = true;
  bool negative = value < 0;
  constexpr int size = sizeof(value) * 8;

  while (more) {
    uint8_t byte = value & 0x7F;
    value >>= 7;
    if (negative) value |= (~0LL << (size - 7));

    if ((value == 0 && (byte & 0x40) == 0) || (value == -1 && (byte & 0x40) != 0)) {
      more = false;
    } else {
        byte |= 0x80;
    }

    data[idx++] = byte;
  }
}

template <typename T>
void leb128_decode(uint8_t* data, T &value, size_t &idx)
{
  value = 0;
  int shift = 0;
  constexpr int size = sizeof(value) * 8;
  uint8_t byte;

  do {
    byte = data[idx++];
    value |= int64_t(byte & 0x7F) << shift;
    shift += 7;
  } while (byte & 0x80);

  if (shift < size && (byte & 0x40)) {
    value |= (~0LL << shift);
  }
}

int64_t n_msg = 100'000'000;
constexpr int BLK_SIZE = 16;
uint8_t* buffer;

int64_t *original;
int64_t *decoded;
int64_t *dummy_array;

// encode 1 value using variable length encoding,
// storing the result in a flag
void encode(int64_t value, uint8_t &flag, size_t &idx)
{
  bool negative = value < 0;
  value = abs(value);

  uint8_t byte_cnt = 0;
  while (value > 0) {
    buffer[idx] = value & 0xFF; // lowest byte
    idx++;
    value >>= 8;
    byte_cnt++;
  }

  // special cases. Since we only have 3 bits to represent 9 values (0->8),
  // we choose to group case 7-8 together because they're rarest.  
  if (byte_cnt == 7) {
    buffer[idx] = 0;
    idx++;
  } else if (byte_cnt == 8) {
    // since we only have 3 bits, we use 7 to represent 8 bytes
    byte_cnt = 7;
  }

  flag = byte_cnt; // bit 0-2 represent length
  if (negative) flag |= 0b1000; // bit 3 represent sign (1 means negative)  
}

// returns compression ratio
double __attribute__ ((noinline)) encode_all(int type = 0)
{
  if (type == 0) {
    // Soonts version uses the same format as this reference version
    size_t idx = 0;
    // encode in blocks of 16 values
    for (size_t i = 0; i < n_msg; i += 16) {
      size_t flag_idx = idx;
      idx += 8; // first 8 bytes of a block are for sign/length flags
      for (int t = 0; t < BLK_SIZE; t += 2) {
        uint8_t low_flag, high_flag;
        encode(original[i + t], low_flag, idx);
        encode(original[i + t + 1], high_flag, idx);
        buffer[flag_idx + t / 2] = (high_flag << 4) | low_flag;
      }
    }
    return (double(idx) / (n_msg * 8));
  } else if (type == 1) {
    const unsigned char *end = encode_buf_bmi(n_msg, original, buffer);
    return (double(end - buffer) / (n_msg * 8));    
  } else if (type == 2) {
    size_t idx = 0;
    for (int i = 0; i < n_msg; i++) leb128_encode(buffer, original[i], idx);
    return (double(idx) / (n_msg * 8));
  }

  cout << "encode_all unknown type " << type << std::endl;
  exit(1);
}

template <typename T>
void extract_flag(uint8_t flag, T &low_sign, T &low_length, T &high_sign, T &high_length)
{
  low_sign = flag & 0b1000;
  low_length = flag & 0b0111;
  high_sign = (flag >> 4) & 0b1000;
  high_length = (flag >> 4) & 0b0111;
}

void __attribute__ ((noinline)) decode_original()
{
  static constexpr int64_t mult[] = {
    1, 1LL << 8, 1LL << 16, 1LL << 24, 1LL << 32, 1LL << 40, 1LL << 48, 1LL << 56
  };

  size_t idx = 0, num_idx = 0;
  for (size_t i = 0; i < n_msg; i += 16) {
    // first 8 bytes of a block are flags
    int signs[BLK_SIZE], lens[BLK_SIZE];
    for (int t = 0; t < BLK_SIZE; t += 2) {
      extract_flag(buffer[idx], signs[t], lens[t], signs[t + 1], lens[t + 1]);
      idx++;
    }
    for (int t = 0; t < BLK_SIZE; t++) if (lens[t] == 7) lens[t] = 8; // special case

    // decode BLK_SIZE values  
    for (int t = 0; t < BLK_SIZE; t++) {
      int64_t value = 0;
      for (int i = 0; i < lens[t]; i++) value += mult[i] * buffer[idx + i];
      if (signs[t]) value = -value;
      decoded[num_idx + t] = value;
      idx += lens[t];
    }
    num_idx += BLK_SIZE;
  }
}

void __attribute__ ((noinline)) decode_soonts()
{
  BlockDecoder dec;
  const uint8_t* rsi = buffer;
  int64_t* rdi = decoded;
  for( size_t i = 0; i < n_msg; i += 16 )
  {
      rsi = dec.decode( rdi, rsi );
      rdi += 16;
  }
}

void __attribute__ ((noinline)) decode_aqrit()
{
  decode_buf_bmi(n_msg, buffer, decoded);
}

void __attribute__ ((noinline)) decode_leb()
{
  size_t idx = 0;
  for (int i = 0; i < n_msg; i++) {
    leb128_decode(buffer, decoded[i], idx);
  }
}

//------------------
//------------------
//------------------  MAIN

void __attribute__ ((noinline)) gen_data()
{
  for (size_t i = 0; i < n_msg; i++) {
    uint64_t modifier = xoshiro256ss(&rng_state);  // low 32 bits decide magnitude range, high bits decide sign, to avoid correlation with %100 over the full thing.
    if ( uint32_t(modifier) % 100 == 0) {
      original[i] = int64_t(xoshiro256ss(&rng_state)) * 1'000'000'000;
    } else {
      original[i] = xoshiro256ss(&rng_state) % 70'000;
    }
    if ((modifier>>62) == 0)
      original[i] *= -1;
  }
}

void __attribute__ ((noinline)) check(const string &name)
{
  for (size_t i = 0; i < n_msg; i++) if (original[i] != decoded[i]) {
    cout << name << " wrong at " << i << " " << original[i] << " " << decoded[i] << "\n";
    exit(1);
  }
}

int64_t volatile dummy = 42;
constexpr int N_DUMMY = 8'000'000;
void __attribute__ ((noinline)) clear_cache()
{
#if 1
  for (int i = 0; i < N_DUMMY; i++) dummy_array[i] = dummy;
  std::sort(dummy_array, dummy_array + N_DUMMY);
  dummy = dummy_array[rand() % N_DUMMY];
#else
    asm("" ::: "memory");
#endif
}

using FuncPtr = void (*)();
int64_t __attribute__ ((noinline)) test(FuncPtr func, const string &name)
{
  clear_cache();
  MyTimer timer;
  timer.startCounter();
  func();
  int64_t cost = timer.getCounterNs();
  check(name);
  return cost;
}

void printTableRow(const std::vector<std::string>& columns, int columnWidth) {
    for (const std::string& column : columns) {
        std::cout << std::left << std::setw(columnWidth) << column << " | ";
    }
    std::cout << std::endl;
}

int main()
{
  // srand(time(NULL));
  rng_state.s[0] = time(nullptr);  // other elements left zero.  This is terrible but sufficient for our purposes

  MyTimer timer;
  buffer = new uint8_t[n_msg * 10];
  original = new int64_t[n_msg];
  decoded = new int64_t[n_msg];
  dummy_array = new int64_t[N_DUMMY]; // enough to flush cache
  
  memset(buffer, 0, n_msg * 10);
  memset(original, 0, n_msg * 8);
  memset(decoded, 0, n_msg * 8);
  memset(dummy_array, 0, N_DUMMY * 8);

  int n_test = 10;
  constexpr int L = 4;
  string names[L] = {"original", "soonts", "aqrit", "leb"};
  int64_t total_costs[L] = {0, 0, 0, 0};

  for (int t = 0; t < n_test; t++) {    
    gen_data();
    double ratio_original = encode_all(0);
    int64_t costs[L];

    costs[0] = test(decode_original, names[0]);
    costs[1] = test(decode_soonts, names[1]);

    double ratio_aqrit = encode_all(1);
    costs[2] = test(decode_aqrit, names[2]);

    double ratio_leb = encode_all(2);
    costs[3] = test(decode_leb, names[3]);
    
    if (t == 0) {
      cout << "Compression ratios: " << ratio_original << " " << ratio_aqrit << " " << ratio_leb << "\n";
    }

    cout << t << ": ";
    for (int i = 0; i < L; i++) {
      cout << (double(costs[i]) / 1'000'000) << " ";
      total_costs[i] += costs[i];
    }
    cout << "\n";
  }

  vector<string> headers = {"name", "cost (ms)", "cost/int64 (ns)", "cost/byte (ns)"};
  printTableRow(headers, 15);
  for (int i = 0; i < L; i++) {    
    double average_cost_ms = double(total_costs[i]) / n_test / 1'000'000;
    double cost_int64 = double(total_costs[i]) / (n_msg * n_test);
    double cost_byte = cost_int64 / 8;
    
    vector<string> row;
    row.push_back(names[i]);
    row.push_back(to_string(average_cost_ms));
    row.push_back(to_string(cost_int64));
    row.push_back(to_string(cost_byte));
    printTableRow(row, 15);
  }

  return 0;
}

编辑:我刚刚发现有人在 2017 年发现了几乎相同的编码/解码格式(但适用于 int32)。多么有趣的巧合,两个人在 2 个不同的时间独立“发明”了同样的东西:D

Soonts 的答案 (, , ) 代码的第 2 版在这里。它看起来更干净,并且不会读取超过输入缓冲区的末尾,但速度明显较慢,因此我保留了第一个版本。funcinl funcvpgather

  • func:此版本不需要解码器对象。force_inline = 0, use_gather = 0
  • inl func:force_inline = 1, use_gather = 0
  • vpgather:force_inline = 0, use_gather = 1

编译命令:g++ -o main test.cpp -std=c+=17 -O3 -mavx2 -mlzcnt -march=native

基准测试。 提供可衡量的性能提升(~8% 最佳情况)-march=native

解压缩 int64 的时间,平均超过 10 次运行:10^8

// AMD EPYC 75F3 (Zen 3 x86-64), 2950MHz
Compression ratios: 0.327437 0.369956
0: 447.651 65.0414 110.171 
1: 442.853 64.5952 105.635 
2: 434.715 64.1977 108.984 
3: 430.09 63.0572 104.074 
4: 424.451 64.6397 103.604 
5: 436.631 65.0076 104.04 
6: 429.59 64.1896 102.936 
7: 434.184 64.0522 104.035 
8: 430.223 69.3877 105.922 
9: 420.659 63.7519 105.563
// AMD EPYC 75F3 -march=native
name            | cost (ms)       | cost/int64 (ns) | cost/byte (ns)  | 
original        | 433.104635      | 4.331046        | 0.541381        | 
soonts          | 64.792034       | 0.647920        | 0.080990        | 
aqrit           | 105.496544      | 1.054965        | 0.131871        |
leb             | 438.236469      | 4.382365        | 0.547796        |
soonts (func)   | 68.403465       | 0.684035        | 0.085504        | 
soon (inl func) | 71.202335       | 0.712023        | 0.089003        |
soon (vpgather) | 78.308508       | 0.783085        | 0.097886        |

// AMD EPYC 75F3 without -march=native
name            | cost (ms)       | cost/int64 (ns) | cost/byte (ns)  | 
original        | 424.631426      | 4.246314        | 0.530789        | 
soonts          | 69.623498       | 0.696235        | 0.087029        | 
aqrit           | 109.895916      | 1.098959        | 0.137370        | 

// Intel(R) Xeon(R) Gold 6252N CPU (supposedly 2.3 GHz, I can't change/lock frequency on this borrowed machine)
name            | cost (ms)       | cost/int64 (ns) | cost/byte (ns)  | 
original        | 549.280441      | 5.492804        | 0.686601        | 
soonts          | 118.401718      | 1.184017        | 0.148002        | 
aqrit           | 151.811314      | 1.518113        | 0.189764        |
soon (func)     | 122.440794      | 1.224408        | 0.153051        |
soon (inl func) | 120.559447      | 1.205594        | 0.150699        |
soon (vpgather) | 115.704298      | 1.157043        | 0.144630        |
aqrit           | 151.811314      | 1.518113        | 0.189764        |
C++ 优化 编码 压缩 SIMD

评论

1赞 Peter Cordes 10/27/2023
你看过 varint 吗?使用 BMI2,它在 Zen 3 或更高版本或英特尔 Haswell 及更高版本上可以非常快。(Zen 2 及更早版本支持 BMI2,但微编码速度非常慢。此外,AVX-512 可能有助于 varint 解码,如果可用且速度快。为什么 varint 是一种高效的数据表示形式?pextpdeppextvpcompressb
1赞 Peter Cordes 10/27/2023
我有一些正在进行的使用 BMI2 或 AVX-512 VBMI 的 varint 编码/解码代码,我从未在 将 varint64 流解压缩到 AVX2 的 __m256i 的 qword 元素中发布。如果它对任何人都有用:godbolt.org/z/76bcanMTG .这些函数被大量注释,但我忘记了我测试过哪些。
2赞 Peter Cordes 10/27/2023
哦,是的,我认为 LEB128 看起来和 一样,使用 MSB 来指示是否有另一个字节。您的方案可能更适合 SIMD,或者至少适合在没有 fast 或 AVX-512VBMI 的情况下在 Zen 2 及更早版本的 CPU 上快速运行。快速可能会有所帮助,执行 64 位加载并使用该字段来决定将多少个高位归零,以及要使用的指针增量。嗯,这意味着负载使用延迟是循环携带的数据依赖关系关键路径的一部分,我在优化 varint 解码时遇到了同样的问题,尤其是在标量方面。varintpextbzhilength
2赞 Peter Cordes 10/27/2023
LEB128 大概代表 Little-Endian Base-128,即每个“数字”(字节)中的有用位数。它是一种任意精度的格式,与使用固定宽度计数的方案不同。如果您只需要对适合int64_t的数字进行编码,那么这个限制是可以的,尽管它的密度比 LEB128 低一些,在 LEB128 中,对于小数字,您可能只有 1 到 2 位的信令长度,而不是对于长度和符号的额外整字节。在这类事情中,速度/空间权衡并不罕见。
2赞 chtz 10/28/2023
关于“特别说明:大多数值的长度为 <= 2 个字节”:在这种情况下,“大多数”是什么意思?用超高效的代码处理这种情况并回退到通用代码就足够了吗?您是否也“大多数时间”有 4 个连续值<=2 个字节?<=2

答:

3赞 Soonts 10/29/2023 #1

这相对棘手,但仍然有可能对解码器进行矢量化。这是一个实现,在我配备 AMD Zen3 CPU 的计算机上,它的性能比您的参考版本高出 2.8 倍。

#include <immintrin.h>
#include <stdint.h>

// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
    __m128i v = src;
    // https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
    v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );

    // So far we have computed inclusive prefix sum
    // Keep the last byte which is total sum of bytes in the vector
    uint16_t tmp = _mm_extract_epi16( v, 7 );
    tmp >>= 8;
    sum = tmp;

    // Subtract original numbers to make exclusive sum = byte offsets to load
    return _mm_sub_epi8( v, src );
}

// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
    // Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
    const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
    const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
    const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
    const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
    __m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );

    // Mask away the higher pieces in the loaded numbers
    __m256i shift = _mm256_cvtepu8_epi64( bits );
    const __m256i ones = _mm256_set1_epi32( -1 );
    __m256i mask = _mm256_sllv_epi64( ones, shift );
    return _mm256_andnot_si256( mask, r );
}

inline __m256i applySigns( __m256i v, __m128i signs )
{
    // Sign extend the masks from bytes to int64
    __m256i mask = _mm256_cvtepi8_epi64( signs );
    // Conditionally negate the numbers
    __m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
    return _mm256_blendv_epi8( v, neg, mask );
}

class BlockDecoder
{
    // Load offsets, in bytes
    __m128i offsetBytes;
    // Payload bits per element, the bytes are in [ 0 .. 64 ] interval
    __m128i payloadBits;
    // 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
    __m128i signs;
    // Count of payload bytes in the complete block
    size_t payloadBytes;

    // Decode the block header
    inline void loadHeader( const uint8_t* rsi )
    {
        // Load 8 bytes, and zero extend them into uint16_t
        const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );

        // Unpack lengths
        const __m128i seven = _mm_set1_epi8( 7 );
        const __m128i l4 = _mm_slli_epi16( v, 4 );
        __m128i lengths = _mm_or_si128( v, l4 );
        lengths = _mm_and_si128( lengths, seven );
        // Transform 7 into 8
        __m128i tmp = _mm_cmpeq_epi8( lengths, seven );
        lengths = _mm_sub_epi8( lengths, tmp );

        // Byte offsets to load 16 numbers, and count of payload bytes in the complete block
        offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
        // Count of payload bits in the loaded numbers, lengths * 8
        payloadBits = _mm_slli_epi16( lengths, 3 );
        // Signs vector, we only use the highest 0x80 bit in these bytes
        signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
    }

    // Decode and store 4 numbers
    template<int slice>
    inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
    {
        uint32_t off;
        __m128i bits, s;
        if constexpr( slice != 0 )
        {
            off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
            constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
            bits = _mm_shuffle_epi32( payloadBits, imm );
            s = _mm_shuffle_epi32( signs, imm );
        }
        else
        {
            off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
            // For the first slice of the block, the 4 lowest bytes are in the correct locations already
            bits = payloadBits;
            s = signs;
        }

        __m256i v = loadAndMask( payload, off, bits );
        v = applySigns( v, s );
        _mm256_storeu_si256( ( __m256i* )rdi, v );
    }

public:

    // Decode and store a block of 16 numbers, return pointer to the next block
    const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
    {
        loadHeader( rsi );
        rsi += 8;

        decodeSlice<0>( rdi, rsi );
        decodeSlice<1>( rdi + 4, rsi );
        decodeSlice<2>( rdi + 8, rsi );
        decodeSlice<3>( rdi + 12, rsi );

        return rsi + payloadBytes;
    }
};

我对代码的测试很少,可以进一步提高性能。

如您所见,该实现需要 AVX2,并且是无分支的。

使用示例:

BlockDecoder dec;
const uint8_t* rsi = buffer;
int64_t* rdi = decoded;
for( size_t i = 0; i < n_msg; i += 16 )
{
    rsi = dec.decode( rdi, rsi );
    rdi += 16;
}

铌!通常,当流中的最后一个数字不使用最大 8 个字节时,实现会在编码缓冲区末尾加载几个字节。您可以填充压缩数据,或实现特殊情况来解码流的最后一个块,或调整编码器以始终发出 8 个字节作为最终块的最终编号。

P.S. 初始版本使用指令从内存中加载四个 int64 数字,使用基指针 + 来自另一个向量的四个字节偏移量。但是,在 AMD CPU 上,它比 4 个标量负载略慢。如果您以英特尔为目标,最好使用初始版本,请参阅编辑历史记录。_mm256_i32gather_epi64

评论

1赞 Soonts 10/29/2023
@PeterCordes 好的一点是,我发现在我的计算机上,4 个标量加载比单个收集略快,请参阅更新。vpgatherdq
1赞 Huy Le 10/30/2023
我在服务器 CPU (EPYC 75F3) 上进行了测试,差异是 6.1 倍,这是巨大的。如果可能的话,我稍后会在更多的 CPU 上进行测试
2赞 Soonts 10/30/2023
@HuyLe 请测试该版本: gist.github.com/Const-me/e897e4565b2c6a2e69d8b5d2c1457730 它使用来自另一个答案的正确移位方法,不再从编码缓冲区的末尾加载。还重构了 C++ 类。还有一个使用 AVX2 收集指令的宏。
2赞 Soonts 11/1/2023
@HuyLe Gathers 是意料之中的,但性能下降是一个谜。我知道两种可能的解释 (1) 你的 GCC 搞砸了我的非 OO 版本的代码生成,但对类做得很好 (2) 减少是由于 .在OO版本中,shift与值无关,仅取决于标头,该指令将与负载并行运行。然后我的 VC++ 搞砸了我的 OO 版本的代码生成,但第二个版本做得很好,所以在我的 Windows 计算机上,第二个版本稍微快一些。_mm256_srlv_epi64
1赞 Soonts 11/1/2023
@HuyLe 语言标准说它是未定义的,因为某些平台需要对齐并在未对齐的标量负载下崩溃。但是,有问题的代码需要 AVX2,即仅针对现代 AMD64 处理器构建,这些处理器在未对齐的 8 字节负载方面没有任何问题。我通常会忽略代码中的此类警告。 IMO 的可读性较差。至少在 vc++ 中,它大大减慢了调试构建的速度,因为未优化的构建会编译成函数调用。memcpymemcpy
2赞 aqrit 10/30/2023 #2

在大多数情况下,标量实现可以是无分支的。

从长远来看,用符号位填充控制位可能会花费位。 相反,这里的所有控制位都打包在流的开头(类似于 streamvbyte)。

之所以使用变量符号扩展,是因为它比 zigzag 用于字节标量实现。

概念验证假设:

  • 快速未对齐的内存访问(和 memcpy 优化)
  • 小端序记忆顺序
  • 算术右移
#include <cstddef>  // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy

unsigned char* encode(size_t n, const int64_t* src, unsigned char* dst) {
    unsigned char* key_ptr = dst;
    size_t num_chunks = n >> 3;
    unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    while (num_chunks--) {
        uint64_t key = 0;
        for (size_t i = 0; i < 8; i++) {
            uint64_t v = (uint64_t)*src++;
            memcpy(data_ptr, &v, sizeof(v)); // assumes little endian

            v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
            v = (v << 8) | v; // combine lengths of 7 and 8
            v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel... 
            data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
            key |= v << (i * 3);
        }
        memcpy(key_ptr, &key, 3); // assumes little endian
        key_ptr += 3;
    }
    // TODO: tail loop...
    
    return dst;
}
    
void decode(size_t n, const unsigned char* src, int64_t* dst) {
    const unsigned char* key_ptr = src;
    size_t num_chunks = n >> 3;
    const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
        data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian) 
        while (num_chunks--) {
            unsigned keys;
            memcpy(&keys, key_ptr, 3);
            key_ptr += 3;
            for (size_t i = 0; i < 8; i++) {
                uint64_t v;
                size_t k = keys & 0x07;
                size_t len = k + ((k + 1) >> 3); // k==7 is len=8
                uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
                size_t shift = (64 - (len * 8)) & (size_t)mask; 

                keys >>= 3;     
                data_ptr += len; 
                memcpy(&v, data_ptr, sizeof(v));
                v &= mask;
                *dst++ = (int64_t)v >> shift;
            }
        }
        data_ptr += sizeof(uint64_t);
    }

    // TODO: tail loop...
}

评论

0赞 Peter Cordes 10/30/2023
memcpy(&keys, key_ptr, 4);可能比 3 字节的 memcpy 更有效,尤其是那些使上字节未初始化的 memcpy。由于键位于开头,然后是同一缓冲区中的数据,因此我们知道只执行 dword load = 4 字节 memcpy 是安全的。此外,64 位移位计数相当于 ,并且比在 x86 上实现要减去的某个地方略便宜;x86 班次免费屏蔽计数。(ARM 从即时中反向减去,但使移位计数饱和而不是掩码,所以那里可能更糟。64 - cv >> (-c & 63)neg64rsb
0赞 Peter Cordes 10/30/2023
使用 OP 的基准测试,整个缓冲区的解码在带有 GCC13.2 的 3.9GHz Skylake 上以 258 毫秒的速度运行。或者使用 Clang 16.0,只需 138 毫秒(而 OP 的字节一次参考实现为 535 毫秒 GCC 或 381 毫秒,避免了位扫描)。将 更改为 4 字节加载可使 GCC 速度达到匹配 clang: godbolt.org/z/G684z56Mj All with .(或者没有 clang 的引导)。memcpy-O3 -march=skylake -Wa,-mbranches-within-32B-boundaries-Wa,
0赞 Peter Cordes 10/30/2023
您可以使用 C++20 代替 ,但 lzcnt 固有排除了在没有选项的情况下编译并获得 AMD 速度较慢的错误,或者 Intel 速度较慢而不是 。(错位负载的好技巧,使符号位已经在顶部,不需要先左移)。std::countl_zero_lzcnt_u64-mbsrsarsarx
1赞 Peter Cordes 10/30/2023
我想知道测试用例的特定选择是否会导致大小,如果符号位没有单独存储,通常需要额外的字节?我本来希望您的编码平均压缩得更好,压缩到输入大小的较小部分。我没有尝试其他分布,比如均匀分布。%70'000* 1 Bnlog2(abs(n))
1赞 Huy Le 10/30/2023
@PeterCordes 是的,这个数字和 1% 是故意的,并且是在我的用例中使用真实数据发现的。这就是为什么我选择存储符号位 + 无符号绝对值,而不是使用符号数字编码/解码70000* 1 Bn