在Flink 流处理过程当中,常常须要和外部系统进行交互,用维度表补全事实表中的字段。java
例如:在电商场景中,须要一个商品的skuid去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些状况;在物流场景中,知道包裹id,须要去关联包裹的行业属性、发货信息、收货信息等等。mysql
默认状况下,在Flink的MapFunction中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO阻塞,等待请求返回,而后继续发送下一个请求。这种同步交互的方式每每在网络等待上就耗费了大量时间。为了提升处理效率,能够增长MapFunction的并行度,但增长并行度就意味着更多的资源,并非一种很是好的解决方式。面试
Flink 在1.2中引入了Async I/O,在异步模式下,将IO操做异步化,单个并行能够连续发送多个请求,哪一个请求先返回就先处理,从而在连续的请求间不须要阻塞式等待,大大提升了流处理效率。redis
Async I/O 是阿里巴巴贡献给社区的一个呼声很是高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。sql
图中棕色的长条表示等待时间,能够发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式能够并发地处理多个请求和回复。也就是说,你能够连续地向数据库发送用户a、b、c等的请求,与此同时,哪一个请求的回复先返回了就处理哪一个回复,从而连续的请求之间不须要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。数据库
详细的原理能够参考文末给出的第一个连接,来自阿里巴巴云邪的分享。apache
一个简单的例子以下:bootstrap
public class AsyncIOFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
Properties p = new Properties();
p.setProperty("bootstrap.servers", "localhost:9092");
DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p));
ds.print();
SingleOutputStreamOperator<Order> order = ds
.map(new MapFunction<String, Order>() {
@Override
public Order map(String value) throws Exception {
return new Gson().fromJson(value, Order.class);
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
@Override
public long extractAscendingTimestamp(Order element) {
try {
return element.getOrderTime();
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
})
.keyBy(new KeySelector<Order, String>() {
@Override
public String getKey(Order value) throws Exception {
return value.getUserId();
}
})
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.maxBy("orderTime");
SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator = AsyncDataStream
.unorderedWait(order, new RichAsyncFunction<Order, Tuple7<String, String, Integer, String, String, String, Long>>() {
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection("url", "user", "pwd");
connection.setAutoCommit(false);
}
@Override
public void asyncInvoke(Order input, ResultFuture<Tuple7<String, String, Integer, String, String, String, Long>> resultFuture) throws Exception {
List<Tuple7<String, String, Integer, String, String, String, Long>> list = new ArrayList<>();
// 在 asyncInvoke 方法中异步查询数据库
String userId = input.getUserId();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select name,age,sex from user where userid=" + userId);
if (resultSet != null && resultSet.next()) {
String name = resultSet.getString("name");
int age = resultSet.getInt("age");
String sex = resultSet.getString("sex");
Tuple7<String, String, Integer, String, String, String, Long> res = Tuple7.of(userId, name, age, sex, input.getOrderId(), input.getPrice(), input.getOrderTime());
list.add(res);
}
// 将数据搜集
resultFuture.complete(list);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
}, 5000, TimeUnit.MILLISECONDS,100);
operator.print();
env.execute("AsyncIOFunctionTest");
}
}
复制代码
上述代码中,原始订单流来自Kafka,去关联维度表将订单的用户信息取出来。从上面示例中可看到,咱们在open()中建立链接对象,在close()方法中关闭链接,在RichAsyncFunction的asyncInvoke()方法中,直接查询数据库操做,并将数据返回出去。这样一个简单异步请求就完成了。api
简单的来讲,使用 Async I/O 对应到 Flink 的 API 就是 RichAsyncFunction 这个抽象类,继层这个抽象类实现里面的open(初始化),asyncInvoke(数据异步调用),close(中止的一些操做)方法,最主要的是实现asyncInvoke 里面的方法。缓存
咱们先来看一个使用Async I/O的模板方法:
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
复制代码
假设咱们一个场景是须要进行异步请求其余数据库,那么要实现一个经过异步I/O来操做数据库还须要三个步骤: 一、实现用来分发请求的AsyncFunction 二、获取操做结果的callback,并将它提交到AsyncCollector中 三、将异步I/O操做转换成DataStream
其中的两个重要的参数:复制代码
Timeouttimeout 定义了异步操做过了多长时间后会被丢弃,这个参数是防止了死的或者失败的请求Capacity 这个参数定义了能够同时处理多少个异步请求。虽然异步I/O方法会带来更好的吞吐量,可是算子仍然会成为流应用的瓶颈。超过限制的并发请求数量会产生背压。
几个须要注意的点:
乱序, 用AsyncDataStream.unorderedWait(...) API,每一个并行的输出顺序和输入顺序可能不一致。复制代码
顺序, 用AsyncDataStream.orderedWait(...) API,每一个并行的输出顺序和输入顺序一致。为保证顺序,须要在输出的Buffer中排序,该方式效率会低一些。
复制代码
因为新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单。若是你要使用该功能,那就须要本身引入 Blink 的 Planner。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
复制代码
而后咱们只要自定义实现 LookupableTableSource 接口,同时实现里面的方法就能够进行,下面来分析一下 LookupableTableSource的代码:
public interface LookupableTableSource<T> extends TableSource<T> {
TableFunction<T> getLookupFunction(String[] lookupKeys);
AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
boolean isAsyncEnabled();
}
复制代码
这三个方法分别是:
咱们抛开同步访问函数无论,对于getAsyncLookupFunction会返回异步访问外部数据源的函数,若是你想使用异步函数,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。使用异步函数访问外部数据系统,通常是外部系统有异步访问客户端,若是没有的话,能够本身使用线程池异步访问外部系统。例如:
public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
private transient RedisAsyncCommands<String, String> async;
@Override
public void open(FunctionContext context) throws Exception {
RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
StatefulRedisConnection<String, String> connection = redisClient.connect();
async = connection.async();
}
public void eval(CompletableFuture<Collection<Row>> future, Object... params) {
redisFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
future.complete(Collections.singletonList(Row.of(key, value)));
}
});
}
}
复制代码
一个完整的例子以下:
Main方法:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.util.Properties;
public class LookUpAsyncTest {
@Test
public void test() throws Exception {
LookUpAsyncTest.main(new String[]{});
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
final ParameterTool params = ParameterTool.fromArgs(args);
String fileName = params.get("f");
DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");
TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
String[] fields = new String[]{"id", "user_click", "time"};
RowTypeInfo typeInformation = new RowTypeInfo(types, fields);
DataStream<Row> stream = source.map(new MapFunction<String, Row>() {
private static final long serialVersionUID = 2349572543469673349L;
@Override
public Row map(String s) {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = split[i];
if (types[i].equals(Types.STRING)) {
value = split[i];
}
if (types[i].equals(Types.LONG)) {
value = Long.valueOf(split[i]);
}
row.setField(i, value);
}
return row;
}
}).returns(typeInformation);
tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");
RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()
.withFieldNames(new String[]{"id", "name"})
.withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})
.build();
tableEnv.registerTableSource("info", tableSource);
String sql = "select t1.id,t1.user_click,t2.name" +
" from user_click_name as t1" +
" join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +
" on t1.id = t2.id";
Table table = tableEnv.sqlQuery(sql);
DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
DataStream<String> printStream = result.map(new MapFunction<Row, String>() {
@Override
public String map(Row value) throws Exception {
return value.toString();
}
});
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9094");
FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(
"user_click_name",
new SimpleStringSchema(),
properties);
printStream.addSink(kafkaProducer);
tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());
}
}
复制代码
RedisAsyncLookupTableSource方法:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
public class RedisAsyncLookupTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
private final String[] fieldNames;
private final TypeInformation[] fieldTypes;
public RedisAsyncLookupTableSource(String[] fieldNames, TypeInformation[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
}
//同步方法
@Override
public TableFunction<Row> getLookupFunction(String[] strings) {
return null;
}
//异步方法
@Override
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) {
return MyAsyncLookupFunction.Builder.getBuilder()
.withFieldNames(fieldNames)
.withFieldTypes(fieldTypes)
.build();
}
//开启异步
@Override
public boolean isAsyncEnabled() {
return true;
}
@Override
public DataType getProducedDataType() {
return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))
.build();
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment environment) {
throw new UnsupportedOperationException("do not support getDataStream");
}
public static final class Builder {
private String[] fieldNames;
private TypeInformation[] fieldTypes;
private Builder() {
}
public static Builder newBuilder() {
return new Builder();
}
public Builder withFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}
public Builder withFieldTypes(TypeInformation[] fieldTypes) {
this.fieldTypes = fieldTypes;
return this;
}
public RedisAsyncLookupTableSource build() {
return new RedisAsyncLookupTableSource(fieldNames, fieldTypes);
}
}
}
复制代码
MyAsyncLookupFunction
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.types.Row;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
private final String[] fieldNames;
private final TypeInformation[] fieldTypes;
private transient RedisAsyncCommands<String, String> async;
public MyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
}
@Override
public void open(FunctionContext context) {
//配置redis异步链接
RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
StatefulRedisConnection<String, String> connection = redisClient.connect();
async = connection.async();
}
//每一条流数据都会调用此方法进行join
public void eval(CompletableFuture<Collection<Row>> future, Object... paramas) {
//表名、主键名、主键值、列名
String[] info = {"userInfo", "userId", paramas[0].toString(), "userName"};
String key = String.join(":", info);
RedisFuture<String> redisFuture = async.get(key);
redisFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
future.complete(Collections.singletonList(Row.of(key, value)));
//todo
// BinaryRow row = new BinaryRow(2);
}
});
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}
public static final class Builder {
private String[] fieldNames;
private TypeInformation[] fieldTypes;
private Builder() {
}
public static Builder getBuilder() {
return new Builder();
}
public Builder withFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}
public Builder withFieldTypes(TypeInformation[] fieldTypes) {
this.fieldTypes = fieldTypes;
return this;
}
public MyAsyncLookupFunction build() {
return new MyAsyncLookupFunction(fieldNames, fieldTypes);
}
}
}
复制代码
十分须要注意的几个点:
一、 外部数据源必须是异步客户端:若是是线程安全的(多个客户端一块儿使用),你能够不加 transient 关键字,初始化一次。不然,你须要加上 transient,不对其进行初始化,而在 open 方法中,为每一个 Task 实例初始化一个。
二、eval 方法中多了一个 CompletableFuture,当异步访问完成时,须要调用其方法进行处理。好比上面例子中的:
redisFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
future.complete(Collections.singletonList(Row.of(key, value)));
}
});
复制代码
三、社区虽然提供异步关联维度表的功能,但事实上大数据量下关联外部系统维表仍然会成为系统的瓶颈,因此通常咱们会在同步函数和异步函数中加入缓存。综合并发、易用、实时更新和多版本等因素考虑,Hbase是最理想的外部维表。
参考文章:http://wuchong.me/blog/2017/05/17/flink-internals-async-io/#https://www.jianshu.com/p/d8f99d94b761https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673https://www.jianshu.com/p/7ce84f978ae0
关注个人公众号,后台回复【JAVAPDF】获取200页面试题!5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,肯定真的不来了解一下吗?