go操做RabbitMQ

RabbitMQ服务器安装
一、安装erlangmysql

wget https://www.rabbitmq.com/releases/erlang/erlang-18.2-1.el6.x86_64.rpm

二、安装RabbitMQgit

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el6.noarch.rpm

三、 经常使用命令github

systemctl start rabbitmq-server 启动 rabbitmq stop 中止 rabbitmq-plugins list 插件命令 rabbitmq-plugins enable rabbitmq_management 安装管理插件 rabbitmq-plugins disable rabbitmq_management 卸载管理插件

四、 浏览器打开sql

端口号默认:15672<br/>
密码和用户名默认:guest
http://127.0.0.1:15672/#/
五、常见错误
错误提示:zsh: command not found: rabbitmq-plugins<br/>
解决办法:
第一种:export PATH=/usr/local/Cellar/rabbitmq/3.8.2/sbin/:$PATH<br/>
第二种:1: vim .bash_profile(前提是存在该文件,若是不存在,能够先建立mkdir .bash_profile,以后再执行vi编辑)<br/>
2:export PATH=/usr/local/Cellar/rabbitmq/3.8.2/sbin/sbin/:$PATH数据库

最后:source ~/.bash_profilevim

## RabbitMQ核心概念
### Virtual Hosts管理
像mysql拥有数据库的概念而且能够指定用户对库和表等操做的权限。那RabbitMQ呢?RabbitMQ也有相似的权限管理。在RabbitMQ中能够虚拟消息服务器VirtualHost,每一个VirtualHost至关于一个相对独立的RabbitMQ服务器,每一个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 至关于mysql的db。Virtual Name通常以/开头<br/>
一、建立Virtual Hosts:
Admin->Virtual Hosts->Add a new virtual host<br/>
二、建立用户:Admin->Users->Add a user<br/>
二、对用户进行受权,点击须要受权的vhosts->Permissions->Set permission浏览器

## RabbitMQ五种模式
url格式:amqp:// 帐号 密码@地址:端口号/vhost <br/>
一、Simple模式 最简单最经常使用的模式,一个消息只能被一个消费者消费<br/>bash

 

 

二、Work模式,一个消息只能被一个消费者消费服务器

 

 

三、Publish/Subscribe订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取,生产端不容许指定消费函数

 

 

四、Routing路由模式,一个消息被多个消费者获取,而且消息的目标队列能够被生产者指定

 

 

五、Topic话题模式,一个消息被多个消息获取,消息的目标queue可用BindKey以通配符,(#:一个或多个词,*:一个词)的方式指定。

 

 

 

示例代码

package RabbitMQ import ( "fmt"
    "github.com/streadway/amqp"
    "log" ) //amqp:// 帐号 密码@地址:端口号/vhost
const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc" type RabbitMQ struct { //链接
    conn *amqp.Connection //管道
    channel *amqp.Channel //队列名称
    QueueName string
    //交换机
    Exchange string
    //key Simple模式 几乎用不到
    Key string
    //链接信息
    Mqurl string } //建立RabbitMQ结构体实例
func NewRabbitMQ(queuename string, exchange string,key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName:queuename,Exchange:exchange,Key:key,Mqurl:MQURL} var err error //建立rabbitmq链接
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "建立链接错误!") rabbitmq.channel,err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err,"获取channel失败") return rabbitmq } //断开channel和connection
func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() } //错误处理函数
func (r *RabbitMQ) failOnErr (err error,message string) { if err !=nil { log.Fatalf("%s:%s",message,err) panic(fmt.Sprintf("%s:%s",message, err)) } } //简单模式step:1。建立简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) * RabbitMQ { return NewRabbitMQ(queueName, "", "") } //订阅模式建立rabbitmq实例
func NewRabbitMQPubSub(exchangeName string) * RabbitMQ { //建立rabbitmq实例
    rabbitmq := NewRabbitMQ("", exchangeName, "") var err error //获取connection
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connecct rabbitmq!") //获取channel
    rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel!") return rabbitmq } //订阅模式生成
func (r *RabbitMQ) PublishPub(message string) { //尝试建立交换机,不存在建立
    err := r.channel.ExchangeDeclare( //交换机名称
 r.Exchange, //交换机类型 广播类型
        "fanout", //是否持久化
        true, //是否字段删除
        false, //true表示这个exchange不能够被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false, //是否阻塞 true表示要等待服务器的响应
        false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2 发送消息
    err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ //类型
            ContentType:"text/plain", //消息
            Body:[]byte(message), }, ) } //订阅模式消费端代码
