Flink常见Checkpoint超时问题排查思路
flink 之 Checkpoint 出现的错误html
Checkpoint完全解密java
转至元数据结尾apache
转至元数据起始编程
现状:已发布安全
讨论主题:http: //apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-IO-in-FLINK-tt13497.htmlapp
JIRA: FLINK-4391-为已解决的流提供异步操做支持 异步
发布: Flink 1.2async
Google文档:https: //docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/editide
请将讨论保留在邮件列表上,而不是评论维基(维基讨论快速笨拙)。函数
在大多数状况下,I / O访问是一个耗时的过程,使得单个操做员的TPS远低于内存计算,特别是对于流式做业,低延迟是用户最关心的问题。启动多个线程多是处理此问题的一个选项,但缺点是显而易见的:最终用户的编程模型可能会变得更加复杂,由于他们必须在运算符中实现线程模型。此外,他们必须注意与检查点协调。
AsyncFunction:异步I / O将在AsyncFunction中触发。
AsyncWaitOperator:一个将调用AsyncFunction的StreamOperator。
AsyncCollector:对于每一个输入流记录,将建立AsyncCollector并将其传递到用户的回调以获取异步i / o结果。
AsyncCollectorBuffer:保留全部AsyncCollector的缓冲区。
发送器线程:AsyncCollectorBuffer中的一个工做线程,当一些AsyncCollectors完成异步i / o并将结果发送到如下操做符时发出信号。
添加了一个名为AsyncDataStream的辅助类,以提供将AsyncFunction(将执行异步i / o操做)添加到FLINK流做业的方法。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class AsyncDataStream {
/**
* Add an AsyncWaitOperator. The order of output stream records may be reordered.
*
* @param in Input data stream
* @param func AsyncFunction
* @bufSize The max number of async i/o operation that can be triggered
* @return A new DataStream.
*/
public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
/**
* Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
*
* @param func AsyncWaitFunction
* @param func AsyncFunction
* @bufSize The max number of async i/o operation that can be triggered
* @return A new DataStream.
*/
public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
}
|
下图说明了如何处理流式传输记录
AsyncFunction 在AsyncWaitOperator中用做函数,它看起来像StreamFlatMap运算符,具备open()/ processElement(StreamRecord <IN> record)/ processWatermark(Watermark mark)。
对于用户的混凝土AsyncFunction,所述asyncInvoke(IN输入,AsyncCollector <OUT>集电极)必须重写以供应代码开始异步操做。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
|
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
* The AsyncCollector should be registered into async client.
*
* @param input Stream Input
* @param collector AsyncCollector
*/
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction
implements AsyncFunction<IN, OUT> {
@Override
public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}
|
对于AsyncWaitOperator的每一个输入流的记录,它们将被处理经过AsyncFunction.asyncInvoke(IN输入,AsyncCollector <OUT> CB)。而后AsyncCollector将附加到AsyncCollectorBuffer中。稍后咱们将介绍AsyncCollector和AsyncCollectorBuffer。
AsyncCollector由AsyncWaitOperator建立,并传递到AsyncFunction,它应该被添加到用户的回调中。它充当从用户代码获取结果或错误的角色,并通知AsyncCollectorBuffer发出结果。
特定于用户的函数是collect,而且应该在异步操做完成或抛出错误时调用它们。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
|
public class AsyncCollector<OUT> {
private List<OUT> result;
private Throwable error;
private AsyncCollectorBuffer<OUT> buffer;
/**
* Set result
* @param result A list of results.
*/
public void collect(List<OUT> result) {
this .result = result;
buffer.mark( this );
}
/**
* Set error
* @param error A Throwable object.
*/
public void collect(Throwable error) {
this .error = error;
buffer.mark( this );
}
/**
* Get result. Throw RuntimeException while encountering an error.
* @return A List of result.
* @throws RuntimeException RuntimeException wrapping errors from user codes.
*/
public List<OUT> getResult() throws RuntimeException { ... }
}
|
在调用AsyncFunction.asyncInvoke(IN输入,AsyncCollector <OUT>收集器)以前,AsyncWaitOperator将尝试从AsyncCollectorBuffer 获取AsyncCollector 的实例。而后它将被带入用户的回调函数。若是缓冲区已满,它将等待一些正在进行的回调完成。
异步操做完成后,AsyncCollector.collect()将获取结果或错误,并将通知AsyncCollectorBuffer。
AsyncCollector由FLINK实现。
AsyncCollectorBuffer保留全部AsyncCollectors,并将结果发送到下一个节点。
调用AsyncCollector.collect()时,标记将放在AsyncCollectorBuffer中,表示已完成的AsyncCollectors。一个名为Emitter的工做线程也将在AsyncCollector获取结果后发出信号,而后根据有序或无序设置尝试发出结果。
为简单起见,咱们将在如下文本中将任务引用到AsyncCollectorBuffer中的AsycnCollector。
根据用户配置,将保证或不保证输出元素的顺序。若是不能保证,稍后发布的完成的AsyncCollectors将会更早发出。
线程将等待完成的AsyncCollectors。在发出信号时,它将处理缓冲区中的任务,以下所示:
有序模式
若是缓冲区中的第一个任务完成,则Emitter将收集其结果,而后继续执行第二个任务。若是第一项任务还没有完成,请再次等待。
无序模式
检查缓冲区中的全部已完成任务,并从缓冲区中最先的水印以前的那些任务中收集结果。
该线程和任务线程将访问彻底 经过获取/释放锁。
信号 任务线程在全部任务完成后通知它已经处理完全部数据,而且能够关闭操做员。
从缓冲区中删除一些任务后的Signal Task Thread。
传播任务线程的异常。
仅 针对发射qi线程访问AsyncCollectorBuffer 。
获取并向缓冲区添加新的AsyncCollector,等待缓冲区已满。
全部水印也将保存在AsyncCollectorBuffer中。当且仅当在发出当前水印以前的全部AsyncCollector以后才会发出水印。
全部输入StreamRecords都将保持状态。而不是在处理时逐个将每一个输入流记录存储到状态,AsyncWaitOperator将在快照操做符状态时将AsyncCollectorBuffer中的全部输入流记录置于状态。在持久保存这些记录以前,将清除状态中的旧数据。
当全部障碍,在操做员已经抵达,安检点能够进行当即。
在恢复操做员状态时,操做员将扫描状态中的全部元素,获取AsyncCollectors,调用AsyncFunction.asyncInvoke()并将它们插回AsyncCollectorBuffer。
对于在同一个TaskManager(也就是相同的JVM)中的不一样插槽(任务工做者)之间共享异步资源(如链接到hbase,netty链接)的状况,咱们可使链接静态,以便同一进程中的全部线程均可以共享相同的实例。
固然,请在使用这些资源时注意线程安全。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/*** ***/
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes( "test" )));
// UserCallback is from user’s async client.
((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
}
|
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
|
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes( "test" )));
ListenableFuture<Result> future = ht.asyncGet(get);
new FutureCallback<Result>() {
@Override public void onSuccess(Result result) {
List ret = new ArrayList<String>();
}
@Override public void onFailure(Throwable t) {
}
},
);
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
}
|
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673