1、分布式系统带来ID生成挑战java
在复杂的系统中,每每须要对大量的数据如订单,帐户进行标识,以一个有意义的有序的序列号来做为全局惟一的ID;node
而分布式系统中咱们对ID生成器要求又有哪些呢?redis
全局惟一性:不能出现重复的ID号,既然是惟一标识,这是最基本的要求。算法
递增:比较低要求的条件为趋势递增,即保证下一个ID必定大于上一个ID,而比较苛刻的要求是连续递增,如1,2,3等等。数据库
高可用高性能:ID生成事关重大,一旦挂掉系统崩溃;高性能是指必需要在压测下表现良好,若是达不到要求则在高并发环境下依然会致使系统瘫痪。apache
优势:缓存
可以保证独立性,程序能够在不一样的数据库间迁移,效果不受影响。安全
保证生成的ID不只是表独立的,并且是库独立的,这点在你想切分数据库的时候尤其重要。服务器
缺点:网络
1. 性能为题:UUID太长,一般以36长度的字符串表示,对MySQL索引不利:若是做为数据库主键,在InnoDB引擎下,UUID的无序性可能会引发数据位置频繁变更,严重影响性能
2. UUID无业务含义:不少须要ID能标识业务含义的地方不使用
3.不知足递增要求
snowflake是twitter开源的分布式ID生成系统。 Twitter每秒有数十万条消息的请求,每条消息都必须分配一条惟一的id,这些id还须要一些大体的顺序(方便客户端排序),而且在分布式系统中不一样机器产生的id必须不一样。
snowflake的结构以下(每部分用-分开):
0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 – 000000000000
第一位为未使用,接下来的41位为毫秒级时间(41位的长度可使用69年),而后是5位datacenterId和5位workerId(10位的长度最多支持部署1024个节点) ,最后12位是毫秒内的计数(12位的计数顺序号支持每一个节点每毫秒产生4096个ID序号)
一共加起来恰好64位,为一个Long型。(转换成字符串长度为18)
snowflake生成的ID总体上按照时间自增排序,而且整个分布式系统内不会产生ID碰撞(由datacenter和workerId做区分),而且效率较高。snowflake的缺点是:
snowflake如今有较好的改良方案,好比美团点评开源的分布式ID框架:leaf,经过使用ZooKeeper解决了时钟依赖问题。
snowflake的关键源码以下:
/** * Twitter_Snowflake<br> * SnowFlake的结构以下(每部分用-分开):<br> * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br> * 1位标识,因为long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,因此id通常是正数,最高位是0<br> * 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截) * 获得的值),这里的的开始时间截,通常是咱们的id生成器开始使用的时间,由咱们程序来指定的(以下下面程序IdWorker类的startTime属性)。41位的时间截,可使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br> * 10位的数据机器位,能够部署在1024个节点,包括5位datacenterId和5位workerId<br> * 12位序列,毫秒内的计数,12位的计数顺序号支持每一个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br> * 加起来恰好64位,为一个Long型。<br> * SnowFlake的优势是,总体上按照时间自增排序,而且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID做区分),而且效率较高,经测试,SnowFlake每秒可以产生26万ID左右。 */ public class SnowflakeIdWorker { // ==============================Fields=========================================== /** 开始时间截 (2015-01-01) */ private final long twepoch = 1420041600000L; /** 机器id所占的位数 */ private final long workerIdBits = 5L; /** 数据标识id所占的位数 */ private final long datacenterIdBits = 5L; /** 支持的最大机器id,结果是31 (这个移位算法能够很快的计算出几位二进制数所能表示的最大十进制数) */ private final long maxWorkerId = -1L ^ (-1L << workerIdBits); /** 支持的最大数据标识id,结果是31 */ private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); /** 序列在id中占的位数 */ private final long sequenceBits = 12L; /** 机器ID向左移12位 */ private final long workerIdShift = sequenceBits; /** 数据标识id向左移17位(12+5) */ private final long datacenterIdShift = sequenceBits + workerIdBits; /** 时间截向左移22位(5+5+12) */ private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */ private final long sequenceMask = -1L ^ (-1L << sequenceBits); /** 工做机器ID(0~31) */ private long workerId; /** 数据中心ID(0~31) */ private long datacenterId; /** 毫秒内序列(0~4095) */ private long sequence = 0L; /** 上次生成ID的时间截 */ private long lastTimestamp = -1L; //==============================Constructors===================================== /** * 构造函数 * @param workerId 工做ID (0~31) * @param datacenterId 数据中心ID (0~31) */ public SnowflakeIdWorker(long workerId, long datacenterId) { if (workerId > maxWorkerId || workerId < 0) { throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } if (datacenterId > maxDatacenterId || datacenterId < 0) { throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); } this.workerId = workerId; this.datacenterId = datacenterId; } // ==============================Methods========================================== /** * 得到下一个ID (该方法是线程安全的) * @return SnowflakeId */ public synchronized long nextId() { long timestamp = timeGen(); //若是当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 if (timestamp < lastTimestamp) { throw new RuntimeException( String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); } //若是是同一时间生成的,则进行毫秒内序列 if (lastTimestamp == timestamp) { sequence = (sequence + 1) & sequenceMask; //毫秒内序列溢出 if (sequence == 0) { //阻塞到下一个毫秒,得到新的时间戳 timestamp = tilNextMillis(lastTimestamp); } } //时间戳改变,毫秒内序列重置 else { sequence = 0L; } //上次生成ID的时间截 lastTimestamp = timestamp; //移位并经过或运算拼到一块儿组成64位的ID return ((timestamp - twepoch) << timestampLeftShift) // | (datacenterId << datacenterIdShift) // | (workerId << workerIdShift) // | sequence; } /** * 阻塞到下一个毫秒,直到得到新的时间戳 * @param lastTimestamp 上次生成ID的时间截 * @return 当前时间戳 */ protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } /** * 返回以毫秒为单位的当前时间 * @return 当前时间(毫秒) */ protected long timeGen() { return System.currentTimeMillis(); } //==============================Test============================================= /** 测试 */ public static void main(String[] args) throws InterruptedException { SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0); for (int i = 0; i < 100; i++) { long id = idWorker.nextId(); //System.out.println(Long.toBinaryString(id)); Thread.sleep(1); System.out.println(id); } } }
利用数据库生成ID是最多见的方案。可以确保ID全数据库惟一。其优缺点以下:
优势:
很是简单,利用现有数据库系统的功能实现,成本小,有DBA专业维护。
ID号单调自增,能够实现一些对ID有特殊要求的业务。
缺点:
不一样数据库语法和实现不一样,数据库迁移的时候或多数据库版本支持的时候须要处理。
分表分库的时候会有麻烦。
经过Redis生成ID(主要经过redis的自增函数)、ZooKeeper生成ID、MongoDB的ObjectID等都可实现惟一性的要求
实际业务中,除了分布式ID全局惟一以外,还有是否趋势/连续递增的要求。根据具体业务需求的不一样,有两种可选方案。
一是只保证全局惟一,不保证连续递增。二是既保证全局惟一,又保证连续递增。
2. 基于ZooKeeper和本地缓存的方案
基于zookeeper分布式ID实现方案有不少种,本方案只使用ZooKeeper做为分段节点协调工具。每台服务器首先从zookeeper缓存一段,如1-1000的id,
此时zk上保存最大值1000,每次获取的时候都会进行判断,若是id<=1000,则更新本地的当前值,若是为1001,则会将zk上的最大值更新至2000,本地缓存
段更新为1001-2000,更新的时候使用curator的分布式锁来实现。
因为ID是从本机获取,所以本方案的优势是性能很是好。缺点是若是多主机负载均衡,则会出现不连续的id,固然将递增区段设置为1也能保证连续的id,
可是效率会受到很大影响。实现关键源码以下:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 根据开源项目mycat实现基于zookeeper的递增序列号 * <p> * 只要配置好ZK地址和表名的以下属性 * MINID 某线程当前区间内最小值 * MAXID 某线程当前区间内最大值 * CURID 某线程当前区间内当前值 * * @author wangwanbin * @version 1.0 * @time 2017/9/1 */ public class ZKCachedSequenceHandler extends SequenceHandler { protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class); private static final String KEY_MIN_NAME = ".MINID";// 1 private static final String KEY_MAX_NAME = ".MAXID";// 10000 private static final String KEY_CUR_NAME = ".CURID";// 888 private final static long PERIOD = 1000;//每次缓存的ID段数量 private static ZKCachedSequenceHandler instance = new ZKCachedSequenceHandler(); /** * 私有化构造方法,单例模式 */ private ZKCachedSequenceHandler() { } /** * 获取sequence工具对象的惟一方法 * * @return */ public static ZKCachedSequenceHandler getInstance() { return instance; } private Map<String, Map<String, String>> tableParaValMap = null; private CuratorFramework client; private InterProcessSemaphoreMutex interProcessSemaphore = null; public void loadZK() { try { this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3)); this.client.start(); } catch (Exception e) { LOGGER.error("Error caught while initializing ZK:" + e.getCause()); } } public Map<String, String> getParaValMap(String prefixName) { if (tableParaValMap == null) { try { loadZK(); fetchNextPeriod(prefixName); } catch (Exception e) { LOGGER.error("Error caught while loding configuration within current thread:" + e.getCause()); } } Map<String, String> paraValMap = tableParaValMap.get(prefixName); return paraValMap; } public Boolean fetchNextPeriod(String prefixName) { try { Stat stat = this.client.checkExists().forPath(PATH + "/" + prefixName + SEQ); if (stat == null || (stat.getDataLength() == 0)) { try { client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath(PATH + "/" + prefixName + SEQ, String.valueOf(0).getBytes()); } catch (Exception e) { LOGGER.debug("Node exists! Maybe other instance is initializing!"); } } if (interProcessSemaphore == null) { interProcessSemaphore = new InterProcessSemaphoreMutex(client, PATH + "/" + prefixName + SEQ); } interProcessSemaphore.acquire(); if (tableParaValMap == null) { tableParaValMap = new ConcurrentHashMap<>(); } Map<String, String> paraValMap = tableParaValMap.get(prefixName); if (paraValMap == null) { paraValMap = new ConcurrentHashMap<>(); tableParaValMap.put(prefixName, paraValMap); } long now = Long.parseLong(new String(client.getData().forPath(PATH + "/" + prefixName + SEQ))); client.setData().forPath(PATH + "/" + prefixName + SEQ, ((now + PERIOD) + "").getBytes()); if (now == 1) { paraValMap.put(prefixName + KEY_MAX_NAME, PERIOD + ""); paraValMap.put(prefixName + KEY_MIN_NAME, "1"); paraValMap.put(prefixName + KEY_CUR_NAME, "0"); } else { paraValMap.put(prefixName + KEY_MAX_NAME, (now + PERIOD) + ""); paraValMap.put(prefixName + KEY_MIN_NAME, (now) + ""); paraValMap.put(prefixName + KEY_CUR_NAME, (now) + ""); } } catch (Exception e) { LOGGER.error("Error caught while updating period from ZK:" + e.getCause()); } finally { try { interProcessSemaphore.release(); } catch (Exception e) { LOGGER.error("Error caught while realeasing distributed lock" + e.getCause()); } } return true; } public Boolean updateCURIDVal(String prefixName, Long val) { Map<String, String> paraValMap = tableParaValMap.get(prefixName); if (paraValMap == null) { throw new IllegalStateException("ZKCachedSequenceHandler should be loaded first!"); } paraValMap.put(prefixName + KEY_CUR_NAME, val + ""); return true; } /** * 获取自增ID * * @param sequenceEnum * @return */ @Override public synchronized long nextId(SequenceEnum sequenceEnum) { String prefixName = sequenceEnum.getCode(); Map<String, String> paraMap = this.getParaValMap(prefixName); if (null == paraMap) { throw new RuntimeException("fetch Param Values error."); } Long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1; Long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME)); if (nextId > maxId) { fetchNextPeriod(prefixName); return nextId(sequenceEnum); } updateCURIDVal(prefixName, nextId); return nextId.longValue(); } public static void main(String[] args) throws UnsupportedEncodingException { long startTime = System.currentTimeMillis(); //获取开始时间 final ZKCachedSequenceHandler sequenceHandler = getInstance(); sequenceHandler.loadZK(); new Thread() { public void run() { long startTime2 = System.currentTimeMillis(); //获取开始时间 for (int i = 0; i < 5000; i++) { System.out.println("线程1 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT)); } long endTime2 = System.currentTimeMillis(); //获取结束时间 System.out.println("程序运行时间1: " + (endTime2 - startTime2) + "ms"); } }.start(); for (int i = 0; i < 5000; i++) { System.out.println("线程2 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT)); } long endTime = System.currentTimeMillis(); //获取结束时间 System.out.println("程序运行时间2: " + (endTime - startTime) + "ms"); } }
能够看到,因为不须要进行过多的网络消耗,缓存式的zk协调方案性能至关了得,生成10000个ID仅需553ms(两个线程耗时较长者) , 平均每一个ID=0.05ms
使用zk的永久sequence策略建立节点,并获取返回值,而后删除前一个节点,这样既防止zk服务器存在过多的节点,又提升了效率;节点删除采用线程池来统一处理,提升响应速度
优势:能建立连续递增的ID,又能下降ZK消耗。关键实现代码以下:
package com.zb.p2p.utils; import com.zb.p2p.enums.SequenceEnum; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 基于zk的永久型自增节点PERSISTENT_SEQUENTIAL实现 * 每次生成节点后会使用线程池执行删除节点任务,以减少zk的负担 * Created by wangwanbin on 2017/9/5. */ public class ZKIncreaseSequenceHandler extends SequenceHandler implements PooledObjectFactory<CuratorFramework> { protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class); private static ZKIncreaseSequenceHandler instance = new ZKIncreaseSequenceHandler(); private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1); private GenericObjectPool genericObjectPool; private Queue<Long> preNodes = new ConcurrentLinkedQueue<>(); private static String ZK_ADDRESS = ""; //192.168.0.65 private static String PATH = "";// /sequence/p2p private static String SEQ = "";//seq; /** * 私有化构造方法,单例模式 */ private ZKIncreaseSequenceHandler() { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(4); genericObjectPool = new GenericObjectPool(this, config); } /** * 获取sequence工具对象的惟一方法 * * @return */ public static ZKIncreaseSequenceHandler getInstance(String zkAddress, String path, String seq) { ZK_ADDRESS = zkAddress; PATH = path; SEQ = seq; return instance; } @Override public long nextId(final SequenceEnum sequenceEnum) { String result = createNode(sequenceEnum.getCode()); final String idstr = result.substring((PATH + "/" + sequenceEnum.getCode() + "/" + SEQ).length()); final long id = Long.parseLong(idstr); preNodes.add(id); //删除上一个节点 fixedThreadPool.execute(new Runnable() { @Override public void run() { Iterator<Long> iterator = preNodes.iterator(); if (iterator.hasNext()) { long preNode = iterator.next(); if (preNode < id) { final String format = "%0" + idstr.length() + "d"; String preIdstr = String.format(format, preNode); final String prePath = PATH + "/" + sequenceEnum.getCode() + "/" + SEQ + preIdstr; CuratorFramework client = null; try { client = (CuratorFramework) genericObjectPool.borrowObject(); client.delete().forPath(prePath); preNodes.remove(preNode); } catch (Exception e) { LOGGER.error("delete preNode error", e); } finally { if (client != null) genericObjectPool.returnObject(client); } } } } }); return id; } private String createNode(String prefixName) { CuratorFramework client = null; try { client = (CuratorFramework) genericObjectPool.borrowObject(); String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL) .forPath(PATH + "/" + prefixName + "/" + SEQ, String.valueOf(0).getBytes()); return result; } catch (Exception e) { throw new RuntimeException("create zookeeper node error", e); } finally { if (client != null) genericObjectPool.returnObject(client); } } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(1); long startTime = System.currentTimeMillis(); //获取开始时间 final ZKIncreaseSequenceHandler sequenceHandler = ZKIncreaseSequenceHandler.getInstance("192.168.0.65", "/sequence/p2p", "seq"); int count = 10; final CountDownLatch cd = new CountDownLatch(count); for (int i = 0; i < count; i++) { executorService.execute(new Runnable() { public void run() { System.out.printf("线程 %s %d \n", Thread.currentThread().getId(), sequenceHandler.nextId(SequenceEnum.ORDER)); cd.countDown(); } }); } try { cd.await(); } catch (InterruptedException e) { LOGGER.error("Interrupted thread",e); Thread.currentThread().interrupt(); } long endTime = System.currentTimeMillis(); //获取结束时间 System.out.println("程序运行时间: " + (endTime - startTime) + "ms"); } @Override public PooledObject<CuratorFramework> makeObject() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3)); client.start(); return new DefaultPooledObject<>(client); } @Override public void destroyObject(PooledObject<CuratorFramework> p) throws Exception { } @Override public boolean validateObject(PooledObject<CuratorFramework> p) { return false; } @Override public void activateObject(PooledObject<CuratorFramework> p) throws Exception { } @Override public void passivateObject(PooledObject<CuratorFramework> p) throws Exception { } }
测试结果以下,生成10000个ID消耗=9443ms(两个线程耗时较长者), 平均每一个ID=0.9ms
这还只是单zk链接的状况下,若是使用链接池来维护多个zk的连接,效率将成倍的提高