慕课网_《Java消息中间件》学习总结

时间:2017年07月22日星期六
说明:本文部份内容均来自慕课网。@慕课网:http://www.imooc.com
教学源码:无
学习源码:https://github.com/zccodere/s...java

第一章:课程介绍

1-1 课程安排

Java消息中间件(入门篇)git

为何须要使用消息中间件
消息中间件概述
JMS规范
JMS代码演练

Java消息中间件(拓展篇)github

ActiveMQ集群配置
消息中间件在大型系统中的最佳实践
使用其它消息中间件

1-2 使用消息中间件缘由

经过服务调用让其它系统感知事件发生web

系统之间高耦合
程序执行效率低

clipboard.png

经过消息中间件解耦服务调用spring

clipboard.png

生活中的案例apache

微信公众号
老师在黑板上写字
电视机
等等

消息中间件带来的好处vim

解耦:系统解耦
异步:异步执行
横向扩展 
安全可靠
顺序保证

横向扩展解释安全

当登陆系统,须要不少用户登陆。这些消息所有须要告知积分系统,去增长积分,而增长积分这个处理过程可能比较麻烦、比较耗时。这个时候,能够启动多台积分系统,来同时消费这个消息中间件里面的登陆消息,达到横向扩展的做用。

第二章:概述

2-1 消息中间件概述

什么是中间件服务器

非底层操做系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件

什么是消息中间件微信

关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统

示意图

clipboard.png

什么是JMS

Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

什么是AMQP

AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准高级消息队列协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。

JMS和AMQP对比

clipboard.png

ActiveMQ

ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个彻底支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今J2EE应用中间件仍然扮演者特殊的地位。

ActiveMQ特性

多种语言和协议编写客户端。
    语言:Java、C、C++、C#、Ruby、Perl、Python、PHP
    应用协议:OpenWire、Stomp、REST、WS Notification、XMPP、AMQP
彻底支持JMS1.1和J2EE1.4规范(持久化、XA消息、事务)
虚拟主题、组合目的、镜像队列

RabbitMQ

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ特性

支持多种客户端
    如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
AMQP的完整实现(vhost、Exchange、Binding、Routing Key等)
事务支持/发布确认
消息持久化

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式、分区的、可高的分布式日志存储服务。它经过一种独一无二的设计提供了一个消息系统的功能。

Kafka特性

经过O(1)的磁盘数据结构提供消息的持久化,
    这种结构对于即便数以TB的消息存储也可以保持长时间的稳定性能
高吞吐量:即便是很是普通的硬件Kafka也能够支持每秒数百万的消息
Partition、Consumer Group

综合评价

clipboard.png

第三章:JMS规范

3-1 JMS规范

Java消息服务定义

Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。

JMS相关概念

提供者:实现JMS规范的消息中间件服务器
客户端:发送或接收消息的应用程序
生产者/发布者:建立并发送消息的客户端
消费者/订阅者:接收并处理消息的客户端
消息:应用程序之间传递的数据内容
消息模式:在客户端之间传递消息的模式,JMS中定义了主题和队列两种模式

JMS消息模式:队列模式

客户端包括生产者和消费者
队列中的消息只能被一个消费者消费
消费者能够随时消费队列中的消息

队列模型示意图

clipboard.png

JMS消息模式:主题模型

客户端包括发布者和订阅者
主题中的消息被全部订阅者消费
消费者不能消费订阅以前就发送到主题中的消息

主题模型示意图

clipboard.png

JMS编码接口

ConnectionFactory:用于建立链接到消息中间件的链接工厂
Connection:表明了应用程序和消息服务器之间的通讯链路
Destination:指消息发布和接收的地点,包括队列和主题
Session:表示一个单线程的上下文,用于发送和接收消息
MessageConsumer:由会话建立,用户接收发送到目标的消息
MessageProducer:由会话建立,用于发送消息到目标
Message:是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体

JMS编码接口之间的关系

clipboard.png

第四章:使用ActiveMQ

4-1 Windows安装ActiveMQ

在Windows安装ActiveMQ

下载安装包
直接启动
使用服务启动

安装验证

