api-rest(先扔一个git地址,这里是源码:https://github.com/burning0xb...)
项目主要分为这几个部分html
require('babel-core/register') require('./server');
const app = new Koa(); // 建立基础路由 const baseRouter = new BaseRouter(); // 使用session(整合redis) app.use(session({ key: 'burning:session', store: new RedisStore(), maxAge: config.maxAge }));
// 加载中间件依次为body体格式化,日志与跨域 app.use(bodyParser()); app.use(Logger()); app.use(convert(cors()));
// 加载基础路由中登陆过滤 app.use(async (ctx, next) => { if (baseRouter.requireLogin(ctx)) { await next(); } });
// 加载全部路由 app .use(router.routes()) .use(router.allowedMethods());
// 服务启动 app.listen(config.port, () => { logger.info(`server is running port ${config.port}`); });
const rule = new schedule.RecurrenceRule(); rule.minute = [0, 20, 40]; // 任务规则 schedule.scheduleJob(rule, () => { getAccessToken().then((res) => { console.log(res); global.wechatToken = res.access_token; }) });
// 定义路由前缀 const router = koaRouter({ prefix: '/api' });
// 初始化rabbitMQ 客户端后再去加载全部的路由 new Client().then((res) => { logger.info('rabbitMQ is ready'); global.MQ = res.RabbitSend; }).then(() => { for (let _router in routers) { if (_router !== '') { routers[_router](router, upload); console.log(`${_router} 加载成功 ?`); } } const userHandler = new UserHandler(global.MQ); rollUserList(userHandler); });
// 这里去加载一个新的定时任务(每晚去更新关注用户的数据) async function rollUserList(userHandler) { schedule.scheduleJob('0 0 1 * * *', async () => { const userList = await wechatApi.getUserList(); if (userList.data) { console.log(`关注用户总数 ${userList.total} 人 开始更新用户信息`); userList.data.openid.map(async (openid) => { await util.sleep(1); const userInfo = await wechatApi.getUserInfo(openid); // 这里是重点,调用远端的服务去持久化用户信息 userHandler.saveWechatUser(userInfo); }) } }); }
import { UserHandler } from '../controller'; function userRouter(router, upload) { // 给controller绑定MQ对象 const userHandler = new UserHandler(global.MQ); // get请求 router.get('/user/getUserList', async (ctx, next) => { const user = await userHandler.getUserList(ctx); // respnose返回 ctx.body = user; }) } export default userRouter;
import BaseHandler from './BaseHandler'; export default class UserHandler extends BaseHandler { constructor(server) { super(); // 这里初始化MQ对象 this.server = server; } /** * [getUserList description] * @param {[type]} ctx [description] * @return {Promise} [description] */ async getUserList(ctx) { // 获取解析过的请求体 const body = ctx.request.body; // 构造远端服务请求体 const content = { class: 'user', // 远端服务的类名 func: 'getUserList', // 远端服务的方法 content: {} // 消息内容 }; // 建立新的远端服务发送对象 const server = this.initServer(this.server); // 发送并取得返回结果 const res = await server.send(content); return res; } }
initServer(server) { return server(global.ch, global.ok); }
import amqp from 'amqplib/callback_api'; import RabbitSend from './RabbitSend'; import config from '../config.json'; export default class Client { constructor() { // 建立mq链接,on_connect为回调函数 return new Promise((resolve, reject) => { amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, this.on_connect.bind(this, resolve)); }).catch((err) => { console.log(err); }); } // 最后会返回一个new 对象,也就是说每init一次就会new一次 init(ch, ok) { const server = new RabbitSend(ch, ok) return server; } // 失败处理函数 bail(err) { console.error(err); } init_client(resolve, RabbitSend) { resolve({ RabbitSend: RabbitSend }); } on_connect(resolve, err, conn) { if (err !== null) return this.bail(err); // 建立信道 conn.createChannel((err, ch) => { if (err !== null) return this.bail(err); // 通道建立成功后咱们经过通道对象的assertQueue方法来监听空队列,并设置durable持久化为true。 ch.assertQueue('', { exclusive: true }, (err, ok) => { if (err !== null) return this.bail(err); global.ch = ch; global.ok = ok; this.init_client(resolve, (ch, ok) => { return this.init(ch, ok); }); }); }); } }
import config from '../config.json'; import uuid from 'node-uuid'; // 这里就是将要去发送消息的对象 export default class RabbitSend { constructor(ch, ok) { this.ch = ch; this.ok = ok; this.ramdom = Date.now(); } mabeAnswer(msg) { // 若是返回的消息ID再发送的消息队列中,就去处理 if (global.msgQueue.includes(msg.properties.correlationId)) { console.log(msg.content.toString()); const index = global.msgQueue.indexOf(msg.properties.correlationId); global.msgQueue.splice(index, 1); // resove返回的消息 global.resolveRabbit[msg.properties.correlationId].resolve({ finalRes: JSON.parse(msg.content.toString()) }); // 从待处理队列中删除 delete global.resolveRabbit[msg.properties.correlationId]; } else { // 若是指定消息的promise对象还存在那么就移除不然直接输出没有对应的MQ if (global.resolveRabbit[msg.properties.correlationId]) { global.resolveRabbit[msg.properties.correlationId].reject({ err: 'Unexpected message' }); delete global.resolveRabbit[msg.properties.correlationId]; } else { console.log('未找到对应的MQ'); } } } // 当控制器去掉用send的时候会触发到这里 send(content, type) { console.log(' [x] Requesting is ', content); let queue = config.MQ_QUEUE_COMMON; // let queue = config.MQ_QUEUE_COMMON_TEST; // 根据type去区分要调用的queue,默认为config.MQ_QUEUE_COMMON switch (type) { case 'log': queue = config.MQ_QUEUE_LOG; break; case 'pay': queue = config.MQ_QUEUE_PAY; break; default: queue = config.MQ_QUEUE_COMMON; // queue = config.MQ_QUEUE_COMMON_TEST; break; } // 返回一个带结果的promise对象 return new Promise((resolve, reject) => { // 这里去声明消息的ID const correlationId = uuid(); // 将此ID压入消息队列中 global.msgQueue.push(correlationId); // 标识当前的promise对象 global.resolveRabbit[correlationId] = { resolve: resolve, reject: reject }; // 建立消费者监听指定queue,noAck: true不作应答 this.ch.consume(this.ok.queue, (msg) => { // 返回的结果处理函数 this.mabeAnswer(msg); }, { noAck: true }); // 发送到指定queue,指明应答的queue以及消息ID this.ch.sendToQueue(queue, new Buffer(JSON.stringify(content)), { replyTo: this.ok.queue, correlationId: correlationId }); }).catch((err) => { console.log(err); }); } }
{ "port": 8888, "rabbitMq_host": "主机IP", "rabbitMq_port": "端口", "rabbitMq_user": "用户名", "rabbitMq_password": "密码", "MQ_QUEUE_COMMON": "js_server", "MQ_QUEUE_COMMON_TEST": "server_test", "MQ_QUEUE_LOG": "log", "MQ_QUEUE_PAY": "pay", "MQ_QUEUE_PAY_TEST": "pay_test", "maxAge": 1800000, "redis_maxAge": 1800 }
common-service (git地址:https://github.com/burning0xb... )node
import amqp from 'amqplib/callback_api'; import { logger, logger_date } from './src/log4j'; import config from './config'; import route from './route'; logger.info('server started'); function bail(err, conn) { logger.error(err); } function on_connect(err, conn) { if (err !== null) return bail(err); process.once('SIGINT', () => { conn.close(); }); var q = config.rabbitMq_queue.logic01 /* 测试mq */ // var q = config.rabbitMq_queue.logic02 // 建立信道 conn.createChannel((err, ch) => { logger_date.info('rabbitMQ createChannel'); // 监听指定的queue ch.assertQueue(q, {durable: true}); // 设置公平调度,这里是指mq不会向一个繁忙的队列推送超过1条消息。 ch.prefetch(1); // 建立消费者监听Q,reply为接收处理函数, noAck: false作出应答 ch.consume(q, reply, { noAck: false }, (err) => { if (err !== null) return bail(err, conn); logger.info(' [x] Awaiting RPC requests'); }); function reply(msg) { logger.info('request content is ' + msg.content.toString()); // 接收的消息内容 const request = JSON.parse(msg.content.toString()); // 这里定义返回函数 const cb = (response) => { ch.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), { correlationId: msg.properties.correlationId }); ch.ack(msg); }; try { // 查找api网关发送的消息中指定的方法 const func = request.class && request.func ? route[request.class][request.func] : null; if (func) { // 调用指定方法 func(cb, request.content); } else { cb({ err: 'method not allowed' }); } } catch(err) { console.log(err); cb({ code: 500, err: 'server error' }); } } }); } // 建立链接 amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, on_connect); logger_date.info('rabbitMQ connect success'); logger.warn('don`t kill this process');
import { User } from './src/server/user'; const user = new User(); const route = { user }; export default route;
import { AttentionUser } from '../../model'; import dbStorage from '../../config/dbStorage'; import moment from 'moment'; import autobind from 'autobind-decorator' // 绑定this import { logger } from '../../log4j'; @autobind export default class User { constructor() { } /** * [getUserList 获取用户信息] * @param {Function} cb [description] * @param {[type]} info [description] * @return {Promise} [description] */ async getUserList(cb, info) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); // 用orm模型去分页查询数据 const attentionUser = await AttentionUser.findAndCount({ limit: 10, offset: 0 }); cb({ code: '00000', attentionUser }); } /** * [unsubscribe 取消关注] * @method unsubscribe * @param {Function} cb [description] * @param {[type]} info [description] * @return {Promise} [description] */ async unsubscribe(cb, info) { // 开启事务 const t = await dbStorage.transaction(); try { const res = await AttentionUser.update({ IS_DISPLAY: 'N', UPDATE_TIME: new Date() }, { where: { OPENID: info.openid } }, { transaction: t }); // 提交事务 t.commit(); cb({ code: '00000', res }) } catch (err) { // 回滚事务 t.rollback(); console.log(err); cb({ code: '00001', err: err }); } } }
"build": "sequelize-auto -o ./entity_model -d 数据库 -h 主机IP -u 用户名 -p 端口 -x 密码 -e mysql -a ./src/model/config.json"
{ "database": "数据库", "username": "用户名", "password": "密码", "host": "主机", "port": 3306 }
至此,咱们完成了api网关到一个简单服务的通讯,在从此的课程中会逐步搭建各种微服务,好比支付服务,物流服务,订单服务,还有其余一些框架的搭建。若是各位看官对笔者的文章感兴趣的话,但愿关注下笔者的我的公众号,你们一块儿交流探讨,若有写的不对的地方,还但愿各位指正,笔者深感荣幸,最后感谢你们的阅读。mysql