TPL Dataflow .Net 数据流组件,了解一下?

回顾上文

  做为单体程序,依赖的第三方服务虽很少,可是2C的程序仍是有很多内容可讲; 做为一个常规互联网系统,无外乎就是接受请求、处理请求,输出响应。html

因为业务渐渐增加,数据处理的过程会愈来愈复杂和冗长,【连贯高效的处理数据】 愈来愈被看重,  .Net 提供了TPL  Dataflow组件使咱们更高效的实现基于数据流和 流水线操做的代码redis

    下图是单体程序中 数据处理的用例图。编程

 

 程序中用到的TPL Dataflow 组件,Dataflow是微软前几年给出的数据处理库, 是由不一样的处理块组成,可将这些块组装成一个处理管道,"块"对应处理管道中的"阶段", 可类比AspNetCore 中Middleware 和pipeline.。api

  • TPL Dataflow库为消息传递和并行化CPU密集型和I / O密集型应用程序提供了编程基础,这些应用程序具备高吞吐量和低延迟。它还可让您明确控制数据的缓冲方式并在系统中移动。数组

  • 为了更好地理解数据流编程模型,请考虑从磁盘异步加载图像并建立这些图像的应用程序。
    •   传统的编程模型一般使用回调和同步对象(如锁)来协调任务和访问共享数据, 从宏观看传统模型: 任务是一步步紧接着完成的缓存

    •   经过使用数据流编程模型,您能够建立在从磁盘读取图像时处理图像的数据流对象。在数据流模型下,您能够声明数据在可用时的处理方式以及数据之间的依赖关系。因为运行时管理数据之间的依赖关系,所以一般能够避免同步访问共享数据的要求。此外,因为运行时调度基于数据的异步到达而工做,所以数据流能够经过有效地管理底层线程来提升响应性和吞吐量。    也就是说: 你定义的是任务内容和任务之间的依赖,不关注数据何时流到这个任务 。并发

  •    须要注意的是:TPL Dataflow 非分布式数据流,消息在进程内传递,   使用nuget引用 System.Threading.Tasks.Dataflow 包。

TPL Dataflow 核心概念

 1.  Buffer & Block

TPL Dataflow 内置的Block覆盖了常见的应用场景,固然若是内置块不能知足你的要求,你也能够自定“块”。app

Block能够划分为下面3类:异步

  • Buffering Only    【Buffer不是缓存Cache的概念, 而是一个缓冲区的概念】async

  • Execution

  • Grouping 

使用以上块混搭处理管道, 大多数的块都会执行一个操做,有些时候须要将消息分发到不一样Block,这时可以使用特殊类型的缓冲块给管道“”分叉”。

2. Execution Block

  可执行的块有两个核心组件:
  • 输入、输出消息的缓冲区(通常称为Input,Output队列)

  • 在消息上执行动做的委托

  消息在输入和输出时可以被缓冲:当Func委托的运行速度比输入的消息速度慢时,后续消息将在到达时进行缓冲;当下一个块的输入缓冲区中没有容量时,将在输出时缓冲。

每一个块咱们能够配置:

  • 缓冲区的总容量, 默认无上限

  • 执行操做委托的并发度, 默认状况下块按照顺序处理消息,一次一个。

咱们将块连接在一块儿造成一个处理管道,生产者将消息推向管道。

TPL Dataflow有一个基于pull的机制(使用Receive和TryReceive方法),但咱们将在管道中使用块链接和推送机制。

  • TransformBlock(Execution category)-- 由输入输出缓冲区和一个Func<TInput, TOutput>委托组成,消费的每一个消息,都会输出另一个,你可使用这个Block去执行输入消息的转换,或者转发输出的消息到另一个Block。

  • TransformManyBlock (Execution category) -- 由输入输出缓冲区和一个Func<TInput, IEnumerable<TOutput>>委托组成, 它为输入的每一个消息输出一个 IEnumerable<TOutput>

  • BroadcastBlock (Buffering category)-- 由只容纳1个消息的缓冲区和Func<T, T>委托组成。缓冲区被每一个新传入的消息所覆盖,委托仅仅为了让你控制怎样克隆这个消息,不作消息转换。

            该块能够连接到多个块(管道的分叉),虽然它一次只缓冲一条消息,但它必定会在该消息被覆盖以前将该消息转发到连接块(连接块还有缓冲区)。

  • ActionBlock (Execution category)-- 由缓冲区和Action<T>委托组成,他们通常是管道的结尾,他们再也不给其余块转发消息,他们只会处理输入的消息。

  • BatchBlock (Grouping category)-- 告诉它你想要的每一个批处理的大小,它将累积消息,直到它达到那个大小,而后将它做为一组消息转发到下一个块。

  还有一下其余的Block类型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,咱们暂时不会深刻。