访问地址:http://127.0.0.1:8161/
默认用户:admin
默认密码:admin

4-2 Linux安装ActiveMQ

在Linux安装ActiveMQ

下载并解压安装包
启动

启动验证

进入到bin目录,使用命令./activemq start启动服务
使用命令ps -ef |grep activemq查看进程是否存在
使用命令./activemq stop关闭服务

安装验证

访问地址:http://Linux主机IP:8161/
默认用户:admin
默认密码:admin

4-3 队列模式的消息演示

使用JMS接口规范链接ActiveMQ

建立生产者
建立消费者
建立发布者
建立订阅者

回顾JMS编码接口之间的关系

clipboard.png

代码演示

1.编写AppProducer类

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生产者-队列模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppProducer {
    /** 指定ActiveMQ服务的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定队列的名称 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.启动链接
        connection.start();
        
        // 4.建立会话(第一个参数:是否在事务中处理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 建立一个目标
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一个生产者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.发布消息
            producer.send(textMessage);
            
            System.out.println("消息发送:" + textMessage.getText());
        }
        
        // 9.关闭链接
        connection.close();
    }
    
}

2.编写AppConsumer类

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消费者-队列模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumer {
    /** 指定ActiveMQ服务的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定队列的名称 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.启动链接
        connection.start();
        
        // 4.建立会话(第一个参数:是否在事务中处理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.建立一个目标
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一个消费者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.建立一个监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息异常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.关闭链接
        //connection.close();
    }
    
}

4-4 主题模式的消息演示

代码演示

1.编写AppProducer类

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生产者-主题模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppProducer {
    /** 指定ActiveMQ服务的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主题的名称 */
    private static final String TOPIC_NAME = "topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.启动链接
        connection.start();
        
        // 4.建立会话(第一个参数:是否在事务中处理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 建立一个目标
        Destination destination = session.createTopic(TOPIC_NAME);
        
        // 6.建立一个生产者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.发布消息
            producer.send(textMessage);
            
            System.out.println("消息发送:" + textMessage.getText());
        }
        
        // 9.关闭链接
        connection.close();
    }
    
}

2.编写AppConsumer类

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消费者-主题模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumer {
    /** 指定ActiveMQ服务的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主题的名称 */
    private static final String TOPIC_NAME = "topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.启动链接
        connection.start();
        
        // 4.建立会话(第一个参数:是否在事务中处理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.建立一个目标
        Destination destination = session.createTopic(TOPIC_NAME);
        
        // 6.建立一个消费者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.建立一个监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息异常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.关闭链接
        //connection.close();
    }
    
}

4-5 Spring jms理论

使用Spring集成JMS链接ActiveMQ

ConnectionFactory:用于管理链接的链接工厂
JmsTemplate:用于发送和接收消息的模版类
MessageListener:消息监听器

ConnectionFactory

一个Spring为咱们提供的链接池
JmsTemplate每次发消息都会从新建立链接,会话和productor
Spring中提供了SingleConnectFactory和CachingConnectionFactory

JmsTemplate

是Spring提供的,只需向Spring容器内注册这个类就可使用JmsTemplate方便的操做jms
JmsTemplate类是线程安全的,能够在整个应用范围使用

MessageListener

实现一个onMessage方法,该方法只接收一个Message参数

4-6 Spring jms演示

代码演示

1.建立名为jmsspring的maven项目POM文件以下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.myimooc</groupId>
    <artifactId>jmsspring</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>jmsspring</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.1.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2.完成后的目录结构以下

clipboard.png

源码请到个人github地址查看

3.测试

使用Postman向ProducerController发起请求,将消息发送出去

clipboard.png

对应的ConsumerTopicMessageListener 和 ConsumerMessageListener接收到消息

clipboard.png

第五章:大型系统最佳实践

5-1 ActiceMQ集群

为何要对消息中间件集群

实现高可用,以排除单点故障引发的服务中断
实现负载均衡,以提高效率为更多客户提供服务

集群方式

客户端集群:让多个消费者消费同一个队列
Broker cluster:多个Broker之间同步消息
Master Slave:实现高可用

