RabbitMQ从在阿里云安装到在spring boot中如何使用这一篇就能够了

//某个服务的具体状况
	ps -ef | grep XXX 
//杀死进程
	kill -9 进程ID,第一个
//查看内存
	free或者top
//查看磁盘使用状况
	df -l
//寻找文件
	find -name  xxx
//查看端口使用状况	
	 netstat -an | grep 15672

RabbitMQ安装:

1.更新
sudo agt-get update
2. 安装erlang环境
yum install erlang
3. 安装rabbitMQ环境
sudo apt-get install rabbitmq-server
4. 启用RabbitMQWeb管理插件

不启用你的http://ip+15672访问不到java

rabbitmq-plugins enable rabbitmq_management
5. 经常使用命令
启动、中止、重启、状态rabbitMq命令:

启动:sudo rabbitmq-server start
关闭: sudo rabbitmq-server stop
重启: sudo rabbitmq-server restart
查看状态:sudo rabbitmqctl status
6. 查看启动状态
root@yan:/# service rabbitmq-server status
● rabbitmq-server.service - RabbitMQ Messaging Server
   Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
   Active: active (running) since Thu 2018-11-08 20:15:25 CST; 7min ago
  Process: 16987 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
  Process: 17074 ExecStartPost=/usr/lib/rabbitmq/bin/rabbitmq-server-wait (code=exited, status=0/SUCCESS)
 Main PID: 17073 (rabbitmq-server)
    Tasks: 70
   Memory: 39.6M
      CPU: 1.538s
   CGroup: /system.slice/rabbitmq-server.service
           ├─17073 /bin/sh /usr/sbin/rabbitmq-server
           ├─17084 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server
           ├─17171 /usr/lib/erlang/erts-7.3/bin/epmd -daemon
           ├─17211 /usr/lib/erlang/erts-7.3/bin/beam -W w -A 64 -P 1048576 -K true -B i -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/ra
           ├─17312 inet_gethost 4
           └─17313 inet_gethost 4

Nov 08 20:15:23 yan systemd[1]: Stopped RabbitMQ Messaging Server.
Nov 08 20:15:23 yan systemd[1]: Starting RabbitMQ Messaging Server...
Nov 08 20:15:23 yan rabbitmq[17074]: Waiting for rabbit@yan ...
Nov 08 20:15:23 yan rabbitmq[17074]: pid is 17084 ...
Nov 08 20:15:25 yan systemd[1]: Started RabbitMQ Messaging Server.
lines 1-22/22 (END)
7 添加用户

添加admin用户,密码设置为admin。web

sudo rabbitmqctl add_user  admin  admin

赋予权限spring

sudo rabbitmqctl set_user_tags admin administrator

赋予virtual host中全部资源的配置、写、读权限以便管理其中的资源sql

sudo rabbitmqctl  set_permissions -p / admin '.*' '.*' '.*'
8. 访问:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TGzxSFXV-1581158404903)(http://phi7gkzsm.bkt.clouddn.com/rabbitmq.png)]xcode



spring boot整合rabbitMQ

一、配置pom包,主要是添加spring-boot-starter-amqp的支持

<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置文件

配置rabbitmq的安装地址、端口以及帐户信息安全

  1. 必定不要把端口号设置成15672,由于那个已经被占用了,因此你新设置一个5672springboot

  2. 同时添加到阿里云服务器安全组配置服务器

spring.application.name=Spring-boot-rabbitmq
spring.rabbitmq.host=101.200.55.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=你的密码

重启,并打开http://ip+15672就能看到一个链接信息了

如图有一个admin链接上了app

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rLYFcBwf-1581158404904)(http://phi7gkzsm.bkt.clouddn.com/rabbitmq%E8%BF%9E%E6%8E%A5.png)]dom

3. 新建一个rabbitMQ的包在你的项目下

新建以下几个文件

//队列配置

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }
}
// 发送者
// rabbitTemplate是springboot 提供的默认实现

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "这是发送的信息 "+"---------------------" + new Date();
        System.out.println("=============================");
        System.out.println("Sender : " + context);
        System.out.println("=============================");
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("------------------1--------------------");
        System.out.println("Receiver : " + hello);
        System.out.println("-----------------1---------------------");
    }
}
// 新建一个controller测试

