RabbitMQ能够实现消息发布和订阅,这其中须要exchange,消息的发布者,消费者,消息队列和Exchange关系以下图java
消息的生产者发出消息,而后通过Exchange转换,消息队列(queue)须要绑定Exchange,Exchange把消息发送到各个消息队列中,而后,各个消费者从消息队列中取到发布者发布的消息。ide
利用Java做为客户端,具体代码以下:code
发布者代码rabbitmq
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * * 用rabbitMQ实现发布订阅 * 发布类 * Created by wangtf on 2015/11/17. */ public class Emitlog { private static final String EXCHANGE_NAME = "logs"; public static void main(String args[]) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String msg = "各单位请注意"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("[send] msg: " +msg); channel.close(); connection.close(); } }
消息接受类队列
import com.rabbitmq.client.*; import java.io.IOException; /** * RabbitMQ实现发布订阅的功能 * 订阅类 * Created by wangtf on 2015/11/17. */ public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.print("[*] waiting for message"); DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("[x] receive message :" + message); } }; channel.basicConsume(queueName,true,consume); } }
分别运行ReceiveLogs和Emitlog,能获得以下结果:get
【Emitlog】
[send] msg: 各单位请注意
Process finished with exit code 0
【ReceiveLogs】
[*] waiting for message[x] receive message :各单位请注意
消息队列