看了几天flink,刚入门。
简单说下对flink的感觉,flink有4层(有些说3层,将Table API和SQL当作一层)API,越底层,对数据的操做就越精细,越高层完成功能所须要的代码就越少,并且代码越易读。java
image.png
api使用起来很像java中的stream,这个其实很显然,都是为了对流数据进行处理。感受就像flink是java中并行流的分布式版本,因此对stream熟悉的话,flink上手不难,或者说使用flink编写代码并不难。git
Flink的编程模式:输入(source) -> 处理(转换transform) -> 输出(sink),3部分,至关清爽。apache
统一术语
数据比对通常针对两个数据集A/B,在选定一个基准方A后,定义以下:
F000:A/B两方数据相同
F113:A中存在,但B中没有,A比B多
F114:B中存在,但A中没有,B比A多
F115:A与B的关键字段相同,但毕竟字段不一样,如A与B都有同一笔订单,但订单金额不一样编程
新建工程
这里咱们使用官方提供的quickstart作模板,若是是比较新版的idea(如2020.1)里面直接有flink的quickstart模板,旧版的idea的话,须要本身添加一下。api
image.png
image.png
下次使用的时候能够直接从这里看到:app
image.png
若是你使用的是scala,ArtifactId则填flink-quickstart-scala。具体的版本信息能够根据最新版的填写。分布式
添加Table API依赖
在pom.xml中添加Table API依赖。ide
<!-- Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Table API须要scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
编写代码
利用模板里的BatchJob来编写:函数
package com.flink; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import java.util.List; /** * Skeleton for a Flink Batch Job. * * <p>For a tutorial how to write a Flink batch application, check the * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>. * * <p>To package your application into a JAR file for execution, * change the main class in the POM.xml file to this class (simply search for 'mainClass') * and run 'mvn clean package' on the command line. */ public class BatchJob { public static void main(String[] args) throws Exception { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Table Environment BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env); /** * 构造两个数据集,实际生产从本身须要的source中获取便可 */ DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115"); DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114"); // 转换成table Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique); Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique); /** * 核心比对(对帐)逻辑 */ Table f113_table = tableA_unique.minusAll(tableB_unique);// 差集 Table f114_table = tableB_unique.minusAll(tableA_unique);// 差集 Table f000_table = tableA_unique.intersect(tableB_unique);// 交集 // 转回DataSet用于输出 DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class); DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class); DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class); /** * 输出,实际输出到本身须要的sink便可 */ List<String> f000_list = f000.collect(); List<String> f113_list = f113.collect(); List<String> f114_list = f114.collect(); System.out.println("=============================="); System.out.println("f000 ->" + f000_list); System.out.println("=============================="); System.out.println("f113 ->" + f113_list); System.out.println("=============================="); System.out.println("f114 ->" + f114_list); // 批处理不须要显示调用execute,不然会报错 // env.execute("Flink Batch Java API Skeleton"); } }
简单说下几个关键点:ui
- 使用Table API须要建立对应的执行环境:
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
- 模板代码中最后显式调用
env.execute()
,其实在批处理中不须要,显式调用反而会报错。
源码
总结
本质上就是利用Table API中对数据集的处理函数(交集、差集)来完成数据比对。
若是你有更好的想法,欢迎留言,多多指教。
转载请注明出处