摘要:本文所介绍 Nebula Graph 链接器 Nebula Flink Connector,采用相似 Flink 提供的 Flink Connector 形式,支持 Flink 读写分布式图数据库 Nebula Graph。html
文章首发 Nebula Graph 官网博客:https://nebula-graph.com.cn/posts/nebula-flink-connector/java
在关系网络分析、关系建模、实时推荐等场景中应用图数据库做为后台数据支撑已相对普及,且部分应用场景对图数据的实时性要求较高,如推荐系统、搜索引擎。为了提高数据的实时性,业界普遍应用流式计算对更新的数据进行增量实时处理。为了支持对图数据的流式计算,Nebula Graph 团队开发了 Nebula Flink Connector,支持利用 Flink 进行 Nebula Graph 图数据的流式处理和计算。git
Flink 是新一代流批统一的计算引擎,它从不一样的第三方存储引擎中读取数据,并进行处理,再写入另外的存储引擎中。Flink Connector 的做用就至关于一个链接器,链接 Flink 计算引擎跟外界存储系统。github
与外界进行数据交换时,Flink 支持如下 4 种方式:数据库
流计算中常常须要与外部存储系统交互,好比须要关联 MySQL 中的某个表。通常来讲,若是用同步 I/O 的方式,会形成系统中出现大的等待时间,影响吞吐和延迟。异步 I/O 则能够并发处理多个请求,提升吞吐,减小延迟。apache
本文所介绍 Nebula Graph 链接器 Nebula Flink Connector,采用相似 Flink 提供的 Flink Connector 形式,支持 Flink 读写分布式图数据库 Nebula Graph。缓存
Flink 做为一款流式计算框架,它可处理有界数据,也可处理无界数据。所谓无界,即源源不断的数据,不会有终止,实时流处理所处理的数据即是无界数据;批处理的数据,即有界数据。而 Source 即是 Flink 处理数据的数据来源。安全
Nebula Flink Connector 中的 Source 即是图数据库 Nebula Graph。Flink 提供了丰富的 Connector 组件容许用户自定义数据源来链接外部数据存储系统。微信
Flink 的 Source 主要负责外部数据源的接入,Flink 的 Source 能力主要是经过 read 相关的 API 和 addSource 方法这 2 种方式来实现数据源的读取,使用 addSource 方法对接外部数据源时,可使用 Flink Bundled Connector,也能够自定义 Source。网络
Flink Source 的几种使用方式以下:
本章主要介绍如何经过自定义 Source 方式实现 Nebula Graph Source。
在 Flink 中可使用 StreamExecutionEnvironment.addSource(sourceFunction)
和 ExecutionEnvironment.createInput(inputFormat)
两种方式来为你的程序添加数据来源。
Flink 已经提供多个内置的 source functions
,开发者能够经过继承 RichSourceFunction
来自定义非并行的 source
,经过继承 RichParallelSourceFunction
来自定义并行的 Source
。RichSourceFunction
和 RichParallelSourceFunction
是 SourceFunction
和 RichFunction
特性的结合。 其中SourceFunction
负责数据的生成, RichFunction
负责资源的管理。固然,也能够只实现 SourceFunction
接口来定义最简单的只具有获取数据功能的 dataSource
。
一般自定义一个完善的 Source 节点是经过实现 RichSourceFunction
类来完成的,该类兼具 RichFunction
和 SourceFunction
的能力,所以自定义 Flink 的 Nebula Graph Source 功能咱们须要实现 RichSourceFunction
中提供的方法。
Nebula Flink Connector 中实现的自定义 Nebula Graph Source 数据源提供了两种使用方式,分别是 addSource 和 createInput 方式。
Nebula Graph Source 实现类图以下:
该方式是经过 NebulaSourceFunction 类实现的,该类继承自 RichSourceFunction 并实现了如下方法:
该方式是经过 NebulaInputFormat 类实现的,该类继承自 RichInputFormat 并实现了如下方法:
经过 addSource 读取 Source 数据获得的是 Flink 的 DataStreamSource,表示 DataStream 的起点。
经过 createInput 读取数据获得的是 Flink 的 DataSource,DataSource 是一个建立新数据集的 Operator,这个 Operator 可做为进一步转换的数据集。DataSource 能够经过 withParameters 封装配置参数进行其余的操做。
使用 Flink 读取 Nebula Graph 图数据时,须要构造 NebulaSourceFunction 和 NebulaOutputFormat,并经过 Flink 的 addSource 或 createInput 方法注册数据源进行 Nebula Graph 数据读取。
构造 NebulaSourceFunction 和 NebulaOutputFormat 时须要进行客户端参数的配置和执行参数的配置,说明以下:
配置项说明:
// 构造 Nebula Graph 客户端链接须要的参数 NebulaClientOptions nebulaClientOptions = new NebulaClientOptions .NebulaClientOptionsBuilder() .setAddress("127.0.0.1:45500") .build(); // 建立 connectionProvider NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions); // 构造 Nebula Graph 数据读取须要的参数 List<String> cols = Arrays.asList("name", "age"); VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSource") .setTag(tag) .setFields(cols) .setLimit(100) .builder(); // 构造 NebulaInputFormat NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider) .setExecutionOptions(sourceExecutionOptions); // 方式 1 使用 createInput 方式注册 Nebula Graph 数据源 DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment() .createInput(inputFormat); // 方式 2 使用 addSource 方式注册 Nebula Graph 数据源 NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider) .setExecutionOptions(sourceExecutionOptions); DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment() .addSource(sourceFunction);
Nebula Source Demo 编写完成后能够打包提交到 Flink 集群执行。
示例程序读取 Nebula Graph 的点数据并打印,该做业以 Nebula Graph 做为 Source,以 print 做为 Sink,执行结果以下:
Source sent 数据为 59,671,064 条,Sink received 数据为 59,671,064 条。
Nebula Flink Connector 中的 Sink 即 Nebula Graph 图数据库。Flink 提供了丰富的 Connector 组件容许用户自定义数据池来接收 Flink 所处理的数据流。
Sink 是 Flink 处理完 Source 后数据的输出,主要负责实时计算结果的输出和持久化。好比:将数据流写入标准输出、写入文件、写入 Sockets、写入外部系统等。
Flink 的 Sink 能力主要是经过调用数据流的 write 相关 API 和 DataStream.addSink 两种方式来实现数据流的外部存储。
相似于 Flink Connector 的 Source,Sink 也容许用户自定义来支持丰富的外部数据系统做为 Flink 的数据池。
Flink Sink 的使用方式以下:
本章主要介绍如何经过自定义 Sink 的方式实现 Nebula Graph Sink。
在 Flink 中可使用 DataStream.addSink
和 DataStream.writeUsingOutputFormat
的方式将 Flink 数据流写入外部自定义数据池。
Flink 已经提供了若干实现好了的 Sink Functions
,也能够经过实现 SinkFunction
以及继承 RichOutputFormat
来实现自定义的 Sink。
Nebula Flink Connector 中实现了自定义的 NebulaSinkFunction,开发者经过调用 DataSource.addSink 方法并将 NebulaSinkFunction 对象做为参数传入便可实现将 Flink 数据流写入 Nebula Graph。
Nebula Flink Connector 使用的是 Flink 的 1.11-SNAPSHOT 版本,该版本中已经废弃了使用 writeUsingOutputFormat 方法来定义输出端的接口。
源码以下,因此请注意在使用自定义 Nebula Graph Sink 时请采用 DataStream.addSink 的方式。
/** @deprecated */ @Deprecated @PublicEvolving public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) { return this.addSink(new OutputFormatSinkFunction(format)); }
Nebula Graph Sink 实现类图以下:
其中最重要的两个类是 NebulaSinkFunction 和 NebulaBatchOutputFormat。
NebulaSinkFunction 继承自 AbstractRichFunction 并实现了如下方法:
NebulaBatchOutputFormat 继承自 AbstractNebulaOutPutFormat,AbstractNebulaOutPutFormat 继承自 RichOutputFormat,主要实现的方法有:
在 AbstractNebulaOutputFormat 中调用了 NebulaBatchExecutor 进行数据的批量管理和批量提交,并经过定义回调函数接收批量提交的结果,代码以下:
/** * write one record to buffer */ @Override public final synchronized void writeRecord(T row) throws IOException { nebulaBatchExecutor.addToBatch(row); if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) { commit(); } } /** * put record into buffer * * @param record represent vertex or edge */ void addToBatch(T record) { boolean isVertex = executionOptions.getDataType().isVertex(); NebulaOutputFormatConverter converter; if (isVertex) { converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions); } else { converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions); } String value = converter.createValue(record, executionOptions.getPolicy()); if (value == null) { return; } nebulaBufferedRow.putRow(value); } /** * commit batch insert statements */ private synchronized void commit() throws IOException { graphClient.switchSpace(executionOptions.getGraphSpace()); future = nebulaBatchExecutor.executeBatch(graphClient); // clear waiting rows numPendingRow.compareAndSet(executionOptions.getBatch(),0); } /** * execute the insert statement * * @param client Asynchronous graph client */ ListenableFuture executeBatch(AsyncGraphClientImpl client) { String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields()); String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows()); // construct insert statement String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values); // execute insert statement ListenableFuture<Optional<Integer>> execResult = client.execute(exec); // define callback function Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() { @Override public void onSuccess(Optional<Integer> integerOptional) { if (integerOptional.isPresent()) { if (integerOptional.get() == ErrorCode.SUCCEEDED) { LOG.info("batch insert Succeed"); } else { LOG.error(String.format("batch insert Error: %d", integerOptional.get())); } } else { LOG.error("batch insert Error"); } } @Override public void onFailure(Throwable throwable) { LOG.error("batch insert Error"); } }); nebulaBufferedRow.clean(); return execResult; }
因为 Nebula Graph Sink 的写入是批量、异步的,因此在最后业务结束 close 资源以前须要将缓存中的批量数据提交且等待写入操做的完成,以防在写入提交以前提早把 Nebula Graph Client 关闭,代码以下:
/** * commit the batch write operator before release connection */ @Override public final synchronized void close() throws IOException { if(numPendingRow.get() > 0){ commit(); } while(!future.isDone()){ try { Thread.sleep(10); } catch (InterruptedException e) { LOG.error("sleep interrupted, ", e); } } super.close(); }
Flink 将处理完成的数据 Sink 到 Nebula Graph 时,须要将 Flink 数据流进行 map 转换成 Nebula Graph Sink 可接收的数据格式。自定义 Nebula Graph Sink 的使用方式是经过 addSink 形式,将 NebulaSinkFunction 做为参数传给 addSink 方法来实现 Flink 数据流的写入。
/// 构造 Nebula Graphd 客户端链接须要的参数 NebulaClientOptions nebulaClientOptions = new NebulaClientOptions .NebulaClientOptionsBuilder() .setAddress("127.0.0.1:3699") .build(); NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); // 构造 Nebula Graph 写入操做参数 List<String> cols = Arrays.asList("name", "age") ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSink") .setTag(tag) .setFields(cols) .setIdIndex(0) .setBatch(20) .builder(); // 写入 Nebula Graph dataSource.addSink(nebulaSinkFunction);
Nebula Graph Sink 的 Demo 程序以 Nebula Graph 的 space:flinkSource 做为 Source 读取数据,进行 map 类型转换后 Sink 入 Nebula Graph 的 space:flinkSink,对应的应用场景为将 Nebula Graph 中一个 space 的数据流入另外一个 space 中。
Flink 1.11.0 以前,用户若是依赖 Flink 的 Source/Sink 读写外部数据源时,必需要手动读取对应数据系统的 Schema。好比,要读写 Nebula Graph,则必须先保证实确地知晓在 Nebula Graph 中的 Schema 信息。可是这样会有一个问题,当 Nebula Graph 中的 Schema 发生变化时,也须要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会形成运行时报错使做业失败。这个操做冗余且繁琐,体验极差。
1.11.0 版本后,用户使用 Flink Connector 时能够自动获取表的 Schema。能够在不了解外部系统数据 Schema 的状况下进行数据匹配。
目前 Nebula Flink Connector 中已支持数据的读写,要实现 Schema 的匹配则须要为 Flink Connector 实现 Catalog 的管理。但为了确保 Nebula Graph 中数据的安全性,Nebula Flink Connector 只支持 Catalog 的读操做,不容许进行 Catalog 的修改和写入。
访问 Nebula Graph 指定类型的数据时,完整路径应该是如下格式:<graphSpace>.<VERTEX.tag>
或者 <graphSpace>.<EDGE.edge>
具体使用方式以下:
String catalogName = "testCatalog"; String defaultSpace = "flinkSink"; String username = "root"; String password = "nebula"; String address = "127.0.0.1:45500"; String table = "VERTEX.player" // define Nebula catalog Catalog catalog = NebulaCatalogUtils.createNebulaCatalog(catalogName,defaultSpace, address, username, password); // define Flink table environment StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); tEnv = StreamTableEnvironment.create(bsEnv); // register customed nebula catalog tEnv.registerCatalog(catalogName, catalog); // use customed nebula catalog tEnv.useCatalog(catalogName); // show graph spaces of Nebula Graph String[] spaces = tEnv.listDatabases(); // show tags and edges of Nebula Graph tEnv.useDatabase(defaultSpace); String[] tables = tEnv.listTables(); // check tag player exist in defaultSpace ObjectPath path = new ObjectPath(defaultSpace, table); assert catalog.tableExists(path) == true // get nebula tag schema CatalogBaseTable table = catalog.getTable(new ObjectPath(defaultSpace, table)); table.getSchema();
Nebula Flink Connector 支持的其余 Catalog 接口请查看 GitHub 代码 NebulaCatalog.java。
Flink Connector 的 Exactly-once 是指 Flink 借助于 checkpoint 机制保证每一个输入事件只对最终结果影响一次,在数据处理过程当中即便出现故障,也不会存在数据重复和丢失的状况。
为了提供端到端的 Exactly-once 语义,Flink 的外部数据系统也必须提供提交或回滚的方法,而后经过 Flink 的 checkpoint 机制协调。Flink 提供了实现端到端的 Exactly-once 的抽象,即实现二阶段提交的抽象类 TwoPhaseCommitSinkFunction。
想为数据输出端实现 Exactly-once,则须要实现四个函数:
根据上述函数可看出,Flink 的二阶段提交对外部数据源有要求,即 Source 数据源必须具有重发功能,Sink 数据池必须支持事务提交和幂等写。
Nebula Graph v1.1.0 虽然不支持事务,但其写入操做是幂等的,即同一条数据的屡次写入结果是一致的。所以能够经过 checkpoint 机制实现 Nebula Flink Connector 的 At-least-Once 机制,根据屡次写入的幂等性能够间接实现 Sink 的 Exactly-once。
要使用 Nebula Graph Sink 的容错性,请确保在 Flink 的执行环境中开启了 checkpoint 配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000) // checkpoint every 10000 msecs .getCheckpointConfig() .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
喜欢这篇文章?来来来,给咱们的 GitHub 点个 star 表鼓励啦~~ 🙇♂️🙇♀️ [手动跪谢]
交流图数据库技术?交个朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你进交流群~~