DirectX11 With Windows SDK--27 计算着色器:双调排序

前言

上一章咱们用一个比较简单的例子来尝试使用计算着色器,可是在看这一章内容以前,你还须要了解下面的内容:html

章节
26 计算着色器:入门
深刻理解与使用缓冲区资源(结构化缓冲区/有类型缓冲区)
Visual Studio图形调试器详细使用教程(编程捕获部分)

这一章咱们继续用一个计算着色器的应用实例做为切入点,进一步了解相关知识。git

DirectX11 With Windows SDK完整目录github

Github项目源码算法

欢迎加入QQ群: 727623616 能够一块儿探讨DX11,以及有什么问题也能够在这里汇报。编程

线程标识符

对于线程组(大小(ThreadDimX, ThreadDimY, ThreadDimZ))中的每个线程,它们都有一个惟一的线程ID值。咱们可使用系统值SV_GroupThreadID来取得,它的索引范围为(0, 0, 0)(ThreadDimX - 1, ThreadDimY - 1, ThreadDimZ - 1)缓存

而对于整个线程组来讲,因为线程组集合也是在3D空间中排布,它们也有一个惟一的线程组ID值。咱们可使用系统值SV_GroupID来取得,线程组的索引范围取决于调用ID3D11DeviceContext::Dispatch时提供的线程组(大小(GroupDimX, GroupDimY, GroupDimZ)),范围为(0, 0, 0)(GroupDimX - 1, GroupDimY - 1, GroupDimZ - 1)安全

紧接着就是系统值SV_GroupIndex,它是单个线程组内的线程三维索引的一维展开。若已知线程组的大小为(ThreadDimX, ThreadDimY, ThreadDimZ),则能够肯定SV_GroupIndexSV_GroupThreadID知足下面关系:网络

SV_GroupIndex = SV_GroupThreadID.z * ThreadDimX * ThreadDimY + SV_GroupThreadID.y * ThreadDimX + SV_GroupThreadID.x;

最后就是系统值SV_DispatchThreadID,线程组中的每个线程在ID3D11DeviceContext::Dispatch提供的线程组集合中都有其惟一的线程ID值。若已知线程组的大小为 (ThreadDimX, ThreadDimY, ThreadDimZ),则能够肯定SV_DispatchThreadIDSV_GroupThreadIDSV_GroupID知足如下关系:dom

SV_DispatchThreadID.xyz = SV_GroupID.xyz * float3(ThreadDimX, ThreadDimY, ThreadDimZ) + SV_GroupThreadID.xyz;

共享内存和线程同步

在一个线程组内,容许设置一片共享内存区域,使得当前线程组内的全部线程均可以访问当前的共享内存。一旦设置,那么每一个线程都会各自配备一份共享内存。共享内存的访问速度很是快,就像寄存器访问CPU缓存那样。函数

共享内存的声明方式以下:

groupshared float4 g_Cache[256];

对于每一个线程组来讲,它所容许分配的总空间最大为32kb(即8192个标量,或2048个向量)。内部线程一般应该使用SV_ThreadGroupID来写入共享内存,这样以保证每一个线程不会出现重复写入操做,而读取共享内存通常是线程安全的。

分配太多的共享内存会致使性能问题。假如一个多处理器支持32kb的共享内存,而后你的计算着色器须要20kb的共享内存,这意味着一个多处理器只适合处理一个线程组,由于剩余的共享内存不足以给新的线程组运行,这也会限制GPU的并行运算,当该线程组由于某些缘由须要等待,会致使当前的多处理器处于闲置状态。所以保证一个多处理器至少可以处理两个或以上的线程组(好比每一个线程组分配16kb如下的共享内存),以尽量减小该多处理器的闲置时间。

如今来考虑下面的代码:

Texture2D g_Input : register(t0);
RWTexture2D<float4> g_Output : register(u0);

groupshared float4 g_Cache[256];

[numthreads(256, 1, 1)]
void CS(uint3 GTid : SV_GroupThreadID,
    uint3 DTid : SV_DispatchThreadID)
{
    // 将纹理像素值缓存到共享内存
    g_Cache[GTid.x] = g_Input[DTid.xy];
    
    // 取出共享内存的值进行计算
    
    // 注意!!相邻的两个线程可能没有完成对纹理的采样
    // 以及存储到共享内存的操做
    float left = g_Cache[GTid.x - 1];
    float right = g_Cache[GTid.x + 1];
    
    // ...
}

