使用aws sqs 作缓冲队列(go)

背景

做为一位刚进公司的小白,参与到项目的第一个任务是为操做记录的存储增长消息队列,为何咱们要这么作呢?缘由以下:在现有系统中咱们直接将用户的操做记录增长到mongodb数据库中,可是在咱们的系统出现峰值的时候,发现mongodb受不了,为此咱们要作到削峰这个功能,按照惯例咱们想到了使用消息队列,同时因为咱们在项目中广泛采用aws的云服务,为此咱们采用了aws的消息队列。html

注意事项

  1. aws sqs 收费是按照请求次数收费因此要尽可能使用批量操做
  2. aws sqs 的消费上线是12000次,最多容许12000个在传递的数据
  3. aws sqs 容量无限大
  4. aws sqs 的批量操做的上限是10条数据(毕竟是按次数收费)
  5. aws sqs并行取数据的过程当中可能会出现重复,咱们利用数据库的ID来去重,注意咱们在生产id的时候使用mongodb本身的库来生成,缘由是依照mongodb生成的id比较均匀,存入的数据库中的树形结构也比较平衡,效率比较高

操做步骤

使用aws sqs和使用其余的消息队列基本步骤一致,aws sqs的官方已经给出了很是详尽的使用说明,尽可能参考官方文档,下面给出简单的操做步骤,以及示例代码,代码是用go写的,其余的语言能够参考go的官方文档git

  1. 配置aws sqs的链接信息
awsSqs := AwsSQS{}

creds := credentials.NewStaticCredentials("key", "secret", "")
sess := session.Must(session.NewSession(&aws.Config{
    Region:      aws.String("region"),
    Credentials: creds,
}))

awsSqs.svc = sqs.New(sess)
  1. 向aws sqs发送数据
// 将消息发送给队列
func (awsSqs *AwsSQS) SendMessage(record string, qURL string) *Error {
    _, err := awsSqs.svc.SendMessage(&sqs.SendMessageInput{
        MessageBody: aws.String(record),
        QueueUrl:    &qURL,
    })
    if err != nil {
        Errorf("Error Send Message to sqs: err = %v", err)
        return NewError(ErrorCodeInnerError, err.Error())
    }
    return nil
}
  1. 从aws sqs 获取数据
// 从队列中获取消息
func (awsSqs *AwsSQS) ReserveMessage(qURL string) (*sqs.ReceiveMessageOutput, *Error) {
    result, err := awsSqs.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
        QueueUrl:            &qURL,
        MaxNumberOfMessages: aws.Int64(10),
        WaitTimeSeconds:     aws.Int64(10),
    })

    if err != nil {
        Errorf("Error aws sqs ReceiveMessage : err=%v ", err)
        return nil, NewError(ErrorCodeInnerError, err.Error())
    }

    return result, nil
}
  1. 从aws sqs 删除数据
deleteMessageList := make([]*sqs.DeleteMessageBatchRequestEntry, 0)
deleteMessage := sqs.DeleteMessageBatchRequestEntry{Id: message.MessageId, ReceiptHandle: message.ReceiptHandle}
deleteMessageList = append(deleteMessageList, &deleteMessage)
// 将队列中的消息删除(批量删除)
func (awsSqs *AwsSQS) DeleteMessage(list []*sqs.DeleteMessageBatchRequestEntry, qURL string) *Error {
    // delete message
    _, err := awsSqs.svc.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
        QueueUrl: &qURL,
        Entries:  list,
    })

    if err != nil {
        Errorf("Delete Message error:error =%v", err)
        return NewError(ErrorCodeInnerError, err.Error())
    }
    return nil
}
  1. 在存储到moongodb的过程当中防止重复
// 自定义mongodb的_id,使用mongodb的库来生成id
id := bson.NewObjectId().Hex()
entity.id = id
type entity struct {
    Id                 string `bson:"_id,omitempty"`
}

参考资料

官方代码示例
aws 限制github

相关文章
相关标签/搜索