PS:最近作了一个面试题解答的开源项目,你们能够看一看,若是对你们有帮助,但愿你们帮忙给一个star,谢谢你们了!html
《面试指北》项目地址:github.com/NotFound9/i…java
本文主要是对美团的分布式ID框架Leaf的原理进行介绍,针对Leaf原项目中的一些issue,对Leaf项目进行功能加强,问题修复及优化改进,改进后的项目地址在这里:node
Leaf项目改进计划 github.com/NotFound9/L…mysql
7849276-4d1955394baa3c6d.png git
1位的符号位+41位时间戳+10位workID+12位序列号github
加起来一共是64个二进制位,正好与Java中的long类型的位数同样。面试
美团的Leaf框架对于snowflake算法进行了一些位数调整,位数分配是这样:算法
最大41位时间差+10位的workID+12位序列化sql
虽然看美团对Leaf的介绍文章里面说数据库
Leaf-snowflake方案彻底沿用snowflake方案的bit位设计,便是“1+41+10+12”的方式组装ID号。
其实看代码里面是没有专门设置符号位的,若是timestamp过大,致使时间差占用42个二进制位,时间差的第一位为1时,可能生成的id转换为十进制后会是负数:
//timestampLeftShift是22,workerIdShift是12
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
复制代码
由于时间戳是以1970年01月01日00时00分00秒做为起始点,其实咱们通常取的时间戳实际上是起始点到如今的时间差,若是咱们能肯定咱们取的时间都是某个时间点之后的时间,那么能够将时间戳的起始点改为这个时间点,Leaf项目中,若是不设置起始时间,默认是2010年11月4日09:42:54,这样可使得支持的最大时间增加,Leaf框架的支持最大时间是起始点以后的69年。
Leaf使用Zookeeper做为注册中心,每次机器启动时去Zookeeper特定路径/forever/下读取子节点列表,每一个子节点存储了IP:Port及对应的workId,遍历子节点列表,若是存在当前IP:Port对应的workId,就使用节点信息中存储的workId,不存在就建立一个永久有序节点,将序号做为workId,而且将workId信息写入本地缓存文件workerID.properties,供启动时链接Zookeeper失败,读取使用。由于workId只分配了10个二进制位,因此取值范围是0-1023。
序列号是12个二进制位,取值范围是0到4095,主要保证同一个leaf服务在同一毫秒内,生成的ID的惟一性。 序列号是生成流程以下: 1.当前时间戳与上一个ID的时间戳在同一毫秒内,那么对sequence+1,若是sequence+1超过了4095,那么进行等待,等到下一毫秒到了以后再生成ID。 2.当前时间戳与上一个ID的时间戳不在同一毫秒内,取一个100之内的随机数做为序列号。
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 为0的时候表示是下一毫秒时间开始对seq作随机
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//若是是新的ms开始
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
复制代码
5e4ff128.png
大体流程就是每一个Leaf服务在内存中有两个Segment实例,每一个Segement保存一个分段的ID,
一个Segment是当前用于分配ID,有一个value属性保存这个分段已分配的最大ID,以及一个max属性这个分段最大的ID。
另一个Segement是备用的,当一个Segement用完时,会进行切换,使用另外一个Segement进行使用。
当一个Segement的分段ID使用率达到10%时,就会触发另外一个Segement去DB获取分段ID,初始化好分段ID供以后使用。
Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
}
SegmentBuffer {
private String key;
private Segment[] segments; //双buffer
private volatile int currentPos; //当前的使用的segment的index
private volatile boolean nextReady; //下一个segment是否处于可切换状态
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //线程是否在运行中
private final ReadWriteLock lock;
private volatile int step;
private volatile int minStep;
private volatile long updateTimestamp;
}
复制代码
目前Leaf项目存在的问题是
Snowflake生成ID相关:
而对于一些小公司或者项目组,其余业务没有使用到Zookeeper的话,为了部署Leaf服务而维护一个Zookeeper集群的代价太大。因此原项目中有issue在问”怎么支持非Zookeeper的注册中心“,因为通常项目中使用MySQL的几率会大不少,因此增长了使用MySQL做为注册中心,本地配置做为注册中心的功能。
因为启动前,服务器时间调到了之前的时间或者进行了回拨,链接Zookeeper失败时会使用本地缓存文件workerID.properties中的workerId,而没有校验该ID生成的最大时间戳,可能会形成ID重复,对这个问题进行了修复。
由于缺乏对时间差的校验,当时间差过大,转换为二进制数后超过41位后,在生成ID时会形成溢出,使得符号位为1,生成id为负数。
Segement生成ID相关:
没有太多问题,主要是根据一些issue对代码进行了性能优化。
Leaf项目原来的注册中心的模式(咱们暂时命令为zk_normal模式) 使用Zookeeper做为注册中心,每次机器启动时去Zookeeper特定路径下读取子节点列表,若是存在当前IP:Port对应的workId,就使用节点信息中存储的workId,不存在就建立一个永久有序节点,将序号做为workId,而且将workId信息写入本地缓存文件workerID.properties,供启动时链接Zookeeper失败,读取使用。
issue#84:workid是否支持回收?
SnowflakeService模式中,workid是否支持回收?分布式环境下,每次从新部署可能就换了一个ip,若是没有回收的话1024个机器标识很快就会消耗完,为何zk不用临时节点去存储呢,这样能动态感知服务上下线,对workid进行管理回收?
开发了zk_recycle模式,针对使用snowflake生成分布式ID的技术方案,本来是使用Zookeeper做为注册中心为每一个服务根据IP:Port分配一个固定的workId,workId生成范围为0到1023,workId不支持回收,因此在Leaf的原项目中有人提出了一个issue#84 workid是否支持回收?,由于当部署Leaf的服务的IP和Port不固定时,若是workId不支持回收,当workId超过最大值时,会致使生成的分布式ID的重复。因此增长了workId循环使用的模式zk_recycle。
在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置
//开启snowflake服务
leaf.snowflake.enable=true
//leaf服务的端口,用于生成workId
leaf.snowflake.port=
//将snowflake模式设置为zk_recycle,此时注册中心为Zookeeper,而且workerId可复用
leaf.snowflake.mode=zk_recycle
//zookeeper的地址
leaf.snowflake.zk.address=localhost:2181
复制代码
启动LeafServerApplication,调用/api/snowflake/get/test就能够得到此种模式下生成的分布式ID。
curl domain/api/snowflake/get/test
1256557484213448722
复制代码
按照上面的配置在leaf.properties里面进行配置后,
if(mode.equals(SnowflakeMode.ZK_RECYCLE)) {//注册中心为zk,对ip:port分配的workId是课循环利用的模式
String zkAddress = properties.getProperty(Constants.LEAF_SNOWFLAKE_ZK_ADDRESS);
RecyclableZookeeperHolder holder = new RecyclableZookeeperHolder(Utils.getIp(),port,zkAddress);
idGen = new SnowflakeIDGenImpl(holder);
if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else {
throw new InitException("Snowflake Service Init Fail");
}
}
复制代码
此时SnowflakeIDGenImpl使用的holder是RecyclableZookeeperHolder的实例,workId是可循环利用的,RecyclableZookeeperHolder工做流程以下: 1.首先会在未使用的workId池(zookeeper路径为/snowflake/leaf.name/recycle/notuse/)中生成全部workId。 2.而后每次服务器启动时都是去未使用的workId池取一个新的workId,而后放到正在使用的workId池(zookeeper路径为/snowflake/leaf.name/recycle/inuse/)下,将此workId用于Id生成,而且定时上报时间戳,更新zookeeper中的节点信息。 3.而且定时检测正在使用的workId池,发现某个workId超过最大时间没有更新时间戳的workId,会把它从正在使用的workId池移出,而后放到未使用的workId池中,以供workId循环使用。 4.而且正在使用这个很长时间没有更新时间戳的workId的服务器,在发现本身超过最大时间,尚未上报时间戳成功后,会中止id生成服务,以防workId被其余服务器循环使用,致使id重复。
issue#100:如何使用非zk的注册中心?
开发了mysql模式,这种模式注册中心为MySQL,针对每一个ip:port的workid是固定的。
须要先在数据库执行项目中的leaf_workerid_alloc.sql,完成建表,而后在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置
//开启snowflake服务
leaf.snowflake.enable=true
//leaf服务的端口,用于生成workId
leaf.snowflake.port=
//将snowflake模式设置为mysql,此时注册中心为Zookeeper,workerId为固定分配
leaf.snowflake.mode=mysql
//mysql数据库地址
leaf.jdbc.url=
leaf.jdbc.username=
leaf.jdbc.password=
复制代码
启动LeafServerApplication,调用/api/snowflake/get/test就能够得到此种模式下生成的分布式ID。
curl domain/api/snowflake/get/test
1256557484213448722
复制代码
使用上面的配置后,此时SnowflakeIDGenImpl使用的holder是SnowflakeMySQLHolder的实例。实现原理与Leaf原项目默认的模式,使用Zookeeper做为注册中心,每一个ip:port的workid是固定的实现原理相似,只是注册,获取workid,及更新时间戳是与MySQL进行交互,而不是Zookeeper。
if (mode.equals(SnowflakeMode.MYSQL)) {//注册中心为mysql
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
dataSource.init();
// Config Dao
WorkerIdAllocDao dao = new WorkerIdAllocDaoImpl(dataSource);
SnowflakeMySQLHolder holder = new SnowflakeMySQLHolder(Utils.getIp(), port, dao);
idGen = new SnowflakeIDGenImpl(holder);
if (idGen.init()) {
logger.info("Snowflake Service Init Successfully in mode " + mode);
} else {
throw new InitException("Snowflake Service Init Fail");
}
}
复制代码
issue#100:如何使用非zk的注册中心?
开发了local模式,这种模式就是适用于部署Leaf服务的IP和Port基本不会变化的状况,就是在Leaf项目中的配置文件leaf.properties中显式得配置某某IP:某某Port对应哪一个workId,每次部署新机器时,将IP:Port的时候在项目中添加这个配置,而后启动时项目会去读取leaf.properties中的配置,读取完写入本地缓存文件workId.json,下次启动时直接读取workId.json,最大时间戳也每次同步到机器上的缓存文件workId.json中。
在Leaf/leaf-server/src/main/resources/leaf.properties中添加如下配置
//开启snowflake服务
leaf.snowflake.enable=true
//leaf服务的端口,用于生成workId
leaf.snowflake.port=
#注册中心为local的的模式
#leaf.snowflake.mode=local
#leaf.snowflake.local.workIdMap=
#workIdMap的格式是这样的{"Leaf服务的ip:端口":"固定的workId"},例如:{"10.1.46.33:8080":1,"10.1.46.33:8081":2}
复制代码
启动LeafServerApplication,调用/api/snowflake/get/test就能够得到此种模式下生成的分布式ID。
curl domain/api/snowflake/get/test
1256557484213448722
复制代码
issue#84:由于当使用默认的模式(咱们暂时命令为zk_normal模式),注册中心为Zookeeper,workId不可复用,上面介绍了这种模式的工做流程,当Leaf服务启动时,链接Zookeeper失败,那么会去本机缓存中读取workerID.properties文件,读取workId进行使用,可是因为workerID.properties中只存了workId信息,没有存储上次上报的最大时间戳,因此没有进行时间戳判断,因此若是机器的当前时间被修改到以前,就可能会致使生成的ID重复。
因此增长了更新时间戳到本地缓存的机制,每次在上报时间戳时将时间戳同时写入本机缓存workerID.properties,而且当使用本地缓存workerID.properties中的workId时,对时间戳进行校验,当前系统时间戳<缓存中的时间戳时,才使用这个workerId。
//链接失败,使用本地workerID.properties中的workerID,而且对时间戳进行校验。
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
Long maxTimestamp = Long.valueOf(properties.getProperty("maxTimestamp"));
if (maxTimestamp!=null && System.currentTimeMillis() <maxTimestamp) {
throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
}
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
LOGGER.error("Read file error ", e1);
return false;
}
//定时任务每3s执行一次updateNewData()方法,调用更新updateLocalWorkerID()更新缓存文件workerID.properties
void updateNewData(CuratorFramework curator, String path) {
try {
if (System.currentTimeMillis() < lastUpdateTime) {
return;
}
curator.setData().forPath(path, buildData().getBytes());
updateLocalWorkerID(workerID);
lastUpdateTime = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.info("update init data error path is {} error is {}", path, e);
}
}
复制代码
由于Leaf框架是沿用snowflake的位数分配 最大41位时间差+10位的workID+12位序列化,可是因为snowflake是强制要求第一位为符号位0,不然生成的id转换为十进制后会是复试,可是Leaf项目中没有对时间差进行校验,当时间戳过大或者自定义的twepoch设置不当太小,会致使计算获得的时间差过大,转化为2进制后超过41位,且第一位为1,会致使生成的long类型的id为负数,例如当timestamp = twepoch+2199023255552L时, 此时在生成id时,timestamp - twepoch会等于2199023255552,2199023255552转换为二进制后是1+41个0,此时生成的id因为符号位是1,id会是负数-9223372036854775793
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
复制代码
//一开始将最大的maxTimeStamp计算好
this.maxTimeStamp = ~(-1L << timeStampBits) + twepoch;
//而后生成ID时进行校验
if (timestamp>maxTimeStamp) {
throw new OverMaxTimeStampException("current timestamp is over maxTimeStamp, the generate id will be negative");
}
复制代码
针对issue#68里面的优化方案,对Segement Buffer的缓存数据与DB数据同步的工做流程进行了进一步优化,主要是对 对SegmentIDGenImpl.updateCacheFromDb()方法进行了优化。
原方案工做流程: 1.遍历cacheTags,将dbTags的副本insertTagsSet中存在的元素移除,使得insertTagsSet只有db新增的tag 2.遍历insertTagsSet,将这些新增的元素添加到cache中 3.遍历dbTags,将cacheTags的副本removeTagsSet中存在的元素移除,使得removeTagsSet只有cache中过时的tag 4.遍历removeTagsSet,将过时的元素移除cache 这种方案须要经历四次循环,使用两个HashSet分别存储db中新增的tag,cache中过时的tag, 而且为了筛选出新增的tag,过时的tag,对每一个如今使用的tag有两次删除操做,
原有方案代码以下:
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌进cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//cache中已失效的tags从cache删除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
复制代码
实际上咱们并不须要这些中间过程,现方案工做流程: 只须要遍历dbTags,判断cache中是否存在这个key,不存在就是新增元素,进行新增。 遍历cacheTags,判断dbSet中是否存在这个key,不存在就是过时元素,进行删除。
现有方案代码:
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
//将dbTags中新加的tag添加cache,经过遍历dbTags,判断是否在cache中存在,不存在就添加到cache
for (String dbTag : dbTags) {
if (cache.containsKey(dbTag)==false) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(dbTag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(dbTag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", dbTag, buffer);
}
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> dbTagSet = new HashSet<>(dbTags);
//将cache中已失效的tag从cache删除,经过遍历cacheTags,判断是否在dbTagSet中存在,不存在说明过时,直接删除
for (String cacheTag : cacheTags) {
if (dbTagSet.contains(cacheTag) == false) {
cache.remove(cacheTag);
logger.info("Remove tag {} from IdCache", cacheTag);
}
}
复制代码
两个方案对比:
这个更新是针对这个issue#88 提出的问题,使用位运算&来代替取模运算%,执行效率更高。 原代码:
public int nextPos() {
return (currentPos + 1) % 2;
}
复制代码
现代码:
public int nextPos() {
return (currentPos + 1) & 1;
}
复制代码