ActiveMQ失效转移(failover)-客户端配置

容许当其中一台消息服务器宕机时,客户端在传输层上从新链接到其它消息服务器
语法:failover:(uri1,…,uriN)?transportOptions

transportOptions参数说明

randomize默认为true,表示在URI列表中选择URI链接时是否采用随机策略
initialReconnectDelay默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
maxReconnectDelay默认为30000,单位毫秒,最长重连的时间间隔

Broker cluster集群配置-原理

clipboard.png

NetworkConnector(网络链接器)

网络链接器主要用于配置ActiveMQ服务器与服务器之间的网络通信方式,用于服务器透传消息
网络链接器分为静态链接器和动态链接器

静态链接器

clipboard.png

动态链接器

clipboard.png

5-2 ActiveMQ集群理论

ActiveMQ Master Slace集群方案

Share nothing storage master/slave(已过期,5.8+后移除)
Shared storage master/slave 共享存储
Replicated LevelDB Store基于负责的LevelDB Store

共享存储集群的原理

clipboard.png

clipboard.png

基于复制的LevelDB Store的原理

clipboard.png

两种集群方式对比

clipboard.png

三台服务器的完美集群方案

clipboard.png

5-3 ActiveMQ集群实践

ActiveMQ集群配置方案

clipboard.png

配置过程

1.节点准备

mkdir activemq建立目录
cp -rf apache-activemq-5.15.0 activemq/activemq-a
cp -rf apache-activemq-5.15.0 activemq/activemq-b
cp -rf apache-activemq-5.15.0 activemq/activemq-c
cd activemq
mkdir kahadb

2.配置a节点

cd activemq-a/
cd conf/
vim activemq.xml
    <networkConnectors>
              <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
    </networkConnectors>
vim jetty.xml:配置管理端口号,a节点使用默认端口,无须配置

3.配置b节点

vim activemq.xml
配置网络链接器
        <networkConnectors>
      <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" />
    </networkConnectors>
配置持久化存储路径
    <persistenceAdapter>
        <kahaDB directory="/studio/activemq/kahadb"/>
    </persistenceAdapter>
配置服务端口
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
vim jetty.xml
配置管理端口号
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
         <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>

4.配置c节点

vim activemq.xml
配置网络链接器
        <networkConnectors>
      <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" />
    </networkConnectors>
配置持久化存储路径
    <persistenceAdapter>
        <kahaDB directory="/studio/activemq/kahadb"/>
    </persistenceAdapter>
配置服务端口
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
vim jetty.xml
配置管理端口号
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
         <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>

5.启动服务
回到activemq目录,分别启动a,b,c三个节点

./activemq-a/bin/activemq start
./activemq-b/bin/activemq start
./activemq-c /bin/activemq start

检查是否都启动成功

ps -ef |grep activemq

检查是否对外提供服务,即端口是否被监听(占用)

netstat -anp |grep 61616
netstat -anp |grep 61617
netstat -anp |grep 61618

检查发现61618即c节点没有提供服务,可是c节点的进程是启动成功了的。由于b节点和c点击是master/slave配置,如今b节点获取到了共享文件夹的全部权,因此c节点正在等待得到资源,而且提供服务。即c节点在未得到资源以前,是不提供服务的。

测试,把b节点杀掉,看c节点能不能提供61618的服务

./activemq-b/bin/activemq stop
netstat -anp |grep 61618
./activemq-b/bin/activemq start
netstat -anp |grep 61617

检查发现,从新启动b节点后,b节点61617端口并无提供服务,是由于如今b节点成为了slave节点,而c节点成为了master节点。因此,如今b节点启动了,可是它并不对外提供服务。只有当c节点出现问题后,b节点才对外提供服务。

6.经过代码测试集群配置是否生效

生产者

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生产者-队列模式-集群配置测试
 * @author ZhangCheng on 2017-07-25
 *
 */
public class AppProducerTest {
    /** failover 为状态转移的存在部分
     * 因a节点只做为消费者使用,因此这里不配置61616节点了。
     * */
    private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定队列的名称 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.启动链接
        connection.start();
        