由于多个线程同时运行,同一时间各个线程当前执行的指令有所误差,有的线程可能已经完成了共享内存的赋值操做,有的线程可能还在进行纹理采样操做。若是当前线程正在读取相邻的共享内存片断,结果将是未定义的。为了解决这个问题,咱们必须在读取共享内存以前让当前线程等待线程组内其它的全部线程完成写入操做。这里咱们可使用GroupMemoryBarrierWithGroupSync函数:

Texture2D g_Input : register(t0);
RWTexture2D<float4> g_Output : register(u0);

groupshared float4 g_Cache[256];

[numthreads(256, 1, 1)]
void CS(uint3 GTid : SV_GroupThreadID,
    uint3 DTid : SV_DispatchThreadID)
{
    // 将纹理像素值缓存到共享内存
    g_Cache[GTid.x] = g_Input[DTid.xy];
    
    // 等待全部线程完成写入
    GroupMemoryBarrierWithGroupSync();
    
    // 如今读取操做是线程安全的,能够开始进行计算
    float left = g_Cache[GTid.x - 1];
    float right = g_Cache[GTid.x + 1];
    
    // ...
}

双调排序

双调序列

所谓双调序列(Bitonic Sequence),是指由一个非严格递增序列X(容许相邻两个数相等)和非严格递减序列Y构成的序列,好比序列\((5, 3, 2, 1, 4, 6, 6, 12)\)

定义:一个序列\(a_1 , a_2, ..., a_n\)是双调序列,须要知足下面条件:

  1. 存在一个\(a_k(1 <= k <= n)\),使得\(a_1 >= ... >= a_k <= ... <= a_n\)成立,或者\(a_1 <= ... <= a_k >= ... >= a_n\)成立;
  2. 序列循环移位后仍可以知足条件(1)

Batcher归并网络

Batcher归并网络是由一系列Batcher比较器组成的,Batcher比较器是指在两个输入端给定输入值x和y,再在两个输出端输出最大值\(max(x, y)\)和最小值\(min(x, y)\)

双调归并网络

双调归并网络是基于Batch定理而构建的。该定理是说将任意一个长为2n的双调序列分为等长的两半X和Y,将X中的元素与Y中的元素按原序比较,即\(a_i\)\(a_{i+n}(i <= n)\)比较,将较大者放入MAX序列,较小者放入MIN序列。则获得的MAX序列和MIN序列仍然是双调序列,而且MAX序列中的任意一个元素不小于MIN序列中的任意一个元素。

根据这个原理,咱们能够将一个n元素的双调序列经过上述方式进行比较操做来获得一个MAX序列和一个MIN序列,而后对这两个序列进行递归处理,直到序列不可再分割为止。最终归并获得的为一个有序序列。

这里咱们用一张图来描述双调排序的全过程:

其中箭头方向指的是两个数交换后,箭头段的数为较大值,圆点段的数为较小值。

咱们能够总结出以下规律:

  1. 每一趟排序结束会产生连续的双调序列,除了最后一趟排序会产生咱们所须要的单调序列
  2. 对于2^k个元素的任意序列,须要进行k趟排序才能产生单调序列
  3. 对于由\(2^{k-1}\)个元素的单调递增序列和\(2^{k-1}\)个元素的单调递减序列组成的双调序列,须要进行k趟交换才能产生2^k个元素的单调递增序列
  4. 在第n趟排序中的第m趟交换,若两个比较数中较小的索引值为i,那么与之进行交换的数索引为\(i+2^{n-m}\)

双调排序的空间复杂度为\(O(n)\),时间复杂度为\(O(n{(lg(n))}^2)\),看起来比\(O(nlg(n))\)系列的排序算法慢上一截,可是得益于GPU的并行计算,能够看做同一时间内有n个线程在运行,使得最终的时间复杂度能够降为\(O({(lg(n))}^2)\),效率又上了一个档次。

须要注意的是,双调排序要求排序元素的数目为\(2^k, (k>=1)\),若是元素个数为\(2^k < n < 2^{k+1}\),则须要填充数据到\(2^{k+1}\)个。若须要进行升序排序,则须要填充足够的最大值;若须要进行降序排序,则须要填充足够的最小值。

排序核心代码实现

本HLSL实现参考了directx-sdk-samples,虽然里面的实现看起来比较简洁,可是理解它的算法实现费了我很多的时间。我的以本身可以理解的形式对它的实现进行了修改,所以这里以我这边的实现版原本讲解。

