图解JanusGraph系列 - 关于JanusGraph图数据批量快速导入的方案和想法(bulk load data)

你们好,我是洋仔,JanusGraph图解系列文章,实时更新~java

图数据库文章总目录:

源码分析相关可查看github码文不易,求个star~): https://github.com/YYDreamer/janusgraphnode

版本:JanusGraph-0.5.2git

转载文章请保留如下声明:github

做者:洋仔聊编程算法

微信公众号:匠心Java数据库

原文地址:https://liyangyang.blog.csdn.net/编程

前言

JanusGraph的批量导入速度一直是用户使用的痛点, 下面会依托官网的介绍和我的理解,聊一下关于图数据批量快速导入的一些方案、方案使用场景和一些想;json

写这篇文章的目的主要是为了让你们了解一下janus的导入的一些经常使用方案,算是一个总结吧,若有疑问或者文章错误,欢迎留言联系我后端

首先,说一下JanusGraph的批量导入的可配置的优化配置选项 和 基于第三方存储和索引的优化配置选项:api

  • 批量导入的配置选项
  • 第三方存储后端的优化选项(Hbase为例)
  • 第三方索引后端的优化选项(ES为例)

以后分析一下数据导入的四个方案:

  • 基于JanusGraph Api的批量导入
  • 基于Gremlin Server的批量导入
  • 使用JanusGraph-utils的批量导入
  • 基于bulk loader 导入方式
  • 基于抽取序列化逻辑生成Hfile离线批量导入

最后聊一下关于批量导入的一些想法;

一:批量导入的优化配置选项

一、批量导入的配置选项

JanusGraph中有许多配置选项和工具能够将大量的图数据更有效地导入。这种导入称为批量加载,与默认的事务性加载相反,默认的事务性加载单个事务只会添加少许数据。

下述介绍了配置选项和工具,这些工具和工具使JanusGraph中的批量加载更加高效。

在继续操做以前,请仔细遵照每一个选项的限制和假设,以避免丢失数据或损坏数据。

配置选项

janusgraph支持批量导入,可经过相关配置项设置

下面具体看一下对应配置项的详细做用:

批量加载

1) 配置项:storage.batch-loading

启用该配置项,至关于打开了JanusGraph的批量导入开关;

影响:

启用批处理加载会在许多地方禁用JanusGraph内部一致性检查,重要的是会禁用lock锁定来保证分布式一致性;JanusGraph假定要加载到JanusGraph中的数据与图形一致,所以出于性能考虑禁用了本身的一致性检查。

换句话说,咱们要在导入数据以前,保证要导入的数据和图中已有的数据不会产生冲突也就是保持一致性!

为何要禁用一致性检查来提高性能?

在许多批量加载方案中,在加载数据以前确保去数据一致性,而后在将数据加载到数据库比在加载数据到图库时确保数据一致性,消耗的成本要便宜的多。

例如,将现有用户数据文件批量加载到JanusGraph中的用例:假设用户名属性键具备定义的惟一复合索引,即用户名在整个图中必须是惟一的。

那么按名称对数据文件进行排序并过滤出重复项或编写执行此类过滤的Hadoop做业消耗的时间成本,就会比开启一致性检查在导入图数据时janusgraph检查花费的成本要少的多。

基于上述,咱们能够启用 storage.batch-loading配置,从而大大减小了批量加载时间,由于JanusGraph没必要检查每一个添加的用户该名称是否已存在于数据库中。

重要提示

启用storage.batch-loading要求用户确保加载的数据在内部是一致的,而且与图中已存在的任何数据一致。

特别是,启用批处理加载时,并发类型建立会致使严重的数据完整性问题。所以,咱们强烈建议经过schema.default = none在图形配置中进行设置来禁用自动类型建立。

优化ID分配

一、ID块大小

1)配置项:ids.block-size

该配置项为配置在分布式id的生成过程当中每次获取 id block的大小;

分布式id相关具体可看文章《图解Janusgraph系列-分布式id生成策略分析》

原理:

