Jusfr 原创,转载请注明来自博客园php
Request 与 Response 都是以 长度+内容 形式描述, 见于 A Guide To The Kafka Protocolgit
Request 除了 Size+ApiKey+ApiVersion+CorrelationId+ClientId 这些固定字段, 额外的 RequestMessage 包含了具体请求数据;github
Request => Size ApiKey ApiVersion CorrelationId ClientId RequestMessage Size => int32 ApiKey => int16 ApiVersion => int16 CorrelationId => int32 ClientId => string RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Response 除了 Size+CorrelationId, 额外的 ResponseMessage 包含了具体响应数据;apache
Response => Size CorrelationId ResponseMessage Size => int32 CorrelationId => int32 ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
序列化 Request 须要分配内存, 从缓冲区读取 Response 同理.设计模式
MemoryStream 是一个可靠方案, 它实现了自动扩容, 但扩容过程离不开字节拷贝, 而频繁分配不小的内存将影响性能, 近似的扩容示例代码以下:数组
// init Byte[] buffer = new Byte[4096]; Int32 offset = 0; //write bytes Byte[] bytePrepareCopy = // from outside if (bytePrepareCopy > buffer.Length - offset) { Byte[] newBuffer = new Byte[buffer.Length * 2]; Array.Copy(buffer, 0, newBuffer, 0, offset); buffer = newBuffer; } Array.Copy(bytePrepareCopy, 0, buffer, offset, bytePrepareCopy.Length);
数组扩容能够参见 List 的实现, 这里只是示意, 没有处理长度为 (buffer.Length*2 - offset) < bytePrepareCopy.Length 的状况安全
在数组长度超4k 时,扩容成本很是高。若是约定“请求和响应不得超过4k“, 那么使用可回收(见下文相关内容)的固定长度的数组模拟 MemoryStream 的读取和写入行为, 可以达到极大的性能收益。markdown
KafkaStreamBinary (见于 github) 内部使用 MemoryStream, KafkaFixedBinary (见于 github) 则是基于数组的实现;ide
使用过 Memcached 的人很容易理解 BufferManager 的思路: 为了下降频繁开辟内存带来的开销,首先“将内存块化”, 申请者获取到“成块的内存”, 被分配出去的内存块标记为“已分配”; 与 Memcached 不一样的是 BufferManager 指望申请者归还使用完后的内存块,以从新分配给其余申请操做。oop
System.ServiceModel.Channels.BufferManager 提供了一个可靠实现, 大体使用方式以下:
const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 32, maxBufferSize: size); Byte[] buffer = bm.TakeBuffer(1024); bm.ReturnBuffer(buffer);
与手动分配内容的性能对比
const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 10, maxBufferSize: size); var timer = new FunctionTimer(); timer.Push("BufferManager", () => { Byte[] buffer = bm.TakeBuffer(size); bm.ReturnBuffer(buffer); }); timer.Push("new Byte[]", () => { Byte[] buffer = new Byte[size]; }); timer.Initialize(); timer.Execute(100000).Print();
测试结果:
BufferManager
Time Elapsed : 7ms CPU Cycles : 17,055,523 Memory cost : 3,388 Gen 0 : 2 Gen 1 : 2 Gen 2 : 2 new Byte[] Time Elapsed : 42ms CPU Cycles : 113,437,539 Memory cost : 24 Gen 0 : 263 Gen 1 : 2 Gen 2 : 2
强制要求业务使用的请求不超过4k 貌似作获得,但需求更大内存的场景老是存在,好比合并消息、批量消费等,Chuye.Kafka 做为类库须要提供支持。
KafkaScalableBinary 并无发明新东西, 在其内部维护了一个 Dictionary<int32, byte[]=""> 保存一系列 Byte数组;
初始化时并未真正分配内存, 除非开始写入;
public KafkaScalableBinary() : this(4096) { } public KafkaScalableBinary(Int32 size) { if (size <= 0) { throw new ArgumentOutOfRangeException("size"); } _lengthPerArray = size; _buffers = new Dictionary<Int32, Byte[]>(16); }
写入时先根据当前位置对数组长度取模 _position / _lengthPerArray 找到待写入数组,不存在则分配新数组;
private Byte[] GetBufferForWrite() { var index = (Int32)(_position / _lengthPerArray); Byte[] buffer; if (!_buffers.TryGetValue(index, out buffer)) { if (_lengthPerArray >= 128) { buffer = ServiceProvider.BufferManager.TakeBuffer(_lengthPerArray); } else { buffer = new Byte[_lengthPerArray]; } _buffers.Add(index, buffer); } return buffer; }
而后根据当前位置对数组长度取整 _position % _lengthPerArray 找到目标位置;因为待写入长度可能超过可以使用长度,这里使用了 while 循环,一边获取和分配待写入数组, 一边将剩余字节写入其中,直至完成;
public override void WriteByte(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } checked { var left = count; //标记剩余量 while (left > 0) { var targetBuffer = GetBufferForWrite(); //查找目标数组 var targetOffset = (Int32)(_position % _lengthPerArray); //查找目标位置 if (targetOffset == _lengthPerArray - 1) { //若是位置已经位于数组末尾, 说明位于起始位置; targetOffset = 0; } var prepareCopy = left; //准备写入剩余量 if (prepareCopy > _lengthPerArray - targetOffset) { //但数组的剩余长度可能不够,写入较小长度 prepareCopy = _lengthPerArray - targetOffset; } Array.Copy(buffer, count - left, targetBuffer, targetOffset, prepareCopy); //拷贝字节 _position += prepareCopy; //推动位置 left -= prepareCopy; //减少剩余量 if (_position > _length) { //增大总长度 _length = _position; } } } }
读取过程相似,循环查找待读取数组和拷贝字节直到完成,不一样的是分配内存的逻辑以一条异常替代;
public override Int32 ReadBytes(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return 0; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } checked { var prepareRead = (Int32)(Math.Min(count, _length - _position)); //计算待读取长度 var left = prepareRead; //标记剩余量 while (left > 0) { var targetBuffer = GetBufferForRead(); //查找目标数组 var targetOffset = (Int32)(_position % _lengthPerArray); //查找目标位置 var prepareCopy = left; //准备读取剩余量 if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } Array.Copy(targetBuffer, targetOffset, buffer, prepareRead - left, prepareCopy); //但数组的剩余长度可能不够,读取较小长度 _position += prepareCopy; //推动位置 left -= prepareCopy; //减少剩余量 } return prepareRead; } } private Byte[] GetBufferForRead() { var index = (Int32)(_position / _lengthPerArray); Byte[] buffer; if (!_buffers.TryGetValue(index, out buffer)) { throw new IndexOutOfRangeException(); } return buffer; }
释放时释放内部维护的的所有字节;
public override void Dispose() { foreach (var item in _buffers) { if (_lengthPerArray >= 128) { ServiceProvider.BufferManager.ReturnBuffer(item.Value); } } _buffers.Clear(); }
写入缓冲区是对内部维护数组列表的直接操做,高度优化
public override void CopyTo(Stream destination) { foreach (var item in GetBufferAndSize()) { destination.Write(item.Key, 0, item.Value); } }
读取缓冲区时和写入行为相似
public override void ReadFrom(Stream source, int count) { var left = count; var loop = 0; do { var targetBuffer = GetBufferForWrite(); var targetOffset = (Int32)(_position % _lengthPerArray); var prepareCopy = left; if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } var readed = source.Read(targetBuffer, targetOffset, prepareCopy); _position += readed; left -= readed; if (_position > _length) { _length = _position; } loop++; } while (left > 0); }
实际上能够从 MemoryStream 定义出 ScalableMemoryStream 再改写其行为,KafkaScalableBinary 依赖于 MemoryStream 而不是具体实现,总体就更加"设计模式"了 , 基本逻辑前文已陈述。
测试过程当中发现,一来 **mono 的 BufferManager 实现存在线程安全问题*,故 Chuye.Kafka 提供了一个 ObjectPool 模式的 BufferManager 做为替代方案; 二是 KafkaScalableBinary 与 ScalableStreamBinary 的性能对比测试结果很是不稳定,但前者频繁的取横取整及字典开销必然是拖累,我会继续追踪和优化。
KafkaScalableBinary (见于 github), 序列化部分设计示意:
Jusfr 原创,转载请注明来自博客园