Golang RabbitMQ Demo

AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。服务器

RabbitMQ 就是 amqp 协议的Erlang的实现。网络

AMQP的模型架构的主要角色,生产者、消费者、交换器、队列。架构

生产者、消费者、服务节点

  • 生产者(Producter) 消息投递方
  • 消费者(Consumer) 消息接收方
  • 服务节点(Broker) 消息的服务节点,基本上能够简单的把一个broker当作一台消息服务器

2019-12-17 10-34-47 的屏幕截图.png

交换器、队列、绑定

绑定

Rabbitmq中须要路由键和绑定键联合使用才能使生产者成功投递到队列中去。异步

  • RoutingKey 生产者发送给交换器绑定的Key
  • BindingKey 交换器和队列绑定的Key

生产者将消息投递到交换器,经过交换器绑定的队列,最终投递到对应的队列中去。
2019-12-17 10-55-51 的屏幕截图.png性能

交换器

Rabbitmq共有4种交换器测试

  • fanout 把消息投递到全部与此交换器绑定的队列中
  • direct 把消息投递到 BindingKey 和 RoutingKey 彻底匹配的队列中
  • topic 规则匹配,BindingKey中存在两种特殊字符ui

    • *匹配零个或多个单词
    • #匹配一个单词
  • header 不依赖于RoutingKey而是经过消息体中的headers属性来进行匹配绑定,经过headers中的key和BindingKey彻底匹配,因为性能较差通常用的比较少。

基本使用

在Golang中建立rabbitmq 生产者基本步骤是:spa

  1. 链接Connection
  2. 建立Channel
  3. 建立或链接一个交换器
  4. 建立或链接一个队列
  5. 交换器绑定队列
  6. 投递消息
  7. 关闭Channel
  8. 关闭Connection

链接

// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    panic(err)
}

// channel
channel, err := connection.Channel()
if err != nil {
    panic(err)
}

建立一个交换器

if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
    panic(err)
}

参数解析:code

  • name 交换机名称
  • kind 交换机类型
  • durable 持久化
  • autoDelete 是否自动删除
  • internal 是不是内置交换机
  • noWait 是否等待服务器确认
  • args 其它配置

参数说明要点:blog

  • autoDelete:

自动删除功能必需要在交换器曾经绑定过队列或者交换器的状况下,处于再也不使用的时候才会自动删除,
若是是刚刚建立的还没有绑定队列或者交换器的交换器或者早已建立只是未进行队列或者交换器绑定的交换器是不会自动删除的。

  • internal:

内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,
只能做为相似于队列的方式绑定到另外一个交换器,来接收这个交换器中路由的消息,
内置交换器一样能够绑定队列和路由消息,只是其接收消息的来源与普通交换器不一样。

  • noWait

当noWait为true时,声明时无需等待服务器的确认。
该通道可能因为错误而关闭。 添加一个NotifyClose侦听器应对任何异常。

建立交换器还有一个差很少的方法(ExchangeDeclarePassive),他主要是假定交换已存在,并尝试链接到
不存在的交换将致使RabbitMQ引起异常,可用于检测交换的存在。

建立一个队列

if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
    panic(err)
}

参数解析:

  • name 队列名称
  • durable 持久化
  • autoDelete 自动删除
  • exclusive 排他
  • noWait 是否等待服务器确认
  • args Table

参数说明要点:

  • exclusive 排他

排他队列只对首次建立它的链接可见,排他队列是基于链接(Connection)可见的,而且该链接内的全部信道(Channel)均可以访问这个排他队列,在这个链接断开以后,该队列自动删除,因而可知这个队列能够说是绑到链接上的,对同一服务器的其余链接不可见。
同一链接中不容许创建同名的排他队列的
这种排他优先于持久化,即便设置了队列持久化,在链接断开后,该队列也会自动删除。
非排他队列不依附于链接而存在,同一服务器上的多个链接均可以访问这个队列。

  • autoDelete 设置是否自动删除。

为true则设置队列为自动删除。
自动删除的前提是:至少有一个消费者链接到这个队列,以后全部与这个队列链接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:"当链接到此队列的全部客户端断开时,这个队列自动删除",由于生产者客户端建立这个队列,或者没有消费者客户端与这个队列链接时,都不会自动删除这个队列。

建立队列还有一个差很少的方法(QueueDeclarePassive),他主要是假定队列已存在,并尝试链接到
不存在的队列将致使RabbitMQ引起异常,可用于检测队列的存在。

