seneca是一个nodejs微服务工具集,它赋予系统易于连续构建和更新的能力。下面会逐一和你们一块儿了解相关技术入门以及实践。html
这里插入一段硬广。小子再进行简单整合以后撸了个vastify框架 ---- 轻量级nodejs微服务框架,有兴趣的同窗过目一下,欢迎顺手star一波,另外有疑问或者代码有毛病欢迎在博文下方留言。node
"node": "^10.0.0" "npm": "^6.0.0" "pm2": "^2.10.3" "rabbitmq": "^3.7.5" "consul": "^1.1.0" "mongodb": "^3.6"
"bluebird": "^3.5.1" "koa": "^2.5.1" "koa-router": "^7.4.0" "seneca": "^3.4.3" "seneca-web": "^2.2.0" "seneca-web-adapter-koa2": "^1.1.0" "amqplib": "^0.5.2" "winston": "^2.4.2" "mongoose": "^5.1.2"
index.js(accout-server/src/index.js)nginx
const seneca = require('seneca')() seneca.use('cmd:login', (msg, done) => { const { username, pass } = msg if (username === 'asd' && pass === '123') { return done(null, { code: 1000 }) } return done(null, { code: 2100 }) }) const Promise = require('bluebird') const act = Promise.promisify(seneca.act, { context: 'seneca' }) act({ cmd: 'login', username: 'asd', pass: '123' }).then(res => { console.log(res) }).catch(err => { console.log(err) })
执行后git
{ code: 1000 } {"kind":"notice","notice":"hello seneca k5i8j1cvw96h/1525589223364/10992/3.4.3/-","level":"info","seneca":"k5i8j1cvw96h/1525589223364/10992/3.4.3/-","when":1525589223563}
seneca.add
方法,添加一个action pattern到Seneca实例中,它有三个参数:github
pattern
: 用于Seneca中JSON的消息匹配模式,对象或格式化字符串sub_pattern
: 子模式,优先级低于主模式(可选)action
: 当匹配成功后的动做函数seneca.act
方法,执行Seneca实例中匹配成功的动做,它也有两个参数:web
msg
: JSON消息sub_pattern
: 子消息,优先级低于主消息(可选)response
: 用于接收服务调用结果seneca.use
方法,为Seneca实例添加一个插件,它有两个参数:(此处插件的原理和中间件有一些不一样)redis
func
: 插件执行方法options
: 插件所需options(可选)核心是利用JSON
对象进行模式匹配。这个JSON对象既包含某个微服务所须要调取另外一个微服务的特征,同时也包含传参。和Java微服务发现有些相似不过是用模式代替ip+port,目前为止模式是彻底能够实现服务发现功能,可是否更加灵活还有待去挖掘。mongodb
所需注意的点docker
index.js(config-server/src/index.js)shell
const seneca = require('seneca')() const config = { SUCCESS_NORMAL_RES: { code: 1000, desc: '服务端正常响应' }} seneca.add('$target$:config-server', (msg, done) => { return done(null, config) }).listen(10011)
运行此脚本后可在浏览器中输入http://localhost:10011/act?cmd=config
发起请求获取全局配置信息
OR
const seneca = require('seneca')() const Promise = require('bluebird') const act = Promise.promisify(seneca.act, { context: seneca }) seneca.client(10011) act('$$target:config-server, default$:{msg:404}').then(res => { console.log(res) }).catch(err => { console.log(err) })
noname-server
const seneca = require('seneca')() seneca.add('$$target:account-server', (msg, done) => { done(null, { seneca: '666' }) }) seneca.listen(10015)
config-server(同上)
call
const seneca = require('seneca')() const Promise = require('blurebird') const act = Promise.promisify(seneca.act, { context: seneca }) seneca.client({ port: '10011', pin: '$$target:account-server' }) seneca.client({ port: '10015', pin: '$$target:noname-server' }) act('$$target:account-server').then(res => { console.log(res) }).catch(err => { console.log(err) }) act('$$target:noname-server').then(res => { console.log(res) }).catch(err => { console.log(err) })
集成koa
const seneca = require('seneca')() const Promise = require('bluebird') const SenecaWeb = require('seneca-web') const Koa = require('koa') const Router = require('koa-router') const app = new Koa() const userModule = require('./modules/user.js') // 初始化用户模块 seneca.use(userModule.init) // 初始化seneca-web插件,并适配koa seneca.use(SenecaWeb, { context: Router(), adapter: require('seneca-web-adapter-koa2'), routes: [...userModule.routes] }) // 将routes导出给koa app seneca.ready(() => { app.use(seneca.export('web/context')().routes()) }) app.listen(3333)
user模块
const $module = 'module:user' let userCount = 3 const REST_Routes = [ { prefix: '/user', pin: `${$module},if:*`, map: { list: { GET: true, name: '' }, load: { GET: true, name: '', suffix: '/:id' }, edit: { PUT: true, name: '', suffix: '/:id' }, create: { POST: true, name: '' }, delete: { DELETE: true, name: '', suffix: '/:id' } } } ] const db = { users: [{ id: 1, name: '甲' }, { id: 2, name: '乙' }, { id: 3, name: '丙' }] } function user(options) { this.add(`${$module},if:list`, (msg, done) => { done(null, db.users) }) this.add(`${$module},if:load`, (msg, done) => { const { id } = msg.args.params done(null, db.users.find(v => Number(id) === v.id)) }) this.add(`${$module},if:edit`, (msg, done) => { let { id } = msg.args.params id = +id const { name } = msg.args.body const index = db.users.findIndex(v => v.id === id) if (index !== -1) { db.users.splice(index, 1, { id, name }) done(null, db.users) } else { done(null, { success: false }) } }) this.add(`${$module},if:create`, (msg, done) => { const { name } = msg.args.body db.users.push({ id: ++userCount, name }) done(null, db.users) }) this.add(`${$module},if:delete`, (msg, done) => { let { id } = msg.args.params id = +id const index = db.users.findIndex(v => v.id === id) if (index !== -1) { db.users.splice(index, 1) done(null, db.users) } else { done(null, { success: false }) } }) } module.exports = { init: user, routes: REST_Routes }
vscode-restclient(vscode的restclient插件,用于发起RESTFUL请求)
### 1 POST http://localhost:3333/user HTTP/1.1 Content-Type: application/json { "name": "测试添加用户" } ### delete DELETE http://localhost:3333/user/2 HTTP/1.1 ### PUT PUT http://localhost:3333/user/2 HTTP/1.1 Content-Type: application/json { "name": "测试修改用户信息" } ### GET GET http://localhost:3333/user HTTP/1.1 ### GET GET http://localhost:3333/user/3 HTTP/1.1
可在构造函数中传入配置,log属性能够控制日志级别
例1:传字符串
require('seneca')({ // quiet silent any all print standard test log: 'all' })
例2:传对象
require('seneca')({ log: { // none debug+ info+ warn+ level: 'debug+' }, // 设置为true时,seneca日志功能会encapsulate senecaId,senecaTag,actId等字段后输出(通常为两字符) short: true })
建议例2代码,由于seneca-web-adapter-koa2插件打印的日志level为debug,利于作web接口访问日志记录。
Logger.js
const { createLogger, format, transports } = require('winston') const { combine, timestamp, label, printf } = format const logger = createLogger({ level: 'info', format: combine( label({label: 'microservices'}), timestamp(), printf(info => { return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}` }) ), transports: [ new transports.Console() ] }) // highest to lowest const levels = { error: 0, warn: 1, info: 2, verbose: 3, debug: 4, silly: 5 } module.exports = logger
日志输出格式
2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客户端的调用请求 2018-05-17T14:43:28.331Z [microservices] warn: warn message 2018-05-17T14:43:28.331Z [microservices] error: error message
producer.js
// 建立一个amqp对等体 const amqp = require('amqplib/callback_api') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const q = 'taskQueue1' const msg = process.argv.slice(2).join(' ') || 'hello world' // 为方式RabbitMQ退出或者崩溃时重启后丢失队列信息,这里配置durable:true(同时在消费者脚本中也要配置durable:true)后, ch.assertQueue(q, { durable: true }) // 这里配置persistent:true,经过阅读官方文档,我理解为当程序重启后,会断点续传以前未send完成的数据消息。(但此功能并不可靠,由于不会为全部消息执行同步IO,会缓存在cache并在某个恰当时机write到disk) ch.sendToQueue(q, Buffer.from(msg), { persistent: true }) setTimeout(() => { conn.close(); process.exit(0) }, 100) }) })
// 建立一个amqp对等体 const amqp = require('amqplib/callback_api') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const q = 'taskQueue1' // 为方式RabbitMQ退出或者崩溃时重启后丢失队列信息,这里配置durable:true(同时在消费者脚本中也要定义durable:true)后, ch.assertQueue(q, { durable: true }) ch.prefetch(1) console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q) ch.consume(q, msg => { const secs = msg.content.toString().split('.').length - 1 console.log(" [x] Received %s", msg.content.toString()) setTimeout(() => { console.log(" [x] Done") ch.ack(msg) }, secs * 1000) }) // noAck配置(默认为false)代表consumer是否须要在处理完后反馈ack给producer,若是设置为true,则RabbitMQ服务若是将任务send至此consumer后不关心任务实际处理结果,send任务后直接标记已完成;不然,RabbiMQ获得ack反馈后才标记为已完成,若是一直未收到ack默认会一直等待ack而后标记,另外若是接收到nack或者该consumer进程退出则继续dispatcher任务 }) })
检验过程
Timeout: 60.0 seconds ... Listing queues for vhost / ...
Listing bindings for vhost /... exchange taskQueue1 queue taskQueue1 []
Timeout: 60.0 seconds ... Listing queues for vhost / ... taskQueue1 1
Waiting for messages in taskQueue1. To exit press CTRL+C [x] Received hello world [x] Done
Timeout: 60.0 seconds ... Listing queues for vhost / ... taskQueue1 0
知识点
publisher.js
const amqp = require('amqplib/callback_api') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const ex = 'logs' const msg = process.argv.slice(2).join(' ') || 'Hello World!' // ex为exchange名称(惟一) // 模式为fanout // 不对消息持久化存储 ch.assertExchange(ex, 'fanout', { durable: false }) // 第二个参数为指定某一个binding,如为空则由RabbitMQ随机指定 ch.publish(ex, '', Buffer.from(msg)) console.log(' [x] Send %s', msg) }) setTimeout(() => { conn.close() process.exit(0) }, 100) })
subscriber.js
const amqp = require('amqplib/callback_api') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const ex = 'logs' // ex -> exchange是发布/订阅消息的载体, // fanout -> 分发消息的模式,fanout,direct,topic,headers // durable设置为false下降一些可靠性,提升性能,由于不须要磁盘IO持久化存储消息,另外 ch.assertExchange(ex, 'fanout', { durable: false }) // 使用匿名(也就是RabbitMQ自动生成随机名的queue)队列 // exclusive设置为true,便可以当其寄生的connection被close的时候自动deleted ch.assertQueue('', { exclusive: true }, (err, q) => { console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue) // 绑定队列到某个exchange载体(监听某个exchange的消息) // 第三个入参为binding key ch.bindQueue(q.queue, ex, '') // 消费即订阅某个exchange的消息并设置处理句柄 // 由于发布/订阅消息的模式就是非可靠性,只有当订阅者订阅才能收到相关的消息并且发布者不关心该消息的订阅者是谁以及处理结果如何,因此这里noAck会置为true ch.consume(q.queue, (msg) => { console.log(' [x] %s', msg.content.toString()) }, { noAck: true }) }) }) })
检验过程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app
(清空以前测试使用的queues、echanges、bindings)
node subscriber.js
[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C
rabbitmqctl list_exchanges
Listing exchanges for vhost / ... logs fanout
rabbitmqctl list_bindings
Listing bindings for vhost /... exchange amq.gen-jDbfwJR8TbSNJT2a2a83Og queue amq.gen-jDbfwJR8TbSNJT2a2a83Og [] logs exchange amq.gen-jDbfwJR8TbSNJT2a2a83Og queue []
node publisher.js tasks.........
[x] Send tasks......... // publiser.js [x] tasks......... // subscriber.js
知识点
exchange.js
module.exports = { name: 'ex1', type: 'direct', option: { durable: false }, ranks: ['info', 'error', 'warning', 'severity'] }
direct-routing.js
const amqp = require('amqplib/callback_api') const ex = require('./exchange') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { ch.assertExchange(ex.name, ex.type, ex.options) setTimeout(() => { conn.close() process.exit(0) }, 0) }) })
subscriber.js
const amqp = require('amqplib/callback_api') const ex = require('./exchange') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const ranks = ex.ranks ranks.forEach(rank => { // 声明一个非匿名queue ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => { ch.bindQueue(q.queue, ex.name, rank) ch.consume(q.queue, msg => { console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString()); }, { noAck: true }) }) }) }) })
publisher.js
const amqp = require('amqplib/callback_api') const ex = require('./exchange') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const ranks = ex.ranks ranks.forEach(rank => { ch.publish(ex.name, rank, Buffer.from(`${rank} logs...`)) }) setTimeout(() => { conn.close() process.exit(0) }, 0) }) })
检验过程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app
(清空以前测试使用的queues、echanges、bindings)
node direct-routing.js
rabbitmqctl list_exchanges
Listing exchanges for vhost / ... amq.headers headers ex1 direct amq.fanout fanout amq.rabbitmq.trace topic amq.topic topic direct amq.direct direct amq.match headers
node subscriber.js
rabbitmqctl list_queues
Timeout: 60.0 seconds ... Listing queues for vhost / ... severity-queue 0 error-queue 0 info-queue 0 warning-queue 0 Listing bindings for vhost /... exchange error-queue queue error-queue [] exchange info-queue queue info-queue [] exchange severity-queue queue severity-queue [] exchange warning-queue queue warning-queue [] ex1 exchange error-queue queue error [] ex1 exchange info-queue queue info [] ex1 exchange severity-queue queue severity [] ex1 exchange warning-queue queue warning []
node publisher.js
[x] info: 'info logs...' [x] error: 'error logs...' [x] severity: 'severity logs...' [x] warning: 'warning logs...'
知识点
exchange.js
module.exports = { name: 'ex2', type: 'topic', option: { durable: false }, ranks: ['info', 'error', 'warning', 'severity'] }
topic-routing.js
const amqp = require('amqplib/callback_api') const exchangeConfig = require('./exchange') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option) setTimeout(() => { conn.close() process.exit(0) }, 0) }) })
subscriber.js
const amqp = require('amqplib/callback_api') const exchangeConfig = require('./exchange') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const args = process.argv.slice(2) const keys = (args.length > 0) ? args : ['anonymous.info'] console.log(' [*] Waiting for logs. To exit press CTRL+C'); keys.forEach(key => { ch.assertQueue('', { exclusive: true }, (err, q) => { console.log(` [x] Listen by routingKey ${key}`) ch.bindQueue(q.queue, exchangeConfig.name, key) ch.consume(q.queue, msg => { console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString()); }, { noAck: true }) }) }) }) })
publisher.js
const amqp = require('amqplib/callback_api') const exchangeConfig = require('./exchange') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const args = process.argv.slice(2) const key = (args.length > 1) ? args[0] : 'anonymous.info' const msg = args.slice(1).join(' ') || 'hello world' ch.publish(exchangeConfig.name, key, Buffer.from(msg)) setTimeout(() => { conn.close() process.exit(0) }, 0) }) })
检验过程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app
(清空以前测试使用的queues、echanges、bindings)
node topic-routing.js
Listing exchanges for vhost / ... amq.fanout fanout amq.rabbitmq.trace topic amq.headers headers amq.match headers ex2 topic direct amq.topic topic amq.direct direct
node subscriber.js "#.info" "*.error"
[*] Waiting for logs. To exit press CTRL+C [x] Listen by routingKey #.info [x] Listen by routingKey *.error
[x] account-server.info:'用户服务测试' [x] config-server.info:'配置服务测试' [x] config-server.error:'配置服务出错'
知识点
#
可匹配0或多个单词,*
可精确匹配1个单词rpc_server.js
const amqp = require('amqplib/callback_api') const logger = require('./Logger') let connection = null amqp.connect('amqp://localhost', (err, conn) => { connection = conn conn.createChannel((err, ch) => { const q = 'account_rpc_queue' ch.assertQueue(q, { durable: true }) ch.prefetch(2) ch.consume(q, msg => { let data = {} let primitiveContent = msg.content.toString() try { data = JSON.parse(primitiveContent) } catch (e) { logger.error(new Error(e)) } logger.info('接收到rpc客户端的调用请求') if (msg.properties.correlationId === '10abc') { logger.info(primitiveContent) const uid = Number(data.uid) || -1 let r = getUserById(uid) ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true }) ch.ack(msg) } else { logger.info('不匹配的调用请求') } }) }) }) function getUserById (uid) { let result = '' if (uid === +uid && uid > 0) { result = { state: 1000, msg: '成功', data: { uid: uid, name: '小强', sex: 1 } } } else { result = { state: 2000, msg: '传参格式错误' } } return result } process.on('SIGINT', () => { logger.warn('SIGINT') connection && connection.close() process.exit(0) })
rpc_client.js
const amqp = require('amqplib/callback_api') amqp.connect('amqp://localhost', (err, conn) => { conn.createChannel((err, ch) => { const q = 'account_rpc_queue' const callback = 'callback_queue' ch.assertQueue(callback, { durable: true }) ch.consume(callback, msg => { const result = msg.content.toString() console.log(`接收到回调的消息啦!`) console.log(result) ch.ack(msg) setTimeout(() => { conn.close() process.exit(0) }, 0) }) ch.assertQueue(q, { durable: true }) const msg = { uid: 2 } ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), { persistent: true, correlationId: '10abc', replyTo: 'callback_queue' }) }) })
检验过程
node rpc_server.js
rabbitmqctl list_queues
Timeout: 60.0 seconds ... Listing queues for vhost / ... account_rpc_queue 0
node rpc_client.js
rpc_client的CLI打印
接收到回调的消息啦! {"state":1000,"msg":"成功","data":{"uid":2,"name":"小强","sex":1}}
rpc_server的CLI打印
接收到rpc客户端的调用请求 { uid: 2 }
pm2 start app.js
-w --watch
:监听目录变化,如变化则自动重启应用--ignore-file
:监听目录变化时忽略的文件。如pm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"
-n --name
:设置应用名字,可用于区分应用-i --instances
:设置应用实例个数,0与max相同-f --force
: 强制启动某应用,经常用于有相同应用在运行的状况-o --output <path>
:标准输出日志文件的路径-e --error <path>
:错误输出日志文件的路径--env <path>
:配置环境变量如pm2 start rpc_server.js -w -i max -n s1 --ignore-watch="rpc_client.js" -e ./server_error.log -o ./server_info.log
在cluster-mode,也就是-i max下,日志文件会自动在后面追加-${index}保证不重复
pm2 stop app_name|app_id
pm2 restart app_name|app_id
pm2 delete app_name|app_id
pm2 show app_name|app_id
OR pm2 describe app_name|app_id
pm2 list
pm2 monit
pm2 logs app_name|app_id --lines <n> --err
pm2 stop app_name|app_id
process.on('SIGINT', () => { logger.warn('SIGINT') connection && connection.close() process.exit(0) })
当进程结束前,程序会拦截SIGINT
信号从而在进程即将被杀掉前去断开数据库链接等等占用内存的操做后再执行process.exit()从而优雅的退出进程。(如在1.6s后进程还未结束则继续发送SIGKILL
信号强制进程结束)
ecosystem.config.js
const appCfg = { args: '', max_memory_restart: '150M', env: { NODE_ENV: 'development' }, env_production: { NODE_ENV: 'production' }, // source map source_map_support: true, // 不合并日志输出,用于集群服务 merge_logs: false, // 经常使用于启动应用时异常,超时时间限制 listen_timeout: 5000, // 进程SIGINT命令时间限制,即进程必须在监听到SIGINT信号后必须在如下设置时间结束进程 kill_timeout: 2000, // 当启动异常后不尝试重启,运维人员尝试找缘由后重试 autorestart: false, // 不容许以相同脚本启动进程 force: false, // 在Keymetrics dashboard中执行pull/upgrade操做后执行的命令队列 post_update: ['npm install'], // 监听文件变化 watch: false, // 忽略监听文件变化 ignore_watch: ['node_modules'] } function GeneratePM2AppConfig({ name = '', script = '', error_file = '', out_file = '', exec_mode = 'fork', instances = 1, args = "" }) { if (name) { return Object.assign({ name, script: script || `${name}.js`, error_file: error_file || `${name}-err.log`, out_file: out_file|| `${name}-out.log`, instances, exec_mode: instances > 1 ? 'cluster' : 'fork', args }, appCfg) } else { return null } } module.exports = { apps: [ GeneratePM2AppConfig({ name: 'client', script: './rpc_client.js' }), GeneratePM2AppConfig({ name: 'server', script: './rpc_server.js', instances: 1 }) ] }
pm2 start ecosystem.config.js
避坑指南:processFile文件命名建议为*.config.js格式。不然后果自负。
// 用不一样用户对不一样远程主机发起ssh请求时指定私钥 Host qingf.me User deploy IdentityFile ~/.ssh/qf_deployment_rsa // 设置为no可去掉首次登录(y/n)的选择 StrictHostKeyChecking no // 别名用法 Host deployment User deploy Hostname qingf.me IdentityFile ~/.ssh/qingf_deployment_rsa StrictHostKeyChecking no
与上述apps同级增长deploy属性,以下
deploy: { production: { 'user': 'deploy', 'host': 'qingf.me', 'ref': 'remotes/origin/master', 'repo': 'https://github.com/Cecil0o0/account-server.git', 'path': '/home/deploy/apps/account-server', // 生命周期钩子,在ssh到远端以后setup操做以前执行 'pre-setup': '', // 生命周期钩子,在初始化设置即git pull以后执行 'post-setup': 'ls -la', // 生命周期钩子,在远端git fetch origin以前执行 'pre-setup': '', // 生命周期钩子,在远端git修改HEAD指针到指定ref以后执行 'post-deploy': 'npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production', // 如下这个环境变量将注入到全部app中 "env" : { "NODE_ENV": "test" } } }
tip:please make git working directory clean first!
而后前后执行如下两条命令(注意config文件路径)
pm2 deploy <configuration_file> <environment> <command>
Commands: setup run remote setup commands update update deploy to the latest release revert [n] revert to [n]th last deployment or 1 curr[ent] output current release commit prev[ious] output previous release commit exec|run <cmd> execute the given <cmd> list list previous deploy commits [ref] deploy to [ref], the "ref" setting, or latest tag
seneca内置log系统如何作自定义日志打印?
舒适提示:请以正常的http请求开始,由于通过测试若是微服务自主发起act,其seneca.fixedargs['tx$']值不一样。
Consul是一个分布式集群服务注册发现工具,并具备健康检查、分级式KV存储、多数据中心等高级特性。
consul agent -dev -ui
{ "service": { // 服务名,稍后用于query服务 "name": "account-server", // 服务标签 "tags": ["account-server"], // 服务元信息 "meta": { "meta": "for my service" }, // 服务端口 "port": 3333, // 不容许标签覆盖 "enable_tag_override": false, // 脚本检测作health checks 与-enable-script-checks=true配合使用,有脚本模式、TCP模式、HTTP模式、TTL模式 "checks": [ { "http": "http://localhost:3333/user", "interval": "10s" } ] } }
curl http://localhost:8500/v1/catalog/service/account-server
[ { "ID": "e66eb1ff-460c-e63f-b4ac-0cb42daed19c", "Node": "haojiechen.local", "Address": "127.0.0.1", "Datacenter": "dc1", "TaggedAddresses": { "lan": "127.0.0.1", "wan": "127.0.0.1" }, "NodeMeta": { "consul-network-segment": "" }, "ServiceID": "account-server", "ServiceName": "account-server", "ServiceTags": [ "account-server" ], "ServiceAddress": "", "ServiceMeta": { "meta": "for my service" }, "ServicePort": 3333, "ServiceEnableTagOverride": false, "CreateIndex": 6, "ModifyIndex": 6 } ]
某一个结点启动一个server模式代理,以下
consul agent -server -bootstrap-expect=1 \ -data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP \ -enable-script-checks=true -config-dir=/usr/local/etc/consul.d
查看集群成员
consul members
Node Address Status Type Build Protocol DC Segment agent-one valid extranet IP:8301 alive server 1.1.0 2 dc1 <all>
另外一个结点启动一个client模式代理,以下
consul agent \ -data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228 \ -enable-script-checks=true -config-dir=/usr/local/etc/consul.d
查看集群成员
consul members
Node Address Status Type Build Protocol DC Segment agent-two 139.129.5.228:8301 alive server 1.1.0 2 dc1 <all>
加入Cluster
consul join 139.129.5.228
consul members
Node Address Status Type Build Protocol DC Segment agent-one valid extranet IP:8301 alive server 1.1.0 2 dc1 <all> agent-two 139.129.5.228:8301 alive server 1.1.0 2 dc1 <all>
config.js
// 服务注册与发现 // https://github.com/silas/node-consul#catalog-node-services 'serverR&D': { consulServer: { type: 'consul', host: '127.0.0.1', port: 8500, secure: false, ca: [], defaults: { token: '' }, promisify: true }, bizService: { name: 'defaultName', id: 'defaultId', address: '127.0.0.1', port: 1000, tags: [], meta: { version: '', description: '注册集群' }, check: { http: '', // check间隔时间(ex: 15s) interval: '10s', // check超时时间(ex: 10s) timeout: '2s', // 处于临界状态后自动注销服务的超时时间 deregistercriticalserviceafter: '30s', // 初始化状态值为成功 status: 'passing', // 备注 notes: '{"version":"111","microservice-port":1115}' } } }
server-register.js
/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 11:26:49 * @Description 微服务注册方法 */ const defaultConf = require('../config')['serverR&D'] const { ObjectDeepSet, isString } = require('../helper/utils') const Consul = require('consul') const { generateServiceName, generateCheckHttp } = require('../helper/consul') // 注册服务 function register({ consulServer = {}, bizService = {} } = {}) { if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!') if (bizService.port !== +bizService.port) throw new Error('port is invalid!') if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!') if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!') if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!') const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer)) const service = defaultConf.bizService service.name = generateServiceName(bizService.name) service.id = service.name service.address = bizService.host service.port = bizService.port service.check.http = generateCheckHttp(bizService.host, bizService.port) service.check.notes = JSON.stringify(bizService.meta) return new Promise((resolve, reject) => { consul.agent.service.list().then(services => { // 检查主机+端口是否已被占用 Object.keys(services).some(key => { if (services[key].Address === service.address && services[key].Port === service.port) { throw new Error(`该服务集群endpoint[${service.address}, ${service.port}]已被占用!`) } }) // 注册集群服务 consul.agent.service.register(service).then(() => { logger.info(`${bizService.name}服务已注册`) resolve(services) }).catch(err => { console.log(err) }) }).catch(err => { throw new Error(err) }) }) } module.exports = class ServerRegister { constructor() { this.register = register } }
保证runtime中存在consul和mongodb服务后,clone该仓库Demo,cd到工程根目录下,运行node src便可。
server-register.js
/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 13:58:22 * @Description 微服务注册方法 */ const defaultConf = require('../config')['serverR&D'] const { ObjectDeepSet, isString } = require('../helper/utils') const Consul = require('consul') const { generateServiceName, generateCheckHttp } = require('../helper/consul') const logger = new (require('./logger'))().generateLogger() // 注册服务方法定义 function register({ consulServer = {}, bizService = {} } = {}) { if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!') if (bizService.port !== +bizService.port) throw new Error('port is invalid!') if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!') if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!') if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!') const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer)) const service = defaultConf.bizService service.name = generateServiceName(bizService.name) service.id = service.name service.address = bizService.host service.port = bizService.port service.check.http = generateCheckHttp(bizService.host, bizService.port) service.check.notes = JSON.stringify(bizService.meta) return new Promise((resolve, reject) => { consul.agent.service.list().then(services => { // 检查主机+端口是否已被占用 Object.keys(services).some(key => { if (services[key].Address === service.address && services[key].Port === service.port) { throw new Error(`该服务集群endpoint[${service.address}, ${service.port}]已被占用!`) } }) // 注册集群服务 consul.agent.service.register(service).then(() => { logger.info(`${bizService.name}服务注册成功`) resolve(services) }).catch(err => { console.log(err) }) }).catch(err => { throw new Error(err) }) }) } module.exports = class ServerRegister { constructor() { this.register = register } }
account-server/src/index.js
const vastify = require('vastify') const version = require('../package.json').version const microservicePort = 10015 const httpPort = 3333 // 注册服务 vastify.ServerRegister.register({ bizService: { name: 'account-server', host: '127.0.0.1', port: httpPort, meta: { $$version: version, $$microservicePort: microservicePort } } })
改造以前的user模块,偷个懒就不贴代码了,具体请查看Demo
microRouting.js
/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 16:22:02 * @Description 微服务内部路由中间件,暂不支持自定义路由匹配策略 */ 'use strict' const Consul = require('consul') const defaultConf = require('../config') const { ObjectDeepSet, isNumber } = require('../helper/utils') const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require('../helper/consul') const logger = new (require('../tools/logger'))().generateLogger() const { IPV4_REGEX } = require('../helper/regex') let services = {} let consul = null /** * @author Cecil0o0 * @description 同步consul服务中心的全部可用服务以及对应check并组装成对象以方便取值 */ function syncCheckList () { return new Promise((resolve, reject) => { consul.agent.service.list().then(allServices => { if (Object.keys(allServices).length > 0) { services = allServices consul.agent.check.list().then(checks => { Object.keys(checks).forEach(key => { allServices[getServiceIdByServiceKey(key)]['check'] = checks[key] }) resolve(services) }).catch(err => { throw new Error(err) }) } else { const errmsg = '未发现可用服务' logger.warn(errmsg) reject(errmsg) } }).catch(err => { throw new Error(err) }) }) } function syncRoutingRule(senecaInstance = {}, services = {}) { Object.keys(services).forEach(key => { let service = services[key] let name = getServiceNameByServiceKey(key) let $$addr = service.Address let $$microservicePort = '' let $$version = '' try { let base = JSON.parse(service.check.Notes) $$microservicePort = base.$$microservicePort $$version = base.$$version } catch (e) { logger.warn(`服务名为${serviceName}。该服务check.Notes为非标准JSON格式,程序已忽略。请检查服务注册方式(请确保调用ServerRegister的register来注册服务)`) } if (IPV4_REGEX.test($$addr) && isNumber($$microservicePort)) { if (service.check.Status === 'passing') { senecaInstance.client({ host: $$addr, port: $$microservicePort, pin: { $$version, $$target: name } }) } else { logger.warn(`${$$target}@${$$version || '无'}服务处于critical,所以没法使用`) } } else { logger.warn(`主机(${$$addr})或微服务端口号(${$$microservicePort})有误,请检查`) } }) } function startTimeInterval() { setInterval(syncCheckList, defaultConf.routing.servicesRefresh) } function microRouting(consulServer) { var self = this consul = Consul(ObjectDeepSet(defaultConf['serverR&D'].consulServer, consulServer)) syncCheckList().then(services => { syncRoutingRule(self, services) }) } module.exports = microRouting
在保证有consul与mongodb的runtime后,请结合这两个config-server,account-server Demo进行测试。
[未完待续....]