Apache Flink是一个分布式框架处理引擎,用于对无界和有界数据流进行有状态计算。Flink运行在全部常见的集群环境中运行,高效率的执行计算。安全
无界数据流:有一个开始但没有定义的结束。它们不会在生成时终止并提供数据。必须持续处理无界流,即必须在摄取事件后当即处理事件。没法等待全部输入数据到达,由于输入是无界的,而且在任什么时候间点都不会完成。处理无界数据一般要求以特定顺序(例如事件发生的顺序)摄取事件,以便可以推断结果完整性。(即流数据)数据结构
有界数据流:具备定义的开始和结束。能够在执行任何计算以前经过摄取全部数据来处理有界流。处理有界流不须要有序摄取,由于能够始终对有界数据集进行排序。(即批处理)架构
以下图: 框架
Apache Flink是一个分布式系统,须要计算资源才能执行应用程序。Flink与全部常见的集群资源管理器(如Hadoop YARN,Apache Mesos和Kubernetes)集成,但也能够设置为做为独立集群运行。less
Flink旨在以任何规模运行有状态流应用程序。应用程序能够并行化为数千个在集群中分布和同时执行的任务。(有状态和无状态的区别-> 有状态对象(Stateful Bean),就是有实例变量的对象,能够保存数据,是非线程安全的。 无状态对象(Stateless Bean),就是没有实例变量的对象,不能保存数据,是不变类,是线程安全的。)异步
有状态Flink应用程序针对本地状态访问进行了优化。任务状态始终保留在内存中,或者,若是状态大小超过可用内存,则保存在访问高效的磁盘上数据结构中。所以,任务经过访问本地(一般是内存中)状态来执行全部计算,从而产生很是低的处理延迟。Flink经过按期和异步检查本地状态到持久存储来保证在出现故障时的一次状态一致性。 以下图: 分布式
Flink提供三种的API:ide
SQL和Table API Flink有两个关系API,Table API和SQL。这两个API都是用于批处理和流处理的统一API,即,在无界的实时流或有界的记录流上以相同的语义执行查询,并产生相同的结果。Table API和SQL利用Apache Calcite进行解析,验证和查询优化。它们能够与DataStream和DataSet API无缝集成,并支持用户定义的标量,聚合和表值函数。 如下SQL用于对点击流进行会话并计算每一个会话的点击次数的SQL查询:函数
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
复制代码
ProcessFunctions是Flink提供的最具表现力的功能接口。Flink提供ProcessFunctions来处理来自窗口中分组的一个或两个输入流或事件的单个事件。ProcessFunctions提供对时间和状态的细粒度控制。ProcessFunction能够任意修改其状态并注册将在将来触发回调函数的定时器。所以,ProcessFunctions能够实现许多有状态事件驱动应用程序所需的复杂的每事件业务逻辑。如下示例显示了KeyedProcessFunction对a KeyedStream和match START以及END事件进行操做的示例。当一个START被接收的事件,则该函数在记住其状态时间戳和计时在四个小时的计时器。若是END在计时器触发以前收到事件,则该函数计算事件END和START事件之间的持续时间,清除状态并返回值。不然,计时器只会触发并清除状态。oop
/**
* 匹配流入的START和END事件,并计算两个元素的时间的差;
* 第一个String字段是键属性,第二个String属性标记START和END事件。
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<Long> startTime;
@Override
public void open(Configuration conf) {
// 获取状态处理
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
switch (in.f1) {
case "START":
// 若是接受到一个开始事件,则设置开始时间
startTime.update(ctx.timestamp());
// 注册一个计时器,从开始时间开始的四个小时内
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// 发出开始和结束事件之间的持续时间
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// 清除状态
startTime.clear();
}
default:
// do nothing
}
}
/** 计时器触发时调用 */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {
// 超时时,清除状态
startTime.clear();
}
}
复制代码
DataStream API所述的数据流中的API经过查询外部数据存储提供了许多常见的流处理操做。数据流API可用于Java和Scala和基于功能,如map(),reduce()和aggregate()。能够经过扩展接口或Java或Scala lambda函数来定义函数。 如下示例显示如何对点击流进行会话并计算每一个会话的点击次数。
// 对点击流进行会话并计算每一个会话的点击次数
DataStream<Click> clicks = ...
DataStream<Tuple2<String, Long>> result = clicks
.map(
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// 定义userId的键是0
.keyBy(0)
// 定义30分钟的会话间隙
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// 计算每一个会话的点击数
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));复制代码