每一个新添加的顶点或边都分配有惟一的ID。JanusGraph的ID池管理器以block的形式获取特定JanusGraph实例的ID。id块获取过程很昂贵,由于它须要保证块的全局惟一分配。

增长 ids.block-size会减小获取次数,但可能会使许多ID未被分配,从而形成浪费。对于事务性工做负载,默认块大小是合理的,可是在批量加载期间,顶点和边的添加要频繁得多,并且要快速连续。

所以,一般建议将块大小增长10倍或更多,具体取决于每台机器要添加的顶点数量。

经验法则

设置ids.block-size为您但愿每小时为每一个JanusGraph实例添加的顶点数。

重要提示:

必须为全部JanusGraph实例配置相同的值,ids.block-size以确保正确的ID分配。所以,在更改此值以前,请务必关闭全部JanusGraph实例

二、ID Acquisition流程

当许多JanusGraph实例频繁并行分配id块时,不可避免地会出现实例之间的分配冲突,从而减慢了分配过程。

此外,因为大容量加载而致使的增长的写负载可能会使该过程进一步减慢到JanusGraph认为失败并引起异常的程度。能够调整2个配置选项来避免这种状况;

1)配置项:ids.authority.wait-time

配置ID池管理器等待应用程序获取ID块被存储后端确认的时间(以毫秒为单位)。这段时间越短,应用程序在拥挤的存储群集上发生故障的可能性就越大。

经验法则

将其设置为负载下存储后端集群上测量的第95百分位读写时间的总和。

重要说明

全部JanusGraph实例的该值都应该相同。

2)配置项:ids.renew-timeout

配置JanusGraph的ID池管理器在尝试获取新的ID块总共等待的毫秒数。

经验法则

将此值设置为尽量大,没必要为不可恢复的故障等待过久。增长它的惟一缺点是JanusGraph将在不可用的存储后端群集上尝试更长的时间

优化读写

一、缓冲区大小

JanusGraph在数据导入时存在一个缓冲区,用来缓冲当前事务的部分请求,从而能够小批量的写入和执行,从而减小针对存储后端的请求数。在短期内执行大量写操做时,存储后端可能会由于大量的写请求打入而变得超负荷;

配置项:storage.buffer-size

这些批次的大小由storage.buffer-size来控制。 增长storage.buffer-size能够经过增长缓冲区大小,来使得批次保存更多的请求,从而减小写请求的次数来避免上述失败。

注意:

增长缓冲区大小会增长写请求的等待时间及其失败的可能性。所以,不建议为事务性负载增长此设置,而且应该在批量加载期间仔细尝试此设置的一个合适的值。

二、读写健壮性

在批量加载期间,群集上的负载一般会增长,从而使读和写操做失败的可能性更大(尤为是如上所述,若是缓冲区大小增长了)。

1)配置项:storage.read-attempts

该配置项配置JanusGraph在放弃以前尝试对存储后端执行读取或写入操做的次数。

2)配置项:storage.attempt-wait

该配置项指定JanusGraph在从新尝试失败的后端操做以前将等待的毫秒数。较高的值能够确保重试操做不会进一步增长后端的负载。

注意:

若是在批量加载期间后端上可能会有很高的负载,一般建议增长这些配置选项。

其余

1)配置项:storage.read-attempts

二、第三方存储后端的优化选项

针对于第三方存储的优化分为两部分:

  • 第三方存储集群自身的优化
  • JanusGraph结合第三方存储的优化选项

集群自身的优化

集群自身的优化,本文主要介绍janusgraph相关优化这里就很少说这部分了,主要是提高hbase集群的读写能力;

这里主要仍是关注的Hbase的写数据能力优化后的提高!这部分的优化相当重要! 下面举几个例子:

1)配置项: hbase.client.write.buffer

设置buffer的容量

HBase Client会在数据累积到设置的阈值后才提交Region Server。这样作的好处在于能够减小RPC链接次数。

计算一下服务端所以而消耗的内存:hbase.client.write.buffer * hbase.regionserver.handler.count从在减小PRC次数和增长服务器端内存之间找到平衡点。

