摘要: 主要介绍如何经过官方 ETL 工具 Exchange 将业务线上数据从 Neo4j 直接导入到 Nebula Graph 以及在导入过程当中遇到的问题和优化方法。html
本文首发于 Nebula 论坛:https://discuss.nebula-graph.com.cn/t/topic/2044java
1 背景
随着业务数据量不断增加,业务对图数据库在线数据实时更新写入和查询的效率要求也不断增长。Neo4j 存在明显性能不足,Neo4j 社区开源版本只支持单机部署,扩展能力存在比较大的问题,没法知足读写性能的线性扩展以及读写分离的业务需求,而且开源版本 Neo4j 对点和边的总数据量也有限制;而 Neo4j 企业版因果集群也存在单机主节点 Cypher 实时写入的性能瓶颈。git
相比于 Neo4j,Nebula Graph 最大的特点即是采用 shared-nothing 分布式的架构,无单主写入瓶颈问题,读写支持线性扩展,擅长处理千亿节点、万亿条边的超大规模数据集。github
本文主要介绍如何经过官方 ETL 工具 Exchange 将业务线上数据从 Neo4j 直接导入到 Nebula Graph 以及在导入过程当中遇到的问题和优化方法。其中绝大部分问题都已经经过论坛发帖的方式获得社区的支持和解决,本文会结合问题进行逐一列举。数据库
2 部署环境
系统环境:缓存
- CPU name:Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
- CPU Cores:40
- Memory Size:376 GB
- Disk:HDD
- System:CentOS Linux release 7.4.1708 (Core)
软件环境:服务器
- Neo4j:3.4 版本,五节点因果集群
- Nebula Graph:
- 版本: nebula-graph v1.1.0 源码编译安装,
- 部署:单台服务器部署三节点 Nebula Graph 集群。
- Exchange:nebula-java v1.1.0 源码编译 jar 包
- 数仓环境:
- hadoop-2.7.4
- spark-2.3.1
注意:单台机器部署 Nebula 多节点的端口分配:每一个 storage 还会将用户配置的端口号 + 1的端口做为内部使用。请参考论坛帖子 nebula从neo4j导入数据出现Get UUID Failed错误架构
3 全量 & 增量数据导入
3.1 全量导入
根据 Neo4j 点和边的属性信息建立 Nebula Graph 的 Tag 和 Edge 结构,这里须要注意一点,业务可能会根据不一样需求只在部分点和边上增长 Neo4j 点和边的属性信息,其余点和边对应的属性为 NULL
,因此须要先跟业务明确一下点和边的所有属性信息,避免遗漏属性。Nebula Graph 的 Schema 信息相似 MySQL,支持 Create 和 Alter 添加属性,而且全部的 Tag 和 Edge 的元数据信息是一致的。并发
一、Nebula Graph 建立 Tag 和 Edgeapp
# 示例 # 建立图空间,10 个分区,3 个 storage 副本。 CREATE SPACE test(partition_num=10,replica_factor=3); # 选择图空间 test USE test; # 建立标签 tagA CREATE TAG tagA(vid string, field-a0 string, field-a1 bool, field-a2 double); # 建立标签 tagB CREATE TAG tagB(vid string, field-b0 string, field-b1 bool, field-b2 double); # 建立边类型 edgeAB CREATE EDGE edgeAB(vid string, field-e0 string, field-e1 bool, field-e2 double);
二、Exchange 导入配置文件
- Exchange 配置目前不支持
bolt+routing
的方式链接neo4j,若是是因果集群,能够选择一个从节点进行bolt
方式直连读取数据,减小集群压力。 - 咱们业务的 Neo4j 数据点和边的 vid 是 string 类型,Nebula v1.x 版本还不支持 string 直接当作 vid(v2.0支持),考虑到官方文档中的描述:“当点数量到达十亿级别时,用 hash 函数生成 vid 有必定的冲突几率。所以 Nebula Graph 提供 UUID 函数来避免大量点时的 vid 冲突。” 选择了uuid() 做为转化函数,可是导入效率要比 hash 低,并且 uuid() 在将来版本可能存在兼容问题。
- partition: 是指 Exchange 从 Neo4j 拉取数据的分页个数。
- batch: 是指批量插入 Nebula 的 batch 大小。
{ # Spark relation config spark: { app: { name: Spark Writer } driver: { cores: 1 maxResultSize: 1G } cores { max: 16 } } # Nebula Graph relation config nebula: { address:{ graph:["xxx.xxx.xxx.xx:3699"] meta:["xxx.xxx.xxx.xx:45500"] } user: user pswd: password space: test connection { timeout: 3000 retry: 3 } execution { retry: 3 } error: { max: 32 output: /tmp/errors } rate: { limit: 1024 timeout: 1000 } } # Processing tags tags: [ # Loading tag from neo4j { name: tagA type: { source: neo4j sink: client } server: "bolt://xxx.xxx.xxx.xxx:7687" user: neo4j password: neo4j exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)" fields: [vid, field-a0, field-a1, field-a2] nebula.fields: [vid, field-a0, field-a1, field-a2] vertex: { field: vid policy: "uuid" } partition: 10 batch: 1000 check_point_path: /tmp/test } # Loading tag from neo4j { name: tagB type: { source: neo4j sink: client } server: "bolt://xxx.xxx.xxx.xxx:7687" user: neo4j password: neo4j exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)" fields: [vid, field-b0, field-b1, field-b2] nebula.fields: [vid, field-b0, field-b1, field-b2] vertex: { field: vid policy: "uuid" } partition: 10 batch: 1000 check_point_path: /tmp/test } ] # Processing edges edges: [ # Loading edges from neo4j { name: edgeAB type: { source: neo4j sink: client } server: "bolt://xxx.xxx.xxx.xxx:7687" user: neo4j password: neo4j exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)" fields: [vid, field-e0, field-e1, field-e2] nebula.fields: [vid, field-e0, field-e1, field-e2] source: { field: a.vid policy: "uuid" } target: { field: b.vid policy: "uuid" } partition: 10 batch: 1000 check_point_path: /tmp/test } ] }
三、执行导入命令
nohup spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local" exchange-1.1.0.jar -c test.conf > test.log &
四、查看导入 Nebula Graph 的数据量
./bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeAB
注意:Nebula 1.x 版本目前还只能用 db_dump 统计,2.0 会支持 nGQL 命令的方式统计数量。
3.2 增量导入
增量数据导入主要是经过 Neo4j 内部点和边的自增 id()
进行切割,在导入配置文件 exec 项执行 Neo4j Cypher 语句时增长 id()
范围限制,但前提是须要业务停掉删数据操做,由于增量导入时,若是以前的数据被删除后 Neo4j 会复用 id()
,这会致使复用 id()
的增量数据导入时查询不到形成数据丢失。固然业务若是有条件支持 Neo4j Nebula 双写的话,增量导入就不会出现这种问题。
exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)"
请参考论坛帖子 neo4j到nebula如何作增量导入
3.3 导入问题及解决
使用 Exchange 导入过程当中遇到两个问题,及时的获得官方 @nicole 的支持和解决,具体请参考下面两个帖子:
问题 1:Exchange 不支持「换行回车」等特殊字符的转义。以下 string 数据中带有回车,在拼接 insert
语句插入时会由于换行致使插入失败。
PR:https://github.com/vesoft-inc/nebula-java/pull/203 已经合入 exchange v1.0 分支
问题 2:Exchange 不支持属性为 NULL
的数据导入。前文 3.1 中提到,业务可能会根据不一样需求为某些点和边增长属性,这时其余点和边属性则是 NULL
,这样在使用 Exchange 导入时会报错。
参考帖子 2 给出的修改建议解决:修改 com.vesoft.nebula.tools.importer.processor.Processor#extraValue
,增长 NULL
类型的转化值。
case NullType => { fieldTypeMap(field) match { case StringType => "" case IntegerType => 0 case LongType => 0L case DoubleType => 0.0 case BooleanType => false } }
4 导入效率优化
关于导入效率的优化,请参考下面两个帖子:
优化 1:经过适当增长导入配置中的 partition 和 batch 值,提高导入效率。 优化 2:若是是 string 类型作 vid 的话,1.x 版本尽可能使用 hash() 函数转化,2.0 版本会支持 string id 类型;若是是int类型作vid的话,能够直接使用,不用转化效率更高。 优化 3:官方建议 spark-submit 提交命令 master 配置改成 yarn-cluster
, 若不使用 yarn,可配置成 spark://ip:port
;咱们是经过 spark-submit --master "local[16]"
的方式增长 spark 并发,导入效率比使用 "local"
提高 4 倍+,测试环境单机三节点 HDD 盘 IO 峰值能到 200-300 MB/s。但在指定 --master "local[16]"
并发导入时遇到 hadoop 缓存问题,采用增长 hdfs 配置 fs.hdfs.impl.disable.cache=true
后重启 hadoop 解决。具体请参考第二个帖子。
5 总结
使用 Exchange 从 Neo4j 导入 Nebula Graph 过程当中遇到一些问题,经过积极与社区进行沟通获得了官方 @nicole 及其余小伙伴的快速响应和大力支持,这一点在 Neo4j 导入 Nebula Graph 的实践过程当中起到了十分关键的做用,感谢社区的大力支持。期待支持 openCypher 的 Nebula Graph 2.0。
6 参考连接
- https://nebula-graph.com.cn/posts/how-to-import-data-from-neo4j-to-nebula-graph/
- https://github.com/vesoft-inc/nebula-java/tree/v1.0
- https://docs.nebula-graph.com.cn/manual-CN/2.query-language/2.functions-and-operators/uuid/
- http://arganzheng.life/hadoop-filesystem-closed-exception.html