RabbitMQ erlang "hello world"

官方文档:http://www.rabbitmq.com/tutorials/tutorial-one-python.htmlhtml

下文为翻译和erlang客户端的例子python

RabbitMQ是一个消息中间件,它主要的思想很是简单:接收和发送消息。你能够把RabbitMQ想象成一个邮局:当你把信件放入邮筒的时候你很是肯定邮递员会把你的信件投递给信件的接收者。在这个比喻里面,RabbitMQ是一个邮筒,邮局以及邮递员。缓存

RabbitMQ与邮局的主要区别在于,RabbitMQ不处理信件,取而代之的是它接收,存储而且发送二进制数据,也就是所谓的消息。服务器

RabbitMQ 与 messaging 一般状况下,使用一些术语。网络

  • 生产也就是发送的意思,发送消息的程序叫作生产者,咱们把它画成这样,使用"p";ide

     

     

  •    队列是邮箱的名字,它存在于RabbitMQ的内部,虽然消息在RabbitMQ和你的应用程序之间流转,可是消息只能被存储在一个队列里面。队列没有任何限制,只要你愿意,它能够存储任意多个消息,本质上,队列是一个没有限制的buffer。一个队列能够接收多个生产者的消息,多个消费者也能够从一个队列里面接收消息。队列通常用下图表示,上面是队列的名字:函数

  • 消费其实就是接收的意思。一个消费者就是一个等待接收消息的应用程序。消费者通常用下图表示:oop

值得注意的是,生产者,消费者和RabbitMQ不须要部署在同一台物理机上,事实上在不少应用里面他们也确实没有部署在同一台物理机上。ui

 


Hello Worldspa

咱们的"Hello world"程序不复杂 :发送一个消息,接收消息和打印到屏幕上。完成这些须要两个程序:一个发送消息程序,和一个接收并打印消息程序。

 总体的过程以下:

    

生产者发送消息到"hello"队列。消费者从这个队列接收消息。


Sending

咱们第一个程序 send.erl 将发送一个消息到队列。

1.咱们须要创建一个到RabbitMQ server的链接。

{ok, Connection} =amqp_connection:start(#amqp_params_network{host = "localhost"}),
{ok, Channel} = amqp_connection:open_channel(Connection),

咱们如今已经链接上了一个本地的RabbitMQ服务器,由于咱们给的参数是”localhost”。若是咱们想链接到不一样机器上的RabbitMQ服务器,咱们能够把”localhost”改为相应机器的名字或者IP地址。

 

2:在发送消息以前,咱们必须确保接收消息的队列是存在的。若是咱们把消息发送到一个不存在的队列,RabbitMQ会把咱们的消息看成垃圾,不予处理。让咱们来建立一个队列,而且把它命名为hello:

amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>}),

如今咱们能够准备发送一个消息。第一个消息仅仅包含一个"Hello world!"字符串。咱们想要发送到咱们的hello 队列。

3:在RabbitMQ里面,消息是不能被直接发送到队列里面去的,在消息发送到队列以前必需要先通过一个exchange”,如今咱们不讨论exchange的具体细节,如今咱们只须要知道怎么使用一个由空字符串标志的默认exchange,这个exchange很是特殊---它容许咱们指定一个具体的消息要投递的队列。队列的名字在参数routing_key里面指定:

amqp_channel:cast(Channel,
                      #'basic.publish'{
                        exchange = <<"">>,
                        routing_key = <<"hello">>},
                      #amqp_msg{payload = <<"Hello World!">>}),

在退出程序以前,咱们须要确认刷新网络缓存区,咱们的消息真正的发送到RabbitMQ。 咱们能够关闭链接来达到这个目的。

ok = amqp_channel:close(Channel),
ok = amqp_connection:close(Connection),

 


Receiving

咱们的第二个程序时receive1.erl,将会接收消息从队列并在屏幕上打印出来。

 

 构造消费者应用程序的步骤:

1:创建一个到RabbitMQ服务器的链接,与生产者的步骤同样:

{ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

2:确保消息队列的存在。咱们使用queue_declare来建立队列。queue_declare能够被调用屡次,可是只有一个调用会建立queue:

amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>})

    你可能会问,为何咱们要重复地建立队列---咱们已经在生产者那边建立了这个队列。由于你不能保证生产者和消费者谁先启动,因此咱们在两边都对队列进行建立操做。

   

 3:从队列接收消息要相对来讲复杂些。接收消息经过在队列上绑定一个回调函数来实现,无论何时咱们接收到一个消息,这个回调函数都会被调用。在hello world这个例子中,回调函数的做用就是把收到的消息打印出来:

amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"hello">>,
                                                     no_ack = true}, self()),
loop(Channel) ->
    receive
        {#'basic.deliver'{}, #amqp_msg{payload = Body}} ->
            io:format(" [x] Received ~p~n", [Body]),
            loop(Channel)
    end.

    4:而后咱们告诉RabbitMQ收到消息时,给本身发送消息。 

  receive
        #'basic.consume_ok'{} -> ok
    end,

完整代码

send.erl (发送消息)

-module(send).
-compile([export_all]).
-include_lib("amqp_client/include/amqp_client.hrl").

main(_) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>}),

    amqp_channel:cast(Channel,
                      #'basic.publish'{
                        exchange = <<"">>,
                        routing_key = <<"hello">>},
                      #amqp_msg{payload = <<"Hello World!">>}),
    io:format(" [x] Sent 'Hello World!'~n"),
    ok = amqp_channel:close(Channel),
    ok = amqp_connection:close(Connection),
    ok.

receive1.erl(接收消息)

-module(receive1).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

main(_) ->
    {ok, Connection} =
        amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),

    amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello">>}),
    io:format(" [*] Waiting for messages. To exit press CTRL+C~n"),

    amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"hello">>,
                                                     no_ack = true}, self()),
    receive
        #'basic.consume_ok'{} -> ok
    end,
    loop(Channel).


loop(Channel) ->
    receive
        {#'basic.deliver'{}, #amqp_msg{payload = Body}} ->
            io:format(" [x] Received ~p~n", [Body]),
            loop(Channel)
    end.

erlang 客户端:

http://www.rabbitmq.com/erlang-client-user-guide.html

相关文章
相关标签/搜索