rabbitmq 使用

序言java

上篇写了rabbitmq的简单单机版安装,这篇写下他与springboot使用,简单写下demo传到码云了,须要能够去下。git

  1. 项目结构web

  2. pom.xml 注释掉amqp-client打开spring-boot-starter-amqp能够支持springboot的注解模式下边会写到spring

    <?xml version="1.0" encoding="UTF-8"?>
    	<project xmlns="http://maven.apache.org/POM/4.0.0"
    	  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	  <modelVersion>4.0.0</modelVersion>
    	  <parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.1.5.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	  </parent>
    	  <groupId>top.heliming.rabbitmq</groupId>
    	  <artifactId>rabbitmq-demo</artifactId>
    	  <version>0.0.1-SNAPSHOT</version>
    	  <name>rabbitmq-demo</name>
    	  <description>Demo project for Spring Boot</description>
    
    	  <properties>
    		<java.version>1.8</java.version>
    	  </properties>
    
    	  <dependencies>
    		<dependency>
    		  <groupId>org.apache.commons</groupId>
    		  <artifactId>commons-lang3</artifactId>
    		  <version>3.3.2</version>
    		</dependency>
    		<!--amqp依赖-->
    		<!--<dependency>-->
    		  <!--<groupId>org.springframework.boot</groupId>-->
    		  <!--<artifactId>spring-boot-starter-amqp</artifactId>-->
    		<!--</dependency>-->
    		<dependency>
    		  <groupId>com.rabbitmq</groupId>
    		  <artifactId>amqp-client</artifactId>
    		  <version>3.2.4</version>
    		</dependency>
    		<dependency>
    		  <groupId>org.springframework.boot</groupId>
    		  <artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<!--lombok-->
    		<dependency>
    		  <groupId>org.projectlombok</groupId>
    		  <artifactId>lombok</artifactId>
    		  <version>1.16.20</version>
    		  <scope>provided</scope>
    		</dependency>
    		<dependency>
    		  <groupId>org.springframework.boot</groupId>
    		  <artifactId>spring-boot-starter-test</artifactId>
    		  <scope>test</scope>
    		</dependency>
    	  </dependencies>
    
    	  <build>
    		<finalName>rabbitmqsend</finalName>
    		<plugins>
    		  <plugin>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-maven-plugin</artifactId>
    		  </plugin>
    		</plugins>
    	  </build>
    
    	</project>
  3. 不用springboot监听注解的例子topic包下apache

    发送者:springboot

    package top.heliming.rabbitmq.rabbitmqdemo.topic;
    	import com.rabbitmq.client.Channel;
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.MessageProperties;
    	import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil;
    
    	/**
    	 * description: //订阅模式:生产者指定路由key SEND RECV2
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 上午 10:24
    	 */
    	public class Send {
    
    	  private final static String EXCHANGE_NAME = "topic_exchange_test";
    
    	  public static void main(String[] argv) throws Exception {
    		send();
    	  }
    
    	  public static void send() throws Exception{
    		// 获取到链接
    		Connection connection = ConnectionUtil.getConnection();
    		// 获取通道
    		Channel channel = connection.createChannel();
    		// 声明exchange,指定类型为topic
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
    		// 消息内容
    		String message = "新增商品 : id = 1001";
    		// 发送消息,而且指定routing key 为:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
    		System.out.println(" [商品服务:] Sent '" + message + "'");
    		String messageupdate = "修改商品 : id = 1001";
    		// 发送消息,而且指定routing key 为:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.update", null, messageupdate.getBytes());
    		System.out.println(" [商品服务:] Sent '" + messageupdate + "'");
    		String messagedelete = "删除商品 : id = 1001";
    		// 发送消息,而且指定routing key 为:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.delete", null, messagedelete.getBytes());
    		System.out.println(" [商品服务:] Sent '" + messagedelete + "'");
    		channel.close();
    		connection.close();
    	  }
    	}

    接受者:maven

    package top.heliming.rabbitmq.rabbitmqdemo.topic;
    	import com.rabbitmq.client.AMQP.BasicProperties;
    	import com.rabbitmq.client.Channel;
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.DefaultConsumer;
    	import com.rabbitmq.client.Envelope;
    	import java.io.IOException;
    	import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil;
    
    	/**
    	 * description: //指定路由key
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 上午 10:29
    	 */
    	public class Recv2 {
    	  private final static String QUEUE_NAME = "topic_exchange_queue_2";
    	  private final static String EXCHANGE_NAME = "topic_exchange_test";
    
    	  public static void main(String[] argv) throws Exception {
    		// 获取到链接
    		Connection connection = ConnectionUtil.getConnection();
    		// 获取通道
    		Channel channel = connection.createChannel();
    		// 声明队列
    		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    		// 绑定队列到交换机,同时指定须要订阅的routing key。订阅 insert、update、delete
    		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
    
    		// 定义队列的消费者
    		DefaultConsumer consumer = new DefaultConsumer(channel) {
    		  // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用
    		  @Override
    		  public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
    			  byte[] body) throws IOException {
    			// body 即消息体
    			String msg = new String(body);
    			System.out.println(" [消费者2] received : " + msg + "!");
    		  }
    		};
    		// 监听队列,自动ACK
    		channel.basicConsume(QUEUE_NAME, true, consumer);
    	  }
    	}

    通用类util包 ConnectionUtil类ide

    package top.heliming.rabbitmq.rabbitmqdemo.util;
    
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.ConnectionFactory;
    
    	/**
    	 * description: //TODO
    	 *
    	 * @author: heliming
    	 * @date:2019/06/06 下午 10:18
    	 */
    	public class ConnectionUtil {
    	  /**
    	   * 创建与RabbitMQ的链接
    	   * @return
    	   * @throws Exception
    	   */
    	  public static Connection getConnection() throws Exception {
    		//定义链接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		//设置服务地址
    		factory.setHost("10.0.1.224");
    		//端口
    		factory.setPort(5672);
    		//设置帐号信息,用户名、密码、vhost
    		factory.setVirtualHost("/leyou");
    		factory.setUsername("leyou");
    		factory.setPassword("leyou");
    		// 经过工程获取链接
    		Connection connection = factory.newConnection();
    		return connection;
    	  }
    	}
  4. 启动发送者main函数 再启动接受者main函数这时建立了一个队列通道,而后再执行发送者函数 就能获取数据函数

    我建立的用户spring-boot

  5. spring注解方式 spring包下

    发送者

    package top.heliming.rabbitmq.rabbitmqdemo.topic;
    	import com.rabbitmq.client.Channel;
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.MessageProperties;
    	import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil;
    
    	/**
    	 * description: //订阅模式:生产者指定路由key
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 上午 10:24
    	 */
    	public class Send3 {
    
    	  private final static String EXCHANGE_NAME = "spring.test.exchange";
    
    	  public static void main(String[] argv) throws Exception {
    		send3();
    	  }
    
    	  public static void send3() throws Exception{
    		// 获取到链接
    		Connection connection = ConnectionUtil.getConnection();
    		// 获取通道
    		Channel channel = connection.createChannel();
    		// 声明exchange,指定类型为topic
    		// channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    		//交换机持久化
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
    		// 消息内容
    		String message = "新增商品 : id = 1001";
    		// 发送消息,而且指定routing key 为:insert ,表明新增商品
    		//消息持久化
    		channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    		System.out.println(" [商品服务:] Sent '" + message + "'");
    		String messageupdate = "修改商品 : id = 1001";
    		// 发送消息,而且指定routing key 为:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.update", MessageProperties.PERSISTENT_TEXT_PLAIN, messageupdate.getBytes());
    		//消息持久化
    		System.out.println(" [商品服务:] Sent '" + messageupdate + "'");
    		String messagedelete = "删除商品 : id = 1001";
    		// 发送消息,而且指定routing key 为:insert ,表明新增商品
    		//消息持久化
    		channel.basicPublish(EXCHANGE_NAME, "item.delete", MessageProperties.PERSISTENT_TEXT_PLAIN, messagedelete.getBytes());
    		System.out.println(" [商品服务:] Sent '" + messagedelete + "'");
    		channel.close();
    		connection.close();
    	  }
    	}

    接受者

    package top.heliming.rabbitmq.rabbitmqdemo.spring;
    
    	import org.springframework.amqp.core.ExchangeTypes;
    	import org.springframework.amqp.rabbit.annotation.Exchange;
    	import org.springframework.amqp.rabbit.annotation.Queue;
    	import org.springframework.amqp.rabbit.annotation.QueueBinding;
    	import org.springframework.amqp.rabbit.annotation.RabbitListener;
    	import org.springframework.stereotype.Component;
    
    	/**
    	 * description: //TODO
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 下午 12:26
    	 */
    	@Component
    	public class Listener {
    
    	  @RabbitListener(bindings = @QueueBinding(
    		  value = @Queue(value = "spring.test.queue", durable = "true"),
    		  exchange = @Exchange(
    			  value = "spring.test.exchange",
    			  ignoreDeclarationExceptions = "true",
    			  type = ExchangeTypes.TOPIC
    		  ),
    		  key = {"#.#"}))
    	  public void listen(String msg){
    		System.out.println("接收到消息:" + msg);
    	  }
    	}

    访问:启动项目,执行发送者函数能够看到发送数据成功,若是不行,那就先执行发送者函数建立完队列,而后启动项目,而后执行发送者函数发送数据,最后就接受到数据了。

总结:

若是不建立rabbitmq用户直接用guest会不让其余机器访问的,具体能够修改角色限制权限或者新添加用户
用spring监听注解方式pom文件须要修改上边说过了。若是rabbitmq安装erlang出现问题大多数是root权限问题。
erlang和rabbitmq版本不一致,上篇中有查看版本兼容的连接。

码云地址连接:https://gitee.com/hexiaoming123/rabbitmqshiyong/

相关文章
相关标签/搜索