virtual hosts至关于mysql的dbjava
通常以/开头mysql
须要对用户进行受权sql
P:消息生产者编程
红色:队列服务器
C:消息消费者架构
包含三个对象:生产者、队列、消费者异步
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { /** * 获取MQ的链接 * @return */ public static Connection getConnection() throws IOException, TimeoutException { //定义一个链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //AMQP的端口 factory.setPort(5672); //vhost factory.setVirtualHost("/vhost_mmr"); factory.setUsername("rabbit"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection; } }
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //从链接中获取一个通道 Channel channel = connection.createChannel(); //建立队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello world!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); channel.close(); connection.close(); } }
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receive { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("msg receive : " + msg); } }; channel.basicConsume(QUEUE_NAME, consumer); } }
耦合性高,生产者一一对应消费者,若是须要多个消费者消费队列中的消息,此时简单队列就无能为力了。ide
队列名变动,源码须要同时变动搜索引擎
一个生产者将消息放入队列中,能够有多个消费者进行消费spa
为何会出现工做队列?
Simple队列:是一一对应的,实际开发中,生产者改善消息是绝不费力的,而消费者通常须要跟业务相结合,消费者接收到消息以后就须要处理,可能须要花费时间,此时队列就会积压不少消息。
生产消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //从链接中获取一个通道 Channel channel = connection.createChannel(); //建立队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消费者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消费者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
现象:
消费者1和消费者2处理的消息是同样多的,这种分发方式称为轮询分发(round-robin),无论谁忙或者谁闲,都不会多给或者少给。任务均分。
保证一次发送给消费者的消息不超过一条
/** * 每一个消费者发送确认消息以前,消息队列不发送下一个消息给消费者,消费者一次只处理一个消息 * * 限制发送给同一个消费者不得超过一条消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount);
使用公平分发,必须关闭自动应答ack,改成手动
channel.basicAck(envelope.getDeliveryTag(), false); boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer);
生产消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //从链接中获取一个通道 Channel channel = connection.createChannel(); //建立队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 每一个消费者发送确认消息以前,消息队列不发送下一个消息给消费者,消费者一次只处理一个消息 * * 限制发送给同一个消费者不得超过一条消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消费消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer);
ack = true时为自动确认模式,一旦rabbitMQ将消息分发给消费者,该消息就会在内存中删除;这种状况下,若是杀死正在处理消息的消费者,会丢失正在处理的消息;
ack = false时为手动回执(消息应答)模式,若是有一个消费者挂掉,就会将会给其余消费者,rabbitMQ支持消息应答,消费者发送一个消息应答,告诉rabbitMQ这个消息已经被处理,而后rabbitMQ就删除内存中的消息;
消息应答默认打开,即为false;
因为消息在内存中存储,若是rabbitMQ挂掉,消息仍然会丢失。
boolean durable = false; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
durable控制的属性就是消息的持久化。
已经声明好的队列,若是durable已经为false了,就没法修改成true,rabbitMQ不容许从新定义(不一样参数)一个已存在的队列
解读:
一、一个生产者,多个消费者;
二、每一个消费者都有本身的队列;
三、生产者没有直接把消息发送到队列,而是发送至交换机(eXchange)
四、每一个队列都要绑定到交换机上
五、生产者发送的消息,通过交换机,到达队列,就能实现一个消息被多个消费者消费
生产消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "hello ps"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("Send " + msg); channel.close(); connection.close(); } }
消息哪去了?丢失了!由于交换机没有存储能力,在rabbitMQ中,只有队列有存储能力。此时并无完成队列绑定到交换机,因此数据丢失了。
消费消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_ps_fanout_email"; private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
不一样的队列作不一样的事情。
一方面接收生产者的消息,另外一方面向队列推送消息
rabbitMQ提供了四种Exchange:fanout,direct,topic,header header模式在实际使用中较少。
将路由键和某模式进行匹配
任何发送到Topic Exchange的消息都会被转发到全部关心RouteKey中指定话题的Queue上
生产者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String msg = "hello direct"; //指定路由键 String routingKey = "warning"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消费者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //绑定队列与交换机时,指定路由键 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消费者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //绑定队列与交换机时,指定路由键 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
# 匹配一个或多个
* 匹配一个
生产者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明exchange,指定模式为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String msg = "商品...."; String routingKey = "goods.delete"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消费者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消费者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自动应答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
其中,消费者1绑定路由键为goods.#
,消费者2绑定路由键为goods.add
。当生产者发送的消息路由键为goods.add
时,两个消费者都会收到消息并处理;当生产者发送的消息路由键为goods.update
时,只有消费者1能够接收到消息。
在rabbitMQ中,能够经过持久化数据解决rabbitMQ服务器异常的数据丢失问题。
问题:生产者将消息发送出去以后,消息到底有没有到达rabbitMQ服务器;默认状况是不知道消息已到达的
两种方式:
用于将当前channel设置成transaction模式
用于提交事务
回滚事务
生产者发送消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello tx msg!"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.txCommit(); } catch (IOException e) { channel.txRollback(); System.out.println("发生异常,事务已回滚"); } } }
事务机制会下降rabbitMQ的吞吐量。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都将会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会在将消息写入磁盘以后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理;
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息。
编程模式:
一、普通,发一条
二、批量,发一批
三、异步confirm模式,提供一个回调方法