首先是排序须要用到的资源和常量缓冲区,定义在BitonicSort.hlsli

// BitonicSort.hlsli
Buffer<uint> g_Input : register(t0);
RWBuffer<uint> g_Data : register(u0);

cbuffer CB : register(b0)
{
    uint g_Level;        // 2^须要排序趟数
    uint g_DescendMask;  // 降低序列掩码
    uint g_MatrixWidth;  // 矩阵宽度(要求宽度>=高度且都为2的倍数)
    uint g_MatrixHeight; // 矩阵高度
}

而后是核心的排序算法:

// BitonicSort_CS.hlsl
#include "BitonicSort.hlsli"

#define BITONIC_BLOCK_SIZE 512

groupshared uint shared_data[BITONIC_BLOCK_SIZE];

[numthreads(BITONIC_BLOCK_SIZE, 1, 1)]
void CS(uint3 Gid : SV_GroupID,
    uint3 DTid : SV_DispatchThreadID,
    uint3 GTid : SV_GroupThreadID,
    uint GI : SV_GroupIndex)
{
    // 写入共享数据
    shared_data[GI] = g_Data[DTid.x];
    GroupMemoryBarrierWithGroupSync();
    
    // 进行排序
    for (uint j = g_Level >> 1; j > 0; j >>= 1)
    {
        uint smallerIndex = GI & ~j;
        uint largerIndex = GI | j;
        bool isDescending = (bool) (g_DescendMask & DTid.x);
        bool isSmallerIndex = (GI == smallerIndex);
        uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ?
            shared_data[largerIndex] : shared_data[smallerIndex];
        GroupMemoryBarrierWithGroupSync();

        shared_data[GI] = result;
        GroupMemoryBarrierWithGroupSync();
    }
    
    // 保存结果
    g_Data[DTid.x] = shared_data[GI];
}

能够看到,咱们实际上能够将递归过程转化成迭代来实现。

如今咱们先从核心排序算法讲起,因为受到线程组的线程数目、共享内存大小限制,这里定义一个线程组包含512个线程,即一个线程组最大容许排序的元素数目为512。共享内存在这是用于临时缓存中间排序的结果。

首先,咱们须要将数据写入共享内存中:

// 写入共享数据
shared_data[GI] = g_Data[DTid.x];
GroupMemoryBarrierWithGroupSync();

接着就是要开始递归排序的过程,其中g_Level的含义为单个双调序列的长度,它也说明了须要对该序列进行\(lg(g_Level)\)趟递归交换。

在一个线程中,咱们仅知道该线程对应的元素,但如今咱们还须要作两件事情:

  1. 找到须要与该线程对应元素进行Batcher比较的另外一个元素
  2. 判断当前线程对应元素与另外一个待比较元素相比,是较小索引仍是较大索引

这里用到了位运算的魔法。先举个例子,当前j为4,则待比较两个元素的索引分别为2和6,这两个索引值的区别在于索引2(二进制010)和索引6(二进制110),前者二进制第三位为0,后者二进制第三位为1.

但只要咱们知道上述其中的一个索引,就能够求出另外一个索引。较小索引值的索引能够经过屏蔽二进制的第三位获得,而较大索引值的索引能够经过按位或运算使得第三位为1来获得:

uint smallerIndex = GI & ~j;
uint largerIndex = GI | j;
bool isSmallerIndex = (GI == smallerIndex);

而后就是判断当前元素是位于当前趟排序完成后的递增序列仍是递减序列,好比序列\((4, 6, 4, 3, 5, 7, 2, 1)\),如今要进行第二趟排序,那么先后4个数将分别生成递增序列和递减序列,咱们能够设置g_DescendMask的值为4(二进制100),这样二进制索引范围在100到111的值(对应十进制4-7)处在递减序列,若是这个双调序列长度为16,那么索引4-7和12-15的两段序列均可以经过g_DescendMask来判断出处在递减序列:

bool isDescending = (bool) (g_DescendMask & DTid.x);

最后就是要肯定当前线程对应的共享内存元素须要获得较小值,仍是较大值了。这里又以一个双调序列\((2, 5, 7, 4)\)为例,待比较的两个元素为5和4,当前趟排序会将它变为单调递增序列,即所处的序列为递增序列,当前线程对应的元素为5,shared_data[smallerIndex] <= shared_data[largerIndex]的比较结果为>,那么它将拿到(较小值)较大索引的值。通过第一趟交换后将变成\((2, 4, 7, 5)\),第二趟交换就不讨论了。

