另外 RingBuffer 的 Size
过小只有 64,严重影响 Disruptor 的表现,实际测试对比下来,应该 1024 或更大。性能
还有 BlockingCollection 里面的 while (!dataItems.IsCompleted)
写的也有问题,即便 BlockingCollection Producer 在循环中一直作添加操做,BlockingCollection 内部状态也并非一直在添加状态中,这样致使添加循环还没作完,但是计时器的循环已经提早结束,致使 BlockingCollection 测得时间少于实际实际。测试
Task.Factory.StartNew(() => {
while (!dataItems.IsCompleted)
{
ValueEntry ve = null;
try
{
ve = dataItems.Take();
long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
results[ve.Value] = microseconds;
//Console.WriteLine("elapsed microseconds = " + microseconds);
//Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
}
catch (InvalidOperationException) { }
}
}, TaskCreationOptions.LongRunning);
for (int i = 0; i < length; i++)
{
var valueToSet = i;
ValueEntry entry = new ValueEntry();
entry.Value = valueToSet;
sw[i].Restart();
dataItems.Add(entry);
//Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
//Thread.Sleep(1000);
}
而后从新修改了他的代码,实测 Disruptor 10 倍速度于 BlockingCollection (这里插一句题外话,Disruptor .NET 版本的速度全面快于 Java 版本,很多场景下的速度比 Java 版本要快 10 倍,.NET 版是从 Java 移植过来的实现也和 Java 保持一直,是哪些语言特性致使性能差别这么大呢?)。优化
Disruptor is 10x faster than BlockingCollection with multi producer (10 parallel producet), 2x faster than BlockingCollection with Single producer:spa
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using NUnit.Framework;
namespace DisruptorTest.Ds
{
public sealed class ValueEntry
{
internal int Id { get; set; }
}
class MyHandler : IEventHandler<ValueEntry>
{
public void OnEvent(ValueEntry data, long sequence, bool endOfBatch)
{
}
}
[TestFixture]
public class DisruptorPerformanceTest
{
private volatile bool collectionAddEnded;
private int producerCount = 10;
private int runCount = 1000000;
private int RingBufferAndCapacitySize = 1024;
[TestCase()]
public async Task TestBoth()
{
for (int i = 0; i < 1; i++)
{
foreach (var rs in new int[] {64, 512, 1024, 2048 /*,4096,4096*2*/})
{
Console.WriteLine($"RingBufferAndCapacitySize:{rs}, producerCount:{producerCount}, runCount:{runCount} of {i}");
RingBufferAndCapacitySize = rs;
await DisruptorTest();
await BlockingCollectionTest();
}
}
}
[TestCase()]
public async Task BlockingCollectionTest()
{
var sw = new Stopwatch();
BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(RingBufferAndCapacitySize);
sw.Start();
collectionAddEnded = false;
// A simple blocking consumer with no cancellation.
var task = Task.Factory.StartNew(() =>
{
while (!collectionAddEnded && !dataItems.IsCompleted)
{
//if (!dataItems.IsCompleted && dataItems.TryTake(out var ve))
if (dataItems.TryTake(out var ve))
{
}
}
}, TaskCreationOptions.LongRunning);
var tasks = new Task[producerCount];
for (int t = 0; t < producerCount; t++)
{
tasks[t] = Task.Run(() =>
{
for (int i = 0; i < runCount; i++)
{
ValueEntry entry = new ValueEntry();
entry.Id = i;
dataItems.Add(entry);
}
});
}
await Task.WhenAll(tasks);
collectionAddEnded = true;
await task;
sw.Stop();
Console.WriteLine($"BlockingCollectionTest Time:{sw.ElapsedMilliseconds/1000d}");
}
[TestCase()]
public async Task DisruptorTest()
{
var disruptor =
new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingBufferAndCapacitySize, TaskScheduler.Default,
producerCount > 1 ? ProducerType.Multi : ProducerType.Single, new BlockingWaitStrategy());
disruptor.HandleEventsWith(new MyHandler());
var _ringBuffer = disruptor.Start();
Stopwatch sw = Stopwatch.StartNew();
sw.Start();
var tasks = new Task[producerCount];
for (int t = 0; t < producerCount; t++)
{
tasks[t] = Task.Run(() =>
{
for (int i = 0; i < runCount; i++)
{
long sequenceNo = _ringBuffer.Next();
_ringBuffer[sequenceNo].Id = 0;
_ringBuffer.Publish(sequenceNo);
}
});
}
await Task.WhenAll(tasks);
disruptor.Shutdown();
sw.Stop();
Console.WriteLine($"DisruptorTest Time:{sw.ElapsedMilliseconds/1000d}s");
}
}
}
-
RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0pwa
DisruptorTest Time:16.962scode
BlockingCollectionTest Time:18.399
-
RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0 DisruptorTest Time:6.101s
BlockingCollectionTest Time:19.526
-
RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0
DisruptorTest Time:2.928s
BlockingCollectionTest Time:20.25
-
RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0
DisruptorTest Time:2.448s
BlockingCollectionTest Time:20.649
-
RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0
DisruptorTest Time:27.374s
BlockingCollectionTest Time:21.955
-
RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0
DisruptorTest Time:5.011s
BlockingCollectionTest Time:20.127
-
RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0
DisruptorTest Time:2.877s
BlockingCollectionTest Time:22.656
-
RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0
DisruptorTest Time:2.384s
BlockingCollectionTest Time:23.567