>本工具的核心思想就是:赌。只有两个基础组件同时死亡,才会受到严重影响
。哦,断电除外。git
mq是个好东西,咱们都在用。这也决定了mq应该是高高高可用的。某团就由于这个组件,出了好几回生产事故,呵呵。github
大部分业务系统,要求的消息语义都是at least once
,即都会有重复消息,但保证不会丢。即便这样,依然有不少问题:redis
1、mq可用性没法保证。 mq的意外死亡,形成生产端发送失败。不少消息要经过扒取日志进行回放,成本高耗时长。api
2、mq阻塞业务正常进行。 mq卡顿或者网络问题,会形成业务线程卡在mq的发送方法上,正常业务进行不下去,形成灾难性的后果。微信
3、消息延迟。 mq死了就用不着说了,消息还没投胎就已死亡。消息延迟主要是客户端消费能力不强,或者是消费通道单一形成的。网络
使用组合存储来保证消息的可靠投递,就是okmq
。多线程
>注意:okmq注重的是可靠性。对于顺序性、事务等其余要素,不予考虑。固然,速度是必须的。socket
我即便用两套redis来模拟一些mq操做,都会比现有的一些解决方案要强。但这确定不是咱们须要的,由于redis的堆积能力太有限,内存占用率直线上升的感受并不太好。分布式
但咱们能够用redis来做为额外的发送确认机制。这个想法,在《使用多线程增长kafka消费能力》一文中曾经提到过,如今到了实现的时候了。ide
OkmqKafkaProducer producer = new ProducerBuilder() .defaultSerializer() .eanbleHa("redis") .any("okmq.redis.mode", "single") .any("okmq.redis.endpoint", "127.0.0.1:6379") .any("okmq.redis.poolConfig.maxTotal", 100) .servers("localhost:9092") .clientID("okMQProducerTest") .build(); Packet packet = new Packet(); packet.setTopic("okmq-test-topic"); packet.setContent("i will send you a msg"); producer.sendAsync(packet, null); producer.shutdown();
咱们按照数字标号来介绍:
一、 在消息发送到kafka以前,首先入库redis。因为后续回调须要用到一个惟一表示,咱们在packet包里添加了一个uuid。
二、 调用底层的api,进行真正的消息投递。
三、 经过监听kafka的回调,删除redis中对应的key。在这里能够获得某条消息确切的的ack时间。那么长时间没有删除的,就算是投递失败的消息。
四、 后台会有一个线程进行这些失败消息的遍历和从新投递。咱们叫作recovery。最复杂的也就是这一部分。对于redis来讲,会首先争抢一个持续5min的锁,而后遍历相关hashkey。
因此,对于以上代码,redis发出如下命令:
1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354" 1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" "" 1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{\"content\":\"i will send you a msg104736623015238\",\"topic\":\"okmq-test-topic\",\"identify\":\"2b9b33fd-95fd-4cd6-8815-4c572f13f76e\",\"timestamp\":1559206423318}" 1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" 1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000" 1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash" 1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0" 1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354" 1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock" 1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"
1、mq可用性没法保证。
为何要要经过过后进行恢复呢?我把recovery机制带着不是更好么?经过对未收到ack的消息进行遍历,能够把这个过程作成自动化。
2、mq阻塞业务正常进行。
经过设置kafka的MAX_BLOCK_MS_CONFIG 参数,实际上是能够不阻塞业务的,但会丢失消息。我可使用其余存储来保证这些丢失的消息从新发送。
3、消息延迟。
mq死掉了,依然有其余备用通道进行正常服务。也有的团队采用双写mq双消费的方式来保证这个过程,也是被逼急了:)。若是kafka死掉了,业务会切换到备用通道进行消费。
若是你不想用redis,好比你先要用hbase,那也是很简单的。 但须要实现一个HA接口。
public interface HA { void close(); void configure(Properties properties); void preSend(Packet packet) throws HaException; void postSend(Packet packet) throws HaException; void doRecovery(AbstractProducer producer) throws HaException; }
使用以前,还须要注册一下你的插件。
AbstractProducer.register("log", "com.sayhiai.arch.okmq.api.producer.ha.Ha2SimpleLog");
okmq.ha.recoveryPeriod 恢复线程检测周期,默认5秒 okmq.redis.mode redis的集群模式,可选:single、sentinel、cluster okmq.redis.endpoint 地址,多个地址以,分隔 okmq.redis.connectionTimeout 链接超时 okmq.redis.soTimeout socket超时 okmq.redis.lockPx 分布式锁的持有时间,可默认,5min okmq.redis.splitMillis 间隔时间,redis换一个key进行运算,默认5min okmq.redis.poolConfig.* 兼容jedis的全部参数
一、进行了生产端的高可用抽象,实现了kafka的样例。
二、增长了SimpleLog的ping、pong日志实现。
三、增长了Redis的生产端备用通道。包含single、cluster、sentinel三种模式。
四、能够自定义其余备用通道。
五、兼容kakfa的全部参数设置。
一、实现ActiveMQ的集成。
二、实现消费者的备用通道集成。
三、增长嵌入式kv存储的生产者集成。
四、更精细的控制系统的行为。
五、加入开关和预热,避免新启动mq即被压垮。
六、redis分片机制,大型系统专用。
一、监控功能添加。
二、rest接口添加。
当你把参数ha设置为true,代表你已经收到如下的使用限制。反之,系统反应于原生无异。
使用限制: 本工具仅适用于非顺序性、非事务性的普通消息投递,且客户端已经作了幂等。一些订单系统、消息通知等业务,很是适合。若是你须要其余特性,请跳出此页面。
kafka死亡,或者redis单独死亡,消息最终都会被发出,仅当kafka与redis同时死亡,消息才会发送失败,并记录在日志文件里。
正常状况下,redis的使用容量极少极少。异常状况下,redis的容量有限,会迅速占满。redis的剩余时间就是你的StopWatch
,你必须在这个时间内恢复你的消息系统,必定要顶住哇。
系统目前处于1.0.0版本,正在线上小规模试用。工具小众,但适用于大部分应用场景。若是你正在寻求这样的解决方案,欢迎一块完善代码。
github地址:
https://github.com/sayhiai/okmq
也欢迎关注《小姐姐味道》微信公众号,进行交流。