import cn.nxcoder.blog.rabbit.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class RabbitController {

    @Autowired
    private HelloSender helloSender;

	/* 一个生产者和一个消费者 */
    @PostMapping ("/rabbitHello")
    @ResponseBody
    public void hello(   ) {
        helloSender.send();
    }
}
//用postman本地测试
http://localhost:8088/rabbitHello
//测试结果


=============================
Sender : 这是发送的信息 ---------------------Fri Nov 09 15:09:01 CST 2018
=============================
15:09:01.188 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法======
15:09:01.188 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - Result:null
------------------1--------------------
Receiver  : 这是发送的信息 ---------------------Fri Nov 09 15:09:01 CST 2018
-----------------1---------------------

以上最简单的MQ完成了


B: 一个生产者,多个消费者的状况

B:1

在我门的生产者里面新家一个sendMsg方法,该方法须要传参

看下面的,hello不能变,或者说是必须和你的消费者引用的名称要同样,这里我就没有改

this.rabbitTemplate.convertAndSend("hello", sendMsg);
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "这是发送的信息 "+"---------------------" 
        + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

    public void sendMsg(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("hello", sendMsg);
    }

}
B:2

新加一个消费者,可是引用的仍是hello

@Component
@RabbitListener(queues = "hello")
public class HelloReceiverTwo {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2 : " + hello);
    }
}

再看原来的消费者1,他们的@RabbitListener(queues = “hello”)

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver : " + hello);
    }
}
B:3

在controller里面新加一个一对多测试

@Controller
public class RabbitController {

    @Autowired
    private HelloSender helloSender;

    /* 一个生产者和一个消费者 */
    @PostMapping ("/rabbitHello")
    @ResponseBody
    public void hello(   ) {
        helloSender.send();
    }


    /** * 单生产者-多消费者 */
    @PostMapping("/oneToMany")
    @ResponseBody
    public void oneToMany() {
        for(int i=0;i<10;i++){
            helloSender.sendMsg("这是第二个生产者发送的消息:==="+i+"====个");
        }
    }
}
B:4

postman测试

http://localhost:8088/oneToMany

B:5

测试结果:

Sender2 : 这是第二个生产者发送的消息:===0====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===1====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===2====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===3====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===4====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===5====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===6====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===7====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===8====个Fri Nov 09 15:33:33 CST 2018
Sender2 : 这是第二个生产者发送的消息:===9====个Fri Nov 09 15:33:33 CST 2018
15:33:33.224 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法======
15:33:33.224 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - Result:null
Receiver2  : 这是第二个生产者发送的消息:===1====个Fri Nov 09 15:33:33 CST 2018
Receiver  : 这是第二个生产者发送的消息:===0====个Fri Nov 09 15:33:33 CST 2018
Receiver2  : 这是第二个生产者发送的消息:===3====个Fri Nov 09 15:33:33 CST 2018
Receiver  : 这是第二个生产者发送的消息:===2====个Fri Nov 09 15:33:33 CST 2018
Receiver2  : 这是第二个生产者发送的消息:===5====个Fri Nov 09 15:33:33 CST 2018
Receiver  : 这是第二个生产者发送的消息:===4====个Fri Nov 09 15:33:33 CST 2018
Receiver2  : 这是第二个生产者发送的消息:===7====个Fri Nov 09 15:33:33 CST 2018
Receiver  : 这是第二个生产者发送的消息:===6====个Fri Nov 09 15:33:33 CST 2018
Receiver2  : 这是第二个生产者发送的消息:===9====个Fri Nov 09 15:33:33 CST 2018
Receiver  : 这是第二个生产者发送的消息:===8====个Fri Nov 09 15:33:33 CST 2018

以上一对多的就完成了


