RabbitMQ入门(4)--路由

###路由 ###(使用Java客户端) 在先前的指南中,咱们创建了一个简单的日志系统。咱们能够将咱们的日志信息广播到多个接收者。 在这部分的指南中,咱们将要往其中添加一个功能-让仅仅订阅一个消息的子集成为可能。例如,咱们能够直接将关键的错误信息指向到日志文件(保存在爱硬盘空间),同时依旧能打印全部日志信息到平台上。java

###绑定python

在以前的例子里咱们已经建立绑定。你能够回顾下代码:git

channel.queueBind(queueName, EXCHANGE_NAME, "");

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange. 一个绑定是一个交换所和一个队列之间的关系。这个很容易理解为:这个队列是对这 交换机的消息感兴趣。github

绑定能够带上额外的路由关键字参数。为了消除对basic_publish参数的迷惑,咱们将会将它称之为绑定关键字。如下是咱们如何经过一个关键字建立一个绑定:算法

channel.queueBind(queueName, EXCHANGE_NAME, "black");

这绑定关键字的意义取决于 交换机类型。这fanout 交换机,咱们以前使用的那个,仅仅忽略它的值。shell

###直接交换 咱们当前的日志系统将全部消息广播到全部消费者。咱们想扩展它,让其容许依据其严格的规则过滤消息。例如咱们可能想让一个往硬盘中写日志消息的程序仅仅接收关键的错误,而不是将硬盘空间浪费在警告和信息的日志消息上。 咱们使用fanout类型的交换机,那个不会给咱们太多的灵活性-它仅仅能胜任没头脑的广播。windows

咱们可使用direct类型的交换机来替代。一个direct交换机背后的路由算法是简单的-一个消息将会进入那些队列的绑定关键字与消息中路由关键字匹配的队列中。this

为了说明那个,考虑接下来结构: direct-exchange.png 在这个结构里,咱们看见了这direct类型的交换机绑定了两个队列。第一个队列装有orange绑定关键字,这第二个有两个绑定,一个是black绑定关键字而且另外一个是green关键之。 在这个结构里,发送到交换机里的消息,其中消息中带路由关键字为orange将要路由到队列Q1上,消息中带路由关键字为blackgreen将路由到队列Q2上。全部其余类型的消息会被丢弃。 ###多种绑定 direct-exchange-multiple.png 将一个绑定关键字绑定到货个队列上是十分合法的。在咱们例子中使用绑定关键字blackXQ1绑定在一块儿。既然那样,这direct类型的交换机与fanout类型类似,一样会广播这消息到全部符合的队列中。一个路由关键位balck的关键字将会被传递到Q1Q2。 ###发出日志 咱们将会为咱们的日志系统使用这个模型。使用direct类型的交换机来代替fanout类型,发送消息。因为这路由关键字咱们能够严格的记录。接收程序经过这种方式能够严格接收它想接收的。让咱们首先关注发布日志。 总之,咱们首先须要建立个交换机。rest

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

咱们准备发送一个消息:日志

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化这个事情,咱们保证这severityinof,warning,error中的一个。 ###订阅 接收消息如先前那样工做,有一个例外,咱们会把每个咱们感兴趣的severity建立一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

###把它们放在一块儿 python-four.png EmitLogDirect.java类的代码:

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.java类的代码:

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

如日常那样编译(看指南第一部分,编译和类路径的建议)。为了方便,当咱们运行实例是,咱们如今使用一个环境变量$CP(在windows环境上是%CP%)表示类路径。 若是你想仅保存warningerror记录不包含info记录信息到文件里,打开一个控制平台并输入:

$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

若是你想在你的屏幕上看全部的日志信息,打开一个新的终端并键入:

$ java -cp $CP ReceiveLogsDirect info warning error
 [*] Waiting for logs. To exit press CTRL+C

例如,为了发布一个错误日志信息,仅须要键入:

$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

EmitLogDirect.java sourceReceiveLogsDirect.java source的全部源代码。

阅览指南第五部分,查看如何根据一个模式来监听消息。

相关文章
相关标签/搜索