熟悉TPL Dataflow博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工做流任务, 在使用Docker部署的过程当中, 有一个问题一直没法回避:html
在单体程序部署的瞬间(服务不可用)会有少许流量没法处理;更糟糕的状况下,迭代部署的这个版本有问题,上线后没法运做, 更多的流量没有获得处理。git
背负神圣使命(巨大压力)的程序猿心生一计, 为什么不将单体程序改为分布式:增长服务ReceiverApp只接收数据,服务WebApp只处理数据。github
消息队列和订阅发布做为老生常谈的两个知识点被反复说起,按照JMS的规范, 官方称为点对点(point to point,message queue) 和 订阅发布(publish/subscribe,channel / topic )web
消息生产者生产消息发送到Message Queue中,而后消费者从队列中取出消息并消费。redis
队列会保留消息,直到他们被消费或超时;docker
① MQ支持多消费者,每一个消息只能被一个消费者处理编程
② 消息发送者和消费者在时间上没有依赖性,当发送者发送消息以后, 无论消费者有没有在运行(甚至无论有没有消费者),都不会影响到消息被发送到队列api
③ 通常消费者在消费以后须要向队列应答成功安全
若是但愿发送的消息都被处理,或只能被处理一次,你应该使用p2p模型。数据结构
消息生产者将消息发布到Channel,同时有多个消息消费者(订阅)该消息。和点对点方式不一样,发布到 特定通道的消息会被通道订阅者实时接收。
通道 只有暂存机制,发布的消息只能被当前订阅者收到。
①每一个消息能够有多个消费者
②发布者和消费者 有时间上依赖性, 针对某topic的订阅者,必须先建立相应订阅,才能消费消息
将消息发布到通道中,而不关注订阅者是谁;订阅者可收听本身感兴趣的多个通道(造成Topic), 也不关注发布者是谁。
③ 故若是没有消费者,发布的消息将得不处处理;
若是但愿广播的消息被实时接收,应该采用发布-订阅模型。
Redis 内置的List数据结构亦能造成轻量级MQ的效果,Redis 原生支持发布/订阅 模型。
如上所述, Pub/Sub 模型 在订阅者宕机的时候,发布的消息得不处处理,故此模型不能用于 强业务的 数据接收和处理。
本次采用的消息队列模型:
须要关注Redis 两个命令( 左进右出,右进左出同理):
LPUSH & RPOP/BRPOP
Brpop 中的B 表示 “Block”, 是一个rpop命令的阻塞版本:若指定List没有新元素,在给定时间内,该命令会阻塞当前redis客户端链接,直到超时返回nil
本次使用 ASPNetCore 完成RedisMQ的实践,引入Redis国产第三方开源库CSRedisCore.
不使用著名的StackExchange.Redis 组件库的缘由:
以前一直使用StackExchange.Redis, 参考了不少资料,作了不少优化,并未彻底解决RedisTimeoutException问题
StackExchange.Redis基于其多路复用的链接机制,不支持阻塞式命令, 故采用了 CSRedisCore,该库强调了API 与Redis官方命令一致,很容易上手
生产者使用LPush 命令向Redis List数据结构写入消息。
------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{
// Redis客户端要定义成单例, 否则在大流量并发收数的时候, 会形成redis client来不及释放。另外一方面也确认api控制器不是单例模式,
var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");
RedisHelper.Initialization(csredis);
services.AddSingleton(csredis);
services.AddMvc();
}
------------------截取自数据接收Controller------------------- [Route("batch")] [HttpPost] public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs) { if (!ModelState.IsValid) throw new ArgumentException("Http Body Payload Error."); var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs); if (eqidPairs != null && eqidPairs.Any()) RedisHelper.LPush(redisKey, eqidPairs.ToArray()); await Task.CompletedTask; }
根据以上RedisMQ思路,事件消费方式是拉取pull,故须要轮询Redis List数据结构,这里使用ASPNetCore内置的BackgroundService后台服务类实现后台轮询消费任务。
public class BackgroundJob : BackgroundService { private readonly IEqidPairHandler _eqidPairHandler; private readonly CSRedisClient[] _cSRedisClients; private readonly IConfiguration _conf; private readonly ILogger _logger; public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory) { _eqidPairHandler = eqidPairHandler; _cSRedisClients = csRedisClients; _conf = conf; _logger = loggerFactory.CreateLogger(nameof(BackgroundJob)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Service starting"); if (_cSRedisClients[0] == null) { _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0); } RedisHelper.Initialization(_cSRedisClients[0]); while (!stoppingToken.IsCancellationRequested) { var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}"; var eqidpair = RedisHelper.BRPop(5, key); if (eqidpair != null) await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair)); // 强烈建议不管如何休眠一段时间,防止突发大流量致使webApp进程CPU满载,自行根据场景设置合理休眠时间 await Task.Delay(10, stoppingToken); } _logger.LogInformation("Service stopping"); } }
最后依照引言中的部署原理图,将Nginx,Receiver, WebApp使用docker-compose工具容器化
根据docker-compsoe up命令的用法,若容器正在运行且对应的Service Configuration或Image并未改变,该容器不会被ReCreate;
docker-compose up指令只会重建(Service或Image变动)的容器。
If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation,
docker-compose up
picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the--no-recreate
flag.
作一次上线测试验证,修改docker-compose.yml文件Web app的容器服务,docker-compose up;
仅数据处理程序WebApp容器被重建:
Nice,分布式改造上线,效果很明显,如今能够放心安全的迭代Web App数据处理程序。
码甲拙见,若有问题请下方留言大胆斧正;码字+Visio制图,均为原创,看官请不吝好评+关注, ~。。~
本文欢迎转载,请转载页面明显位置注明原做者及原文连接。