来源: http://www.54tianzhisheng.cn/...
Apache Flink 是近年来愈来愈流行的一款开源大数据计算引擎,它同时支持了批处理和流处理,也能用来作一些基于事件的应用。使用官网的一句话来介绍 Flink 就是 “Stateful Computations Over Streams”。java
首先 Flink 是一个纯流式的计算引擎,它的基本数据模型是数据流。流能够是无边界的无限流,即通常意义上的流处理。也能够是有边界的有限流,这样就是批处理。所以 Flink 用一套架构同时支持了流处理和批处理。其次,Flink 的一个优点是支持有状态的计算。若是处理一个事件(或一条数据)的结果只跟事件自己的内容有关,称为无状态处理;反之结果还和以前处理过的事件有关,称为有状态处理。稍微复杂一点的数据处理,好比说基本的聚合,数据流之间的关联都是有状态处理。mysql
那么那些常见的无穷数据集有哪些呢?git
数据运算模型有哪些呢:github
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i3IYaQm9-1595768814163)(https://ws3.sinaimg.cn/large/...]web
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1KK2dXk1-1595768814167)(https://ws3.sinaimg.cn/large/...]sql
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8uDNlF7v-1595768814169)(https://ws2.sinaimg.cn/large/...]apache
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W7z6hQI9-1595768814171)(https://ws4.sinaimg.cn/large/...]编程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4mHt3Iq8-1595768814173)(/Applications/Typora.app/Contents/Resources/TypeMark/Docs/img/006tNbRwly1fw6nu5yishj31kw0w04cm.jpg)]api
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qGWVgGhG-1595768814174)(https://ws2.sinaimg.cn/large/...]缓存
从下至上:
一、部署:Flink 支持本地运行、能在独立集群或者在被 YARN 或 Mesos 管理的集群上运行, 也能部署在云上。
二、运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理。
三、API:DataStream、DataSet、Table、SQL API。
四、扩展库:Flink 还包括用于复琐事件处理,机器学习,图形处理和 Apache Storm 兼容性的专用代码库。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZWEXSka-1595768814175)(https://ws2.sinaimg.cn/large/...]
你能够在表与 DataStream/DataSet 之间无缝切换,也容许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j4U1eMhC-1595768814176)(https://ws1.sinaimg.cn/large/...]
Flink 应用程序结构就是如上图所示:
一、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,固然你也能够定义本身的 source。
二、Transformation:数据转换的各类操做,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操做不少,能够将数据转换计算成你想要的数据。
三、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能须要存储下来,Flink 常见的 Sink 大概有以下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也能够定义本身的 sink。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4Tz386WK-1595768814177)(https://ws3.sinaimg.cn/large/...]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6QPM1ZUj-1595768814178)(https://ws1.sinaimg.cn/large/...]
经过 homebrew 来安装。
brew install apache-flink
flink -v
进入到:/usr/local/Cellar/apache-flink/1.7.1/libexec/bin
./start-cluster.sh
访问http://localhost:8081/#/overview
GroupId=org.apache.flink ArtifactId=flink-quickstart-java Version=1.6.1
package study; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author wangjun * @date 2019/03/25 */ public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { //参数检查 if (args.length != 2) { System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>"); return; } String hostname = args[0]; Integer port = Integer.parseInt(args[1]); // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据 DataStreamSource<String> stream = env.socketTextStream(hostname, port); //计数 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter()) .keyBy(0) .sum(1); sum.print(); env.execute("Java WordCount from SocketTextStream Example"); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) { String[] tokens = s.toLowerCase().split("\\W+"); for (String token: tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
mvn clean package -Dmaven.test.skip=true
nc -l 9000
flink run -c study.SocketTextStreamWordCount /Users/wangjun/timwang/codeArea/study/target/original-study-1.0-SNAPSHOT.jar 127.0.0.1 9000
tail -f flink-wangjun-taskexecutor-0-wangjundeMBP.out
数据来源,Flink 作为一款流式计算框架,它可用来作批处理,即处理静态的数据集、历史的数据集;也能够用来作流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就可以一直计算下去,这个 Data Sources 就是数据的来源地。
Flink 中你可使用 StreamExecutionEnvironment.addSource(sourceFunction)
来为你的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,固然你也能够经过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source,
StreamExecutionEnvironment 中可使用如下几个已实现的 stream sources,
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9Jk36Myp-1595768814183)(https://ws4.sinaimg.cn/large/...]
一、fromCollection(Collection) - 从 Java 的 Java.util.Collection 建立数据流。集合中的全部元素类型必须相同。
二、fromCollection(Iterator, Class) - 从一个迭代器中建立数据流。Class 指定了该迭代器返回元素的类型。
三、fromElements(T …) - 从给定的对象序列中建立数据流。全部对象类型必须相同。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Person> input = env.fromElements( new Person(1, "name", 12), new Person(2, "name2", 13), new Person(3, "name3", 14) );
四、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中建立并行数据流。Class 指定了该迭代器返回元素的类型。
五、generateSequence(from, to) - 建立一个生成指定区间范围内的数字序列的并行数据流。
一、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其做为字符串返回。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
二、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。
三、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 能够按期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你能够经过 pathFilter 进一步排除掉须要处理的文件。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<MyEvent> stream = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter(), typeInfo);
socketTextStream(String hostname, int port) - 从 socket 读取。元素能够用分隔符切分。
addSource - 添加一个新的 source function。例如,你能够 addSource(new FlinkKafkaConsumer011<>(…)) 以从 Apache Kafka 读取数据
package study; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; /** * @author wangjun * @date 2019/3/25 */ public class SourceFromMySQL extends RichSourceFunction<Student> { private PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "select * from student;"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } @Override public void run(SourceContext<Student> ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { Student student = new Student( resultSet.getInt("id"), resultSet.getString("name").trim(), resultSet.getString("password").trim(), resultSet.getInt("age")); ctx.collect(student); } } @Override public void cancel() { } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/cloud?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage()); } return con; } }
package study; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author wangjun * @date 2019/3/25 */ public class MySqlSourceMain { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new SourceFromMySQL()).print(); env.execute("Flink add data source"); } }
一、基于集合:有界数据集,更偏向于本地测试用
二、基于文件:适合监听文件修改并读取其内容
三、基于 Socket:监听主机的 host port,从 Socket 中获取数据
四、自定义 addSource:大多数的场景数据都是无界的,会源源不断的过来。好比去消费 Kafka 某个 topic 上的数据,这时候就须要用到这个 addSource,可能由于用的比较多的缘由吧,Flink 直接提供了 FlinkKafkaConsumer011 等类可供你直接使用。你能够去看看 FlinkKafkaConsumerBase 这个基础类,它是 Flink Kafka 消费的最根本的类。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SS0BBXqM-1595768814185)(https://ws4.sinaimg.cn/large/...]
本案例将实现一个“实时热门商品”的需求,每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解
咱们大概要作这么几件事情:
• 抽取出业务时间戳,告诉 Flink 框架基于业务时间作窗口
• 过滤出点击行为数据
• 按一小时的窗口大小,每 5 分钟统计一次,作滑动窗口聚合( Sliding Window)
• 按每一个窗口聚合,输出每一个窗口中点击量前 N 名的商品
这里咱们准备了一份淘宝用户行为数据集(来自阿里云天池公开数据集)。本数据集包含了淘宝上某一天随机一百万用户的全部行为(包括点击、购买、加购、收藏)。数据集的组织形式和 MovieLens-20M 相似,即数据集的每一行表示一条用户行为,由用户 ID、商品 ID、商品类目 ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述以下:
curl https://raw.githubusercontent... UserBehavior.csv
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 为了打印到控制台的结果不乱序,咱们配置全局的并发为 1,这里改变并发对结果正确性没有影响 env.setParallelism(1); // UserBehavior.csv 的本地文件路径 URL fileUrl = HotItems.class.getClassLoader().getResource("UserBehavior.csv"); Path filePath = Path.fromLocalFile(new File(fileUrl.toURI())); // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class); // 因为 Java 反射抽取出的字段顺序是不肯定的,须要显式指定下文件中字段的顺序 String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"}; // 建立 PojoCsvInputFormat PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder); // PojoCsvInputFormat 建立输入源。 DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);
CsvInputFormat 建立模拟数据源。咱们先建立一个 UserBehavior 的 POJO 类(全部成员变量声明成 public 即是 POJO 类),强类
型化后能方便后续的处理
package study.goods; import java.io.Serializable; /** * @author wangjun * @date 2019/3/26 */ public class UserBehavior implements Serializable { /** * 用户 ID */ public long userId; /** * 商品 ID */ public long itemId; /** * 商品类目 ID */ public int categoryId; /** * 用户行为, 包括("pv", "buy", "cart", "fav") */ public String behavior; /** * 行为发生的时间戳,单位秒 */ public long timestamp; public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } public long getItemId() { return itemId; } public void setItemId(long itemId) { this.itemId = itemId; } public int getCategoryId() { return categoryId; } public void setCategoryId(int categoryId) { this.categoryId = categoryId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } }
当咱们说“统计过去一小时内点击量”,这里的“一小时”是指什么呢? 在 Flink 中它能够是指 ProcessingTime ,也能够是 EventTime,由用户决定。
• ProcessingTime:事件被处理的时间。也就是由机器的系统时间来决定。
• EventTime:事件发生的时间。通常就是数据自己携带的时间。
在本案例中,咱们须要统计业务时间上的每小时的点击量,因此要基于 EventTime 来处理。那么若是让 Flink 按照咱们想要的业务时间来处理呢?这里主要有两件事情要作。
第一件是告诉 Flink 咱们如今按照 EventTime 模式进行处理, Flink 默认使用 ProcessingTime处理,因此咱们要显式设置下。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
第二件事情是指定如何得到业务时间,以及生成 Watermark。 Watermark 是用来追踪业务事件的概念,能够理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。因为咱们的数据源的数据已经通过整理,没有乱序,即事件的时间戳是单调递增的,因此能够将每条数据的业务时间就当作 Watermark。这里咱们用 AscendingTimestampExtractor 来实现时间戳的抽取和 Watermark 的生成。
// 这样咱们就获得了一个带有时间标记的数据流了,后面就能作一些窗口的操做 DataStream<UserBehavior> timedData = dataSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() { @Override public long extractAscendingTimestamp(UserBehavior userBehavior) { // 原始数据单位秒,将其转成毫秒 return userBehavior.timestamp * 1000; } });
在开始窗口操做以前,先回顾下需求“每隔 5 分钟输出过去一小时内点击量最多的前 N 个商品”。因为原始数据中存在点击、加购、购买、收藏各类行为的数据,可是咱们只须要统计点击量,因此先使用 FilterFunction 将点击行为数据过滤出来
DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() { @Override public boolean filter(UserBehavior userBehavior) throws Exception { // 过滤出只有点击的数据 return userBehavior.behavior.equals("pv"); } });
因为要每隔 5 分钟统计一次最近一小时每一个商品的点击量,因此窗口大小是一小时,每隔 5 分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)... 等窗口的商品点击量。是一个常见的滑动窗口需求( Sliding Window)
DataStream<ItemViewCount> windowedData = pvData.keyBy("itemId") .timeWindow(Time.minutes(60), Time.minutes(5)) .aggregate(new CountAgg(), new WindowResultFunction());
咱们使用.keyBy("itemId")对商品进行分组,使用.timeWindow(Time size, Time slide)对每一个 商 品 作 滑 动 窗 口 (1 小 时 窗 口 , 5 分 钟 滑 动 一 次 )。 然 后 我 们 使用 .aggregate(AggregateFunction af, WindowFunction wf) 作增量的聚合操做,它能使用AggregateFunction 提早聚合掉数据,减小 state 的存储压力。较之.apply(WindowFunction wf)会将窗口中的数据都存储下来,最后一块儿计算要高效地多。aggregate()方法的第一个参数用于
这里的 CountAgg 实现了 AggregateFunction 接口,功能是统计窗口中的条数,即遇到一条数据就加一。
package study.goods; import org.apache.flink.api.common.functions.AggregateFunction; /** * COUNT 统计的聚合函数实现,每出现一条记录加一 * @author wangjun * @date 2019/3/26 */ public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehavior userBehavior, Long acc) { return acc + 1; } @Override public Long getResult(Long acc) { return acc; } @Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; } }
aggregate(AggregateFunction af, WindowFunction wf)的第二个参数 WindowFunction 将每一个 key 每一个窗口聚合后的结果带上其余信息进行输出。咱们这里实现的 WindowResultFunction将主键商品 ID,窗口,点击量封装成了 ItemViewCount 进行输出。
package study.goods; /** * @author wangjun * @date 2019/3/26 */ public class ItemViewCount { public long itemId; // 商品 ID public long windowEnd; // 窗口结束时间戳 public long viewCount; // 商品的点击量 public static ItemViewCount of(long itemId, long windowEnd, long viewCount) { ItemViewCount result = new ItemViewCount(); result.itemId = itemId; result.windowEnd = windowEnd; result.viewCount = viewCount; return result; } }
如今咱们获得了每一个商品在每一个窗口的点击量的数据流。
为了统计每一个窗口下最热门的商品,咱们须要再次按窗口进行分组,这里根据 ItemViewCount中的 windowEnd 进行 keyBy()操做。而后使用 ProcessFunction 实现一个自定义的 TopN 函数TopNHotItems来计算点击量排名前 3名的商品,并将排名结果格式化成字符串,便于后续输出。
DataStream<String> topItems = windowedData .keyBy("windowEnd") .process(new TopNHotItems(3)); // 求点击量前 3 名的商品
ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持 EventTime 或 ProcessingTime)。本案例中咱们将利用 timer 来判断什么时候收齐了某个 window 下全部商品的点击量数据。因为 Watermark 的进度是全局的,
在 processElement 方法中,每当收到一条数据( ItemViewCount),咱们就注册一个 windowEnd+1的定时器( Flink 框架会自动忽略同一时间的重复注册)。 windowEnd+1 的定时器被触发时,意味着收到了 windowEnd+1 的 Watermark,即收齐了该 windowEnd 下的全部商品窗口统计值。我
们在 onTimer()中处理将收集的全部商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。
这里咱们还使用了 ListState<ItemViewCount>来存储收到的每条 ItemViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。 ListState 是 Flink 提供的相似 Java List 接口的State API,它集成了框架的 checkpoint 机制,自动作到了 exactly-once 的语义保证
package study.goods; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Comparator; import java.util.List; /** * @author wangjun * @date 2019/3/26 */ public class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> { private final int topSize; // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算 private ListState<ItemViewCount> itemState; public TopNHotItems(int topSize) { this.topSize = topSize; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 状态的注册 ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( "itemState-state", ItemViewCount.class); itemState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 获取收到的全部商品点击量 List<ItemViewCount> allItems = new ArrayList<>(); for (ItemViewCount item : itemState.get()) { allItems.add(item); } // 提早清除状态中的数据,释放空间 itemState.clear(); // 按照点击量从大到小排序 allItems.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return (int) (o2.viewCount - o1.viewCount); } }); // 将排名信息格式化成 String, 便于打印 StringBuilder result = new StringBuilder(); result.append("====================================\n"); result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n"); for (int i = 0; i < topSize; i++) { ItemViewCount currentItem = allItems.get(i); // No1: 商品 ID=12224 浏览量=2413 result.append("No").append(i).append(":") .append(" 商品 ID=").append(currentItem.itemId) .append(" 浏览量=").append(currentItem.viewCount) .append("\n"); } result.append("====================================\n\n"); out.collect(result.toString()); } @Override public void processElement(ItemViewCount input, Context context, Collector<String> collector) throws Exception { // 每条数据都保存到状态中 itemState.add(input); // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于 windowEnd 窗口的全部商品数据 context.timerService().registerEventTimeTimer(input.windowEnd + 1); } }