2)配置项: hbase.regionserver.handler.count

定义每一个Region Server上的RPC Handler的数量

Region Server经过RPC Handler接收外部请求并加以处理。因此提高RPC Handler的数量能够必定程度上提升HBase接收请求的能力。

固然,handler数量也不是越大越好,这要取决于节点的硬件状况。

等等各类配置项

3)针对一些CF、RowKey设计之类的优化点,由于这些都是janus预设好的,因此在janusGraph中使用不到;

JanusGraph针对优化

针对于JanusGraph+第三方存储的优化,官网(配置项文档超连接) 给出了一些配置选项,可从其找出对应的配置项;

针对于hbase,我在配置项中找出了对应的一些可能有做用的配置以下:

1)配置项: storage.hbase.compression-algorithm

hbase存储数据压缩算法的配置,咱们在《图解图库JanusGraph系列-一文知晓“图数据“底层存储结构》文章中提到有好几个地方都是压缩存储的,此处就是配置的压缩算法;

类型: 枚举值,支持 lzogzsnappylz4bzip2zstd五种压缩算法 和 不压缩配置:none

默认值: gz压缩;

注意:此处配置的算法须要hbase也支持才能够! 若是存储空间足够,能够考虑配置为不压缩,也会提高导入速率!

2)配置项:storage.hbase.skip-schema-check

假设JanusGraph的HBase表和列族已经存在。 若是是这样,JanusGraph将不会检查其 table/ CF 的存在,也不会在任何状况下尝试建立它们。

类型: 布尔值

默认值: false,检查

注意: 能够在数据导入时,将该配置项设置为true,去除table/ CF的检查,这个其实做用不大;由于都是在初始化图实例的时候就去检查了。。

三、第三方索引后端的优化选项

针对于第三方存索引的优化分为两部分:

  • 第三方索引集群自身的优化
  • JanusGraph结合第三方索引的优化选项

集群自身的优化

集群自身的优化,本文主要介绍janusgraph相关优化这里就很少说这部分了,主要是提高索引集群的读写能力;

这里主要仍是关注的索引的写数据能力优化后的提高!这部分的优化相当重要!

例如es的线程池参数优化等

JanusGraph针对优化

针对于JanusGraph+第三方索引的优化,官网(配置项文档超连接) 给出了一些配置选项,可从其找出对应的配置项;

针对于es,我在配置项中找出了对应的一些可能有做用的配置以下:

1)配置项: index.[X].elasticsearch.retry_on_conflict

指定在发生冲突时应重试操做多少次。

类型: 整数

默认值: 0次

注意: 增大该值能够提高在批量导入中,发生冲突后解决冲突的概率

三、JVM的优化

JanusGraph基于Java语言编写,则毋庸置疑会用到JVM

对JVM的调优也主要集中到垃圾收集器和堆内存的调优

堆大小调整:

咱们在导入图数据时会产生大量的临时数据,这里须要咱们调整一个合适的堆空间;

推荐至少为8G

垃圾收集器调优:

若是在使用CMS发现GC过于频繁的话,咱们能够考虑将垃圾收集器设置为:G1

这个收集器适用于大堆空间的垃圾收集,有效的减小垃圾收集消耗的时间;

注意:

此处的JVM调优设计JanusGraph java api项目gremlin server部分的JVM调优;

二:基于数据层面的优化

2.1 拆分图 并发执行

在某些状况下,图数据能够分解为多个断开链接的子图。这些子图能够跨多台机器独立地并行加载;无论是采用下述的那种方式加载均可以;

这里有一个前提: 底层第三方存储集群的处理能力没有达到最大; 若是底层存储集群当前的平均cpu已是80 90%的了,就算拆分多个图也没用,底层存储的处理能力已经被限制住当前的速度了;

这个方式官网上提了一句,这个地方其实很难能够将图拆分为断开的子图,而且针对于拆分为多个子图来讲,主要仍是依托于底层存储集群的处理能力;

通常状况下,不用拆分图进行一个好的优化后,底层存储集群的处理能力均可以彻底调用起来;