绑定交换器和队列

if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
    panic(err)
}

参数解析:

  • name 队列名称
  • key BindingKey 根据交换机类型来设定
  • exchange 交换机名称
  • noWait 是否等待服务器确认
  • args Table

绑定交换器

if err = channel.ExchangeBind("dest", "q1Key", "src", false, nil); err != nil {
    panic(err)
}

参数解析:

  • destination 目的交换器
  • key RoutingKey 路由键
  • source 源交换器
  • noWait 是否等待服务器确认
  • args Table 其它参数

生产者发送消息至交换器source中,交换器source根据路由键找到与其匹配的另外一个交换器destination,井把消息转发到destination中,进而存储在.destination绑定的队列queue中,某种程度上来讲destination交换器能够看做一个队列。如图:

2019-12-17 11-40-50 的屏幕截图.png

投递消息

if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
    Timestamp:   time.Now(),
    ContentType: "text/plain",
    Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
}); err != nil {
    panic(err)
}

参数解析:

  • exchange 交换器名称
  • key RouterKey
  • mandatory 是否为没法路由的消息进行返回处理
  • immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
  • msg 消息体

参数说明要点:

  • mandatory

消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器以后没法路由到队列的状况对消息的处理方式,
设置为 true 表示将消息返回到生产者,不然直接丢弃消息。

  • immediate

参数告诉服务器至少将该消息路由到一个队列中,不然将消息返回给生产者。imrnediate参数告诉服务器,若是该消息关联的队列上有消费者,则马上投递:若是全部匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0版本开始去掉了对imrnediate参数的支持

消费信息

Rabbitmq消费方式共有2种,分别是推模式和拉模式

推模式是经过持续订阅的方式来消费信息,
Consume将信道(Channel)直为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者。
推送消息的个数仍是会受到channel.Qos的限制

deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
if err != nil {
    panic(err)
}

若是ack设置为false则表示须要手动进行ack消费

v, ok := <-deliveries
if ok {
    // 手动ack确认
    // 注意: 这里只要调用了ack就是手动确认模式,
    // multiple 表示的是在此channel中先前全部未确认的deliveries都将被确认
    // 并非表示设置为false就不进行当前ack确认
    if err := v.Ack(true); err != nil {
        fmt.Println(err.Error())
    }
} else {
    fmt.Println("Channel close")
}

参数解析:

  • queue 队列名称
  • consumer 消息者名称
  • autoAck 是否确认消费
  • exclusive 排他
  • noLocal
  • noWait bool
  • args Table

参数说明要点:

  • noLocal

设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者

拉模式:
相对来讲比较简单,是由消费者主动拉取信息来消费,一样也须要进行ack确认消费

channel.Get(queue string, autoAck bool)

简单示例Demo

下面是一个简单示例,只是为了通讯测试,单条数据收发

func Connection() (*amqp.Connection) {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        panic(err)
    }
    return conn
}

func Sample() {
    var wg sync.WaitGroup
    wg.Add(1)
    go SampleConsumption(&wg)

    // 建立链接
    connection := Connection()
    defer connection.Close()

    // 开启 channel
    channel, err := connection.Channel()
    if err != nil {
        panic(err)
    }

    defer channel.Close()

    if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
        panic(err)
    }

    if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
        panic(err)
    }

    if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
        panic(err)
    }

    // mandatory true 未找到队列返回给消费者
    returnChan := make(chan amqp.Return,0)
    channel.NotifyReturn(returnChan)

    // Publish
    if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
        Timestamp:   time.Now(),
        ContentType: "text/plain",
        Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
    }); err != nil {
        panic(err)
    }

    //for v := range returnChan{
    //    fmt.Printf("Return %#v\n",v)
    //}
    
    wg.Wait()
}

func SampleConsumption(wg *sync.WaitGroup) {
    connection := Connection()
    defer connection.Close()

    channel, err := connection.Channel()
    if err != nil {
        panic(err)
    }

    defer channel.Close()
    
    deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
    if err != nil {
        panic(err)
    }

    // 这里只取一条,由于product只发一条
    v, ok := <-deliveries
    if ok {
        if err := v.Ack(true); err != nil {
            fmt.Println(err.Error())
        }
    } else {
        fmt.Println("Channel close")
    }
    wg.Done()
}

来源:https://blog.crcms.cn/2019/09/29/go-ioc/

相关文章
相关标签/搜索