func (r * RabbitMQ) RecieveSub() { //尝试建立交换机,不存在建立
    err := r.channel.ExchangeDeclare( //交换机名称
 r.Exchange, //交换机类型 广播类型
        "fanout", //是否持久化
        true, //是否字段删除
        false, //true表示这个exchange不能够被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false, //是否阻塞 true表示要等待服务器的响应
        false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2试探性建立队列,建立队列
    q, err := r.channel.QueueDeclare( "",//随机生产队列名称
        false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //绑定队列到exchange中
    err = r.channel.QueueBind( q.Name, //在pub/sub模式下,这里的key要为空
        "", r.Exchange, false, nil, ) //消费消息
    message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出请按 Ctrl+C") <- forever } //话题模式 建立RabbitMQ实例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ { //建立rabbitmq实例
    rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err,"failed to connect rabbingmq!") rabbitmq.channel,err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //话题模式发送信息
func (r * RabbitMQ) PublishTopic(message string) { //尝试建立交换机,不存在建立
    err := r.channel.ExchangeDeclare( //交换机名称
 r.Exchange, //交换机类型 话题模式
        "topic", //是否持久化
        true, //是否字段删除
        false, //true表示这个exchange不能够被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false, //是否阻塞 true表示要等待服务器的响应
        false, nil, ) r.failOnErr(err, "topic failed to declare an excha" + "nge") //2发送信息
    err = r.channel.Publish( r.Exchange, //要设置
 r.Key, false, false, amqp.Publishing{ //类型
            ContentType:"text/plain", //消息
            Body:[]byte(message), }, ) } //话题模式接收信息 //要注意key //其中* 用于匹配一个单词,#用于匹配多个单词(能够是零个) //匹配 表示匹配imooc.* 表示匹配imooc.hello,可是imooc.hello.one须要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() { //尝试建立交换机,不存在建立
    err := r.channel.ExchangeDeclare( //交换机名称
 r.Exchange, //交换机类型 话题模式
        "topic", //是否持久化
        true, //是否字段删除
        false, //true表示这个exchange不能够被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false, //是否阻塞 true表示要等待服务器的响应
        false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2试探性建立队列,建立队列
    q, err := r.channel.QueueDeclare( "",//随机生产队列名称
        false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //绑定队列到exchange中
    err = r.channel.QueueBind( q.Name, //在pub/sub模式下,这里的key要为空
 r.Key, r.Exchange, false, nil, ) //消费消息
    message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出请按 Ctrl+C") <- forever } //路由模式 建立RabbitMQ实例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ { //建立rabbitmq实例
    rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err,"failed to connect rabbingmq!") rabbitmq.channel,err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //路由模式发送信息
func (r * RabbitMQ) PublishRouting(message string) { //尝试建立交换机,不存在建立
    err := r.channel.ExchangeDeclare( //交换机名称
 r.Exchange, //交换机类型 广播类型
        "direct", //是否持久化
        true, //是否字段删除
        false, //true表示这个exchange不能够被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false, //是否阻塞 true表示要等待服务器的响应
        false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //发送信息
    err = r.channel.Publish( r.Exchange, //要设置
 r.Key, false, false, amqp.Publishing{ //类型
            ContentType:"text/plain", //消息
            Body:[]byte(message), }, ) } //路由模式接收信息
func (r *RabbitMQ) RecieveRouting() { //尝试建立交换机,不存在建立
    err := r.channel.ExchangeDeclare( //交换机名称
 r.Exchange, //交换机类型 广播类型
        "direct", //是否持久化
        true, //是否字段删除
        false, //true表示这个exchange不能够被client用来推送消息,仅用来进行exchange和exchange之间的绑定
        false, //是否阻塞 true表示要等待服务器的响应
        false, nil, ) r.failOnErr(err, "failed to declare an excha" + "nge") //2试探性建立队列,建立队列
    q, err := r.channel.QueueDeclare( "",//随机生产队列名称
        false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //绑定队列到exchange中
    err = r.channel.QueueBind( q.Name, //在pub/sub模式下,这里的key要为空
 r.Key, r.Exchange, false, nil, ) //消费消息
    message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出请按 Ctrl+C") <- forever } //简单模式Step:二、简单模式下生产代码
func (r *RabbitMQ) PublishSimple (message string) { //一、申请队列,若是队列存在就跳过,不存在建立 //优势:保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare( //队列名称
 r.QueueName, //是否持久化
        false, //是否为自动删除 当最后一个消费者断开链接以后,是否把消息从队列中删除
        false, //是否具备排他性 true表示本身可见 其余用户不能访问
        false, //是否阻塞 true表示要等待服务器的响应
        false, //额外数学系
 nil, ) if err != nil { fmt.Println(err) } //2.发送消息到队列中
 r.channel.Publish( //默认的Exchange交换机是default,类型是direct直接类型
 r.Exchange, //要赋值的队列名称
 r.QueueName, //若是为true,根据exchange类型和routkey规则,若是没法找到符合条件的队列那么会把发送的消息返回给发送者
        false, //若是为true,当exchange发送消息到队列后发现队列上没有绑定消费者,则会把消息还给发送者
        false, //消息
 amqp.Publishing{ //类型
            ContentType:"text/plain", //消息
            Body:[]byte(message), }) } func (r *RabbitMQ) ConsumeSimple() { //一、申请队列,若是队列存在就跳过,不存在建立 //优势:保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare( //队列名称
 r.QueueName, //是否持久化
        false, //是否为自动删除 当最后一个消费者断开链接以后,是否把消息从队列中删除
        false, //是否具备排他性
        false, //是否阻塞
        false, //额外数学系
 nil, ) if err != nil { fmt.Println(err) } //接收消息
    msgs, err := r.channel.Consume( r.QueueName, //用来区分多个消费者
        "", //是否自动应答
        true, //是否具备排他性
        false, //若是设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者
        false, //队列是否阻塞
        false, nil, ) if err!=nil { fmt.Println(err) } forever := make(chan bool) //启用协程处理
 go func() { for d := range msgs { //实现咱们要处理的逻辑函数
            log.Printf("Received a message:%s",d.Body) //fmt.Println(d.Body)
 } }() log.Printf("【*】warting for messages, To exit press CCTRAL+C") <- forever }

测试

//Simple模式 发送者
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") rabbitmq.PublishSimple("hello imooc!") //接收者
rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple") rabbitmq.ConsumeSimple() //订阅模式发送者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") for i :=0; i<=100 ; i++ { rabbitmq.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据") fmt.Println(i) time.Sleep(1 * time.Second) } //接收者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") rabbitmq.RecieveSub() //路由模式发送者
imoocOne := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one") imoocTwo := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_two") for i :=0; i<=10; i++ { imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i)) imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } //接收者
rabbitmq := RabbitMQ.NewRabbitMQRouting("exImooc", "imooc_one") rabbitmq.RecieveRouting() //Topic模式发送者
imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.four") for i :=0; i<=10; i++ { imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i)) imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } //Topic接收者
rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") rabbitmq.RecieveTopic()
相关文章
相关标签/搜索