在消息队列使用场景中,有时须要同时下发多条消息,但如今的消息队列好比kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务bash
2PC俗称两阶段提交,经过将一个操做分为两个阶段:准备阶段和提交阶段来尽量保证操做的原子执行(实际上不可能,你们有个概念先)架构
延迟更新实际上是一个很经常使用的技术手段,简单来讲,当某个操做条件不知足时,经过必定手段将数据暂存,等条件知足时在进行执行app
实现也蛮简单的, 在原来的业务消息以后再添加一条事务消息(事务消息能够经过相似惟一ID来关联到以前提交的消息), worker未消费到事物提交的消息,就会一直将消息放在本地延迟存储中,只有当接收到事物提交消息,才会进行业务逻辑处理分布式
MemoryQuue: 用于模拟消息队列,接收事件分发事件 Worker: 模拟具体业务服务,接收消息,存入本地延迟更新存储,或者提交事务触发业务回调ide
Event: 用于标识事件,用户将业务数据封装成事件存入到MemoryQueue中 EventListener: 事件回调接口,用于MemoryQueue接收到数据后的回调 事件在发送的时候,须要经过一个前缀来进行事件类型标识,这里有三种TaskPrefix、CommitTaskPrefix、ClearTaskPrefix函数
const ( // TaskPrefix 任务key前缀 TaskPrefix string = "task-" // CommitTaskPrefix 提交任务key前缀 CommitTaskPrefix string = "commit-" // ClearTaskPrefix 清除任务 ClearTaskPrefix string = "clear-" ) // Event 事件类型 type Event struct { Key string Name string Value interface{} } // EventListener 用于接收消息回调 type EventListener interface { onEvent(event *Event) }
MemoryQueue内存消息队列,经过Push接口接收用户数据,经过AddListener来注册EventListener, 同时内部经过poll来从chan event取出数据分发给全部的Listenerui
// MemoryQueue 内存消息队列 type MemoryQueue struct { done chan struct{} queue chan Event listeners []EventListener wg sync.WaitGroup } // Push 添加数据 func (mq *MemoryQueue) Push(eventType, name string, value interface{}) { mq.queue <- Event{Key: eventType + name, Name: name, Value: value} mq.wg.Add(1) } // AddListener 添加监听器 func (mq *MemoryQueue) AddListener(listener EventListener) bool { for _, item := range mq.listeners { if item == listener { return false } } mq.listeners = append(mq.listeners, listener) return true } // Notify 分发消息 func (mq *MemoryQueue) Notify(event *Event) { defer mq.wg.Done() for _, listener := range mq.listeners { listener.onEvent(event) } } func (mq *MemoryQueue) poll() { for { select { case <-mq.done: break case event := <-mq.queue: mq.Notify(&event) } } } // Start 启动内存队列 func (mq *MemoryQueue) Start() { go mq.poll() } // Stop 中止内存队列 func (mq *MemoryQueue) Stop() { mq.wg.Wait() close(mq.done) }
Worker接收MemoryQueue里面的数据,而后在本地根据不一样类型来进行对应事件事件类型处理, 主要是经过事件的前缀来进行对应事件回调函数的选择设计
// Worker 工做进程 type Worker struct { name string deferredTaskUpdates map[string][]Task onCommit ConfigUpdateCallback } func (w *Worker) onEvent(event *Event) { switch { // 获取任务事件 case strings.Contains(event.Key, TaskPrefix): w.onTaskEvent(event) // 清除本地延迟队列里面的任务 case strings.Contains(event.Key, ClearTaskPrefix): w.onTaskClear(event) // 获取commit事件 case strings.Contains(event.Key, CommitTaskPrefix): w.onTaskCommit(event) } }
事件处理任务主要分为:onTaskClear(从本地清楚该数据)、onTaskEvent(数据存储本地延迟存储进行暂存)、onTaskCommit(事务提交)3d
func (w *Worker) onTaskClear(event *Event) { task, err := event.Value.(Task) if !err { // log return } _, found := w.deferredTaskUpdates[task.Group] if !found { return } delete(w.deferredTaskUpdates, task.Group) // 还能够继续中止本地已经启动的任务 } // onTaskCommit 接收任务提交, 从延迟队列中取出数据而后进行业务逻辑处理 func (w *Worker) onTaskCommit(event *Event) { // 获取以前本地接收的全部任务 tasks, found := w.deferredTaskUpdates[event.Name] if !found { return } // 获取配置 config := w.getTasksConfig(tasks) if w.onCommit != nil { w.onCommit(config) } delete(w.deferredTaskUpdates, event.Name) } // onTaskEvent 接收任务数据,此时须要丢到本地暂存不能进行应用 func (w *Worker) onTaskEvent(event *Event) { task, err := event.Value.(Task) if !err { // log return } // 保存任务到延迟更新map configs, found := w.deferredTaskUpdates[task.Group] if !found { configs = make([]Task, 0) } configs = append(configs, task) w.deferredTaskUpdates[task.Group] = configs } // getTasksConfig 获取task任务列表 func (w *Worker) getTasksConfig(tasks []Task) map[string]string { config := make(map[string]string) for _, t := range tasks { config = t.updateConfig(config) } return config }
unc main() { // 生成一个内存队列启动 queue := NewMemoryQueue(10) queue.Start() // 生成一个worker name := "test" worker := NewWorker(name, func(data map[string]string) { for key, value := range data { println("worker get task key: " + key + " value: " + value) } }) // 注册到队列中 queue.AddListener(worker) taskName := "test" // events 发送的任务事件 configs := []map[string]string{ map[string]string{"task1": "SendEmail", "params1": "Hello world"}, map[string]string{"task2": "SendMQ", "params2": "Hello world"}, } // 分发任务 queue.Push(ClearTaskPrefix, taskName, nil) for _, conf := range configs { queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf}) } queue.Push(CommitTaskPrefix, taskName, nil) // 中止队列 queue.Stop() }
输出code
# go run main.go worker get task key: params1 value: Hello world worker get task key: task1 value: SendEmail worker get task key: params2 value: Hello world worker get task key: task2 value: SendMQ
在分布式环境中,不少时候并不须要使用CP模型,更多时候是知足最终一致性便可
基于2PC和延迟队列的这种设计,主要是依赖于事件驱动的架构
在kafka connect中, 每次节点变化都会触发一次任务的重分配,因此延迟存储直接用的就是内存中的HashMap, 由于即便分配消息的主节点挂了,那就再触发一次事件,直接将HashMap里面的数据清掉,进行下一次事务便可,并不须要保证延迟存储里面的数据不丢,
因此方案因环境、需求不一样,能够作一些取舍,不必什么东西都去加一个CP模型的中间件进来,固然其实那样更简单
未完待续!
更多文章能够访问http://www.sreguide.com/