RabbitMQ学习:RabbitMQ的六种工做模式之简单和工做模式(三)

上一篇:RabbitMQ学习:RabbitMQ的基本概念及RabbitMQ使用场景(二) --- http://www.javashuo.com/article/p-rcygqhuj-nw.htmljava


RabbitMQ的六种工做模式

首先开启虚拟机上的rabbitmq服务器apache

# 启动服务
systemctl start rabbitmq-server

1、简单模式

RabbitMQ是一个消息中间件,你能够想象它是一个邮局。当你把信件放到邮箱里时,可以确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。api

  • 发送消息的程序是生产者数组

  • 队列就表明一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者能够向同一个队列发送消息,多个消费者也能够从同一个队列接收消息.服务器

  • 消费者等待从队列接收消息maven

建立Rabbitmq-demo 的测试项

一、pom.xml

添加 slf4j 依赖, 和 rabbitmq amqp 依赖tcp

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.qile</groupId>
  <artifactId>rabbitmq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.4.3</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.8.0-alpha2</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.8.0-alpha2</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.0</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

2. 生产者发送消息--HelloWorld

package rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {

	public static void main(String[] args) throws IOException, TimeoutException {
		
		/**
		 * 1. 创建链接
		 * 2. 建立队列:helloworld
		 * 3. 向队列发送数据
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		/*
		 * 与rabbitmq服务器创建链接,
		 * rabbitmq服务器端使用的是nio,会复用tcp链接,
		 * 并开辟多个信道与客户端通讯
		 * 以减轻服务器端创建链接的开销
		 */
		Connection con = f.newConnection();
		//建立通道
		Channel c = con.createChannel();
		
		/*
		 * 声明队列,会在rabbitmq中建立一个队列
		 * 若是已经建立过该队列,就不能再使用其余参数来建立
		 * 
		 * 参数含义:
		 *   -queue:      队列名称
		 *   -durable:    队列持久化,true表示RabbitMQ重启后队列仍存在
		 *   -exclusive:  排他,true表示限制仅当前链接可用
		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
		 *   -arguments:  其余参数
		 */
    	c.queueDeclare("helloworld",false,false,false,null);
		
    	/*
		 * 发布消息
		 * 这里把消息向默认交换机发送.
		 * 默认交换机隐含与全部队列绑定,routing key即为队列名称
		 * 
		 * 参数含义:
		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
		 * 	-props: 其余参数,例如头信息
		 * 	-body: 消息内容byte[]数组
		 */
    	c.basicPublish("","helloworld",null, 
				("Hello World!" + System.currentTimeMillis()).getBytes());
		System.out.println("消息已发出");
		
		c.close();
		con.close();
	}
}

这时Run as 获得
ide

在rabbitmq客户端有:

以后编写消费者接受消息学习

三、消费者接收消息

package rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {

	public static void main(String[] args) throws IOException, TimeoutException {
		
		/**
		 * 1. 创建链接
		 * 2. 建立队列:helloworld
		 * 3. 向队列发送数据
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		Connection con = f.newConnection();  //建立链接
		Channel c = con.createChannel();     //建立通道
		
		//定义队列,服务器没有这个队列会建立,如有什么都不作
    	c.queueDeclare("helloworld",false,false,false,null);
		
    	//收到消息后用来处理消息的回调对象
    	DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				byte[] a = message.getBody();
				String msg = new String(a);
				System.out.println("收到" + msg);
			}
    	};
    	
    	//消费者取消时的回调对象
    	CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
				
			}
		};
		
    	//开始消费数据
    	c.basicConsume("helloworld",true,deliverCallback,cancelCallback);
	}

}

此时,在以前所积累的两条消息将会在你程序运转之时,显示出来,这是再去运转生产者,将会直接显示出发送的数据
测试

2、工做模式

工做队列(即任务队列)背后的主要思想是避免当即执行资源密集型任务,而且必须等待它完成。相反,咱们将任务安排在稍后完成。

咱们将任务封装为消息并将其发送到队列。后台运行的工做进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发

使用任务队列的一个优势是可以轻松地并行工做。若是咱们正在积压工做任务,咱们能够添加更多工做进程,这样就能够轻松扩展。

一、生产者发送消息

这里模拟耗时任务,发送的消息中,每一个点使工做进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

package rabbitmq.work;

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		/**
		 * 1. 创建链接
		 * 2. 建立队列:helloworld
		 * 3. 向队列发送数据
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();  //建立链接
		Channel ch = c.createChannel();  //建立通道
		//参数:queue,durable,exclusive,autoDelete,arguments
		ch.queueDeclare("helloworld", false,false,false,null);

		/**
		 * 模拟耗时消息
		 * 发送的字符串中,有一个点字符,消费者处理的时候就暂停1秒
		 */
		//循环输入消息发送到rabbitmq
		while (true) {
			System.out.print("输入消息: ");
			String msg = new Scanner(System.in).nextLine();
			//若是输入的是"exit"则结束生产者进程
			if ("exit".equals(msg)) {
				break;
			}
			//参数:exchage,routingKey,props,body
			ch.basicPublish("", "helloworld", null, msg.getBytes());
			System.out.println("消息已发送: "+msg);
		}

		c.close();
	}
}

二、消费者接收消息

package rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		
		/**
		 * 1. 创建链接
		 * 2. 建立队列:helloworld
		 * 3. 向队列发送数据
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();  //建立链接
		Channel ch = c.createChannel();  //建立通道
		
		ch.queueDeclare("helloworld",false,false,false,null);
		System.out.println("等待接收数据");
		
		//收到消息后用来处理消息的回调对象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);

				//遍历字符串中的字符,每一个点使进程暂停一秒
				for (int i = 0; i < msg.length(); i++) {
					if (msg.charAt(i)=='.') {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
						}
					}
				}
				System.out.println("处理结束");
			}
		};
		
		//消费者取消时的回调对象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume("helloworld", true, callback, cancel);
	}
}

3.运行测试

运行:

  • 一个生产者
  • 两个消费者

生产者发送多条消息
如:1,2,3,4,5,...两个消费者分别收到:

  • 消费者一:1,3,5,...
  • 消费者二:2,4,...

rabbtimq在全部消费者中轮询分布消息,把消息均匀发送给全部消费者。

4.消息确认

一个消费者接收消息后,在消息没有彻底处理完时就挂掉了,那么这时会发生什么呢?

就如今的代码来讲,rabbitmq把消息发送给消费者后,会当即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失

若是生产者发送如下消息:

    1…

    2

    3

    4

    5

    两个消费者分别收到:

        消费者一: 1…, 3, 5
        消费者二: 2, 4

    当消费者一收到全部消息后,要话费7秒时间来处理第一条消息,这期间若是关闭该消费者,那么1未处理完成,3,5则没有被处理

咱们并不想丢失任何消息, 若是一个消费者挂掉,咱们想把它的任务消息派发给其余消费者

为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到而且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你能够把这条消息删除了。

若是一个消费者没有返回消息确认就挂掉了(信道关闭,链接关闭或者TCP连接丢失),rabbitmq就会明白,这个消息没有被处理完成rabbitmq就会把这条消息从新放入队列,若是在这时有其余的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其余的消费者,这样就确保了没有消息会丢失。

这里不存在消息超时, rabbitmq只在消费者挂掉时从新分派消息, 即便消费者花很是久的时间来处理消息也能够

手动消息确认默认是开启的,前面的例子咱们经过autoAck=ture把它关闭了。咱们如今要把它设置为false,而后工做进程处理完意向任务时,发送一个消息确认(回执)。

package rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		
		/**
		 * 1. 创建链接
		 * 2. 建立队列:helloworld
		 * 3. 向队列发送数据
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();  //建立链接
		Channel ch = c.createChannel();  //建立通道
		
		//声明队列
		ch.queueDeclare("helloworld",false,false,false,null);
		System.out.println("等待接收数据");
		
		//收到消息后用来处理消息的回调对象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
				for (int i = 0; i < msg.length(); i++) {
					if (msg.charAt(i)=='.') {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
						}
					}
				}
				System.out.println("处理结束");
				//发送回执
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//消费者取消时的回调对象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//autoAck设置为false,则须要手动确认发送回执
		ch.basicConsume("helloworld", false, callback, cancel);
	}
}

使用以上代码,就算杀掉一个正在处理消息的工做进程也不会丢失任何消息,工做进程挂掉以后,没有确认的消息就会被自动从新传递。

忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 因为未确认的消息不会被释放, rabbitmq会吃掉愈来愈多的内存

可使用下面命令打印工做队列中未确认消息的数量

rabbitmqctl list_queues name messages_ready messages_unacknowledged

当处理消息时异常中断, 能够选择让消息重回队列从新发送. nack 操做能够是消息重回队列, 可使用 basicNack() 方法:

// requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
c.basicNack(tag, multiple, requeue)

5.合理地分发

rabbitmq会一次把多个消息分发给消费者, 这样可能形成有的消费者很是繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

咱们可使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

6.消息持久化

当rabbitmq关闭时, 咱们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

要求rabbitmq不丢失数据要作以下两点: 把队列和消息都设置为可持久化(durable)

队列设置为可持久化, 能够在定义队列时指定参数durable为true

//第二个参数是持久化参数durable
ch.queueDeclare("helloworld", true, false, false, null);

因为以前咱们已经定义过队列"hello"是不可持久化的, 对已存在的队列, rabbitmq不容许对其定义不一样的参数, 不然会出错, 因此这里咱们定义一个不一样名字的队列"task_queue"

//定义一个新的队列,名为 task_queue
//第二个参数是持久化参数 durable
ch.queueDeclare("task_queue", true, false, false, null);

生产者和消费者代码都要修改

这样即便rabbitmq从新启动, 队列也不会丢失. 如今咱们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

//第三个参数设置消息持久化
ch.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            msg.getBytes());

下面是"工做模式"最终完成的生产者和消费者代码

7.生产者代码

package rabbitmq.work;

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Test3 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//第二个参数设置队列持久化
		ch.queueDeclare("task_queue", true,false,false,null);

		while (true) {
			System.out.print("输入消息: ");
			String msg = new Scanner(System.in).nextLine();
			if ("exit".equals(msg)) {
				break;
			}
			
			//第三个参数设置消息持久化
			ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
			System.out.println("消息已发送: "+msg);
		}

		c.close();
	}
}

8.消费者代码

package rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test4 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//定义一个新的队列,名为 task_queue
		//设定第二个参数是持久化参数 durable为true
		ch.queueDeclare("task_queue",true,false,false,null);
		
		System.out.println("等待接收数据");
		
		ch.basicQos(1); //一次只接收一条消息
		
		//收到消息后用来处理消息的回调对象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
				for (int i = 0; i < msg.length(); i++) {
					if (msg.charAt(i)=='.') {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
						}
					}
				}
				System.out.println("处理结束");
				//发送回执
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//消费者取消时的回调对象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//autoAck设置为false,则须要手动确认发送回执
		ch.basicConsume("task_queue", false, callback, cancel);
	}
}

9.总结