/* 首先咱们在server.js中添加一个队列监听数组global.readyListener,用来记录已经consume的queue */ import schedule from 'node-schedule'; global.msgQueue = []; global.resolveRabbit = {}; global.readyListener = [];
/* RabbitSent.js中修改以下代码, 防止rabbit重复监听 */ if (!global.readyListener.includes(queue)) { global.readyListener.push(queue); this.ch.consume(this.ok.queue, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); }
/* 添加订单类型的queue */ switch (type) { case 'order': queue = config.MQ_QUEUE_ORDER; break; case 'pay': queue = config.MQ_QUEUE_PAY; break; default: queue = config.MQ_QUEUE_COMMON; // queue = config.MQ_QUEUE_COMMON_TEST; break; }
/* 添加获取用户列表的路由 */ router.get('/user/getUserList', async (ctx, next) => { const user = await userHandler.getUserList(ctx); ctx.body = user; })
/* 添加对应的控制器 */ /* 这里咱们要注意,咱们在掉这个api的时候同时发起了两个远端请求,一个为原始获取用户列表,另外一个为获取订单列表 */ /** * [getUserList description] * @param {[type]} ctx [description] * @return {Promise} [description] */ async getUserList(ctx) { const body = ctx.request.body; const content = { class: 'user', func: 'getUserList', content: {} }; const server = this.initServer(this.server); const res1 = await server.send(content); const content2 = { class: 'common', func: 'getOrderList', content: {} }; const server2 = this.initServer(this.server); const res2 = await server2.send(content2, 'order'); return { res1, res2 }; }
/* 这里改动比较大,增长了发送消息的方法 */ import amqp from 'amqplib/callback_api'; import { logger, logger_date } from './src/log4j'; import config from './config'; import route from './route'; import { RabbitSend } from './rabbitMQ'; import { Cache } from './util'; import packages from './package.json'; logger.info('server started'); global.msgQueue = []; global.resolveRabbit = {}; global.readyListener = []; function bail(err, conn) { logger.error(err); } // 初始化mq的发送promise function initServer(ch, ok) { const server = new RabbitSend(ch, ok) return server; } // 声明监听queue function assertQueue(ch, q) { return new Promise((resolve, reject) => { ch.assertQueue(q, {durable: true}, (err, ok) => { if (err !== null) return bail(err); global.ch = ch; global.ok = ok; global.server = initServer; resolve(); }); }); } // 发送mq的方法,这里就简单的传送的方法中,后续会提出到基础类,用继承的方式实现 function mq() { return global.server(global.ch, global.ok); } // 删除已使用的queue,这里解释下缘由 // 考虑到分布式节点,同一个服务可能会启动多个,这里用uuid去标记不一样节点的queue,每次从新启动的时候删除上次启动时rabbitmq-server端保留的queue 避免无用堆积 function delQueues(ch) { Cache.getCache(`${packages.name}-mq`).then((res) => { if (res) { logger.warn('================ start clear mq queues ================='); Cache.destroy(`${packages.name}-mq`); const queues = res.rabbitmq_queues.queues; queues.map((key) => { ch.checkQueue(key, (err, ok) => { if (ok.queue === key) { logger.warn(`================== delete queue ${key} ==================`); ch.deleteQueue(key); } }); }); } }); } function on_connect(err, conn) { if (err !== null) return bail(err); process.once('SIGINT', () => { conn.close(); }); var q = config.rabbitMq_queue.logic01 /* 测试mq */ // var q = config.rabbitMq_queue.logic02 // 压入本地已监听队列中 global.readyListener.push(q); conn.createChannel((err, ch) => { logger_date.info('rabbitMQ createChannel'); delQueues(ch); assertQueue(ch, q).then(() => { ch.prefetch(1); ch.consume(q, reply, { noAck: false }, (err) => { if (err !== null) return bail(err, conn); logger.info(' [x] Awaiting RPC requests'); }); function reply(msg) { logger.info('request content is ' + msg.content.toString()); const request = JSON.parse(msg.content.toString()); // 声明返回消息的queue,以及消息id和返回体 const cb = (response) => { ch.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), { correlationId: msg.properties.correlationId }); ch.ack(msg); }; try { const func = request.class && request.func ? route[request.class][request.func] : null; if (func) { // 这里传入发送对象 func(cb, request.content, mq); } else { cb({ err: 'method not allowed' }); } } catch(err) { console.log(err); cb({ code: 500, err: 'server error' }); } } }); }); } amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, on_connect); logger_date.info('rabbitMQ connect success'); logger.warn('don`t kill this process');
/* 这里的send方法与api-rest的有所不一样 */ send(content, type) { console.log(' [x] Requesting is ', content); let queue = config.MQ_QUEUE_ORDER; switch (type) { case 'log': queue = config.MQ_QUEUE_ORDER; break; case 'pay': queue = config.MQ_QUEUE_ORDER; break; default: queue = config.MQ_QUEUE_ORDER; break; } return new Promise(async (resolve, reject) => { const correlationId = uuid(); console.log('========= mq loading =========='); global.msgQueue.push(correlationId); global.resolveRabbit[correlationId] = { resolve: resolve, reject: reject }; // 避免重复监听,不重复取redis,下降开销 if (!global.readyListener.includes(queue)) { global.readyListener.push(queue); // 若是是第一次监听该通道,从redis里取一下,查看是否存在指定uuid队列,存在的话直接用,不存在的话去assert一个队列,不持久化,而后监听该队列,一切初始化完成后进行发送 const _c = await Cache.getCache(`${packages.name}-mq`); console.log(_c); if (_c && _c.rabbitmq_queues && _c.rabbitmq_queues.queues) { const queues = _c.rabbitmq_queues.queues; if (queues.includes(`${queue}-${this.pid}`)) { console.log(`========= use old mq queue ${queue}-${this.pid} ==========`); this.ch.consume(`${queue}-${this.pid}`, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); } else { queues.push(`${queue}-${this.pid}`); console.log(`========= use new mq queue ${queue}-${this.pid} ==========`); Cache.setCache(`${packages.name}-mq`, { rabbitmq_queues: { queues }, }); this.ch.assertQueue(`${queue}-${this.pid}`, {durable: false}, (err, ok) => { if (err) return; this.ch.consume(`${queue}-${this.pid}`, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); }); } } else { console.log('========== 初始化mq队列 =========='); Cache.setCache(`${packages.name}-mq`, { rabbitmq_queues: { queues: [`${queue}-${this.pid}`] }, }); this.ch.assertQueue(`${queue}-${this.pid}`, {durable: false}, (err, ok) => { if (err) return; this.ch.consume(`${queue}-${this.pid}`, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); }); } } console.log(`============= use queue ${queue}-${this.pid} ==============`); this.ch.sendToQueue(queue, new Buffer(JSON.stringify(content)), { replyTo: `${queue}-${this.pid}`, correlationId: correlationId }) }).catch((err) => { console.log(err); }); }
/* 由api-rest发起的微服务调用会被分发到下面这个方法中 */ async getUserList(cb, info, mq) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); const content = { class: 'common', func: 'getOrderList', content: {} }; // 这里模拟返回用户列表,而后再次调用远端的订单服务,去拉取订单列表 const res = await mq().send(content); cb({ code: '00000', users: [], order: res }); }
/* 该服务在其余方法上与common服务相似,不作赘述,监听的queue不一样 */ module.exports = Object.assign({ rabbitMq_host: '192.168.41.144', rabbitMq_port: '5672', rabbitMq_user: 'admin', rabbitMq_password: 'wangrui1994', // server_host: '106.14.77.183', server_host: '127.0.0.1', server_port: 8889, rabbitMq_queue: { logic01: 'jslight-service-order', logic02: 'jslight-service-order-test' } });
/* 这里为上一步common服务调用的订单列表服务 */ async getOrderList(cb, info, mq) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); const content = { class: 'address', func: 'getUserAddress', content: {} }; // 再次调用远端帐户服务中的地址列表,最后将订单和地址所有返回 const address = await mq().send(content, 'account'); cb({ code: '00000', order: [{ orderId: Date.now(), price: 200 }], address }); }
/* 监听帐户queue */ module.exports = Object.assign({ rabbitMq_host: '192.168.41.144', rabbitMq_port: '5672', rabbitMq_user: 'admin', rabbitMq_password: 'wangrui1994', // server_host: '106.14.77.183', server_host: '127.0.0.1', server_port: 8889, rabbitMq_queue: { logic01: 'jslight-service-account', logic02: 'jslight-service-account-test' } });
/* 这里返回地址列表 */ async getUserAddress(cb, info, mq) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); cb({ code: '00000', address: { province: '上海', city: '上海', country: '徐汇区' } }); }
首先api网关发起远端微服务调用,两个分支,一个为调用common服务,另外一个为调用order服务node
分支一 common服务git
分支二 order服务github
整合后redis
至此微服务间的调用整合完成,咱们来看一下控制台的输出json
api-rest
咱们能够看到两个远端调用的返回值api
node-service-common
咱们能够看到common服务调用了order服务,而且使用监听了jslight-service-order-23be5586-2411-450d-9523-e0093401830d队列,最后获得了远端的返回值数组
node-service-order
订单服务接收到了api网关和common服务发起的调用,请求account服务,而且获得了返回promise
node-service-account
帐户服务接收到两个由订单服务发起的请求,并获得返回值async
如下为postman的最终返回结果分布式
{ "res1": { "finalRes": { "code": "00000", "users": [], "order": { "finalRes": { "code": "00000", "order": [ { "orderId": 1512366914453, "price": 200 } ], "address": { "finalRes": { "code": "00000", "address": { "province": "上海", "city": "上海", "country": "徐汇区" } } } } } } }, "res2": { "finalRes": { "code": "00000", "order": [ { "orderId": 1512366914501, "price": 200 } ], "address": { "finalRes": { "code": "00000", "address": { "province": "上海", "city": "上海", "country": "徐汇区" } } } } } }
好了,本节为你们演示了微服务间的调用,其中还有不少优化须要去作,笔者但愿各位可以本身完成。以上是本篇的全部内容,欢迎各位关注个人我的公众号,提出您的宝贵意见并相互交流学习。