2.2 分步骤 并发执行

若是没法分解图形,则分多个步骤加载一般是有益的,也就是将vertex 和 edge 分开导入;

这种方式,须要数据同窗作好充分的数据探查,否则可能会产生数据不一致的状况! 下面是步骤(其中最后两个步骤能够在多台计算机上并行执行):

  1. 前提: 确保vertex和edge数据集 删除了重复数据 而且是一致的
  2. 环境配置: 设置batch-loading=true 而且优化上述介绍的其余选项
  3. vertex全量导入: 将全部的vertex及节点对应的property添加到图中。维护一份从顶点ID(由加载的数据用户自定义)到JanusGraph的内部顶点分布式一致性ID(即vertex.getId())的映射,该ID 为64位长
  4. edge全量导入: 使用映射添加全部的边 来查找JanusGraph的顶点id 并使用该id检索顶点。

讲述过程:

假设存在3个用户,“-”号后为对应的自定义的顶点id值(注意,非导入图库中的顶点id,只是标识当前节点的业务id):

user1-1
user2-2
user3-3

上述第三步,咱们将这些节点导入到图库中! 产生一个业务id 与 图库中节点的分布式惟一id的对应关系以下:

咱们在导入一个点后,janus会返回一个vertex实例对象,经过这个对象就能够拿到对应的图库vertexId

业务id-图库中节点id
1-4261
2-4274
3-4351

注意:这一步骤,咱们能够多线程并行导入而无需担忧一致性问题,由于节点所有惟一

节点导入完成!

假设存在对应的有3条边以下,

edge1:user1 --> user2 
edge2:user1 --> user3
edge3:user2 --> user3

咱们经过user1对应业务id:1,而业务id:1对应节点id:4261,咱们就能够转化为下述对应关系:

4261 --> 4274
4261 --> 4351
4274 --> 4351

在JanusGraph中经过节点id查询节点,是获取节点的最快方式!!

咱们就能够经过id获取图库中对应的vertex对象实例,而后使用addVertex将edge导入!

注意:这一步骤,咱们能够多线程并行导入而无需担忧一致性问题,由于edge所有惟一

第三个步骤和第四个步骤也能够并行执行,咱们在导入点的过程当中,能够也同时将源节点和目标节点已经导入到图库中的edge同步入图;

三:批量导入方案

下述介绍一下5种导入方案,其中包含3种批量导入方案;

3.1 方案一:基于JanusGraph Api的数据导入

该方案能够整合上述第二部分二:基于数据层面的优化

涉及方法:

public JanusGraphVertex addVertex(Object... keyValues);
public JanusGraphEdge addEdge(String label, Vertex vertex, Object... keyValues);

在janusGraph的业务项目中,能够开发一个数据导入模块,使用提供的相似于java api等,进行数据的导入;

流程:

这种是最简单的方案,具体的细节,这里就不给出了,节点导入大致流程为下述:

  1. 获取图实例
  2. 获取图实例事务对象
  3. 插入节点
  4. 提交事务

边导入大致流程以下:

  1. 获取图实例
  2. 获取图实例事务对象
  3. 查询源节点 + 目标节点(这个地方多是性能瓶颈)
  4. 在两个节点中插入边
  5. 提交事务

主要做用:

此方案能够用于数据量较小的状况下使用,例如天天的增量导入等;

优化点:

一、批量事务提交

此处的事务提交,咱们能够经过一个经常使用的优化手段: 处理多个vertex 或者 edge后再提交事务!

能够减小janus与底层存储的交互,减小网络消耗和链接数,提高导入的性能!

处理的个数多少主要仍是和底层存储集群相关,几百仍是几千这就须要本身调试获取当前环境下的最优配置了

注意:

若是开启了上述提到的storage.batch-loading,则须要大家如今的环境下注意一致性的问题;

例如图库中本来存在一个a节点,你又插入一个a节点,便会有一致性问题;

咱们能够经过插入数据前,先经过惟一索引查询节点,节点存在则更新节点,不存在则插入节点;

3.2 方案二:基于Gremlin Server的批量导入