3. Pipeline Chain React

  当输入缓冲区达到上限容量,为其供货的上游块的输出缓冲区将开始填充,当输出缓冲区已满时,该块必须暂停处理,直到缓冲区有空间,这意味着一个Block的处理瓶颈可能致使全部前面的块的缓冲区被填满。

  可是不是全部的块变满时,都会暂停,BroadcastBlock 有容许1个消息的缓冲区,每一个消息都会被覆盖, 所以若是这个广播块不能将消息转发到下游,则在下个消息到达的时候消息将丢失,这在某种意义上是一种限流(比较生硬).

编程实践

    将按照上图实现TPL Dataflow 

①  定义Dataflow  pipeline
        public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory)
        {
            _httpClient = httpClientFactory.CreateClient("bce-request");
            _redisDB0 = redisCache[0];
            _redisDB = redisCache;
            _logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));
            var option = new DataflowLinkOptions { PropagateCompletion = true };

            publisher = _redisDB.RedisConnection.GetSubscriber();
            _eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>
              (
                   // redis piublih 没有作在TransformBlock fun里面, 由于publih失败可能影响后续的block传递
                   eqidPair => EqidResolverAsync(eqidPair),
                   new ExecutionDataflowBlockOptions
                   {
                       MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")
                   }
              );
            // https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline
            _logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);
            _logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );

            _broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容纳一个消息的缓存区和拷贝函数组成
            _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);
            _broadcastBlock.LinkTo(_logPublishBlock, option);
            _eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);
        }