        // 4.建立会话(第一个参数:是否在事务中处理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 建立一个目标
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一个生产者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.发布消息
            producer.send(textMessage);
            
            System.out.println("消息发送:" + textMessage.getText());
        }
        
        // 9.关闭链接
        connection.close();
    }
    
}

消费者

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消费者-队列模式-集群配置测试
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumerTest {
    /** failover 为状态转移的存在部分
     * */
    private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定队列的名称 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.启动链接
        connection.start();
        
        // 4.建立会话(第一个参数:是否在事务中处理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.建立一个目标
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一个消费者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.建立一个监听器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息异常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.关闭链接
        //connection.close();
    }
    
}

运行生产者,而后到管理界面查看消息发送到了那里

http://127.0.0.1:8161
http://127.0.0.1:8162
http://127.0.0.1:8163

查看发现,8162没法访问,是由于b节点是slave节点,不提供服务,消息都发送到了c节点

把8163即c节点宕掉后,运行消费者,查看消息是否可以使用

./activemq-c/bin/activemq stop

5-4 企业级系统中的最佳实践

实际业务场景分析

clipboard.png

实际业务场景特色

子业务系统都有集群的可能性
同一个消息会广播给关注该类消息的全部子业务系统
同一类消息在集群中被负载消费
业务的发生和消息的发布最终一致性

须要解决的问题

不一样业务系统分别处理同一个消息,同一业务系统负载处理同类消息
解决消息发送时的一致性问题
解决消息处理的幂等性问题
基于消息机制创建事件总线

集群系统处理消息方案-使用JMS级联的解决方案

clipboard.png

集群系统处理消息方案-使用ActiveMQ的虚拟主题解决方案

发布者:将消息发布到一个主题中,主题名以VirtualTopic开头,如VirtualTopic.TEST
消费者:从队列中获取消息,在队列名中表名本身身份,如Consumer.A.VirtualTopic.TEST

解决消息发送时的一致性问题-使用JMS中XA系列接口保证强一致性

引入分布式事务
要求业务操做必须支持XA协议

解决消息发送时的一致性问题-使用消息表的本地事务解决方案

clipboard.png

解决消息发送时的一致性问题-使用内存日志的解决方案

clipboard.png

解决消息处理的幂等性问题

所谓幂等性问题,是指屡次执行所产生的影响(结果)与一次执行所产生的影响(结果)同样。好比:支付成功后,支付宝会发起屡次通知给业务系统,要求业务系统可以处理这些重复的消息,可是又不重复处理订单。若是在消息处理系统中保证幂等性,会增长系统复杂度,咱们能够统一处理幂等性后,再将消息发送给消息处理系统。

解决消息处理的幂等性问题-使用消息表的本地事务解决方案

clipboard.png

解决消息处理的幂等性问题-使用内存日志的解决方案

clipboard.png

基于消息机制的事件总线-什么是事件驱动架构

事件驱动架构(Event Driven Architecture,EDA)定义了一个设计和实现一个应用系统的方法学,在这个系统里事件可传输于松散耦合的组件和服务之间。特色:有事我叫你,没事别烦我

事件驱动架构模型

clipboard.png

该教师正在开发该事件总线的框架,github地址https://github.com/jovezhao/nest

第六章:使用其它消息中间件

6-1 使用其它消息中间件

分析须要作的事

解决各业务系统集群处理同一条消息
实现本身的消息提供者

经常使用消息中间件

ActiveMQ
RabbitMQ
Kafka

集成RabbitMQ

RabbitMQ:使用交换器绑定到队列

示意图

clipboard.png

RabbitMQ消息提供者源码解析

建立ConnectionFactory
建立Connection
建立Channel
定义Exchange
定义Queue而且绑定队列

集成Kafka

Kafka使用group.id分组消费者

配置消息者参数group.id相对时对消息进行负载处理
配置服务器partitions参数,控制同一个group.id下的consumer数量小于partitions
Kafka只保证同一个partition下的消息是有序的

第七章:课程总结

7-1 课程总结

clipboard.png

相关文章
相关标签/搜索