简介:今天直接开门见山,先来介绍一下我今天所带来的东西。没错,看标题想必你们已经想到了 —— Leaf-segment数据库获取ID
方案。这个方案已经喜闻乐见了,美团早就进行了开源,不过他是由java
来实现的,因此最近为了学习这一方面知识,我用go
本身实现了一下,目前本身验证是没有发现什么bug
,等待你们的检验,发现bug
可及时反馈(提mr或加我vx均可)。代码已收录到个人我的仓库——[go-算法系列(go-algorithm)](https://github.com/asong2020/...。html
欢迎
Star
,感谢各位~~~。java注:下文
leaf-segment
数据库方案设计直接参考美团(摘取部分)。详细请参阅:https://tech.meituan.com/2017...mysql
CREATE TABLE `leaf_alloc` ( `id` int(11) NOT NULL AUTO_INCREMENT, `biz_tag` varchar(128) NOT NULL DEFAULT '', `max_id` bigint(20) NOT NULL DEFAULT '1', `step` int(11) NOT NULL, `description` varchar(256) DEFAULT NULL, `update_time` bigint(20) NOT NULL DEFAULT '0', PRIMARY KEY (`id`), UNIQUE KEY (`biz_tag`) ) ENGINE=InnoDB;
也能够直接使用我已经生成好的SQL
文件(已在工程项目中)。各个字段的介绍我会在后文代码实现部分进行解析,这里就不一一解析了。git
// 1 新建文件目录 $ mdkir asong.cloud $ cd asong.cloud // 2 获取项目 $ git clone git@github.com:asong2020/go-algorithm.git // 3 进入项目目录 $ cd go-go-algorithm/leaf // 4 运行 $ go run main.go //运行
URI: POST http://localhost:8080/api/leaf Param(json): { "biz_tag": "test_create_one", "max_id": 1, // 能够不传 默认为1 "step": 2000, // 能够不传 默认为200 "descprition": "test api one" }
curl --location --request POST 'http://localhost:8080/api/leaf' \ --header 'Content-Type: application/json' \ --data-raw '{ "biz_tag": "test_create_one", "descprition": "test api one" }'
URI: PUT http://localhost:8080/api/leaf/init/cache Param(json): { "biz_tag": "test_create" }
curl --location --request PUT 'http://localhost:8080/api/leaf/init/cache' \ --header 'Content-Type: application/json' \ --data-raw '{ "biz_tag": "test_create" }'
URI: GET http://localhost:8080/api/leaf Param: ?biz_tag=test_create
curl --location --request GET 'http://localhost:8080/api/leaf?biz_tag=test_create'
step
URI: PUT http://localhost:8080/api/leaf/step Param(json): { "step": 10000, "biz_tag": "test_create" }
curl --location --request PUT 'http://localhost:8080/api/leaf/step' \ --header 'Content-Type: application/json' \ --data-raw '{ "step": 10000, "biz_tag": "test_create" }'
在复杂分布式系统中,每每须要对大量的数据和消息进行惟一标识。一个可以生成全局惟一ID的系统是很是必要的。好比某宝,业务分布普遍,这么多业务对数据分库分表后须要有一个惟一ID来标识一条数据或消息,数据库的自增ID显然不能知足需求;因此,咱们能够总结一下业务系统对ID号的要求有哪些呢?github
上述123对应三类不一样的场景,3和4需求仍是互斥的,没法使用同一个方案知足。golang
本文只讲述场景3的方案,即leaf-segment
。场景4能够用雪花算法
实现,这个我以前实现过了,有兴趣的童鞋能够参考一下。传送门:https://github.com/asong2020/...面试
leaf-sement
是在使用数据库生成方案上作的改进。这里先抛砖引玉一下,看一下数据库生成方案是怎样实现的。算法
以MySQL举例,利用给字段设置auto_increment_increment
和auto_increment_offset
来保证ID自增,每次业务使用下列SQL读写MySQL获得ID号。sql
begin; REPLACE INTO Tickets64 (stub) VALUES ('a'); SELECT LAST_INSERT_ID(); commit;
这种方案的优缺点以下:shell
优势:
缺点:
对于MySQL性能问题,可用以下方案解决:在分布式系统中咱们能够多部署几台机器,每台机器设置不一样的初始值,且步长和机器数相等。好比有两台机器。设置步长step为2,TicketServer1的初始值为1(1,3,5,7,9,11…)、TicketServer2的初始值为2(2,4,6,8,10…)。这是Flickr团队在2010年撰文介绍的一种主键生成策略(Ticket Servers: Distributed Unique Primary Keys on the Cheap )。以下所示,为了实现上述方案分别设置两台机器对应的参数,TicketServer1从1开始发号,TicketServer2从2开始发号,两台机器每次发号以后都递增2。
TicketServer1: auto-increment-increment = 2 auto-increment-offset = 1 TicketServer2: auto-increment-increment = 2 auto-increment-offset = 2
假设咱们要部署N台机器,步长需设置为N,每台的初始值依次为0,1,2…N-1那么整个架构就变成了以下图所示:
这种架构貌似可以知足性能的需求,但有如下几个缺点:
Leaf-Segment
数据库方案Leaf-Segment
数据库方案是在上面的数据库生成方案上作的改进。
作了以下改变: - 原方案每次获取ID都得读写一次数据库,形成数据库压力大。改成利用proxy server批量获取,每次获取一个segment(step决定大小)号段的值。用完以后再去数据库获取新的号段,能够大大的减轻数据库的压力。 - 各个业务不一样的发号需求用biz_tag字段来区分,每一个biz-tag的ID获取相互隔离,互不影响。若是之后有性能需求须要对数据库扩容,不须要上述描述的复杂的扩容操做,只须要对biz_tag分库分表就行。
数据库表设计以下:
CREATE TABLE `leaf_alloc` ( `id` int(11) NOT NULL AUTO_INCREMENT, `biz_tag` varchar(128) NOT NULL DEFAULT '', `max_id` bigint(20) NOT NULL DEFAULT '1', `step` int(11) NOT NULL, `description` varchar(256) DEFAULT NULL, `update_time` bigint(20) NOT NULL DEFAULT '0', PRIMARY KEY (`id`), UNIQUE KEY (`biz_tag`) ) ENGINE=InnoDB;
这里我依旧使用了一个自增主键,不过没什么用,能够忽略。biz_tag用来区分业务(因此我把它设置成了惟一索引),max_id表示该biz_tag目前所被分配的ID号段的最大值,step表示每次分配的号段长度。原来获取ID每次都须要写数据库,如今只须要把step设置得足够大,好比1000。那么只有当1000个号被消耗完了以后才会去从新读写一次数据库。读写数据库的频率从1减少到了1/step,大体架构以下图所示:
test_tag在第一台Leaf机器上是1~1000的号段,当这个号段用完时,会去加载另外一个长度为step=1000的号段,假设另外两台号段都没有更新,这个时候第一台机器新加载的号段就应该是3001~4000。同时数据库对应的biz_tag这条数据的max_id会从3000被更新成4000,更新号段的SQL语句以下:
Begin UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx SELECT tag, max_id, step FROM table WHERE biz_tag=xxx Commit
这种模式有如下优缺点:
优势:
缺点:
对于第二个缺点,Leaf-segment作了一些优化,简单的说就是:
Leaf 取号段的时机是在号段消耗完的时候进行的,也就意味着号段临界点的ID下发时间取决于下一次从DB取回号段的时间,而且在这期间进来的请求也会由于DB号段没有取回来,致使线程阻塞。若是请求DB的网络和DB的性能稳定,这种状况对系统的影响是不大的,可是假如取DB的时候网络发生抖动,或者DB发生慢查询就会致使整个系统的响应时间变慢。
为此,咱们但愿DB取号段的过程可以作到无阻塞,不须要在DB取号段的时候阻塞请求线程,即当号段消费到某个点时就异步的把下一个号段加载到内存中。而不须要等到号段用尽的时候才去更新号段。这样作就能够很大程度上的下降系统的TP999指标。详细实现以下图所示:
采用双buffer的方式,Leaf服务内部有两个号段缓存区segment。当前号段已下发10%时,若是下一个号段未更新,则另启一个更新线程去更新下一个号段。当前号段所有下发完后,若是下个号段准备好了则切换到下个号段为当前segment接着下发,循环往复。
终于到本文的重点了,下面就给你们讲解一下我是怎么实现的。
这里先贴一下个人代码架构,具体以下:
leaf ├── common -- common包,放的是一些client初始的代码 ├── conf -- conf包,配置文件 ├── config -- config包,读取配置文件代码部分 ├── dao -- dao包,DB操做部分 ├── handler -- hanler包,路由注册即API代码实现部分 ├── images -- 本文的图片文件 ├── model -- model包,db模型或其余模型 ├── service -- service包,逻辑实现部分 ├── wire -- wire包,依赖绑定 ├── leaf_svr.go -- main运行先置条件 └── main.go -- main函数
在咱们实现以前,确定要分析一波,咱们要作什么,怎么作?我老大常常跟我说的一句话:"先把需求分析明白了,再动手,返工反而是浪费时间"。
map
来存,使用biz_tag
来做为key
,由于它具备惟一性,而且能很快的定位。因此咱们能够定义一个这样的结构体做为全局ID分发器:// 全局分配器 // key: biz_tag value: SegmentBuffer type LeafSeq struct { cache sync.Map }
这里考虑到并发操做,因此使用sync.map
,由于他是并发安全的。
ID
怎么存,接下来咱们就要思考,咱们存什么样的结构比较合适。这里我决定直接实现"双buffer优化"的方案。这里我准备本身定义struct
,而后用切片来模拟双buffer。因此能够设计以下结构:// 号段 type LeafSegment struct { Cursor uint64 // 当前发放位置 Max uint64 // 最大值 Min uint64 // 开始值即最小值 InitOk bool // 是否初始化成功 }
字段说明:首先要有一个字段来记录当前数据发放到什么位置了,Cursor
就是来作这个的。其次咱们还要把范围固定住,也是这个号段的开始和结束,也就是min
、max
字段的做用。最后咱们还要考虑一个问题,既然咱们使用的双buffer
,也就是说咱们并不能肯定下一段buffer
是否可用,因此加了一个initOK
字段来进行代表。
id
有序的发放出去,因此能够设计以下结构:type LeafAlloc struct { Key string // 也就是`biz_tag`用来区分业务 Step int32 // 记录步长 CurrentPos int32 // 当前使用的 segment buffer光标; 总共两个buffer缓存区,循环使用 Buffer []*LeafSegment // 双buffer 一个做为预缓存做用 UpdateTime time.Time // 记录更新时间 方便长时间不用进行清理,防止占用内存 mutex sync.Mutex // 互斥锁 IsPreload bool // 是否正在预加载 Waiting map[string][]chan byte // 挂起等待 }
字段介绍:key
也就是咱们的biz_tag
,能够用它快速从map
中定义数据。Step
记录当前号段的步长,由于步长是能够动态改变的,因此这里须要记录一下。currentPos
这个彻底是记录当前使用buffer
,由于是双buffer
,因此须要定位。buffer
这个不用介绍你们也应该知道,就是缓存池。Update_time
这个字段大多人可能想不到为何会有这个,咱们的号段如今都存到内存当中了,那么当咱们的业务变多了之后,那么内存就会越占越多,因此咱们须要记录他的更新时间,这样咱们可使用一个定时器按期去清除长时间不使用的号段,节省内存。IsPreload
这个字段是给预加载使用,当前缓冲区的号段使用了90%后,咱们就会去预加载下一段缓存池,为了防止屡次重复加载,因此使用该字段作标识。waiting
这里我使用的是map+chan
的结合,这里的做用就是当咱们当前缓存池的号段消耗的比较快或者预加载失败了,就会致使如今没有缓存池可用,因此咱们能够去等待一会,由于如今有可能正在作预加载,这样能够保持系统的高可用,若是超时仍未等到预加载成功则返回失败,下一次调用便可。
基本思想就是这样啦,下面咱们就分块看一下代码。
我这我的写代码,爱从DB
层开始,也就是把我须要的CRUD
操做都提早写好并测试,这里就不贴每段代码的实现了,要不代码量有点大,而且也没有必要,直接介绍一下有哪些方法就能够了。
func (l *LeafDB) Create(ctx context.Context, leaf *model.Leaf) error {} func (l *LeafDB) Get(ctx context.Context, bizTag string, tx *sql.Tx) (*model.Leaf, error) {} func (l *LeafDB) UpdateMaxID(ctx context.Context, bizTag string, tx *sql.Tx) error {} func (l *LeafDB) UpdateMaxIdByCustomStep(ctx context.Context, step int32, bizTag string, tx *sql.Tx) error {} func (l *LeafDB) GetAll(ctx context.Context) ([]*model.Leaf, error) {} func (l *LeafDB) UpdateStep(ctx context.Context, step int32, bizTag string) error {
leaf
方法。step
更新DB到下一个号段step
先贴出个人代码:
func (l *LeafDao) NextSegment(ctx context.Context, bizTag string) (*model.Leaf, error) { // 开启事务 tx, err := l.sql.Begin() defer func() { if err != nil { l.rollback(tx) } }() if err = l.checkError(err); err != nil { return nil, err } err = l.db.UpdateMaxID(ctx, bizTag, tx) if err = l.checkError(err); err != nil { return nil, err } leaf, err := l.db.Get(ctx, bizTag, tx) if err = l.checkError(err); err != nil { return nil, err } // 提交事务 err = tx.Commit() if err = l.checkError(err); err != nil { return nil, err } return leaf, nil } func (l *LeafDao) checkError(err error) error { if err == nil { return nil } if message, ok := err.(*mysql.MySQLError); ok { fmt.Printf("it's sql error; str:%v", message.Message) } return errors.New("db error") } func (l *LeafDao) rollback(tx *sql.Tx) { err := tx.Rollback() if err != sql.ErrTxDone && err != nil { fmt.Println("rollback error") } }
实现其实很简单,也就是先更新一下数据库中的号段,而后再取出来就能够了,这里为了保证数据的一致性和防止屡次更新DB致使号段丢失,因此使用了事务,没有什么特别的点,看一下代码就能懂了。
这里我把DB中的号段初始化到内存这一步和获取ID
合到一块儿来讲吧,由于在获取ID时会有兜底策略进行初始化。先看初始化部分代码:
// 第一次使用要初始化也就是把DB中的数据存到内存中,非必须操做,直接使用的话有兜底策略 func (l *LeafService) InitCache(ctx context.Context, bizTag string) (*model.LeafAlloc, error) { leaf, err := l.dao.NextSegment(ctx, bizTag) if err != nil { fmt.Printf("initCache failed; err:%v\n", err) return nil, err } alloc := model.NewLeafAlloc(leaf) alloc.Buffer = append(alloc.Buffer, model.NewLeafSegment(leaf)) _ = l.leafSeq.Add(alloc) return alloc, nil }
这里步骤主要分两步:
map
以后是咱们来看一下咱们是如何获取id
的,这里步骤比较多了,也是最核心的地方了。
先看主流程:
func (l *LeafService) GetID(ctx context.Context, bizTag string) (uint64, error) { // 先去内存中看一下是否已经初始了,未初始化则开启兜底策略初始化一下。 l.mutex.Lock() var err error seqs := l.leafSeq.Get(bizTag) if seqs == nil { // 不存在初始化一下 seqs, err = l.InitCache(ctx, bizTag) if err != nil { return 0, err } } l.mutex.Unlock() var id uint64 id, err = l.NextID(seqs) if err != nil { return 0, err } l.leafSeq.Update(bizTag, seqs) return id, nil }
主要分为三步:
id
这里最终要的就是第二步,获取id
,这里我先把代码贴出来,而后细细讲解一下:
func (l *LeafService) NextID(current *model.LeafAlloc) (uint64, error) { current.Lock() defer current.Unlock() var id uint64 currentBuffer := current.Buffer[current.CurrentPos] // 判断当前buffer是不是可用的 if current.HasSeq() { id = atomic.AddUint64(¤t.Buffer[current.CurrentPos].Cursor, 1) current.UpdateTime = time.Now() } // 当前号段已下发10%时,若是下一个号段未更新加载,则另启一个更新线程去更新下一个号段 if currentBuffer.Max-id < uint64(0.9*float32(current.Step)) && len(current.Buffer) <= 1 && !current.IsPreload { current.IsPreload = true cancel, _ := context.WithTimeout(context.Background(), 3*time.Second) go l.PreloadBuffer(cancel, current.Key, current) } // 第一个buffer的segment使用完成 切换到下一个buffer 并移除如今的buffer if id == currentBuffer.Max { // 判断第二个buffer是否准备好了(由于上面开启协程去更新下一个号段会出现失败),准备好了切换 currentPos 永远是0 无论怎么切换 if len(current.Buffer) > 1 && current.Buffer[current.CurrentPos+1].InitOk { current.Buffer = append(current.Buffer[:0], current.Buffer[1:]...) } // 若是没准备好,直接返回就行了,由于如今已经分配id了, 后面会进行补偿 } // 有id直接返回就能够了 if current.HasID(id) { return id, nil } // 当前buffer已经没有id可用了,此时补偿线程必定正在运行,咱们等待一会 waitChan := make(chan byte, 1) current.Waiting[current.Key] = append(current.Waiting[current.Key], waitChan) // 释放锁 等待让其余客户端进行走前面的步骤 current.Unlock() timer := time.NewTimer(500 * time.Millisecond) // 等待500ms最多 select { case <-waitChan: case <-timer.C: } current.Lock() // 第二个缓冲区仍未初始化好 if len(current.Buffer) <= 1 { return 0, errors.New("get id failed") } // 切换buffer current.Buffer = append(current.Buffer[:0], current.Buffer[1:]...) if current.HasSeq() { id = atomic.AddUint64(¤t.Buffer[current.CurrentPos].Cursor, 1) current.UpdateTime = time.Now() } return id, nil }
这里我以为用文字描述不清楚,因此我画了个图,不知道大家能不能看懂,能够对照着代码来看,这样是最清晰的,有问题欢迎留言讨论:
预加载的流程也被我画上去了,预加载的步骤主要有三个:
buffer
中,留着备用.代码实现以下:
func (l *LeafService) PreloadBuffer(ctx context.Context, bizTag string, current *model.LeafAlloc) error { for i := 0; i < MAXRETRY; i++ { leaf, err := l.dao.NextSegment(ctx, bizTag) if err != nil { fmt.Printf("preloadBuffer failed; bizTag:%s;err:%v", bizTag, err) continue } segment := model.NewLeafSegment(leaf) current.Buffer = append(current.Buffer, segment) // 追加 l.leafSeq.Update(bizTag, current) current.Wakeup() break } current.IsPreload = false return nil } func (l *LeafAlloc) Wakeup() { l.mutex.Lock() defer l.mutex.Unlock() for _, waitChan := range l.Waiting[l.Key] { close(waitChan) } l.Waiting[l.Key] = l.Waiting[l.Key][:0] }
到这里,全部核心代码都已经实现了,还差最后一步,就是清缓存,就像上面说到的,超长时间不用的号段咱们就要清除它,不能让他堆积形成内存浪费。这里我实现的是清理超过15min未使用的号段。实现也很简单,就是使用timer
作一个定时器,每隔15min就去遍历存储号段的`map,把超过15min未更新的号段清除掉(虽然会形成号段浪费,但也要这要作)。
// 清理超过15min没用过的内存 func (l *LeafSeq) clear() { for { now := time.Now() // 15分钟后 mm, _ := time.ParseDuration("15m") next := now.Add(mm) next = time.Date(next.Year(), next.Month(), next.Day(), next.Hour(), next.Minute(), 0, 0, next.Location()) t := time.NewTimer(next.Sub(now)) <-t.C fmt.Println("start clear goroutine") l.cache.Range(func(key, value interface{}) bool { alloc := value.(*LeafAlloc) if next.Sub(alloc.UpdateTime) > ExpiredTime { fmt.Printf("clear biz_tag: %s cache", key) l.cache.Delete(key) } return true }) } }
好啦,到这里就是接近尾声了,上面就是我实现的整个过程,目前本身测试没有什么问题,后期还会在缝缝补补,你们也能够帮我找找问题,欢迎提出大家宝贵的建议~~~。
代码已收录到个人我的仓库——[go-算法系列(go-algorithm)](https://github.com/asong2020/...。
欢迎Star
,感谢各位~~~。
好啦,这一篇文章到这就结束了,咱们下期见~~。但愿对大家有用,又不对的地方欢迎指出,可添加个人golang交流群,咱们一块儿学习交流。
结尾给你们发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,本身也收集了一本PDF,有须要的小伙能够到自行下载。获取方式:关注公众号:[Golang梦工厂],后台回复:[微服务],便可获取。
我翻译了一份GIN中文文档,会按期进行维护,有须要的小伙伴后台回复[gin]便可下载。
翻译了一份Machinery中文文档,会按期进行维护,有须要的小伙伴们后台回复[machinery]便可获取。
我是asong,一名普普统统的程序猿,让gi我一块儿慢慢变强吧。我本身建了一个golang
交流群,有须要的小伙伴加我vx
,我拉你入群。欢迎各位的关注,咱们下期见~~~
推荐往期文章: