programing

어떻게 이렇게 빠르지?

newsource 2022. 8. 18. 23:43

어떻게 이렇게 빠르지?

는 어떻게 하고 있다numpy최적화된 C/C++ 코드와의 놀라운 비교에 따르면, Numpy의 속도를 재현하기에는 아직 멀었다.

예시를 검토해 주십시오.「 D 」 、 「 」 、 「 2 D 」 の [ a ]shape=(N, N) ★★★★★★★★★★★★★★★★★」dtype=float32N차원의 N개의 벡터 리스트를 나타내며, 나는 각 벡터 쌍 사이의 쌍별 차이를 계산하고 있다.「」를 사용합니다.numpy브로드캐스트는 다음과 같이 기술되어 있습니다.

def pairwise_sub_numpy( X ):
    return X - X[:, None, :]

「」를 사용합니다.timeit를 할 수 .N=512: 88로 하다

C/C++에서는, 순진한 실장에서는 다음과 같이 기술하고 있습니다.

#define X(i, j)     _X[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}

를 사용한 컴파일gcc.0 (7.3.0 인치)-O3 합니다.pairwise_sub_naive(X), 코드만 단순하면 numpy.

이제 본격적으로 행 벡터를 직접 인덱싱하여 몇 가지 작은 최적화를 추가합니다.

float* pairwise_sub_better( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}

속도는 195ms로 동일하게 유지되며, 이는 컴파일러가 그 정도를 계산할 수 있었다는 것을 의미합니다.이제 SIMD 벡터 명령을 사용합니다.

float* pairwise_sub_simd( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}

이것에 의해, 작은 부스트(함수 콜 마다 194 ms 가 아니고 178 ms)만이 발생합니다.

그리고 닷 제품을 최적화하는 데 사용되는 것과 같은 "블록 단위" 접근 방식이 도움이 될지 궁금했습니다.

float* pairwise_sub_blocks( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}

놀랍게도 이것은 지금까지의 방법 중 가장 느린 것입니다(1함수 호출당 258밀리초).

요약하자면, 최적화된 C++ 코드로 몇 가지 노력을 기울였지만, numpy가 쉽게 달성할 수 있는 88 ms/콜에 근접할 수 없습니다.왜 그런지 추측이라도?

는 ★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★numpy멀티캐스팅, 그리고 어쨌든 이런 종류의 조작은 멀티캐스팅이 아닙니다.

편집: Numpy 코드를 벤치마킹하기 위한 정확한 코드:

import numpy as np

def pairwise_sub_numpy( X ):
    return X - X[:, None, :]

N = 512
X = np.random.rand(N,N).astype(np.float32)

import timeit
times = timeit.repeat('pairwise_sub_numpy( X )', globals=globals(), number=1, repeat=5)
print(f">> best of 5 = {1000*min(times):.3f} ms")

C 코드의 완전한 벤치마크:

#include <stdio.h>
#include <string.h>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>
#include <time.h>


#define X(i, j)     _x[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}

float* pairwise_sub_better( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}

float* pairwise_sub_simd( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}


float* pairwise_sub_blocks( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}

int main() 
{
    const int N = 512;
    float* _x = (float*) malloc( N * N * sizeof(float) );
    for( int i = 0; i < N; i++)
      for( int j = 0; j < N; j++)
        X(i,j) = ((i+j*j+17*i+101) % N) / float(N);

    double best = 9e9;
    for( int i = 0; i < 5; i++)
    {
        struct timespec start, stop;
        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start);
        
        //float* res = pairwise_sub_naive( _x, N );
        //float* res = pairwise_sub_better( _x, N );
        //float* res = pairwise_sub_simd( _x, N );
        float* res = pairwise_sub_blocks( _x, N );

        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop);

        double t = (stop.tv_sec - start.tv_sec) * 1e6 + (stop.tv_nsec - start.tv_nsec) / 1e3;    // in microseconds
        if (t < best) best = t;
        free( res );
    }
    printf("Best of 5 = %f ms\n", best / 1000);

    free( _x );
    return 0;
}

7.3 gcc 7.3.0을 사용한 컴파일gcc -Wall -O3 -mavx -msse4.1 -o test_simd test_simd.c

내 컴퓨터의 타이밍 요약:

실행 시간을
수치 88 밀리초
C++ 순진 194 밀리초
C++보다 우수 195밀리초
C++ SIMD 178 밀리초
C++ 차단됨 258 밀리초
C++ 차단됨(gcc 8.3.1) 217 밀리초

일부 코멘트에서 지적되었듯이 numpy는 구현에서 SIMD를 사용하며 계산 시 메모리를 할당하지 않습니다.구현에서 메모리 할당을 제거하고 계산 전에 모든 버퍼를 미리 할당하면 scaler 버전(최적화가 없는 버전)에서도 numpy에 비해 더 나은 시간을 얻을 수 있습니다.

또한 SIMD에 관해서도, 또 scaler보다 실장 퍼포먼스가 그다지 뛰어나지 않은 것은 메모리 액세스 패턴이 SIMD 사용에 적합하지 않기 때문입니다.메모 카피를 실행해, 서로 멀리 떨어진 장소에서 SIMD 레지스터에 로드합니다.예를 들어, 0행과 511행의 벡터를 채우는 등, CA에서 잘 동작하지 않는 경우가 있습니다.또는 SIMD 프리페처를 사용합니다.

SIMD 레지스터를 로드하는 방법에도 오류가 있습니다(계산하려는 내용을 정확히 이해한 경우). 256비트 SIMD 레지스터는 8개의 싱글소수점 부동소수점 숫자를 로드할 수 있지만, 루프에서 k를 "256/size of (크기 of (크기)"로 점프합니다. 즉 256/4 = 64; _x_res expective SIMD는 SIMICS이며, floative SIMD입니다.포인터는 인수로서 8 플로트마다 해당 행의 모든 요소를 읽는 대신 64 플로트마다 읽습니다.

이 계산은 액세스 패턴을 변경하는 것에 의해서 한층 더 최적화 할 수 있습니다.예를 들어 line0을 베이스로 반복할는 line0 - line1을 계산하지만 나중에 line1을 베이스로 반복할 때는 line1 - line0을 계산해야 합니다.line1 - line0은 기본적으로 각 회선에 대해 - (line0 - line1)입니다.e line0 이후는 이전 계산에서 많은 결과를 재사용할 수 있었다.대부분의 경우 SIMD 사용 또는 병렬화는 의미 있는 개선을 제공하기 위해 데이터에 액세스하거나 추론하는 방법을 변경해야 합니다.

이것은 당신의 초기 구현을 바탕으로 한 첫 번째 단계이며, 숫자보다 빠릅니다(OpenMP는 원래 방법이 아니기 때문에 신경 쓰지 마세요.순진하게 실행하려고 하면 어떻게 되는지 보고 싶었어요).

C++
Time scaler version: 55 ms
Time SIMD version: 53 ms
**Time SIMD 2 version: 33 ms**
Time SIMD 3 version: 168 ms
Time OpenMP version: 59 ms

Python numpy
>> best of 5 = 88.794 ms


#include <cstdlib>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>

#include <numeric>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <cstring>

using namespace std;

float* pairwise_sub_naive (const float* input, float* output, int n) 
{
    for (int i = 0; i < n; i++) {
        for (int j = 0; j < n; j++) {
            for (int k = 0; k < n; k++)
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
          }
    }
    return output;
}

float* pairwise_sub_simd (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    
    return output;
}

float* pairwise_sub_simd_2 (const float* input, float* output, int n) 
{
    float* line_buffer = (float*) aligned_alloc(32, n * sizeof(float));

    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(line_buffer + k, _mm256_sub_ps( A, B ));
            }
            memcpy(output + outidx * n, line_buffer, n);
        }
    }
    
    return output;
}

float* pairwise_sub_simd_3 (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int k = 0; k < n; k += 8) 
        {
            __m256 A = _mm256_load_ps(input + idxi + k);
            for (int j = 0; j < n; j++)
            {
                const int idxj = j * n;
                const int outidx = (idxi + j) * n;
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx + k, _mm256_sub_ps( A, B     ));
             }
        }
    }

    return output;
}

float* pairwise_sub_openmp (const float* input, float* output, int n)
{
    int i, j;
    #pragma omp parallel for private(j)
    for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++)
        {
            const int idxi = i * n; 
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    /*for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++) 
        {
            for (int k = 0; k < n; k++)
            {
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
            }
        }
    }*/
    
    return output;
}

