分布式消息通讯(ActiveMQ)

分布式消息通讯(ActiveMQ)

应用场景

异步通讯

应用解耦

流量削峰

 

# ActiveMQ安装html

  1. 下载java

    http://activemq.apache.org/mysql

  2. 压缩包上传到Linux系统linux

    apache-activemq-5.15.9-bin.tar.gzspring

  3. 解压缩sql

    tar -zxvf apache.activemq-5.15.0-bin.tar.gz数据库

  4. 启动apache

    bin目录下: ./activemq startvim

    ####查看是否启动命令:./activemq statuscentos

  5. 进入管理后台(默认后台管理端口8161;默认openwire端口61616)

    http://127.0.0.1:8161/admin admin admin

  6. 错误处理

    查看日志:./activemq console

    • hostname不合法

      ERROR | Failed to start Apache ActiveMQ (localhost, ID:VM_0_5_centos-46296-1554189350972-0:1)
     java.net.URISyntaxException: Illegal character in hostname at index 7: ws://VM_0_5_centos:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
             at java.net.URI$Parser.fail(URI.java:2848)[:1.8.0_201]
             at java.net.URI$Parser.parseHostname(URI.java:3387)[:1.8.0_201]
             at java.net.URI$Parser.parseServer(URI.java:3236)[:1.8.0_201]
             at java.net.URI$Parser.parseAuthority(URI.java:3155)[:1.8.0_201]
             at java.net.URI$Parser.parseHierarchical(URI.java:3097)[:1.8.0_201]
             at java.net.URI$Parser.parse(URI.java:3053)[:1.8.0_201]
             at java.net.URI.<init>(URI.java:673)[:1.8.0_201]
             at

    解决办法:

     编辑文件:vim /etc/hostname,文件内直接填写新的主机名(能够尝试只用这一步)
     
     若是还不行,执行下面的:
     编辑文件:vim /etc/sysconfig/network
     改HOSTNAME=xxxxxxx为 HOSTNAME=wly 保存
     改映射: Vim /etc/hosts
     127.0.0.1 new-hostname.domainname ******
     星号为原来的主机名,现只将*****改为为wly 保存
     
     重启linux机器:reboot

     

JMS概念和规范

java消息服务(java message service)

MOM(message oriented middleware)面向消息中间件

转载:http://www.javashuo.com/article/p-rpbcvyhr-ho.html

JMS是什么   JMS 全称:Java Message Service,Java消息服务,是Java EE中的一个技术。

JMS规范

  JMS定义了Java 中访问消息中间件的接口,并无给予实现,实现JMS接口的消息中间件成为JMS Provider,例如:Active MQ

JMS Provider

  实现JMS接口和规范的消息中间件

JMS message

  JMS的消息,JMS消息由三部分组成:消息头、消息属性、消息体

  消息头包含消息的识别消息和路由消息,消息头包含一些标准的属性以下:

   (1)JMSDestination: 消息发送的目的地,主要是指Queue和Topic,由send方法设置.

   (2)JMSDeliveryMode:传送模式。有两种:持久模式非持久模式。一条持久性的消息应该被传输"一次仅仅一次",这就意味着若是JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复以后再次传递。一条非持久的消息最多会传递一次,这意味着服务器出现故障,该消息将永远丢失。由send方法设置

   (3)JMSExpiration:消息过时时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT的时间值。若是timeToLive值等于零,则JMSExpiration被设置为零,表示该消息永不过时。若是发送后,在消息过时时间以后消息尚未被发送到目的地,则该消息被清除。由send方法设置

   (4)JMSPriority:消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。JMS不要求JMS Provider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达,默认是4级。由send方法设置

   (5)JMSMessageID:惟一识别每一个消息的标识,由JMS Provider产生。由send方法设置

   (6)JMSTimestamp:一个JMS Provider在调用send()方法时自动设置,它是消息被发送和消费者实际接收的时间差。由客户端设置

   (7)JMSCorrelationID:用来链接到另一个消息,典型的应用是在回复消息中链接到原消息。在大多数状况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID能够是任何值,不只仅是JMSMessageID。由客户端设置

   (8)JMSType: 消息类型的标识符,由客户端设置

   (9)JMSReplyTo: 提供本消息回复消息的目的地址,由客户端设置

   (10)JMSRedelivered:若是一个客户端收到一个设置了JMSRedelivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并无签收(acknowledged)。若是该消息被从新传送,JMSRedelivered=true 不然 JMSRedelivered=flase 。由JMS Provider设置

  消息体,JMS API定义了5种消息体格式,也叫消息类型,可使用不一样形式发送接收数据,并能够兼容现有的消息格式。

    包括:TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage

  消息属性,包含如下三种类型的属性:

    1.应用程序设置和添加的数据,好比:message.setStringProperty("userName",userName);

    2.JMS定义的属性,使用"JMSX"做为属性名的前缀, connection.getMetaData().getJMSXPropertyNames() 方法返回全部链接支持的JMSX属性的名字。

    3.JMS供应商特定的属性

JMS producer

  消息生产者,建立和发送JMS消息的客户端应用

JMS consumer

  消息消费者,建立和处理JMS消息的客户端应用

JMS domains: 消息传递域

  JMS规范中定义了两种消息传递域: 点对点(point-to-point,简写成PTP);消息传递域和发布/订阅消息传递域(publish/subscribe,简写成pub/sub)

  1.点对点消息传递域的特色以下:

   a.每一个消息只能有一个消费者

   b.消息的生产者和消费者之间没有时间上的相关性。不管消费者在生产者发送消息的时候是否处于运行状态,它均可以提取消息。

  2.发布/订阅消息传递域的特色以下:

    a.每一个消息能够有多个消费者

    b.生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅以后发布的消息。JMS规范容许客户建立持久订阅(setClientID),这在必定程度上放松了时间上的相关性要求。持久订阅容许消费者消费它在未处于激活状态时发送的消息。

    

  3.在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中,目的地被称为主题(topic)

 

Connection factory: 链接工厂,用来建立链接对象,以链接到JMS的provider

JMS Connection: 封装了客户与JMS提供者之间的一个虚拟的链接 JMS Session: 是生产和消费消息的一个单线程上下文 会话用于建立消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操做中。 Destination:消息发送到的目的地 Acknowledge:签收 Transaction:事务 JMS client: 用来收发消息的Java应用

JMS的可靠性机制:JMS消息以后被确认后,才会认为是被成功消费。

事务性会话,即设置为ture,消息会在commit后自动确认;

非事务性会话,即设置为false,在该模式下,消息是否被确认取决于建立会话时的应答模式。:

AUTO_ACKNOWLEDGE:当客户端成功receive后,消息自动确认

CLIENT_ACKNOWLEDGE:客户端手动确认(textMessage.acknowledge)

DUPS_OK_ACKNOWLEDGE:延迟确认(能够设置时间)。

 

ActiveMQ测试

编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:

      <dependency>  
          <groupId>org.apache.activemq</groupId>  
          <artifactId>activemq-all</artifactId>  
     </dependency>

queue的发送代码以下(生产者):

  public void testMQProducerQueue() throws Exception{
         //一、建立工厂链接对象,须要制定ip和端口号
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
         //二、使用链接工厂建立一个链接对象
         Connection connection = connectionFactory.createConnection();
         //三、开启链接
         connection.start();
         //四、使用链接对象建立会话(session)对象
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         //五、使用会话对象建立目标对象,包含queue和topic(一对一和一对多)
         Queue queue = session.createQueue("test-queue");
         //六、使用会话对象建立生产者对象
         MessageProducer producer = session.createProducer(queue);
         //七、使用会话对象建立一个消息对象
         TextMessage textMessage = session.createTextMessage("hello!test-queue");
         //八、发送消息
         producer.send(textMessage);
         //九、关闭资源
         producer.close();
         session.close();
         connection.close();
    }

接收代码(消费者):

     public void TestMQConsumerQueue() throws Exception{
         //一、建立工厂链接对象,须要制定ip和端口号
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
         //二、使用链接工厂建立一个链接对象
         Connection connection = connectionFactory.createConnection();
         //三、开启链接
         connection.start();
         //四、使用链接对象建立会话(session)对象
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         //五、使用会话对象建立目标对象,包含queue和topic(一对一和一对多)
         Queue queue = session.createQueue("test-queue");
         //六、使用会话对象建立生产者对象
         MessageConsumer consumer = session.createConsumer(queue);
         //七、向consumer对象中设置一个messageListener对象,用来接收消息
         consumer.setMessageListener(new MessageListener() {
 
             @Override
             public void onMessage(Message message) {
                 // TODO Auto-generated method stub
                 if(message instanceof TextMessage){
                     TextMessage textMessage = (TextMessage)message;
                     try {
                         System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                         // TODO Auto-generated catch block
                         e.printStackTrace();
                    }
                }
            }
        });
         //八、程序等待接收用户消息
         System.in.read();
         //九、关闭资源
         consumer.close();
         session.close();
         connection.close();
    }

而后当咱们运行queue发送的时候能够看到队列里已经有一条消息了,但没有发送出去: 而后在运行queue 的接收端,能够看到消息已经发出了:

 

broker

能够本地建立并运行。

 BrokerService brokerservice = new BrokerService();
 brokrService.setUseJmx(true);
 brokrService.addConnector("tcp://localhost:61616");
 brokrService.start();

FAQ

 Q:消息的发送策略
 A:持久化消息(默认)/非持久化消息
 PS:设置消息发送端发送持久化消息`异步方式`: connectionFactory.setUseAsyncSend(true);
    设置消息发送端发送非持久化消息(默认为异步方式):textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENCE)
    回执窗口大小设置:connectionFactory.setProducerWindowSize();
    若是须要对非持久化消息的每次发送的消息都得到broker的回执:connectionFactory.setAllwaysSyncSend();
 Q:comsumer获取消息时pull仍是push
 A:默认状况下,mq服务器(broker)采用异步方式向客户端主动推送消息(push)
 
    prefetchsize:预取消息数量(broker每次主动推送的消息数量)。Queue默认值是1000;topic持久化消息默认值是100,非持久化消息默认值是32766。若是设置为0,此时对于consumer来讲,就是pull模式。
Q:acknowledge为何可以在第5次把前面执行的消息确认掉?
A:源码deliveryMessage:存在以前执行完但没有确认的消息队列

#消息确认
ACK_TYPE:消息端和broker交换ack指令的时候,还须要告知broker ACK_TYPE
	REDELIVERED_ACK_TYPE:重发策略
	DELIVERED_ACK_TYPE:消息已经接受,可是还没有处理结束
	STANDARD_ACK_TYPE:消息处理成功

 

ActiveMQ+spring整合

service-jms.xml

实现MessageListener

ActiveMQ的传输协议

client与broker的通讯协议

支持的协议TCP(默认)、UDP、NIO、SSL、Http(s)、vm

ActiveMQ持久化存储

1.kahaDB(默认,基于文件的存储方式)

2.AMQ(基于文件的存储方式)

写入速度快,容易恢复

文件默认大小是32M

3.JDBC(基于数据库的存储)

<jdbcPersistenceAdapter dataSource="#mysqlDataSource"  createTablesOnStartup="true"/>

链接数据库成功后,会建立三张表ACTIVEMQ_ACKS、ACTIVEMQ_LOCK、ACTIVEMQ_MSGS

4.Memory(基于内存的存储)

5.LevelDB

5.8版本之后引入的持久化策略,一般用于集群配置

 

ActiveMQ网络链接

networkConnector:用来配置broker与broker之间的通讯链接

<!--静态网络链接-->
<networkConnectors>
	<networkConnectot uri="static://(tcp://192.168.1.1:61616,tcp://192.168.1.2:61616)"/>
</networkConnectors>

#双向链接:duplex
<!--丢失的消息:配置消息回流,解决该问题-->
<policyEntry queue=">" enableAudit="false">
	<networkBridgeFilterFactory>
        <conditionalNetwordBridgeFilterFactory replayWhenNoConsumers="true" />
    </networkBridgeFilterFactory>	
</policyEntry>

容错链接

 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://192.168.156.44:61616,tcp://192.168.156.45:61616")";
相关文章
相关标签/搜索