在分布式系统中,每每须要对大量的数据如订单、帐户进行标识,以一个有意义的有序的序列号来做为全局惟一的ID。java
而分布式系统中咱们对ID生成器要求又有哪些呢?node
全局惟一性:不能出现重复的ID号,既然是惟一标识,这是最基本的要求。redis
递增:比较低要求的条件为趋势递增,即保证下一个ID必定大于上一个ID,而比较苛刻的要求是连续递增,如1,2,3等等。数据库
高可用高性能:ID生成事关重大,一旦挂掉系统崩溃;高性能是指必需要在压测下表现良好,若是达不到要求则在高并发环境下依然会致使系统瘫痪。apache
优势:缓存
可以保证独立性,程序能够在不一样的数据库间迁移,效果不受影响。服务器
保证生成的ID不只是表独立的,并且是库独立的,这点在你想切分数据库的时候尤其重要。网络
缺点:并发
性能问题:UUID太长,一般以36长度的字符串表示,对MySQL索引不利:若是做为数据库主键,在InnoDB引擎下,UUID的无序性可能会引发数据位置频繁变更,严重影响性能。app
UUID无业务含义:不少须要ID能标识业务含义的地方不使用。
不知足递增要求。
snowflake是twitter开源的分布式ID生成系统。 Twitter每秒有数十万条消息的请求,每条消息都必须分配一条惟一的id,这些id还须要一些大体的顺序(方便客户端排序),而且在分布式系统中不一样机器产生的id必须不一样。
snowflake的结构以下(每部分用-分开):
0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 – 000000000000
第一位为未使用,接下来的41位为毫秒级时间(41位的长度可使用69年),而后是5位datacenterId和5位workerId(10位的长度最多支持部署1024个节点) ,最后12位是毫秒内的计数(12位的计数顺序号支持每一个节点每毫秒产生4096个ID序号)
一共加起来恰好64位,为一个Long型。
snowflake生成的ID总体上按照时间自增排序,而且整个分布式系统内不会产生ID碰撞(由datacenter和workerId做区分),而且效率较高。snowflake的缺点是:
强依赖时钟,若是主机时间回拨,则会形成重复ID
ID虽然有序,可是不连续
snowflake如今有较好的改良方案,好比美团点评开源的分布式ID框架:leaf,经过使用ZooKeeper解决了时钟依赖问题。
利用数据库生成ID是最多见的方案。可以确保ID全数据库惟一。其优缺点以下:
优势:
很是简单,利用现有数据库系统的功能实现,成本小,有DBA专业维护。
ID单调自增。
缺点:
不一样数据库语法和实现不一样,数据库迁移的时候或多数据库版本支持的时候须要处理。
在单个数据库或读写分离或一主多从的状况下,只有一个主库能够生成。有单点故障的风险。
在性能达不到要求的状况下,比较难于扩展。
若是涉及多个系统须要合并或者数据迁移会比较麻烦。
分表分库的时候会有麻烦。
经过Redis生成ID(主要经过redis的自增函数)、ZooKeeper生成ID、MongoDB的ObjectID等都可实现惟一性的要求。
实际业务中,除了分布式ID全局惟一以外,还有是否趋势/连续递增的要求。根据具体业务需求的不一样,有两种可选方案。
一是只保证全局惟一,不保证连续递增。二是既保证全局惟一,又保证连续递增。
2. 基于ZooKeeper和本地缓存的方案
基于zookeeper分布式ID实现方案有不少种,本方案只使用ZooKeeper做为分段节点协调工具。每台服务器首先从zookeeper缓存一段,如1-1000的id。
此时zk上保存最大值1000,每次获取的时候都会进行判断,若是id小于本地最大值,即id<=1000,则更新本地的当前值,若是id大于本地当前值,好比说是1001,则会将从zk再获取下一个id数据段并在本地缓存。获取数据段的时候须要更新zk节点数据,更新的时候使用curator的分布式锁来实现。
因为id是从本机获取,所以本方案的优势是性能很是好。缺点是若是多主机负载均衡,则会出现不连续的id,固然将递增区段设置为1也能保证连续的id,可是效率会受到很大影响。
实现关键源码以下:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 根据开源项目mycat实现基于zookeeper的递增序列号
* <p>
* 只要配置好ZK地址和表名的以下属性
* MINID 某线程当前区间内最小值
* MAXID 某线程当前区间内最大值
* CURID 某线程当前区间内当前值
*
* @author wangwanbin
* @version 1.0
* @time 2017/9/1
*/
public class ZKCachedSequenceHandler extends SequenceHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
private static final String KEY_MIN_NAME = ".MINID";// 1
private static final String KEY_MAX_NAME = ".MAXID";// 10000
private static final String KEY_CUR_NAME = ".CURID";// 888
private final static long PERIOD = 1000;//每次缓存的ID段数量
private static ZKCachedSequenceHandler instance = new ZKCachedSequenceHandler();
/**
* 私有化构造方法,单例模式
*/
private ZKCachedSequenceHandler() {
}
/**
* 获取sequence工具对象的惟一方法
*
* @return
*/
public static ZKCachedSequenceHandler getInstance() {
return instance;
}
private Map<String, Map<String, String>> tableParaValMap = null;
private CuratorFramework client;
private InterProcessSemaphoreMutex interProcessSemaphore = null;
public void loadZK() {
try {
this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
this.client.start();
} catch (Exception e) {
LOGGER.error("Error caught while initializing ZK:" + e.getCause());
}
}
public Map<String, String> getParaValMap(String prefixName) {
if (tableParaValMap == null) {
try {
loadZK();
fetchNextPeriod(prefixName);
} catch (Exception e) {
LOGGER.error("Error caught while loding configuration within current thread:" + e.getCause());
}
}
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
return paraValMap;
}
public Boolean fetchNextPeriod(String prefixName) {
try {
Stat stat = this.client.checkExists().forPath(PATH + "/" + prefixName + SEQ);
if (stat == null || (stat.getDataLength() == 0)) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(PATH + "/" + prefixName + SEQ, String.valueOf(0).getBytes());
} catch (Exception e) {
LOGGER.debug("Node exists! Maybe other instance is initializing!");
}
}
if (interProcessSemaphore == null) {
interProcessSemaphore = new InterProcessSemaphoreMutex(client, PATH + "/" + prefixName + SEQ);
}
interProcessSemaphore.acquire();
if (tableParaValMap == null) {
tableParaValMap = new ConcurrentHashMap<>();
}
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
if (paraValMap == null) {
paraValMap = new ConcurrentHashMap<>();
tableParaValMap.put(prefixName, paraValMap);
}
long now = Long.parseLong(new String(client.getData().forPath(PATH + "/" + prefixName + SEQ)));
client.setData().forPath(PATH + "/" + prefixName + SEQ, ((now + PERIOD) + "").getBytes());
if (now == 1) {
paraValMap.put(prefixName + KEY_MAX_NAME, PERIOD + "");
paraValMap.put(prefixName + KEY_MIN_NAME, "1");
paraValMap.put(prefixName + KEY_CUR_NAME, "0");
} else {
paraValMap.put(prefixName + KEY_MAX_NAME, (now + PERIOD) + "");
paraValMap.put(prefixName + KEY_MIN_NAME, (now) + "");
paraValMap.put(prefixName + KEY_CUR_NAME, (now) + "");
}
} catch (Exception e) {
LOGGER.error("Error caught while updating period from ZK:" + e.getCause());
} finally {
try {
interProcessSemaphore.release();
} catch (Exception e) {
LOGGER.error("Error caught while realeasing distributed lock" + e.getCause());
}
}
return true;
}
public Boolean updateCURIDVal(String prefixName, Long val) {
Map<String, String> paraValMap = tableParaValMap.get(prefixName);
if (paraValMap == null) {
throw new IllegalStateException("ZKCachedSequenceHandler should be loaded first!");
}
paraValMap.put(prefixName + KEY_CUR_NAME, val + "");
return true;
}
/**
* 获取自增ID
*
* @param sequenceEnum
* @return
*/
@Override
public synchronized long nextId(SequenceEnum sequenceEnum) {
String prefixName = sequenceEnum.getCode();
Map<String, String> paraMap = this.getParaValMap(prefixName);
if (null == paraMap) {
throw new RuntimeException("fetch Param Values error.");
}
Long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1;
Long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME));
if (nextId > maxId) {
fetchNextPeriod(prefixName);
return nextId(sequenceEnum);
}
updateCURIDVal(prefixName, nextId);
return nextId.longValue();
}
public static void main(String[] args) throws UnsupportedEncodingException {
long startTime = System.currentTimeMillis(); //获取开始时间
final ZKCachedSequenceHandler sequenceHandler = getInstance();
sequenceHandler.loadZK();
new Thread() {
public void run() {
long startTime2 = System.currentTimeMillis(); //获取开始时间
for (int i = 0; i < 5000; i++) {
System.out.println("线程1 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
}
long endTime2 = System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间1:" + (endTime2 - startTime2) + "ms");
}
}.start();
for (int i = 0; i < 5000; i++) {
System.out.println("线程2 " + sequenceHandler.nextId(SequenceEnum.ACCOUNT));
}
long endTime = System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间2:" + (endTime - startTime) + "ms");
}
}
能够看到,因为不须要进行过多的网络消耗,缓存式的zk协调方案性能至关了得,生成10000个id仅需553ms(两个线程耗时较长者) , 平均每一个id消耗0.05ms。
使用zk的永久sequence策略建立节点,并获取返回值,而后删除前一个节点,这样既防止zk服务器存在过多的节点,又提升了效率;节点删除采用线程池来统一处理,提升响应速度。
优势:能建立连续递增的ID。
关键实现代码以下:
package com.zb.p2p.utils;
import com.zb.p2p.enums.SequenceEnum;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基于zk的永久型自增节点PERSISTENT_SEQUENTIAL实现
* 每次生成节点后会使用线程池执行删除节点任务
* Created by wangwanbin on 2017/9/5.
*/
public class ZKIncreaseSequenceHandler extends SequenceHandler implements PooledObjectFactory<CuratorFramework> {
protected static final Logger LOGGER = LoggerFactory.getLogger(ZKCachedSequenceHandler.class);
private static ZKIncreaseSequenceHandler instance = new ZKIncreaseSequenceHandler();
private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
private GenericObjectPool genericObjectPool;
private Queue<Long> preNodes = new ConcurrentLinkedQueue<>();
private static String ZK_ADDRESS = ""; //192.168.0.65
private static String PATH = "";// /sequence/p2p
private static String SEQ = "";//seq;
/**
* 私有化构造方法,单例模式
*/
private ZKIncreaseSequenceHandler() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(4);
genericObjectPool = new GenericObjectPool(this, config);
}
/**
* 获取sequence工具对象的惟一方法
*
* @return
*/
public static ZKIncreaseSequenceHandler getInstance(String zkAddress, String path, String seq) {
ZK_ADDRESS = zkAddress;
PATH = path;
SEQ = seq;
return instance;
}
@Override
public long nextId(final SequenceEnum sequenceEnum) {
String result = createNode(sequenceEnum.getCode());
final String idstr = result.substring((PATH + "/" + sequenceEnum.getCode() + "/" + SEQ).length());
final long id = Long.parseLong(idstr);
preNodes.add(id);
//删除上一个节点
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
Iterator<Long> iterator = preNodes.iterator();
if (iterator.hasNext()) {
long preNode = iterator.next();
if (preNode < id) {
final String format = "%0" + idstr.length() + "d";
String preIdstr = String.format(format, preNode);
final String prePath = PATH + "/" + sequenceEnum.getCode() + "/" + SEQ + preIdstr;
CuratorFramework client = null;
try {
client = (CuratorFramework) genericObjectPool.borrowObject();
client.delete().forPath(prePath);
preNodes.remove(preNode);
} catch (Exception e) {
LOGGER.error("delete preNode error", e);
} finally {
if (client != null)
genericObjectPool.returnObject(client);
}
}
}
}
});
return id;
}
private String createNode(String prefixName) {
CuratorFramework client = null;
try {
client = (CuratorFramework) genericObjectPool.borrowObject();
String result = client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath(PATH + "/" + prefixName + "/" + SEQ, String.valueOf(0).getBytes());
return result;
} catch (Exception e) {
throw new RuntimeException("create zookeeper node error", e);
} finally {
if (client != null)
genericObjectPool.returnObject(client);
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis(); //获取开始时间
final ZKIncreaseSequenceHandler sequenceHandler = ZKIncreaseSequenceHandler.getInstance("192.168.0.65", "/sequence/p2p", "seq");
int count = 10;
final CountDownLatch cd = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
executorService.execute(new Runnable() {
public void run() {
System.out.printf("线程 %s %d \n", Thread.currentThread().getId(), sequenceHandler.nextId(SequenceEnum.ORDER));
cd.countDown();
}
});
}
try {
cd.await();
} catch (InterruptedException e) {
LOGGER.error("Interrupted thread",e);
Thread.currentThread().interrupt();
}
long endTime = System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间:" + (endTime - startTime) + "ms");
}
@Override
public PooledObject<CuratorFramework> makeObject() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
client.start();
return new DefaultPooledObject<>(client);
}
@Override
public void destroyObject(PooledObject<CuratorFramework> p) throws Exception {
}
@Override
public boolean validateObject(PooledObject<CuratorFramework> p) {
return false;
}
@Override
public void activateObject(PooledObject<CuratorFramework> p) throws Exception {
}
@Override
public void passivateObject(PooledObject<CuratorFramework> p) throws Exception {
}
}
测试结果以下,生成10000个id消耗=9443ms(两个线程耗时较长者), 平均每一个id消耗0.9ms。
这还只是单zk链接的状况下,若是使用链接池来维护多个zk的链接,效率将成倍的提高。
分布式ID生成器的实现有不少种。目前各方案也都各有特色。咱们能够根据业务的具体要求,选择实现合适的方案。
感谢你们看到这里,文章有不足,欢迎你们指出;若是你以为写得不错,那就给我一个赞吧。