C:多个消费者和多个生产者
  1. 在刚才咱们实现了一个生产者和2个消费者

  2. 其实多对多更简单,只须要基于一对多把生产者copy一份就能够了,里面的东西不要变,换个名字

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSenderTwo {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "这是发送的信息 "+"
        ---------------------" + new Date();
        System.out.println("生产者2_Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

    public void sendMsg(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("生产者2_Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("hello", sendMsg);
    }

}

controller新加一个方法

import cn.nxcoder.blog.rabbit.HelloSender;
import cn.nxcoder.blog.rabbit.HelloSenderTwo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class RabbitController {

    @Autowired
    private HelloSender helloSender;
    @Autowired
    private HelloSenderTwo helloSenderTwo;

    /* 一个生产者和一个消费者 */
    @PostMapping ("/rabbitHello")
    @ResponseBody
    public void hello(   ) {
        helloSender.send();
    }


    /** * 单生产者-多消费者 */
    @PostMapping("/oneToMany")
    @ResponseBody
    public void oneToMany() {
        for(int i=0;i<10;i++){
            helloSender.sendMsg("这是第二个生产者发送的消息:==="+i+"====个");
        }
    }


    /** * 多生产者-多消费者 */
    @PostMapping("/manyToMany")
    @ResponseBody
    public void manyToMany() {
        for(int i=0;i<10;i++){
            helloSender.sendMsg("hellomsg:"+i+" ");
            helloSenderTwo.sendMsg("hellomsg:"+i+" ");
        }

    }
}
// postman测试
http://localhost:8088/manyToMany
//测试结果
Sender2 : hellomsg:0    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:0      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:1    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:1      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:2    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:2      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:3    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:3      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:4    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:4      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:5    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:5      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:6    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:6      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:7    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:7      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:8    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:8      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:9    Fri Nov 09 15:49:42 CST 2018
生产者2_Sender2 : hellomsg:9      Fri Nov 09 15:49:42 CST 2018
15:49:42.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法======
15:49:42.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - Result:null
Receiver  : hellomsg:0    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:0      Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:1      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:1    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:2      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:2    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:3      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:3    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:4      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:4    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:5      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:5    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:6      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:6    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:7      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:7    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:8      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:8    Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:9    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:9      Fri Nov 09 15:49:42 CST 2018

C: 用实体类发送消息队列

大部分的状况是数据是用对象封装的,因此咱们来测试一下实体类

C.1 新建一个实体类并实现序列化接口(必须)

springboot完美的支持对象的发送和接收,不须要格外的配置。

实体类(必须实现序列化接口):

public class RabbitTest implements Serializable {

    private String name;
    private String pass;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getPass() {
        return pass;
    }
    public void setPass(String pass) {
        this.pass = pass;
    }
}

C.2 在咱们的生产者里面新建一个方法

须要更改一下他的名字

this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest);

原来的生产者变成这样;

import cn.nxcoder.blog.entity.RabbitTest;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "这是发送的信息 "+"---------------------" + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

    public void sendMsg(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("hello", sendMsg);
    }

    public void sendEntity() {
        RabbitTest rabbitTest =new RabbitTest();
        rabbitTest.setName("琬琬");
        rabbitTest.setPass("123456987");
        this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest);
    }
}
C3 新建一个消费者指定他的名字为entityQueue

必须新建一个消费者,由于一个消费者只能有一个名字,刚才咱们新家的消费者名字都是hello

如今咱们给他定义一个新的名字entityQueue

import cn.nxcoder.blog.entity.RabbitTest;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "entityQueue")
public class EntityReceiver {
    @RabbitHandler
    public void process(RabbitTest rabbitTest) {
        System.out.println("rabbitTest receive : " + 
        rabbitTest.getName()+"/"+rabbitTest.getPass());
    }
}

注意:这样继续下去时会报错的

由于你一旦新定义一个名字,就必须往config文件中添加这个名字

如今咱们的配置类多了一个entityQueue

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }

    @Bean
    public Queue entityQueue() {
        return new Queue("entityQueue");
    }
}

C4 controller测试

/** * 实体类传输测试 */
    @PostMapping("/entityTest")
    @ResponseBody
    public void userTest() {
        helloSender.sendEntity();
    }

C5 postman测试

 
//测试结果
rabbitTest receive  : 琬琬/123456987


topic ExChange 示例

  1. topic 是RabbitMQ中最灵活的一种方式,能够根据binding_key自由的绑定不一样的队列

  2. 首先对topic规则配置,这里使用两个队列来测试

  3. (也就是在Application类中建立和绑定的topic.message1和topic.message2两个队列)

    其中topic.message的bindting_key为

    “topic.message1”,topic.message2的binding_key为“topic.#”;

1. D 如今咱们的config文件中添加以下记录

//===============如下是验证topic Exchange的队列==========
    @Bean
    public Queue queueMessage() {
        return new Queue("topic.message1");
    }

    @Bean
    public Queue queueMessages() {
        return new Queue("topic.message2");
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    /** * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配 * @param queueMessages * @param exchange * @return */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

2. D 在咱们的生产者里面添加以下方法:

 

3. D 添加两个消费者

@Component
@RabbitListener(queues = "topic.message1")
public class topicMessageReceiver {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver1 : " +msg);
    }

}
@Component
@RabbitListener(queues = "topic.message2")
public class topicMessageReceiverTwo {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver2 : " +msg);
    }

}