该方案能够整合上述第二部分二:基于数据层面的优化

这里须要咱们搭建一个Gremlin server服务器,经过在服务器执行gremlin-server.sh便可,暴露出一个tcp接口;

则能够将对应的gremlin 语句提交到对应的gremlin服务器执行;

具体的流程和第一个方案一致

优化点:

同上一个方案优化点1;

三、gremlin server池参数调整

除了上述给定的一些配置的优化项,还有两个gremlin server的优化项须要调整

  • threadPoolWorke:最大2*core个数,用于处理非阻塞读写的Gremlin服务器可用的线程数;

  • gremlinPool:用于在ScriptEngine中执行实际脚本的“Gremlin”线程的数量。此池表示Gremlin服务器中可用于处理阻塞操做的工做者;

和线程池调优同样,要找出最合适的一个值,过小很差,太大也很差;

注意:

该方案本质上和第一个方案相似,只不过是一个是经过给定的java api提交插入请求,一个直接经过gremlin语句提交插入请求到gremlin server;

3.3 方案三:IBM的janusgraph-utils

这个方案没用过,简单看了一下,这个主要也是经过多线程对数据进行导入;

本身手动组装对应的schema文件,将schema导入到数据库;

而后将组装为特定格式的csv文件中的数据,导入到图库中;

github地址: https://github.com/IBM/janusgraph-utils

优势:

一、使用难度不高,让咱们不用再去手写多线程的导入了;减小工做量

二、直连hbase和es,相对于前两种减小了对应的gremlin server和janus server的网络交互

三、支持经过配置文件自动建立Janusgraph schema和index

四、可配置化的线程池大小和每次批量提交的数量

问题:

一、schema和csv文件也是要用户组装出对应格式

二、相对于前两种方式性能提高有限,主要是少了一层网络交互。多线程和批量提交,前两种均可以手动去实现;还须要引入一个新的组件

三、支持janus版本较低,能够手动升级,不难

四、相对于下面两种方案,性能仍是较低

3.4 方案四:bulk loader

官方提供的批量导入方式;须要hadoop集群和spark集群的支持;

hadoop和spark集群配置,能够看官网:https://docs.janusgraph.org/advanced-topics/hadoop/

该方案对导入的数据有着严格的要求,支持多种数据格式:jsoncsvxmlkryo

数据要求: 节点、节点对应的属性、节点对应的边须要在一行中(一个json中、一个xml项中)

数据案例: 下面给一下官网的案例,在data目录下:

-- json格式
{"id":2,"label":"song","inE":{"followedBy":[{"id":0,"outV":1,"properties":{"weight":1}},{"id":323,"outV":34,"properties":{"weight":1}}]},"outE":{"followedBy":[{"id":6190,"inV":123,"properties":{"weight":1}},{"id":6191,"inV":50,"properties":{"weight":1}}],"sungBy":[{"id":7666,"inV":525}],"writtenBy":[{"id":7665,"inV":525}]},"properties":{"name":[{"id":3,"value":"IM A MAN"}],"songType":[{"id":5,"value":"cover"}],"performances":[{"id":4,"value":1}]}}

-- xml格式
<node id="4"><data key="labelV">song</data><data key="name">BERTHA</data><data key="songType">original</data><data key="performances">394</data></node><node id="5"><data key="labelV">song</data><data key="name">GOING DOWN THE ROAD FEELING BAD</data><data key="songType">cover</data><data key="performances">293</data></node><node id="6"><data key="labelV">song</data><data key="name">MONA</data><data key="songType">cover</data><data key="performances">1</data></node><node id="7"><data key="labelV">song</data><data key="name">WHERE HAVE THE HEROES GONE</data><data key="songType"></data><data key="performances">0</data></node>

-- csv格式
2,song,IM A MAN,cover,1 followedBy,50,1|followedBy,123,1|sungBy,525|writtenBy,525       followedBy,1,1|followedBy,34,1

咱们能够观察到,这实际上是不容易构造的,节点属性边所有须要整合到一块;

