Flink Doris Connector设计方案
该方案首先感谢社区Spark Doris Connector的做者html
从Doris角度看,将其数据引入Flink,可使用Flink一系列丰富的生态产品,拓宽了产品的想象力,也使得Doris和其余数据源的联合查询成为可能java
从咱们业务架构出发和业务需求,咱们选择了Flink做为咱们架构的一部分,用于数据的ETL及实时计算框架,社区目前支持Spark doris connector,所以咱们参照Spark doris connector 设计开发了Flink doris Connector。node
技术选型
一开始咱们选型的时候,也是和Spark Doris Connector 同样,开始考虑的是JDBC的方式,可是这种方式就像Spark doris connector那篇文章中说的,有优势,可是缺点更明显。后来咱们阅读及测试了Spark的代码,决定站在巨人的肩上来实现(备注:直接拷贝代码修改)。git
如下内容来自Spark Doris Connector博客的,直接拷贝了github
因而咱们开发了针对Doris的新的Datasource,Spark-Doris-Connector。这种方案下,Doris能够暴露Doris数据分布给Spark。Spark的Driver访问Doris的FE获取Doris表的Schema和底层数据分布。以后,依据此数据分布,合理分配数据查询任务给Executors。最后,Spark的Executors分别访问不一样的BE进行查询。大大提高了查询的效率
使用方法
在Doris的代码库的 extension/flink-doris-connector/ 目录下编译生成doris-flink-1.0.0-SNAPSHOT.jar,将这个jar包加入flink的ClassPath中,便可使用Flink-on-Doris功能了sql
SQL方式
CREATE TABLE flink_doris_source ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', 'username' = '$YOUR_DORIS_USERNAME', 'password' = '$YOUR_DORIS_PASSWORD' ); CREATE TABLE flink_doris_sink ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', 'username' = '$YOUR_DORIS_USERNAME', 'password' = '$YOUR_DORIS_PASSWORD' ); INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DataStream方式
DorisOptions.Builder options = DorisOptions.builder() .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") .setUsername("$YOUR_DORIS_USERNAME") .setPassword("$YOUR_DORIS_PASSWORD") .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"); env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
适用场景
1.使用Flink对Doris中的数据和其余数据源进行联合分析
不少业务部门会将本身的数据放在不一样的存储系统上,好比一些在线分析、报表的数据放在Doris中,一些结构化检索数据放在Elasticsearch中、一些须要事物的数据放在MySQL中,等等。业务每每须要跨多个存储源进行分析,经过Flink Doris Connector打通Flink和Doris后,业务能够直接使用Flink,将Doris中的数据与多个外部数据源作联合查询计算。数据库
2.实时数据接入
Flink Doris Connector以前:针对业务不规则数据,常常须要针对消息作规范处理,空值过滤等写入新的topic,而后再启动Routine load写入Doris。apache
Flink Doris Connector以后:flink读取kafka,直接写入doris。json
技术实现
架构图
Doris对外提供更多能力
Doris FE
对外开放了获取内部表的元数据信息、单表查询规划和部分统计信息的接口。api
全部的Rest API接口都须要进行HttpBasic认证,用户名和密码是登陆数据库的用户名和密码,须要注意权限的正确分配。
// 获取表schema元信息 GET api/{database}/{table}/_schema // 获取对单表的查询规划模版 POST api/{database}/{table}/_query_plan { "sql": "select k1, k2 from {database}.{table}" } // 获取表大小 GET api/{database}/{table}/_count
Doris BE
经过Thrift协议,直接对外提供数据的过滤、扫描和裁剪能力。
service TDorisExternalService { // 初始化查询执行器 TScanOpenResult open_scanner(1: TScanOpenParams params); // 流式batch获取数据,Apache Arrow数据格式 TScanBatchResult get_next(1: TScanNextBatchParams params); // 结束扫描 TScanCloseResult close_scanner(1: TScanCloseParams params); }
Thrift相关结构体定义可参考:
https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift
实现DataStream
继承 org.apache.flink.streaming.api.functions.source.RichSourceFunction ,自定义DorisSourceFunction,初始化时,获取相关表的执行计划,获取对应的分区。
重写run方法,循环从分区中读取数据。
public void run(SourceContext sourceContext){ //循环读取各分区 for(PartitionDefinition partitions : dorisPartitions){ scalaValueReader = new ScalaValueReader(partitions, settings); while (scalaValueReader.hasNext()){ Object next = scalaValueReader.next(); sourceContext.collect(next); } } }
实现Flink SQL on Doris
参考了Flink自定义Source&Sink 和 Flink-jdbc-connector,实现了下面的效果,能够实现用Flink SQL直接操做Doris的表,包括读和写。
实现细节
1.实现DynamicTableSourceFactory , DynamicTableSinkFactory 注册 doris connector
2.自定义DynamicTableSource和DynamicTableSink 生成逻辑计划
3.DorisRowDataInputFormat和DorisDynamicOutputFormat获取到逻辑计划后开始执行。
实现中最主要的是基于RichInputFormat和RichOutputFormat 定制的DorisRowDataInputFormat和DorisDynamicOutputFormat。
在DorisRowDataInputFormat中,将获取到的dorisPartitions 在createInputSplits中 切分红多个分片,用于并行计算。
public DorisTableInputSplit[] createInputSplits(int minNumSplits) { List<DorisTableInputSplit> dorisSplits = new ArrayList<>(); int splitNum = 0; for (PartitionDefinition partition : dorisPartitions) { dorisSplits.add(new DorisTableInputSplit(splitNum++,partition)); } return dorisSplits.toArray(new DorisTableInputSplit[0]); } public RowData nextRecord(RowData reuse) { if (!hasNext) { //已经读完数据,返回null return null; } List next = (List)scalaValueReader.next(); GenericRowData genericRowData = new GenericRowData(next.size()); for(int i =0;i<next.size();i++){ genericRowData.setField(i, next.get(i)); } //判断是否还有数据 hasNext = scalaValueReader.hasNext(); return genericRowData; }
在DorisRowDataOutputFormat中,经过streamload的方式向doris中写数据。streamload程序参考org.apache.doris.plugin.audit.DorisStreamLoader
public void writeRecord(RowData row) throws IOException { //streamload 默认分隔符 \t StringJoiner value = new StringJoiner("\t"); GenericRowData rowData = (GenericRowData) row; for(int i = 0; i < row.getArity(); ++i) { value.add(rowData.getField(i).toString()); } //streamload 写数据 DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString()); System.out.println(loadResponse); }
后续Flink Doris Connector计划
1.doris sink 批量写入
2.doris sink 支持json 数据写入