4. D controller测试

/**
     * topic exchange类型rabbitmq测试
     */
    @PostMapping("/topicTest")
    @ResponseBody
    public void topicTest() {
        helloSender.sendTopic();
    }
测试:
http://localhost:8088/topicTest
//结果

sender1 : I am topic.mesaage msg======
sender2 : I am topic.mesaages msg########
16:50:10.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法======
16:50:10.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - Result:null
topic.messageReceiver2  : I am topic.mesaage msg======
topic.messageReceiver2  : I am topic.mesaages msg########

六、fanout ExChange示例

Fanout 就是咱们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的全部队列都收到这个消息。

这里使用三个队列来测试(也就是在config类中建立和绑定的fanout.A、fanout.B、fanout.C)这三个队列都和config中建立的fanoutExchange转发器绑定。

6.1 添加config文件

//===============如下是验证Fanout Exchange的队列==========
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }
    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }
    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
    //===============以上是验证Fanout Exchange的队列==========

6.2 添加生产者方法

public void sendFanout() {
        String msgString="fanoutSender :hello i am hzb";
        System.out.println(msgString);
        this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString);
    }
6.3 添加三个消费者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }

}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }

}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }

}

6.4 controller方法

/**
     * fanout exchange类型rabbitmq测试
     */
    @PostMapping("/fanoutTest")
    @ResponseBody
    public void fanoutTest() {
        helloSender.sendFanout();
    }
6.5 测试
http://localhost:8088/fanoutTest
//结果
fanoutSender :hello i am hzb

17:35:14.911 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法======
17:35:14.911 [http-nio-8088-exec-1] INFO  
c.n.blog.handler.LogAspectHandler - Result:null

FanoutReceiverB  : fanoutSender :hello i am hzb
FanoutReceiverA  : fanoutSender :hello i am hzb
FanoutReceiverC  : fanoutSender :hello i am hzb

6.6 结果分析:

由以上结果可知:就算fanoutSender发送消息的时候,指定了routing_key为"abcd.ee",可是全部接收者都接受到了消息


七、带callback的消息发送

增长回调处理,这里再也不使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。该示例中没有新建队列和exchange,用的是第5节中的topic.messages队列和exchange转发器。消费者也是第5节中的topicMessagesReceiver

7.1 在application.properties中添加一些信息
添加;
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
//如今变为:
spring.application.name=Spring-boot-rabbitmq
spring.rabbitmq.host=101.200.55.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=你的密码
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
7.2 新增config文件
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;

public class RabbitConfig2 {

    @Value("${spring.rabbitmq.host}")
    private String addresses;

    @Value("${spring.rabbitmq.port}")
    private String port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean publisherConfirms;

    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses+":"+port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        /** 若是要进行消息回调,则这里必需要设置为true */
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }

    @Bean
    /** 由于要设置回调类,因此应是prototype类型,若是是singleton类型,则回调类为最后一次设置 */
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplatenew() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

}

7.3 新增生产者类:

import java.util.Date;
import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CallBackSender implements  RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplatenew;
    public void send() {

        rabbitTemplatenew.setConfirmCallback(this);
        String msg="callbackSender : i am callback sender";
        System.out.println(msg );
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("callbackSender UUID: " + correlationData.getId());
        this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // TODO Auto-generated method stub
        System.out.println("callbakck confirm: " + correlationData.getId());
    }
}

7.4 用原来topic的消费者类,这里再贴一次

@Component
@RabbitListener(queues = "topic.message1")
public class topicMessageReceiver {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver1  : " +msg);
    }

}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.message2")
public class topicMessageReceiverTwo {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver2  : " +msg);
    }

}

7.5 controller测试

@PostMapping("/callback")
    @ResponseBody
    public void callbak() {
        callBackSender.send();
    }
http://localhost:8088/callback
//测试结果
callbackSender : i am callback sender
callbackSender UUID: 48be7d7e-69f8-4d9c-b264-191402dec3de
 
callbakck confirm: 48be7d7e-69f8-4d9c-b264-191402dec3de
topic.messageReceiver2  : callbackSender : i am callback sender

7.6 结果分析

从上面能够看出callbackSender发出的UUID,收到了回应,又传回来了。


到此,rabbitMQ先分析到这里,接下来咱们会用它作些高级的功能

有问题的能够联系

neuq_hcg@163.com