数据整理方案: spark的cogroup, cogroup的做用就是将多个 RDD将相同的key jion成一行,从而使用csv格式进行导入,操做实示例以下:

val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6)))
val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))
rdd1.cogroup(rdd2).collect()

output:
(aa,(CompactBuffer(1),CompactBuffer(3, 5)))
(dd,(CompactBuffer(),CompactBuffer(4)))
(bb,(CompactBuffer(2),CompactBuffer()))
(cc,(CompactBuffer(6),CompactBuffer()))

这里你们能够参考360对这方面的处理,转化代码github地址:https://github.com/360jinrong/janusgraph-data-importer

注意:

此处的原始数据的准备须要细致,一致性保证彻底依赖于原始数据的一致性保证;

3.5 方案五:基于抽取序列化逻辑的生成Hbase File离线批量导入

博主在图库初始化时采用了这种方式,前先后后花费了接近一个月的时间,通过细致的验证,现已应用到生产环境使用,下面介绍一下对应的注意点和主要流程:

方案: 依据源码抽取出对应的序列化逻辑,分布式生成Hfile,将Hfile导入到Hbase;

问题: 人力成本太高,须要看源码抽逻辑,而且须要一个细致的验证;

方案难点:

JanusGraph对于Hbase的数据底层格式,能够看我写的博客:

这两篇博客,一个分析了底层存储的格式,一个进行了相应的源码分析;

流程+验证+建议: 请看我写的另一个博客:《图解JanusGraph系列-生成Hbase file离线批量导入方案》

这种方式,其实消耗的人力会比较大;另外,对于抽取的逻辑是否开源,这个后续咱们会考虑这个问题,开源后地址会同步更新到本文章;

四:几种场景

4.1 图库中已经存在数据

若是图库中已经存在数据,对于3.4 方案四:bulk loader3.5 方案五:基于抽取序列化逻辑的生成Hbase File离线批量导入 这两种方案可能就没法使用了;

咱们能够采起两种方式:

  1. 使用第一种方案和第二种方案进行导入(注意数据一致性)
  2. 总体迁移图库,将图库中现有数据和将要导入的数据总体迁移到另一个新图库,就可使用四、5方案进行导入

4.2 图数据初始化或者迁移

数据量小,建议使用3.1 方案一:基于JanusGraph Api的数据导入3.2 方案二:基于Gremlin Server的批量导入3.3 方案三:IBM的janusgraph-utils

数据量大,建议使用3.4 方案四:bulk loader3.5 方案五:基于抽取序列化逻辑的生成Hbase File离线批量导入

4.3 单纯只看业务数据量

选择什么方式导入,单纯基于业务数据量给一些我的建议:

  • 小数据量(亿级如下): 直接janusgraph api 或者 gremlin server导入便可,几小时就ok了; 若是想要更快可使用另外的方式,只是会增长人力成本;
  • 中等数据量(十亿级如下):数据充分探查,开启storage.batch-loading彻底能够支持,使用api,2天左右能够完成全量的数据导入
  • 大数据量(百亿级数据):推荐采用bulk load方式,配置hadoop集群,使用spark cluster导入
  • 另外一个方案:若是上述仍是没法知足大家的需求,能够采用依据源码抽取序列化逻辑生成Hfile,而后离线导入到Hbase的方案,不过这种是花费人力成本最大的一种方式,不过效果也几乎是最好的,尤为是数据量越大效果越明显

总结

数据的批量导入一直是JanusGraph让人难受的地方,通过本文的介绍你们应该有一个大致的认识,针对于百亿级的数据导入,上述的几种方案是能够支持的;

其余:批量导入后,天天的增量采用消息中间件接入JanusGraph api导入便可;

数据导入过程当中,针对于不一样的底层存储、不一样的版本仍是会有一些问题,具体的导入的坑你们能够加我v,邀你加群

注意!!!以上仅做为参考,有任何问题可评论或加博主v讨论

参考:
JanusGaph官网
https://www.jianshu.com/p/f372f0ef6c42
https://www.jianshu.com/p/4b59c00a15de/

相关文章
相关标签/搜索