public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase
    {
        private readonly string _dirPath;
        private readonly Timer _triggerBatchTimer;
        private readonly Timer _openFileTimer;
        private DateTime? _nextCheckpoint;
        private TextWriter _currentWriter;
        private readonly LogHead _logHead;
        private readonly object _syncRoot = new object();
        private readonly ILogger _logger;
        private readonly BatchBlock<T> _packer;
        private readonly ActionBlock<T[]> batchWriterBlock;
        private readonly TimeSpan _logFileIntervalTimeSpan;

        /// <summary>
        /// Generate  request log file.
        /// </summary>
        public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger<LogBatchBlock<T>>();

            _dirPath = logConfig.DirPath;
            if (!Directory.Exists(_dirPath))
            {
                Directory.CreateDirectory(_dirPath);
            }
            _logHead = logConfig.LogHead;

            _packer = new BatchBlock<T>(logConfig.BatchSize);
            batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models));     
            _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });

            _triggerBatchTimer = new Timer(state =>
            {
                _packer.TriggerBatch();
            }, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));

            _logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);
            _openFileTimer = new Timer(state =>
            {
                AlignCurrentFileTo(DateTime.Now);
            }, null, TimeSpan.Zero, _logFileIntervalTimeSpan);
        }

        public ITargetBlock<T> InputBlock => _packer;

        private void AlignCurrentFileTo(DateTime dt)
        {
            if (!_nextCheckpoint.HasValue)
            {
                OpenFile(dt);
            }
            if (dt >= _nextCheckpoint.Value)
            {
                CloseFile();
                OpenFile(dt);
            }
        }

        private void OpenFile(DateTime now, string fileSuffix = null)
        {
            string filePath = null;
            try
            {
                var currentHour = now.Date.AddHours(now.Hour);
                _nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan);
                int hourConfiguration = _logFileIntervalTimeSpan.Hours;
                int minuteConfiguration = _logFileIntervalTimeSpan.Minutes;
                filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log";

                var appendHead = !File.Exists(filePath);
                if (filePath != null)
                {
                    var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write);
                    var sw = new StreamWriter(stream, Encoding.Default);
                    if (appendHead)
                    {
                        sw.Write(GenerateHead());
                    }
                    _currentWriter = sw;
                    _logger.LogDebug($"{currentHour} TextWriter has been created.");
                }

            }
            catch (UnauthorizedAccessException ex)
            {
                _logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex);
                throw;
            }
            catch (Exception e)
            {
                if (fileSuffix == null)
                {
                    _logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace);
                    OpenFile(now, $"-{Guid.NewGuid()}");
                }
                else
                {
                    _logger.LogError($"OpenFile failed after retry: {filePath}", e);
                    throw;
                }
            }
        }

        private void CloseFile()
        {
            if (_currentWriter != null)
            {
                _currentWriter.Flush();
                _currentWriter.Dispose();
                _currentWriter = null;
                _logger.LogDebug($"{DateTime.Now} TextWriter has been disposed.");
            }
            _nextCheckpoint = null;
        }

        private string GenerateHead()
        {
            StringBuilder head = new StringBuilder();
            head.AppendLine("#Software: " + _logHead.Software)
                .AppendLine("#Version: " + _logHead.Version)
                .AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}")
                .AppendLine("#Fields: " + _logHead.Fields);
            return head.ToString();
        }

        private void WriteToFile(T[] models)
        {
            try
            {
                lock (_syncRoot)
                {
                    var flag = false;
                    foreach (var model in models)
                    {
                        if (model == null)
                            continue;
                        flag = true;
                        AlignCurrentFileTo(model.ServerLocalTime);
                        _currentWriter.WriteLine(model.ToString());
                    }
                    if (flag)
                        _currentWriter.Flush();
                }
            }
            catch (Exception ex)
            {
                _logger.LogError("WriteToFile Error : {0}", ex.Message);
            }
        }

        public bool AcceptLogModel(T model)
        {
            return _packer.Post(model);
        }

        public string GetDirPath()
        {
            return _dirPath;
        }

        public async Task CompleteAsync()
        {
            _triggerBatchTimer.Dispose();
            _openFileTimer.Dispose();
            _packer.TriggerBatch();
            _packer.Complete();
            await InputBlock.Completion;
            lock (_syncRoot)
            {
                CloseFile();
            }
        }
    }
仿IIS日志存储代码

② 异常处理

  上述程序在部署时就遇到相关的坑位,在测试环境_eqid2ModelTransformBlock 内Func委托稳定执行,程序并未出现异样;

  部署到生产以后, 该Pipeline会运行一段时间就中止工做,一直很困惑, 后来经过监测_eqid2ModelTransformBlock.Completion 属性,该块提早进入“完成态”   :   程序在执行某次Func委托时报错,Block提早进入完成态

TransfomrBlock.Completion 一个Task对象,当TPL Dataflow再也不处理消息而且能保证再也不处理消息的时候,就被定义为完成态, Task对象的TaskStatus枚举值将标记此Block进入完成态的真实缘由

- TaskStatus.RanToCompletion       根据Block定义的任务成功完成

- TaskStatus.Fault                            由于未处理的异常 致使"过早的完成"

- TaskStatus.Cancled                       由于取消操做 致使 "过早的完成"

  咱们须要当心处理异常, 通常状况下咱们使用try、catch包含全部的执行代码以确保全部的异常都被处理。

 

    可将TPL Dataflow 作为进程内消息队列,本文只是一个入门参考,更多复杂用法仍是看官网, 你须要记住的是, 这是一个.Net 进程内数据流组件, 能让你专一于流程。

 

做者: JulianHuang

感谢您的认真阅读,若有问题请大胆斧正;以为有用,请下方或加关注。

本文欢迎转载,但请保留此段声明,且在文章页面明显位置注明本文的做者及原文连接。

相关文章
相关标签/搜索