Event: 用于标识事件,用户将业务数据封装成事件存入到MemoryQueue中 EventListener: 事件回调接口,用于MemoryQueue接收到数据后的回调 事件在发送的时候,须要经过一个前缀来进行事件类型标识,这里有三种TaskPrefix、CommitTaskPrefix、ClearTaskPrefixbash
const (
// TaskPrefix 任务key前缀
TaskPrefix string = "task-"
// CommitTaskPrefix 提交任务key前缀
CommitTaskPrefix string = "commit-"
// ClearTaskPrefix 清除任务
ClearTaskPrefix string = "clear-"
)
// Event 事件类型
type Event struct {
Key string
Name string
Value interface{}
}
// EventListener 用于接收消息回调
type EventListener interface {
onEvent(event *Event)
}
复制代码
MemoryQueue内存消息队列,经过Push接口接收用户数据,经过AddListener来注册EventListener, 同时内部经过poll来从chan event取出数据分发给全部的Listener架构
// MemoryQueue 内存消息队列
type MemoryQueue struct {
done chan struct{}
queue chan Event
listeners []EventListener
wg sync.WaitGroup
}
// Push 添加数据
func (mq *MemoryQueue) Push(eventType, name string, value interface{}) {
mq.queue <- Event{Key: eventType + name, Name: name, Value: value}
mq.wg.Add(1)
}
// AddListener 添加监听器
func (mq *MemoryQueue) AddListener(listener EventListener) bool {
for _, item := range mq.listeners {
if item == listener {
return false
}
}
mq.listeners = append(mq.listeners, listener)
return true
}
// Notify 分发消息
func (mq *MemoryQueue) Notify(event *Event) {
defer mq.wg.Done()
for _, listener := range mq.listeners {
listener.onEvent(event)
}
}
func (mq *MemoryQueue) poll() {
for {
select {
case <-mq.done:
break
case event := <-mq.queue:
mq.Notify(&event)
}
}
}
// Start 启动内存队列
func (mq *MemoryQueue) Start() {
go mq.poll()
}
// Stop 中止内存队列
func (mq *MemoryQueue) Stop() {
mq.wg.Wait()
close(mq.done)
}
复制代码
Worker接收MemoryQueue里面的数据,而后在本地根据不一样类型来进行对应事件事件类型处理, 主要是经过事件的前缀来进行对应事件回调函数的选择app
// Worker 工做进程
type Worker struct {
name string
deferredTaskUpdates map[string][]Task
onCommit ConfigUpdateCallback
}
func (w *Worker) onEvent(event *Event) {
switch {
// 获取任务事件
case strings.Contains(event.Key, TaskPrefix):
w.onTaskEvent(event)
// 清除本地延迟队列里面的任务
case strings.Contains(event.Key, ClearTaskPrefix):
w.onTaskClear(event)
// 获取commit事件
case strings.Contains(event.Key, CommitTaskPrefix):
w.onTaskCommit(event)
}
}
复制代码
事件处理任务主要分为:onTaskClear(从本地清楚该数据)、onTaskEvent(数据存储本地延迟存储进行暂存)、onTaskCommit(事务提交)分布式
func (w *Worker) onTaskClear(event *Event) {
task, err := event.Value.(Task)
if !err {
// log
return
}
_, found := w.deferredTaskUpdates[task.Group]
if !found {
return
}
delete(w.deferredTaskUpdates, task.Group)
// 还能够继续中止本地已经启动的任务
}
// onTaskCommit 接收任务提交, 从延迟队列中取出数据而后进行业务逻辑处理
func (w *Worker) onTaskCommit(event *Event) {
// 获取以前本地接收的全部任务
tasks, found := w.deferredTaskUpdates[event.Name]
if !found {
return
}
// 获取配置
config := w.getTasksConfig(tasks)
if w.onCommit != nil {
w.onCommit(config)
}
delete(w.deferredTaskUpdates, event.Name)
}
// onTaskEvent 接收任务数据,此时须要丢到本地暂存不能进行应用
func (w *Worker) onTaskEvent(event *Event) {
task, err := event.Value.(Task)
if !err {
// log
return
}
// 保存任务到延迟更新map
configs, found := w.deferredTaskUpdates[task.Group]
if !found {
configs = make([]Task, 0)
}
configs = append(configs, task)
w.deferredTaskUpdates[task.Group] = configs
}
// getTasksConfig 获取task任务列表
func (w *Worker) getTasksConfig(tasks []Task) map[string]string {
config := make(map[string]string)
for _, t := range tasks {
config = t.updateConfig(config)
}
return config
}
复制代码
unc main() {
// 生成一个内存队列启动
queue := NewMemoryQueue(10)
queue.Start()
// 生成一个worker
name := "test"
worker := NewWorker(name, func(data map[string]string) {
for key, value := range data {
println("worker get task key: " + key + " value: " + value)
}
})
// 注册到队列中
queue.AddListener(worker)
taskName := "test"
// events 发送的任务事件
configs := []map[string]string{
map[string]string{"task1": "SendEmail", "params1": "Hello world"},
map[string]string{"task2": "SendMQ", "params2": "Hello world"},
}
// 分发任务
queue.Push(ClearTaskPrefix, taskName, nil)
for _, conf := range configs {
queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf})
}
queue.Push(CommitTaskPrefix, taskName, nil)
// 中止队列
queue.Stop()
}
复制代码
输出ide
# go run main.go
worker get task key: params1 value: Hello world
worker get task key: task1 value: SendEmail
worker get task key: params2 value: Hello world
worker get task key: task2 value: SendMQ
复制代码
在分布式环境中,不少时候并不须要使用CP模型,更多时候是知足最终一致性便可函数
基于2PC和延迟队列的这种设计,主要是依赖于事件驱动的架构ui
在kafka connect中, 每次节点变化都会触发一次任务的重分配,因此延迟存储直接用的就是内存中的HashMap, 由于即便分配消息的主节点挂了,那就再触发一次事件,直接将HashMap里面的数据清掉,进行下一次事务便可,并不须要保证延迟存储里面的数据不丢,spa
因此方案因环境、需求不一样,能够作一些取舍,不必什么东西都去加一个CP模型的中间件进来,固然其实那样更简单设计
未完待续!3d
更多文章能够访问www.sreguide.com/