根据对元素所处序列、元素当前索引和比较结果的讨论,能够产生出八种状况:

所处序列 当前索引 比较结果 取值结果
递减 小索引 <= (较大值)较大索引的值
递减 大索引 <= (较小值)较大索引的值
递增 小索引 <= (较小值)较小索引的值
递增 大索引 <= (较大值)较小索引的值
递减 小索引 > (较大值)较小索引的值
递减 大索引 > (较小值)较小索引的值
递增 小索引 > (较小值)较大索引的值
递增 大索引 > (较大值)较大索引的值

显然现有的变量判断较大/较小索引值比判断较大值/较小值容易得多。上述结果表能够整理成下面的代码:

uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ?
    shared_data[largerIndex] : shared_data[smallerIndex];
GroupMemoryBarrierWithGroupSync();

shared_data[GI] = result;
GroupMemoryBarrierWithGroupSync();

在C++中,如今有以下资源和着色器:

ComPtr<ID3D11Buffer> m_pConstantBuffer;             // 常量缓冲区
ComPtr<ID3D11Buffer> m_pTypedBuffer1;               // 有类型缓冲区1
ComPtr<ID3D11Buffer> m_pTypedBuffer2;               // 有类型缓冲区2
ComPtr<ID3D11Buffer> m_pTypedBufferCopy;            // 用于拷贝的有类型缓冲区
ComPtr<ID3D11UnorderedAccessView> m_pDataUAV1;      // 有类型缓冲区1对应的无序访问视图
ComPtr<ID3D11UnorderedAccessView> m_pDataUAV2;      // 有类型缓冲区2对应的无序访问视图
ComPtr<ID3D11ShaderResourceView> m_pDataSRV1;       // 有类型缓冲区1对应的着色器资源视图
ComPtr<ID3D11ShaderResourceView> m_pDataSRV2;       // 有类型缓冲区2对应的着色器资源视图

而后就是对512个元素进行排序的部分代码(size为2的次幂):

void GameApp::SetConstants(UINT level, UINT descendMask, UINT matrixWidth, UINT matrixHeight);

//
// GameApp::GPUSort
//

m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);

// 按行数据进行排序,先排序level <= BLOCK_SIZE 的全部状况
for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2)
{
    SetConstants(level, level, 0, 0);
    m_pd3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1);
}

给更多的数据排序

上述代码容许咱们对元素个数为2到512的序列进行排序,但缓冲区的元素数目必须为2的次幂。因为在CS4.0中,一个线程组最多容许一个线程组包含768个线程,这意味着双调排序仅容许在一个线程组中对最多512个元素进行排序。

接下来咱们看一个例子,假若有一个16元素的序列,然而线程组仅容许包含最多4个线程,那咱们将其放置在一个4x4的矩阵内:

而后对矩阵转置:

能够看到,经过转置后,列数据变换到行数据的位置,这样咱们就能够进行跨度更大的交换操做了。处理完大跨度的交换后,咱们再转置回来,处理行数据便可。

如今假定咱们已经对行数据排完序,下图演示了剩余的排序过程:

可是在线程组容许最大线程数为4的状况下,经过二维矩阵最多也只能排序16个数。。。。也许能够考虑三维矩阵转置法,这样就能够排序64个数了哈哈哈。。。

不过还有一个状况咱们要考虑,就是元素数目不为(2x2)的倍数,没法构成一个方阵,但咱们也能够把它变成对两个方阵转置。这时矩阵的宽是高的两倍:

因为元素个数为32,它的最大索引跨度为16,转置后的索引跨度为2,不会越界到另外一个方阵进行比较。可是当g_Level到32时,此时进行的是单调排序,g_DescendMask也必须设为最大值32(而不是4),避免产生双调序列。

经过下面的转置算法,使得本来只能排序512个数的算法如今能够排序最大262144个数了。

负责转置的着色器实现以下:

// MatrixTranspose_CS.hlsl
#include "BitonicSort.hlsli"

#define TRANSPOSE_BLOCK_SIZE 16

groupshared uint shared_data[TRANSPOSE_BLOCK_SIZE * TRANSPOSE_BLOCK_SIZE];

