做为一位刚进公司的小白,参与到项目的第一个任务是为操做记录的存储增长消息队列,为何咱们要这么作呢?缘由以下:在现有系统中咱们直接将用户的操做记录增长到mongodb数据库中,可是在咱们的系统出现峰值的时候,发现mongodb受不了,为此咱们要作到削峰这个功能,按照惯例咱们想到了使用消息队列,同时因为咱们在项目中广泛采用aws的云服务,为此咱们采用了aws的消息队列。html
使用aws sqs和使用其余的消息队列基本步骤一致,aws sqs的官方已经给出了很是详尽的使用说明,尽可能参考官方文档,下面给出简单的操做步骤,以及示例代码,代码是用go写的,其余的语言能够参考go的官方文档git
awsSqs := AwsSQS{} creds := credentials.NewStaticCredentials("key", "secret", "") sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("region"), Credentials: creds, })) awsSqs.svc = sqs.New(sess)
// 将消息发送给队列 func (awsSqs *AwsSQS) SendMessage(record string, qURL string) *Error { _, err := awsSqs.svc.SendMessage(&sqs.SendMessageInput{ MessageBody: aws.String(record), QueueUrl: &qURL, }) if err != nil { Errorf("Error Send Message to sqs: err = %v", err) return NewError(ErrorCodeInnerError, err.Error()) } return nil }
// 从队列中获取消息 func (awsSqs *AwsSQS) ReserveMessage(qURL string) (*sqs.ReceiveMessageOutput, *Error) { result, err := awsSqs.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: &qURL, MaxNumberOfMessages: aws.Int64(10), WaitTimeSeconds: aws.Int64(10), }) if err != nil { Errorf("Error aws sqs ReceiveMessage : err=%v ", err) return nil, NewError(ErrorCodeInnerError, err.Error()) } return result, nil }
deleteMessageList := make([]*sqs.DeleteMessageBatchRequestEntry, 0) deleteMessage := sqs.DeleteMessageBatchRequestEntry{Id: message.MessageId, ReceiptHandle: message.ReceiptHandle} deleteMessageList = append(deleteMessageList, &deleteMessage) // 将队列中的消息删除(批量删除) func (awsSqs *AwsSQS) DeleteMessage(list []*sqs.DeleteMessageBatchRequestEntry, qURL string) *Error { // delete message _, err := awsSqs.svc.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{ QueueUrl: &qURL, Entries: list, }) if err != nil { Errorf("Delete Message error:error =%v", err) return NewError(ErrorCodeInnerError, err.Error()) } return nil }
// 自定义mongodb的_id,使用mongodb的库来生成id id := bson.NewObjectId().Hex() entity.id = id
type entity struct { Id string `bson:"_id,omitempty"` }