搭建前端监控系统(五)Nodejs怎么搭建消息队列

  怎样定位前端线上问题,一直以来,都是很头疼的问题,由于它发生于用户的一系列操做以后。错误的缘由可能源于机型,网络环境,接口请求,复杂的操做行为等等,在咱们想要去解决的时候很难复现出来,天然也就没法解决。 固然,这些问题并不是不能克服,让咱们来一块儿看看如何去监控并定位线上的问题吧。 javascript

 

  这是搭建前端监控系统的第五章,主要是介绍如何处理日志高并发上传的状况,跟着我一步步作,你也能搭建出一个属于本身的前端监控系统。html

  若是感受有帮助,或者有兴趣,请关注 or Star Me 。 前端

 

  具体效果: 前端监控系统java

 

  随着监控日志搜集的内容愈来愈多,终于有一天,因为公司的一波推文,致使了日志的瞬间流量达到历史新高,以致于mysql的链接数过多,系统崩溃。 固然,做为日志上传的服务器,这个是必然会发生的状况,只是迟早的问题。 既然出现了并发问题,那么咱们就着手来处理吧。日志上传如何缓解高并发的状况呢?咱们分为三个小点来处理。mysql

  •   增长日志上传的时间间隔

  正如咱们所知,日志上传的时间间隔越长,用户在这个间隔内离开的概率就会越大,日志的漏传量就会增长,而后会致使日志的准确度下降。由于咱们的探针是安插在浏览器内的,用户随时都有可能关掉,因此,理论上讲间隔越短越好,但这并不现实。因此这个须要在服务器的承受能力和日志的准确率之间作个权衡。由具体状况而定git

  •   移除探针代码里冗余的参数,缩短参数名字的长度 

  另一点,每台服务器的硬盘有限,带宽有限,若是参数名字太长,参数内容冗余,对服务器的硬盘和带宽都是一种极大的浪费。虽然每条日志都不起眼,可是日志起量了之后,就是会是一笔很是庞大的开销。github

  •   Nodejs + RabbitMq 搭建消息队列,缓解瞬间并发量  

  对于一个前端来讲,要把消息队列搭建起来还确实费了一番周折。 web

  1)  ubantu16 安装RabbitMQ服务软件包,不少教程都要求安装erlang, 可是更新apt之后,直接执行安装命令,会自动安装erlang的核心组件的。(erlang始终没法成功安装,真心累。)sql

$ apt-get update
$ apt-get install rabbitmq-server // 安装
$ rabbitmq-plugins enable rabbitmq_management // 启动插件,浏览器才能访问

  正常状况下是直接成功的,直接访问ip端口号就能够打开了 http://IP:15672, 以下图:浏览器

  2)如今咱们须要一个有效的登陆名和密码,执行以下命令

$ rabbitmqctl add_user username password  // 设置用户名密码
$ rabbitmqctl set_user_tags username administrator  // 设置为管理员身份
$ rabbitmqctl set_permissions -p / username ".*" ".*" ".*"  //为用户设置读写等权限 

  OK, 如今咱们登陆进来就是这样的界面,如此消息队列服务咱们算是搭建完成了。

  3)消息服务启动了,那么如何存消息,如何取消息呢?以下图所示:

  

  我可以接触到的关于消息队列的应用场景实在有限,因此不能介绍更复杂的内容,大体的思惟逻辑如上图1:有消息进来,先存入消息队列里,另外一端再从队列去取出来,完成接下来的工做。从代码的角度来看如上图2:就是一个生产者和消费者的模式,生产者不停的向消息队列里生产消息,消费者在有须要的时候,从消息队列里取消息, 一旦完成消费,队列里便移除这个消息。消息的生产者和消费者互相没有感知,生产者产生过剩的消息都存放在消息队列里,由消费者慢慢消耗。以此来削峰填谷,达处处理高并发的目的。固然这都是个人浅显理解,可是也足以知足目前日志上传的需求了。

  OK、理论说完了,具体如何实现呢?

  

let amqp = require('amqplib');

module.exports = class RabbitMQ {
  constructor() {
    this.hosts = ["amqp://localhost"]; 
    this.index = 0;
    this.length = this.hosts.length;
    this.open = amqp.connect(this.hosts[this.index]);
  }
 // 消息生产者
  sendQueueMsg(queueName, msg, errCallBack) {
    let self = this;
    self.open
      .then(function (conn) {
        return conn.createChannel();
      })
      .then(function (channel) {
        return channel.assertQueue(queueName).then(function (ok) {
          return channel.sendToQueue(queueName, new Buffer.from(msg), {
            persistent: true
          });
        })
          .then(function (data) {
            if (data) {
              errCallBack && errCallBack("success");
              channel.close();
            }
          })
          .catch(function () {
            setTimeout(() => {
              if (channel) {
                channel.close();
              }
            }, 500)
          });
      })
      .catch(function () {
      // 这里尝试备用链接,我就一个,因此就处理了
      });
  }
 
// 消息消费者 receiveQueueMsg(queueName, receiveCallBack, errCallBack) { let self = this; self.open.then(function (conn) { return conn.createChannel(); }).then(function (channel) { return channel.assertQueue(queueName).then(function (ok) { return channel.consume(queueName, function (msg) { if (msg !== null) { let data = msg.content.toString(); channel.ack(msg); receiveCallBack && receiveCallBack(data); } }).finally(function () { }); }) }) .catch(function (e) { errCallBack(e) }); } }

 

消息队列测试:每隔5秒发送一条消息,每隔5秒取出一条消息,成功

var mq = new RabbitMQ()
setInterval(function () {
  mq.sendQueueMsg("queue1", "这是一个队列消息", function (err) {
    console.log(err)
  })
}, 5000)

setInterval(function () {
  mq.receiveQueueMsg("queue1", function (msg) {
    console.log(msg)
  }, function (error) {
    console.log(error)
  })
}, 5000)

 

RabbitMq消息队列使用中遇到的坑:

①   var mq = new RabbitMQ() 屡次建立RabbitMQ对象,致使connections, channels, memory 暴增,服务器很快挂掉

②  生产者的channel忘记close, 致使channel太多,服务器超负荷

③   消费者的channel被close掉了,永远只能接收到一条消息,消息队列很快爆掉

最后是消息队列运行的状态:

 

OK、通过了这么一番处理,咱们的日志上传应该可以承受住必定量的并发了,让咱们拭目以待吧。

 

上一章:搭建前端监控系统(四)接口请求异常监控篇

 

参考: 架构设计之NodeJS操做消息队列RabbitMQ  Ubuntu上安装和使用RabbitMQ

相关文章
相关标签/搜索