本文是《Flink的DataSource三部曲》的终篇,前面都是在学习Flink已有的数据源功能,但若是这些不能知足须要,就要自定义数据源(例如从数据库获取数据),也就是今天实战的内容,以下图红框所示:
### Flink的DataSource三部曲文章连接
java
环境和版本
本次实战的环境和版本以下:mysql
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操做系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
在服务器上搭建Flink服务
- 前面两章的程序都是在IDEA上运行的,本章须要经过Flink的web ui观察运行结果,所以要单独部署Flink服务,我这里是在CentOS环境经过docker-compose部署的,如下是docker-compose.yml的内容,用于参考:
version: "2.1" services: jobmanager: image: flink:1.9.2-scala_2.12 expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager1: image: flink:1.9.2-scala_2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager2: image: flink:1.9.2-scala_2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
- 下图是个人Flink状况,有两个Task Maganer,共八个Slot所有可用:
源码下载
若是您不想写代码,整个系列的源码可在GitHub下载到,地址和连接信息以下表所示(https://github.com/zq2599/blog_demos):git
名称 | 连接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,以下图红框所示:
准备完毕,开始开发;
程序员
实现SourceFunctionDemo接口的DataSource
- 从最简单的开始,开发一个不可并行的数据源并验证;
- 实现SourceFunction接口,在工程flinkdatasourcedemo中增长SourceFunctionDemo.java:
package com.bolingcavalry.customize; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; public class SourceFunctionDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为2 env.setParallelism(2); DataStream<Tuple2<Integer,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<Integer, Integer>>() { private volatile boolean isRunning = true; @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { int i = 0; while (isRunning) { ctx.collect(new Tuple2<>(i++ % 5, 1)); Thread.sleep(1000); if(i>9){ break; } } } @Override public void cancel() { isRunning = false; } }); dataStream .keyBy(0) .timeWindow(Time.seconds(2)) .sum(1) .print(); env.execute("Customize DataSource demo : SourceFunction"); } }
- 从上述代码可见,给addSource方法传入一个匿名类实例,该匿名类实现了SourceFunction接口;
- 实现SourceFunction接口只需实现run和cancel方法;
- run方法产生数据,这里为了简答操做,每隔一秒产生一个Tuple2实例,因为接下来的算子中有keyBy操做,所以Tuple2的第一个字段始终保持着5的余数,这样能够多几个key,以便分散到不一样的slot中;
- 为了核对数据是否准确,这里并无无限发送数据,而是仅发送了10个Tuple2实例;
- cancel是job被取消时执行的方法;
- 总体并行度显式设置为2;
- 编码完成后,执行mvn clean package -U -DskipTests构建,在target目录获得文件flinkdatasourcedemo-1.0-SNAPSHOT.jar;
- 在Flink的web UI上传flinkdatasourcedemo-1.0-SNAPSHOT.jar,并指定执行类,以下图红框所示:
- 任务执行完成后,在Completed Jobs页面能够看到,DataSource的并行度是1(红框),对应的SubTask一共发送了10条记录(蓝框),这和咱们的代码是一致的;
- 再来看消费的子任务,以下图,红框显示并行度是2,这和前面代码中的设置是一致的,蓝框显示两个子任务一共收到10条数据记录,和上游发出的数量一致:
- 接下来尝试多并行度的DataSource;
实现ParallelSourceFunction接口的DataSource
- 若是自定义DataSource中有复杂的或者耗时的操做,那么增长DataSource的并行度,让多个SubTask同时进行这些操做,能够有效提高总体吞吐量(前提是硬件资源充裕);
- 接下来实战能够并行执行的DataSource,原理是DataSoure实现ParallelSourceFunction接口,代码以下,可见和SourceFunctionDemo几乎同样,只是addSource方发入参不一样,该入参依然是匿名类,不过实现的的接口变成了ParallelSourceFunction:
package com.bolingcavalry.customize; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; public class ParrelSourceFunctionDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为2 env.setParallelism(2); DataStream<Tuple2<Integer,Integer>> dataStream = env.addSource(new ParallelSourceFunction<Tuple2<Integer, Integer>>() { private volatile boolean isRunning = true; @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { int i = 0; while (isRunning) { ctx.collect(new Tuple2<>(i++ % 5, 1)); Thread.sleep(1000); if(i>9){ break; } } } @Override public void cancel() { isRunning = false; } }); dataStream .keyBy(0) .timeWindow(Time.seconds(2)) .sum(1) .print(); env.execute("Customize DataSource demo : ParallelSourceFunction"); } }
- 编码完成后,执行mvn clean package -U -DskipTests构建,在target目录获得文件flinkdatasourcedemo-1.0-SNAPSHOT.jar;
- 在Flink的web UI上传flinkdatasourcedemo-1.0-SNAPSHOT.jar,并指定执行类,以下图红框所示:
- 任务执行完成后,在Completed Jobs页面能够看到,现在DataSource的并行度是2(红框),对应的SubTask一共发送了20条记录(蓝框),这和咱们的代码是一致的,绿框显示两个SubTask的Task Manager是同一个:
- 为何DataSource一共发送了20条记录?由于每一个SubTask中都有一份ParallelSourceFunction匿名类的实例,对应的run方法分别被执行,所以每一个SubTask都发送了10条;
- 再来看消费数据的子任务,以下图,红框显示并行度与代码中设置的数量是一致的,蓝框显示两个SubTask一共消费了20条记录,和数据源发出的记录数一致,另外绿框显示两个SubTask的Task Manager是同一个,并且和DataSource的TaskManager是同一个,所以整个job都是在同一个TaskManager进行的,没有跨机器带来的额外代价:
- 接下来要实践的内容,和另外一个重要的抽象类有关;
继承抽象类RichSourceFunction的DataSource
- 对RichSourceFunction的理解是从继承关系开始的,以下图,SourceFunction和RichFunction的特性最终都体如今RichSourceFunction上,SourceFunction的特性是数据的生成(run方法),RichFunction的特性是对资源的链接和释放(open和close方法):
- 接下来开始实战,目标是从MySQL获取数据做为DataSource,而后消费这些数据;
- 请提早准备好可用的MySql数据库,而后执行如下SQL,建立库、表、记录:
DROP DATABASE IF EXISTS flinkdemo; CREATE DATABASE IF NOT EXISTS flinkdemo; USE flinkdemo; SELECT 'CREATING DATABASE STRUCTURE' as 'INFO'; DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(25) COLLATE utf8_bin DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; INSERT INTO `student` VALUES ('1', 'student01'), ('2', 'student02'), ('3', 'student03'), ('4', 'student04'), ('5', 'student05'), ('6', 'student06'); COMMIT;
- 在pom.xml中增长mysql依赖:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.34</version> </dependency>
- 新增MySQLDataSource.java,内容以下:
package com.bolingcavalry.customize; import com.bolingcavalry.Student; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class MySQLDataSource extends RichSourceFunction<Student> { private Connection connection = null; private PreparedStatement preparedStatement = null; private volatile boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); if(null==connection) { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); } if(null==preparedStatement) { preparedStatement = connection.prepareStatement("select id, name from student"); } } /** * 释放资源 * @throws Exception */ @Override public void close() throws Exception { super.close(); if(null!=preparedStatement) { try { preparedStatement.close(); } catch (Exception exception) { exception.printStackTrace(); } } if(null==connection) { connection.close(); } } @Override public void run(SourceContext<Student> ctx) throws Exception { ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next() && isRunning) { Student student = new Student(); student.setId(resultSet.getInt("id")); student.setName(resultSet.getString("name")); ctx.collect(student); } } @Override public void cancel() { isRunning = false; } }
- 上面的代码中,MySQLDataSource继承了RichSourceFunction,做为一个DataSource,能够做为addSource方法的入参;
- open和close方法都会被数据源的SubTask调用,open负责建立数据库链接对象,close负责释放资源;
- open方法中直接写死了数据库相关的配置(不可取);
- run方法在open以后被调用,做用和以前的DataSource例子同样,负责生产数据,这里是用前面准备好的preparedStatement对象直接去数据库取数据;
- 接下来写个Demo类使用MySQLDataSource:
package com.bolingcavalry.customize; import com.bolingcavalry.Student; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class RichSourceFunctionDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为2 env.setParallelism(2); DataStream<Student> dataStream = env.addSource(new MySQLDataSource()); dataStream.print(); env.execute("Customize DataSource demo : RichSourceFunction"); } }
- 从上述代码可见,MySQLDataSource实例传入addSource方法便可建立数据集;
- 像以前那样,编译构建、提交到Flink、指定任务类,便可开始执行此任务;
- 执行结果以下图,DataSource的并行度是1,一共发送六条记录,即student表的全部记录:
- 处理数据的SubTask一共两个,各处理三条消息:
- 因为代码中对数据集执行了print(),所以在TaskManager控制台看到数据输出以下图红框所示:
关于RichParallelSourceFunction
- 实战到了这里,还剩RichParallelSourceFunction这个抽象类咱们尚未尝试过,但我以为这个类能够不用在文中多说了,我们把RichlSourceFunction和RichParallelSourceFunction的类图放在一块儿看看:
- 从上图可见,在RichFunction继承关系上,二者一致,在SourceFunction的继承关系上,RichlSourceFunction和RichParallelSourceFunction略有不一样,RichParallelSourceFunction走的是ParallelSourceFunction这条线,而SourceFunction和ParallelSourceFunction的区别,前面已经讲过了,所以,结果不言而喻:继承RichParallelSourceFunction的DataSource的并行度是能够大于1的;
- 读者您若是有兴趣,能够将前面的MySQLDataSource改为继承RichParallelSourceFunction再试试,DataSource的并行度会超过1,可是毫不是只有这一点变化,DAG图显示Flink还会作一些Operator Chain处理,但这不是本章要关注的内容,只能说结果是正确的(两个DataSource的SubTask,一共发送12条记录),建议您试试;
至此,《Flink的DataSource三部曲》系列就所有完成了,好的开始是成功的一半,在拿到数据后,后面还有不少知识点要学习和掌握,接下来的文章会继续深刻Flink的奇妙之旅;github