int main ()
{
    constexpr size_t n = 512;
    constexpr size_t input_size = n * n;
    constexpr size_t output_size = n * n * n;

    float* input = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_simd = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_simd = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_par = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_par = (float*) aligned_alloc(32, output_size * sizeof(float));

    iota(input, input + input_size, float(0.0));
    fill(output, output + output_size, float(0.0));

    iota(input_simd, input_simd + input_size, float(0.0));
    fill(output_simd, output_simd + output_size, float(0.0));
    
    iota(input_par, input_par + input_size, float(0.0));
    fill(output_par, output_par + output_size, float(0.0));

    std::chrono::milliseconds best_scaler{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_naive(input, output, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
        if (duration < best_scaler)
        {
            best_scaler = duration;
        }
    }
    cout << "Time scaler version: " << best_scaler.count() << " ms\n";

    std::chrono::milliseconds best_simd{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd)
    {
        best_simd = duration;
    }
}
cout << "Time SIMD version: " << best_simd.count() << " ms\n";

std::chrono::milliseconds best_simd_2{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_2(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_2)
    {
        best_simd_2 = duration;
    }
}
cout << "Time SIMD 2 version: " << best_simd_2.count() << " ms\n";

std::chrono::milliseconds best_simd_3{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_3(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_3)
    {
        best_simd_3 = duration;
    }
}
cout << "Time SIMD 3 version: " << best_simd_3.count() << " ms\n";

    std::chrono::milliseconds best_par{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_openmp(input_par, output_par, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
         if (duration < best_par)
        {
            best_par = duration;
        }
    }
    cout << "Time OpenMP version: " << best_par.count() << " ms\n";

    cout << "Verification\n";
    if (equal(output, output + output_size, output_simd))
    {
        cout << "PASSED\n";
    }
    else
    {
        cout << "FAILED\n";
    }

    return 0;
}

편집: SIMD 구현의 두 번째 버전과 관련하여 잘못된 호출이 있었기 때문에 약간의 수정이 이루어졌습니다.

현재 알 수 있듯이 캐시의 참조 인접성 측면에서 두 번째 구현이 가장 빠르게 동작하기 때문에 두 번째 구현이 가장 빠릅니다.SIMD 구현의 예 2와 3은 메모리 액세스 패턴을 변경하여 SIMD 최적화의 성능에 어떤 영향을 미치는지 보여 줍니다.요약하면 (제 조언으로는 아직 완전하지 않다는 것을 알고 있습니다) 메모리의 액세스 패턴과 SIMD 유닛에 대한 부하와 저장에 유의하십시오.SIMD는 프로세서 코어 내의 다른 하드웨어 유닛이기 때문에 메모리로부터 레지스터를 로드하여 많은 수의 오퍼라를 실행하려고 할 때 데이터를 주고받을 때 문제가 발생합니다.데이터를 가능한 한 빨리 저장할 필요가 없습니다(물론 이 예에서는 데이터만 저장하면 됩니다).또, 사용 가능한 SIMD 레지스터의 수가 한정되어 있어 너무 많은 SIMD 레지스터를 로드하면 메인 메모리의 임시 장소에 「스필」해, 모든 이익을 무효로 하는 것에 주의해 주세요.SIMD 최적화는 진정한 균형입니다!

크로스 플랫폼 내장형 래퍼를 표준에 넣기 위한 노력이 있습니다(예전에는 클로즈드 소스 래퍼를 스스로 개발했습니다).완성과는 거리가 멀어도 검토할 가치가 있습니다(SIMD가 어떻게 기능하는지 정말 알고 싶다면 첨부 문서를 읽어보십시오).https://github.com/VcDevel/std-simd

이것은 @celakev가 올린 답변에 대한 보충 자료입니다.이제서야 무엇이 문제인지 알 수 있을 것 같습니다.문제는 계산을 수행하는 주요 함수에 메모리를 할당하는 것이 아닙니다.

실제로 시간이 걸렸던 것은 새로운 메모리에 액세스하는 것이었습니다.제 생각에는malloc은 가상 메모리에 하지 않는 할 까지 반환합니다.call은 실제 물리 메모리에 대응하지 않습니다.실제로 시간이 걸리는 것은 물리 메모리가 기능 코드로 액세스 되었을 때(OS 레벨이라고 생각합니다) 즉석에서 할당하는 프로세스입니다.

여기 증거가 있습니다.다음의 2개의 간단한 함수를 고려합니다.

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

float* just_alloc( size_t N ) 
{
    return (float*) aligned_alloc( 32, sizeof(float)*N );
}

void just_fill( float* _arr, size_t N ) 
{
    for (size_t i = 0; i < N; i++)
        _arr[i] = 1;
}

#define Time( code_to_benchmark, cleanup_code ) \
    do { \
        double best = 9e9; \
        for( int i = 0; i < 5; i++) { \
            struct timespec start, stop; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start); \
            code_to_benchmark; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop); \
            double t = (stop.tv_sec - start.tv_sec) * 1e3 + (stop.tv_nsec - start.tv_nsec) / 1e6; \
            printf("Time[%d] = %f ms\n", i, t); \
            if (t < best) best = t; \
            cleanup_code; \
        } \
        printf("Best of 5 for '" #code_to_benchmark "' = %f ms\n\n", best); \
    } while(0)

