这边就先不介绍消息队列的优劣,主要列了一下它的三种核心的场景。node
解耦react
异步git
削峰github
点对点: Work Queueweb
发布-订阅:Publish/Subscribe算法
目前咱们项目应用到的场景:typescript
目前咱们使用RabbitMq, 主要使用点对点的消费模式。session
削峰 , 异步:架构
咱们这些场景若是用 Kafka
该如何实现?app
官网的描述是这几句:
Apache Kafka® is a distributed streaming platform**. What exactly does that mean?**
A streaming platform has three key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Kafka 是一个流处理平台
一个流处理平台有三个关键的特色:
Kafka is generally used for two broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data
Kafka 主要应用在两个类应用中
Producer: 生产者,发送信息的服务端
Consumer:消费者,订阅消息的客户端
Broker:消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker能够组成一个Kafka集群
Topic: 主题,能够理解成队列
ConsumerGroup:消费者组,一个 ConsumerGoup 里面包括多个 Consumer,每一个 ConsumerGoup 里面只有一个 Consumer 能够消费一个 Topic。基于这个特性,每一个 ConsumerGoup 里面只存一个 Consumer 能够实现广播;全部 Consumer 都存在于同一个 ConsumerGoup 内则能够实现单播。
Partition:基于 Kafka 的拓展性,有可能一个很大的 Topic 会存在于不一样的 Broker 里面。这时一个 Topic 里面就会存在多个 Partition,Partition 是一个有序的队列,Partition 上每一个消息会有一个顺序的 id —— Offset。可是,值得注意的是,Kafka 会保证 Partition 的顺序性,而没有保证 Topic 的顺序性。
Offset:Kafka 的存储文件都是offset顺序存储的,以 offset.kafka 来命名。例如第一个就是 0000.kafka, 第 n 个文件就是 n-1.kafka
Zookeerper:管理多个 Kafka 节点,具备管理集群配置的功能
单个消费者的实现,应用场景是只有一个消费者节点 须要消费该消息。
图例:
Producer:
// Producer.ts
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const producer = new kafka.HighLevelProducer(client)
producer.on('ready', function () {
console.log('Kafka Producer is connected and ready.')
})
// For this demo we just log producer errors to the console.
producer.on('error', function (error) {
console.error(error)
})
const sendRecord = (objData, cb) => {
const buffer = Buffer.from(JSON.stringify(objData))
// Create a new payload
const record = [
{
topic: 'webevents.dev',
messages: buffer,
attributes: 1 /* Use GZip compression for the payload */
}
]
// Send record to Kafka and log result/error
producer.send(record, cb)
}
let times = 0
setInterval(() => {
sendRecord({
msg: `this is message ${++times}!`
}, (err, data) => {
if (err) {
console.log(`err: ${err}`)
}
console.log(`data: ${JSON.stringify(data)}`)
})
}, 1000)
复制代码
Consumer代码:
// Consumer.ts
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const topics = [
{
topic: 'webevents.dev'
}
]
const options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024
// encoding: 'buffer'
}
// { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };
const consumer = new kafka.Consumer(client, topics, options)
consumer.on('message', function (message) {
// Read string into a buffer.
console.info(`[message]:==:>${JSON.stringify(message)}`)
const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = JSON.parse(buf.toString())
console.log('decodedMessage: ', decodedMessage)
})
consumer.on('error', function (err) {
console.log('error', err)
})
process.on('SIGINT', function () {
consumer.close(true, function () {
process.exit()
})
})
复制代码
当个人服务是多节点,如何保证同一个消息只被其中一个节点消费呢。 这个时候就须要把每一个节点当作同一个 ConsumerGroup里的不一样 Consumer。
图例:
Producer 同上
Consumer:
// Consumer1
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const offset = new kafka.Offset(client)
import * as bluebird from 'bluebird'
const consumerGoupOptions = {
kafkaHost: 'localhost:9092',
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
} as any
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions), ['test'])
export default consumer
// 处理消息
consumer.on('message', async function (message) {
console.info('i am consumer1!')
// Read string into a buffer.
console.info(`[message]:==:>${JSON.stringify(message)}`)
// const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = message // JSON.parse(buf.toString())
await bluebird.delay(1000)
console.log('decodedMessage: ', decodedMessage)
})
// 消息处理错误
consumer.on('error', function (err) {
console.log('error', err)
})
consumer.on('offsetOutOfRange', function (topic) {
console.info(`[offsetOutOfRange]:==:>${topic}`)
topic.maxNum = 2
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err)
}
let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
consumer.setOffset(topic.topic, topic.partition, min)
})
})
process.on('SIGINT', function () {
consumer.close(true, function () {
console.log('consumer colse!')
process.exit()
})
})
复制代码
// Consumer2
import * as kafka from 'kafka-node'
const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'})
const offset = new kafka.Offset(client)
import * as bluebird from 'bluebird'
const consumerGoupOptions = {
kafkaHost: 'localhost:9092',
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
} as any
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions), ['test'])
export default consumer
// 处理消息
consumer.on('message', async function (message) {
console.info('i am consumer2!')
// Read string into a buffer.
console.info(`[message]:==:>${JSON.stringify(message)}`)
// const buf = new Buffer(String(message.value), 'binary')
const decodedMessage = message // JSON.parse(buf.toString())
await bluebird.delay(1000)
console.log('decodedMessage: ', decodedMessage)
})
// 消息处理错误
consumer.on('error', function (err) {
console.log('error', err)
})
consumer.on('offsetOutOfRange', function (topic) {
console.info(`[offsetOutOfRange]:==:>${topic}`)
topic.maxNum = 2
offset.fetch([topic], function (err, offsets) {
if (err) {
return console.error(err)
}
let min = Math.min.apply(null, offsets[topic.topic][topic.partition])
consumer.setOffset(topic.topic, topic.partition, min)
})
})
process.on('SIGINT', function () {
consumer.close(true, function () {
console.log('consumer colse!')
process.exit()
})
})
复制代码
执行以后,发现了一个问题:同一个 ConsumerGroup 的不一样 Consumer 没有均匀消费数据, 会出现一段时间,只有一个 Consumer 消费, 而另外一个 Conumser 不消费的状况。
为何呢?
这里就须要知道消费端的均衡算法
算法以下:
1.A=(partition数量/同分组消费者总个数) 2.M=对上面所获得的A值小数点第一位向上取整 3.计算出该消费者拉取数据的patition合集:
Ci = [P(M*i ),P((i + 1) * M -1)]
Partition 数量为 1 , 由于只有一个 broker
同分组消费者总个数: 2
A = 1 / 2
M = roundUp (A) = 1
C0 = [P(0), P(0
]`
C1 = [P(1), P(1)]
因此,若是不是 C0 消费者不可用, C1 一直都不会去消费 Partition0 里面的消息
结论是,若是非多 Kafka 节点的话, 单纯增长同一消费组里的消费者, 并不能作到均衡消费数据的状况。
有其余方法能够实现吗?
有的, 咱们能够从 Producer 里面入手,分发消息时固定 Topic 对应 固定的消费者节点。
Producer:
// Producer
// ...
const sendRecord = (objData, cb) => {
const partition = Date.now() % 2 === 0 ? 0 : 1
const buffer = Buffer.from(JSON.stringify(objData) + '_' + partition)
// Create a new payload
const record = [
{
topic: `test${partition}`, // 这里用了随机方法分配 topic
messages: buffer,
attributes: 1, /* Use GZip compression for the payload */
key: `key_${partition}`
}
]
// Send record to Kafka and log result/error
console.info(`[record]:==:>${JSON.stringify(record)}`)
producer.send(record, cb)
}
// ...
复制代码
Consumer:
// Consumer1
// ...
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions), ['test0', 'test1']) // 这里须要优先输入 须要消费的 topic, 次要消费的 topic 也要写上,以防另外一节点重启时, 消息没及时消费
// ...
复制代码
// Consumer2
// ...
const consumer = new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions), ['test1', 'test2']) // 这里须要优先输入 须要消费的 topic, 次要消费的 topic 也要写上,以防另外一节点重启时, 消息没及时消费
// ...
复制代码
Kafka
设计上:队列消息不删除,不一样 ConsumerGroup均可以publish-subscribe,同一 ConsumerGroup 里面只有一个 Consumer 能消费同一个 Topic
延迟消费:不支持: Consumer 开启后, 会自动获取 Producer 生产对应 Topic 的消息, 若想 Consumer 暂时不消费消息, 须要中断 Consumer 的服务
负载均衡:从集群上看, 即便其中一个 Broker 挂了,其余 Broker上的 partition 都会存在副本集,kafka 仍然能够正常运行。从 ConsumerGroup 上看,即便其中的Consumer 挂了, 同一 ConsumerGroup 的其余 Consumer 仍然能够消费其Topic 的消息,而不须要担忧服务中断。
实际上:Kafka 作点对点队列,有点浪费。只用一个 ConsumerGroup,并无发挥 Kafka 的优点。可是 Kafka 这种很方便就能拓展成发布-订阅模式,消费端创建另一个 ConsumerGroup,就能够为另外一个服务启用。