Consumer的高级特性linux
独有消费者:web
Consumer消息的异步分发:spring
Consumer的优先级:windows
管理持久化消息:
tomcat
持久化发送端:服务器
持久化接收端:session
运行程序,从界面上查看订阅者,20秒以后再刷新界面就能够看到这个不活跃的订阅者就会消失:多线程
消息分组:并发
前面讲了使用queue消息的时候,若是消息队列中有1,2,3,共3个消息,此时若是是并发的线程来拿消息进行消费,会出现消息顺序不一致的状况,为了防止这种状况使用独有消费者,可是会有性能上的问题,因此考虑使用消息分组。框架
发送端:
接收端:
运行接收端和发送端,从运行结果中能够看的出来,虽然有两个接收端,可是由于设置了分组,因此发送的全部消息只会给一个消费者。如今修改一下发送端,咱们让发送端发给不一样的组,那这些消息就会根据组的不一样被不一样的消费者消费,看运行的结果:
因此使用消息分组,一个组的消息就会给一个消费者,这一个组的消息不会被分发给不一样的消费者。不一样的组的消息会发给不一样的消费者。
实际上这个组用完就关闭了,因此这里的关闭其实是没有什么用的。
消息选择器(消息过滤):
使用上一节讲的消息分组的发送端和接收端。修改发送端:
修改接收端:
由于使用了过滤条件,因此运行程序以后能够看的出来是没有信息的。虽然消费者不接受不知足本身条件的消息,可是消息仍是在Activemq队列中的,若是如今修改一下接收端,直接只运行接收端,就能够接收到(原来的消息还在队列中):
从新传递消息策略:
如今来测试一下默认的重传次数,用之前写过的一个发送端:
先把接收端的session.commit注释掉。而后运行程序,经过测试能够发现,咱们运行一次发送端,运行n次接收端,由于把接收端的session.commit注释了,因此开启了不少次的接收端仍是能够接收到消息,这是由于接收端的session.commit注释了,因此即便消费端消费了消息,在消息队列中仍是不会清除掉已经消费的消息,仍是会存在,因此能够一直接收到消息:
可是消费端一直消费到第7次之后的时候就不能再接受了,由于默认的重传次数是6,超过7就不能接受了:
从web界面中能够看到那个DLQ队列:
如今来测试使用消息重传策略。修改消费端,设置的重传次数是3:
如今从新来测试,启动消费端,启动生产端,消费者一共接受了4次消息,再次启动消费者的时候就接受不到了,由于已经达到重传次数了。观察web界面中的死消息的队列的时候就能够看到多的消息了:
能够自动删除过时消息:
固然也能够为每个topic或者queue的目的地设置重传策略:
慢消费者处理:
在配置文件中能够看到默认是1000:
监控和管理Broker
第一种方式:这种方式就是咱们常说的web界面:
第二种方式:这是ActiveMQ5.9之后可使用的:
这里有不少能够查询或者使用的功能:
集成ActiveMQ和Tomcat
就是说把ActiveMQ嵌入到Tomcat中去跑,而不是做为一个单独的程序应用运行。其实通常的状况下是不但愿二者结合使用的,由于单独用ActiveMQ能够很方便的作修改和扩展,可是若是有要使用ActiveMQ和tomcat结合的需求的时候能够用。结合以后就但愿ActiveMQ是尽可能不要常常变化的了。如今来操做一下,修改里面的web.xml内容:
在tomcat中加入spring和xbean的包。下面的除了default包之外的spring和xbean的包都是须要加入到Linux上的tomcat中的,因此这些包在windows上指定一个位置处放好:
经过下面的命令将windows上的spring和xbean的包传入到linux的tomcat上(选中的是里面的jar包):
ActiveMQ的使用场景
4.做为RPC的替代:就是远程过程的调用。
若是A,B在同一个虚拟机中那么他们能够随便的通讯,可是若是不在同一个机器上,就要经过RPC框架。ActiveMQ就至关于RPC。好比A要调用B的方法,那么A就给ActiveMQ发一个消息,B就会执行相应的方法,而后A给ActiveMQ发消息说要获得返回结果,而后B就把结果告诉A。
ActiveMQ的优化
ActiveMQ结合业务
通常就是适合于高并发量的,插入修改等复杂业务逻辑的时候使用,若是是高并发的访问适合用memcache这些。上面的例子:若是生成订单,若是同时有很大的并发量要求生成订单,一个服务器压力是很大的,那么每一个客户那边反应就会很慢,因此使用ActiveMQ,将客户的生成订单的请求放在队列中,能够很快的放进去,而后若是并发量很大的时候能够有多个消费者从队列中拿请求作处理。因此这个时候对于客户来讲反应是很快的,真正处理业务逻辑的就是一个个的消费者了。
那ActiveMQ如何咱们的应用进行结合呢?
通常咱们使用这种异步的组件的时候仍是用第二种方式,方便之后扩展或其余的使用。第二种方式就是单独的启动ActiveMQ。就是将service中的一堆的处理逻辑做为Consumer。而后还要考虑用queueu仍是用topic,这里咱们用的是queue。固然咱们一个service中有不少的业务处理逻辑,因此能够搞多个消费者的线程一块儿处理。
如今有一个业务,接收消费者的Id,下订单:
如今使用ActiveMQ的消息:
service的方法就能够直接使用ActiveMQ消息了:
如今来写消费端。这里面就是当时具体的service中的业务逻辑,由于下面具体的业务逻辑中须要的uuid是从发送端发送过来的,因此接收端要首先接受一下:
消费者的一端能够用多线程的方式,若是有须要共享的资源就要加上锁,注意加锁是为哪一个加锁,不能随便使用this。
因此为了想在web应用启动的时候接收端就建立好,有两种方式,第一种是把接收端作成Lister(就是和配置Spring的监听器相似,在web.xml中把service配置好),另外一种方式是让web容器启动的时候就把spring相关的service都建立好,这样就把接收端也建立好了。
如今咱们来试试第二种方式,让咱们的接收端实现监听器,而且加上注解,让web容器建立的时候spring就把这个接收端建立,好:
修改接收端,去掉session的关闭:
不关闭链接这样服务器就是一直开着的,只要有消息就会进行接受。咱们这里就是把接收端作成一个服务了,是为了方便起见,固然也能够根据实际的业务须要作修改。
第一种方式:将接收端做为监听器
实现监听器接口,就要实现初始化和销毁的方法,初始化作的就是让接收端打开链接:
而后在web应用的web.xml中把监听器配置上,名字要对应,注意:咱们的接收端的Listener要放在spring的监听器的后面,由于只有spring建立成功了,才能建立它管理的接收端和service:
如今分析一下。咱们一共有3个部分,业务的service,发送端和接收端,咱们把接收端作成了监听器,不用咱们手动启动了。
由于咱们的接收端作成监听器了,每次启动web容器的时候都会启动监听器,因此接收端就是一直运行的,若是咱们把接收端的监听器先去掉,先不让她启动,看一下业务的速度:
这样咱们就只管发消息,这样速度明显就快了。根据具体的业务具体处理。