基于Redis实现分布式消息队列
原文地址:http://blog.csdn.net/stationxp/article/details/45731497)
一、为何须要消息队列?
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就须要消息队列,做为抽象层,弥合双方的差别。
举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,须要未来不及处理的消息暂存一下,缓冲压力。
再举个例子:调远程系统下订单成本较高,且由于网络等因素,不稳定,攒一批一块儿发送。
再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开。1:00到4:00和ERP联通,和电商系统断开。
再举个例子,服务员点菜快,厨师作菜慢。
再举个例子,到银行办事的人多,提供服务的窗口少。
乖乖排队吧。
二、使用消息队列有什么好处?
2.一、提升系统响应速度
使用了消息队列,生产者一方,把消息往队列里一扔,就能够立马返回,响应用户了。无需等待处理结果。
处理结果可让用户稍后本身来取,如医院取化验单。也可让生产者订阅(如:留下手机号码或让生产者实现listener接口、加入监听队列),有结果了通知。得到约定将结果放在某处,无需通知。
2.二、提升系统稳定性
考虑电商系统下订单,发送数据给生产系统的状况。
电商系统和生产系统之间的网络有可能掉线,生产系统可能会因维护等缘由暂停服务。
若是不使用消息队列,电商系统数据发布出去,顾客没法下单,影响业务开展。
两个系统间不该该如此紧密耦合。应该经过消息队列解耦。同时让系统更健壮、稳定。
三、为何须要分布式?
3.一、多系统协做须要分布式
消息队列中的数据须要在多个系统间共享数据才能发挥价值。
因此必须提供分布式通讯机制、协同机制。
3.二、单系统内部署环境须要分布式
单系统内部,为了更好的性能、为了不单点故障,多为集群环境。
集群环境中,应用运行在多台服务器的多个JVM中;数据也保存在各类类型的数据库或非数据库的多个节点上。
为了知足多节点协做须要,须要提供分布式的解决方案。
四、分布式环境下须要解决哪些问题
4.一、并发问题
需进行良好的并发控制。确保“线程安全“。
不要出现一个订单被出货两次。不要出现顾客A下的单,发货发给了顾客B等状况。
4.二、简单的、统一的操做机制
需定义简单的,语义明确的,业务无关的,恰当稳妥的统一的访问方式。
4.三、容错
控制好单点故障,确保数据安全。
4.四、可横向扩展
可便捷扩容。
五、如何实现?
成熟的消息队列中间件产品太多了,族繁不及备载。
成熟产品通过验证,接口规范,可扩展性强。
结合事业环境因素、组织过程遗产、实施运维考虑、技术路线考虑、开发人员状况等缘由综合考虑,基于Redis本身作一个是最可行的选择。
一、消息队列需提供哪些功能?
在功能设计上,我崇尚奥卡姆剃刀法则。
对于消息队列,只须要两个方法: 生产 和 消费。
具体的业务场景是任务队列,代码设计以下:
public abstract class TaskQueue{
private final String name ;
public String getName(){return this.name;}html
public abstract void addTask(Serializable taskId); public abstract Serializable popTask();
}
同时支持多个队列,每一个队列都应该有个名字。final确保TaskQueue是线程安全的。TaskQueue的实现类也应该确保线程安全。
addTask向队列中添加一个任务。队列中仅保存任务的id,不存储任务的业务数据。
popTask从队列中取出一个任务来执行。
这种设计不是特别友好,由于她须要调用者自行保证任务执行成功,若是执行失败,自行确保从新把任务放回队列。 不管如何,这种机制是能够工做的。想一想奥卡姆剃刀法则,咱们先按照这个设计实现出来看看。
若是调用者把业务数据存在数据库中,业务数据中包含“状态“列,标识任务是否被执行,调用者须要自行管理这个状态,并控制事务。
popTask采用阻塞方式,仍是非阻塞方式呢?
若是采用阻塞方式,队列中没任务的时候,客户端不会断开链接,只是等。
通常状况下,客户端会有多个worker抢着干活儿,几条狼一块儿等一个肉包子,画面太美。链接是重要资源,若是一直没活儿干,先放回池里,也不错。
先采用非阻塞的方式吧,若是队列是空的,popTask返回null,当即返回。
二、后续可能提供的功能
2.一、引入Task生命周期概念
应用场景不一样,需求也不一样。
在严格的应用场景中,须要确保每一个Task执行“成功“了。
对于上面提到的popTask后无论的“模式“,这是另一种“运行模式“,两种模式能够并行存在。
在这种新模式下,Task状态有3种:新建立(new,刚调用addTask加到队列中)、正在执行(in-process,调用popTask后,调用finish前)、完成(done,执行OK了,调用finishTask后)。
调整后的代码以下:
public abstract class TaskQueue{redis
private final String name ; public String getName(){return this.name;} public abstract int getMode(); public abstract void addTask(Serializable taskId); public abstract Serializable popTask(); public abstract void finishTask(Serializable taskId);
}
2.二、增长批量取出任务的功能
popTask()一次取出一个任务,太磨叽了。
比如咱们要买5瓶水,开车去超市买,每去一次买1瓶,有点儿啥。
咱们须要一个一次取多个任务的方法。
public abstract class TaskQueue{
... ...
public abstract Serializable[] popTasks(long cnt);
}
2.三、增长阻塞等待机制
想象一种场景:
小明同窗,取出一个任务,发现干不了,放回队列,再去取,取出来发现仍是干不了,又放回去。反反复复。
小明童鞋肿么了?多是他干活须要网络,网络断了。多是他作任务须要写磁盘,磁盘满了。
若是小明像邻居家的孩子同样优秀,当他发现哪里不对的时候,他应该冷静下来,歇会儿。
但他万一不是呢?只有咱们能帮他了。
假如队列中有10000个待办任务。
这时候小明来了。他失败100次后,咱们应该拦他吗?不该该,除非他主动要求(在系统参数中配置)。5000次后呢?也不该该,除非他主动要求。咱们的原则是:咱们作的全部事情,对于调用者,都是能够预期的。
咱们能够在系统参数中要求调用者设置一个阀值N,若是不设置,默认为100。连续失败N次后,让调用者睡一下子,睡多长时间,让调用者配置。
假如咱们的底层实现中包含待办子队列、重作子队列和完成子队列(这种设计好复杂!pop的时候先pop重作,仍是先pop待办,复杂死了!希望不须要这样)。
待办子队列中有10000个任务。
在小明失败10000次后,全部的任务都在重作子队列了。这时候咱们应该拦他吗?
重作子队列要不要设置大小,超过以后,让下一个访问者等。
等的话就会涉及超时,超时后,任务也不能丢弃。
太复杂 了!设置一个连续失败次数的限制就够了!
2.四、考虑增长Task类
不保存任务的相关数据是基本原则,绝对不动摇。
增长Task类能够管理下生命周期,更有用的是,能够把Task自己设计成Listener,代码大概时这样的:
public abstract class Task{数据库
public Serializable getId(); public int getState(); pubic void doTask(); public void whenAdded(final TaskQueue tq); public void whenPoped(final TaskQueue tq); // public void whenFaild(final TaskQueue tq); public void whenFinished(final TaskQueue tq);
}
经过Task接口,咱们能够对调用过程进行更强势的管理(如进行事务控制),对调用者施加更强的控制,用户也能够得到更多的交互机会,同TaskQueue有更好的交互(如在whenFinished中作持久化工做)。
但这些真的有必要吗?是否是太侵入了?注解的方式会好些吗?
再考虑吧。
2.五、增长系统参数
貌似须要个Config类了,不爽!
原本想作一个很小很精致的小东西的,若是必须再加吧。
若是作的话,须要支持properties、注解设置、api方式设置、Spring注入式设置,烦。
次回预告:Redis自己机制和TaskQueue的契合。
基于Redis实现分布式消息队列(3)
一、Redis是什么鬼?
Redis是一个简单的,高效的,分布式的,基于内存的缓存工具。
假设好服务器后,经过网络链接(相似数据库),提供Key-Value式缓存服务。
简单,是Redis突出的特点。
简单能够保证核心功能的稳定和优异。
二、性能
性能方面:Redis是足够高效的。
和Memecached对比,在数据量较小大状况下,Redis性能更优秀。
数据量大到必定程度的时候,Memecached性能稍好。
简单结论:但整体上讲Redis性能已经足够好。
// Ref: Redis性能测试 http://www.cnblogs.com/lulu/archive/2013/06/10/3130878.html
原则:Value大小不要超过1390Byte。
经实验得知:
List操做和字符串操做性能至关,略差,几乎能够忽略。
使用Jedis自带pool,“每次从pool中取用完放回“ 和 “重用单个链接“ 相比,平均用时是3倍。这部分须要继续研究底层机制,采用更合理的实验方法进一步得到数据。
使用Jedis自带pool,性能上是知足当前访问量须要的,等有时间了再进一步深刻。
三、数据类型
Redis支持5种数据类型:字符串、Map、List、Set、Sorted Set。
List特别适合用于实现队列。提供的操做包括:
从左侧(或右侧)放入一个元素,从右侧(或左侧)取出一个元素,读取某个范围的元素,删除某个范围的元素。
Sorted Set中元素是惟一的,能够经过名字找。
Map能够高效地经过key找。
假如咱们须要实现finishTash(taskId),须要经过名字在队列中找元素,上面两个可能会用到。
四、原子操做
实现分布式队列首要问题是:不能出现并发问题。
Redis是底层是单线程的,命令执行是原子操做,支持事务,契合了咱们的需求。
Redis直接提供的命令都是原子操做,包括lpush、rpop、blpush、brpop等。
Redis支持事务。经过相似 begin…[cancel]…commit的语法,提供begin…commit之间的命令为原子操做的功能,之间命令对数据的改变对其余操做是不可见的。相似关系型数据库中的存储过程,同时提供了最高级别的事务隔离级别。
Redis支持脚本,每一个脚本的执行是原子性的。
作了一下并发测试:
写了个小程序,随机对List作push或pop操做,push的比pop的稍多。
记录每次处理的详细信息到数据库。
最后把List中数据都pop出来,详细记录每次pop详细信息。
统计push和pop是否相等,统计针对每条数据是否都有push和pop。
500并发,没有出现并发问题。
五、集群
实现分布式队列另外一个重要问题是:不能出现单点故障。
Redis支持Master-Slave数据复制,从服务器设置 slave-of master-ip:port 便可。
集群功能能够由客户端提供。
客户端使用哨兵,可自动切换主服务器。
因为队列操做都是写操做,从服务器主要目的是备份数据,保证数据安全。
若是想基于 sharding 作多master集群,能够结合 zookeeper 本身作。
Redis 3.0支持集群了,还没细看,应该是个好消息,等你们都用起来,没什么问题的话,能够考虑试试看。
若是 master 宕掉,怎么办?
“哨兵”会选出一个新的master来。产生过程当中,消息队列暂停服务。
最极端的状况,全部Redis都停了,当消息队列发现Redis中止响应时,对业务系统的请求应抛出异常,中止队列服务。
这样会影响业务,业务系统下订单、审批等操做会失败。若是能够接受,这是一种方案。
Redis整个集群宕掉,这种状况不多发生,若是真发生了,业务系统中止服务也是能够理解的。
若是想要在Redis整个集群宕掉的状况下,消息队列仍继续提供服务。
方法是这样的:
启用备用存储机制,能够是zookeeper、能够是关系型数据库、能够是另外可用的Memecached等。
本地内存存储是不可取的,首先,同步多个客户端虚拟机内存数据太复杂,至关于本身实现了一个Redis,其次,保证内存数据存储安全太复杂。
备用存储机制至关于实现了另一个版本的消息队列,逻辑一致,底层存储不一样。这个实现能够性能低一些,保证最基本的原则便可。
想要保证不出现并发问题,因为消息队列程序同时运行在多个虚拟机中,对象锁、方法锁无效。须要有一个独立于虚拟机的锁机制,zookeeper是个好选择。
将关系型数据库设置为最高级别的事务隔离级别,太傻了。除了zk有其余好办法吗?
Redis集群整个宕掉的同时Zookeeper也全军覆没怎么办?
这个问题是没有尽头的,提供了第二备用存储、第三备用存储、第四备用存储、…,理论上也会同时宕掉,那时候怎么办?
有钱任性的土豪能够继续,预算有限的状况,能作到哪步就作到哪步。
六、持久化
分布式队列的应用场景和缓存的应用场景是不同的。
若是有没来得及持久化的数据怎么办?
从业务系统的角度,已经成功发送给消息队列了。
消息队列也觉得Redis妥妥地收好了。
可Redis还没写到日记里,更没有及时通知小伙伴,挂了。多是断电了,多是进程被kill了。
后果会怎样?
已经执行过的任务会再次执行一遍。
已经放到队列中的任务,消失了。
标记为已经完成的任务,状态变为“进行中”了,而后又被执行了一遍。
后果不可接受。
分布式队列不容许丢数据。
从业务角度,哪怕丢1条数据也是没法接受的。
从运维角度,Redis丢数据后,若是能够及时发现并补救,也是能够接受的。
从架构角度,队列保存在Redis中,业务数据(包括任务状态)保存在关系型数据库中。
任务状态是从业务角度肯定的,消息队列不该该干涉。若是业务状态没有统一的规范和定义,从业务数据比对任务队列是否全面正确,就只能交给业务开发方来作。
从分工上来看,任务队列的目的是管理任务执行的状态,业务系统把这个职责交给了任务队列,业务系统自身的任务状态维护未必准确。
结论:任务队列不能推卸责任,不能丢数据是核心功能,不能打折扣。
采用 Master-Slave 数据复制模式,配置bgsave,追加存储到aof。
在从服务器上配置bgsave,不影响master性能。
队列操做都是写操做,master任务繁重,能让slave分担的持久化工做,就不要master作。
rdb和aof两种方法都用上,多重保险。
appendfsync设为always。// 单节点测性能,连续100000次算平均时间,和per second比对,性能损失不大。
性能会有些许损失,但任务执行为异步操做,无需用户同步等待,为了保证数据安全,这样是值得的。
当运维须要重启Master服务器的时候,采起这样的顺序: 小程序
将以上操做写成脚本,自动化执行,避免人为错误。
基于Redis实现分布式消息队列(4)
一、访问Redis的工具类
public class RedisManager {api
private static Pool<Jedis> pool;缓存
protected final static Logger logger = Logger.getLogger(RedisManager.class);安全
static{
try {
init();
} catch (Exception e) {
e.printStackTrace();
}
}服务器
public static void init() throws Exception {网络
Properties props = ConfigManager.getProperties("redis"); logger.debug("初始化Redis链接池。"); if(props==null){ throw new RuntimeException("没有找到redis配置文件"); } // 建立jedis池配置实例 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // 设置池配置项值 int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim()); jedisPoolConfig.setMaxTotal(poolMaxTotal); int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim()); jedisPoolConfig.setMaxIdle(poolMaxIdle); long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim()); jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis); logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ", poolMaxTotal,poolMaxIdle,poolMaxWaitMillis)); // 根据配置实例化jedis池 String connectMode = props.getProperty("redis.connectMode"); String hostPortStr = props.getProperty("redis.hostPort"); logger.debug(String.format("host : %s ",hostPortStr)); logger.debug(String.format("mode : %s ",connectMode)); if(StringUtils.isEmpty(hostPortStr)){ throw new OptimusException("redis配置文件未配置主机-端口集"); } String[] hostPortSet = hostPortStr.split(","); if("single".equals(connectMode)){ String[] hostPort = hostPortSet[0].split(":"); pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim())); }else if("sentinel".equals(connectMode)){ Set<String> sentinels = new HashSet<String>(); for(String hostPort : hostPortSet){ sentinels.add(hostPort); } pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig); }
}架构
/**
/**
/**
/**
/**
/**
/**
}
用String类型描述任务,也能够考虑byte[],要求对每一个任务描述的数据尽量短。
三、队列的Redis实现类
/**
*/public class TaskQueueRedisImpl implements TaskQueue {
private final static int REDIS_DB_IDX = 9;
private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);
private final String name;
/**
/* (non-Javadoc)
/* (non-Javadoc)
}
和具体的队列过于紧耦合,但简单好用。
先跑起来再说。
五、向队列中添加任务的代码
TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);
六、从队列中取出任务执行的代码
public class SmsSendTask{
protected final static Logger logger = Logger.getLogger(SmsSendTask.class);
protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();