[numthreads(TRANSPOSE_BLOCK_SIZE, TRANSPOSE_BLOCK_SIZE, 1)]
void CS(uint3 Gid : SV_GroupID,
    uint3 DTid : SV_DispatchThreadID,
    uint3 GTid : SV_GroupThreadID,
    uint GI : SV_GroupIndex)
{
    uint index = DTid.y * g_MatrixWidth + DTid.x;
    shared_data[GI] = g_Input[index];
    GroupMemoryBarrierWithGroupSync();
    
    uint2 outPos = DTid.yx % g_MatrixHeight + DTid.xy / g_MatrixHeight * g_MatrixHeight;
    g_Data[outPos.y * g_MatrixWidth + outPos.x] = shared_data[GI];
}

最后是GPU排序用的函数:

#define BITONIC_BLOCK_SIZE 512

#define TRANSPOSE_BLOCK_SIZE 16

void GameApp::GPUSort()
{
    UINT size = (UINT)m_RandomNums.size();

    m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
    m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);

    // 按行数据进行排序,先排序level <= BLOCK_SIZE 的全部状况
    for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2)
    {
        SetConstants(level, level, 0, 0);
        m_pd3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1);
    }
    
    // 计算相近的矩阵宽高(宽>=高且须要都为2的次幂)
    UINT matrixWidth = 2, matrixHeight = 2;
    while (matrixWidth * matrixWidth < size)
    {
        matrixWidth *= 2;
    }
    matrixHeight = size / matrixWidth;

    // 排序level > BLOCK_SIZE 的全部状况
    ComPtr<ID3D11ShaderResourceView> pNullSRV;
    for (UINT level = BITONIC_BLOCK_SIZE * 2; level <= size; level *= 2)
    {
        // 若是达到最高等级,则为全递增序列
        if (level == size)
        {
            SetConstants(level / matrixWidth, level, matrixWidth, matrixHeight);
        }
        else
        {
            SetConstants(level / matrixWidth, level / matrixWidth, matrixWidth, matrixHeight);
        }
        // 先进行转置,并把数据输出到Buffer2
        m_pd3dImmediateContext->CSSetShader(m_pMatrixTranspose_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf());
        m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV2.GetAddressOf(), nullptr);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, m_pDataSRV1.GetAddressOf());
        m_pd3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE, 
            matrixHeight / TRANSPOSE_BLOCK_SIZE, 1);

        // 对Buffer2排序列数据
        m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1);

        // 接着转置回来,并把数据输出到Buffer1
        SetConstants(matrixWidth, level, matrixWidth, matrixHeight);
        m_pd3dImmediateContext->CSSetShader(m_pMatrixTranspose_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf());
        m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, m_pDataSRV2.GetAddressOf());
        m_pd3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE,
            matrixHeight / TRANSPOSE_BLOCK_SIZE, 1);

        // 对Buffer1排序剩余行数据
        m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1);
    }
}

最后是std::sort和双调排序(使用NVIDIA GTX 850M)的比较结果:

元素数目 CPU排序(s) GPU排序(s) GPU排序+写回内存总时(s)
512 0.000020 0.000003 0.000186
1024 0.000043 0.000007 0.000226
2048 0.000102 0.000009 0.000310
4096 0.000245 0.000009 0.000452
8192 0.000512 0.000010 0.000771
16384 0.001054 0.000012 0.000869
32768 0.002114 0.000012 0.001819
65536 0.004448 0.000014 0.002625
131072 0.009600 0.000015 0.005130
262144 0.021555 0.000021 0.008983
524288 0.048621 0.000030 0.015652
1048576 0.111219 0.000044 0.040927
2097152 0.257118 0.000072 0.188662
4194304 0.444995 0.000083 0.300240
8388608 0.959173 0.000124 0.465746
16777216 2.178453 0.000177 0.915898
33554432 5.869439 0.000361 2.269989

能够初步看到双调排序的排序用时比较稳定,而快速排序明显随元素数目增加而变慢。固然,若是GPU排序的数据量再更大一些的话,能够看到时间的明显增加。

可是!若是你用GPU排序后须要取回到CPU,由于GPU->CPU的速度比较慢,而且须要等待资源结束占用才能开始取出,故须要排较大数目的数据(约10000以上)才有明显效益

DirectX11 With Windows SDK完整目录

Github项目源码

欢迎加入QQ群: 727623616 能够一块儿探讨DX11,以及有什么问题也能够在这里汇报。

相关文章
相关标签/搜索