序言java
上篇写了rabbitmq的简单单机版安装,这篇写下他与springboot使用,简单写下demo传到码云了,须要能够去下。git
项目结构web
pom.xml 注释掉amqp-client打开spring-boot-starter-amqp能够支持springboot的注解模式下边会写到spring
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>top.heliming.rabbitmq</groupId> <artifactId>rabbitmq-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <!--amqp依赖--> <!--<dependency>--> <!--<groupId>org.springframework.boot</groupId>--> <!--<artifactId>spring-boot-starter-amqp</artifactId>--> <!--</dependency>--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.2.4</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <finalName>rabbitmqsend</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
不用springboot监听注解的例子topic包下apache
发送者:springboot
package top.heliming.rabbitmq.rabbitmqdemo.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil; /** * description: //订阅模式:生产者指定路由key SEND RECV2 * * @author: heliming * @date:2019/06/07 上午 10:24 */ public class Send { private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { send(); } public static void send() throws Exception{ // 获取到链接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "新增商品 : id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服务:] Sent '" + message + "'"); String messageupdate = "修改商品 : id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品 channel.basicPublish(EXCHANGE_NAME, "item.update", null, messageupdate.getBytes()); System.out.println(" [商品服务:] Sent '" + messageupdate + "'"); String messagedelete = "删除商品 : id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品 channel.basicPublish(EXCHANGE_NAME, "item.delete", null, messagedelete.getBytes()); System.out.println(" [商品服务:] Sent '" + messagedelete + "'"); channel.close(); connection.close(); } }
接受者:maven
package top.heliming.rabbitmq.rabbitmqdemo.topic; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil; /** * description: //指定路由key * * @author: heliming * @date:2019/06/07 上午 10:29 */ public class Recv2 { private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 获取到链接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定须要订阅的routing key。订阅 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者2] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
通用类util包 ConnectionUtil类ide
package top.heliming.rabbitmq.rabbitmqdemo.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * description: //TODO * * @author: heliming * @date:2019/06/06 下午 10:18 */ public class ConnectionUtil { /** * 创建与RabbitMQ的链接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定义链接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("10.0.1.224"); //端口 factory.setPort(5672); //设置帐号信息,用户名、密码、vhost factory.setVirtualHost("/leyou"); factory.setUsername("leyou"); factory.setPassword("leyou"); // 经过工程获取链接 Connection connection = factory.newConnection(); return connection; } }
启动发送者main函数 再启动接受者main函数这时建立了一个队列通道,而后再执行发送者函数 就能获取数据函数
我建立的用户spring-boot
spring注解方式 spring包下
发送者
package top.heliming.rabbitmq.rabbitmqdemo.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil; /** * description: //订阅模式:生产者指定路由key * * @author: heliming * @date:2019/06/07 上午 10:24 */ public class Send3 { private final static String EXCHANGE_NAME = "spring.test.exchange"; public static void main(String[] argv) throws Exception { send3(); } public static void send3() throws Exception{ // 获取到链接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic // channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //交换机持久化 channel.exchangeDeclare(EXCHANGE_NAME, "topic",true); // 消息内容 String message = "新增商品 : id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品 //消息持久化 channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [商品服务:] Sent '" + message + "'"); String messageupdate = "修改商品 : id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品 channel.basicPublish(EXCHANGE_NAME, "item.update", MessageProperties.PERSISTENT_TEXT_PLAIN, messageupdate.getBytes()); //消息持久化 System.out.println(" [商品服务:] Sent '" + messageupdate + "'"); String messagedelete = "删除商品 : id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品 //消息持久化 channel.basicPublish(EXCHANGE_NAME, "item.delete", MessageProperties.PERSISTENT_TEXT_PLAIN, messagedelete.getBytes()); System.out.println(" [商品服务:] Sent '" + messagedelete + "'"); channel.close(); connection.close(); } }
接受者
package top.heliming.rabbitmq.rabbitmqdemo.spring; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * description: //TODO * * @author: heliming * @date:2019/06/07 下午 12:26 */ @Component public class Listener { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"#.#"})) public void listen(String msg){ System.out.println("接收到消息:" + msg); } }
访问:启动项目,执行发送者函数能够看到发送数据成功,若是不行,那就先执行发送者函数建立完队列,而后启动项目,而后执行发送者函数发送数据,最后就接受到数据了。
总结:
若是不建立rabbitmq用户直接用guest会不让其余机器访问的,具体能够修改角色限制权限或者新添加用户 用spring监听注解方式pom文件须要修改上边说过了。若是rabbitmq安装erlang出现问题大多数是root权限问题。 erlang和rabbitmq版本不一致,上篇中有查看版本兼容的连接。