int main() 
{
    const size_t N = 512;

    Time( float* arr = just_alloc(N*N*N), free(arr) );
    
    float* arr = just_alloc(N*N*N);
    Time( just_fill(arr, N*N*N), ; );
    free(arr);

    return 0;
}

다음의 타이밍이 표시됩니다.이 타이밍은, 각 콜에 대해 상술합니다.

Time[0] = 0.000931 ms
Time[1] = 0.000540 ms
Time[2] = 0.000523 ms
Time[3] = 0.000524 ms
Time[4] = 0.000521 ms
Best of 5 for 'float* arr = just_alloc(N*N*N)' = 0.000521 ms

Time[0] = 189.822237 ms
Time[1] = 45.041083 ms
Time[2] = 46.331428 ms
Time[3] = 44.729433 ms
Time[4] = 42.241279 ms
Best of 5 for 'just_fill(arr, N*N*N)' = 42.241279 ms

보시다시피 메모리 할당은 매우 빠르지만 처음 메모리에 액세스할 때는 다른 시간보다 5배 느립니다.즉, 기본적으로 코드가 느린 이유는 물리적인 주소가 없는 새로운 메모리를 매번 재할당했기 때문입니다.(잘못했다면 정정해 주세요.그게 요점이라고 생각합니다!)

, 티에 a a a를 하고 싶었다.pairwiseIgen을 사용한 방법, 이것은 C++에 높은 수준의 대수 조작 기능을 제공하고 후드 아래에서 SIMD를 사용해야 합니다.눅눅한 느낌이야.

구현은 다음과 같습니다.

#include <iostream>
#include <vector>
#include <chrono>
#include <algorithm>
        
#include <Eigen/Dense>

auto pairwise_eigen(const Eigen::MatrixXf &input, std::vector<Eigen::MatrixXf> &output) {
        for (int k = 0; k < input.cols(); ++k)
                output[k] = input
                          // subtract matrix with repeated k-th column
                          - input.col(k) * Eigen::RowVectorXf::Ones(input.cols());

}

int main() {
        constexpr size_t n = 512;
        
        // allocate input and output 
        Eigen::MatrixXf input = Eigen::MatrixXf::Random(n, n);
        std::vector<Eigen::MatrixXf> output(n);

        std::chrono::milliseconds best_eigen{100000};
        for (int i = 0; i < 5; ++i) {
                auto start = std::chrono::high_resolution_clock::now();
                pairwise_eigen(input, output);
                auto end = std::chrono::high_resolution_clock::now();
         
                auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end-start);      
                if (duration < best_eigen)
                        best_eigen = duration;
        }

        std::cout << "Time Eigen version: " << best_eigen.count() << " ms\n";

        return 0;
}

시스템 상에서 @celavek가 제안하는 벤치마크 테스트의 전체 내용은 다음과 같습니다.

Time scaler version: 57 ms
Time SIMD version: 58 ms
Time SIMD 2 version: 40 ms
Time SIMD 3 version: 58 ms
Time OpenMP version: 58 ms

Time Eigen version: 76 ms

Numpy >> best of 5 = 118.489 ms

Whit Eigen은 Numpy에 관해서는 아직 눈에 띄게 개선되었지만 "raw" 구현과 비교하면 그다지 인상적이지 않습니다(확실히 오버헤드가 있습니다).추가 최적화는 출력 벡터를 입력 복사본과 함께 할당한 다음 각 벡터 엔트리에서 직접 빼는 것입니다. 단순히 다음 라인을 바꿉니다.

// inside the pairwise method
        for (int k = 0; k < input.cols(); ++k)
                output[k] -= input.col(k) * Eigen::RowVectorXf::Ones(input.cols());


// at allocation time
        std::vector<Eigen::MatrixXf> output(n, input);

이것에 의해, 베스트 5의 값이 60 밀리초로 내려갑니다.

언급URL : https://stackoverflow.com/questions/65888569/how-is-numpy-so-fast