本篇咱们将使用Java语言来实现Flink的单词统计。html
导入Flink 1.9 pom依赖java
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> </dependencies>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
每秒生成一行文本apache
DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒发送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } });
// 3. 单词统计 // 3.1 将文本行切分红一个个的单词 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分单词 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 将单词转换为一个个的元组 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照单词进行分组 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 对每组单词数量进行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print();
public class WordCount { public static void main(String[] args) throws Exception { // 1. 构建Flink流式初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 自定义source - 每秒发送一行文本 DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() { private boolean isCanal = false; private String[] words = { "important oracle jdk license update", "the oracle jdk license has changed for releases starting april 16 2019", "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ", "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ", "downloading and using this product an faq is available here ", "commercial license and support is available with a low cost java se subscription", "oracle also provides the latest openjdk release under the open source gpl license at jdk java net" }; @Override public void run(SourceContext<String> ctx) throws Exception { // 每秒发送一行文本 while (!isCanal) { int randomIndex = RandomUtils.nextInt(0, words.length); ctx.collect(words[randomIndex]); Thread.sleep(1000); } } @Override public void cancel() { isCanal = true; } }); // 3. 单词统计 // 3.1 将文本行切分红一个个的单词 SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分单词 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING); //3.2 将单词转换为一个个的元组 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)); // 3.3 按照单词进行分组 KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0); // 3.4 对每组单词数量进行累加 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS .timeWindow(Time.seconds(3)) .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)); resultDS.print(); env.execute("app"); } }
Flink支持Java API全部操做符使用Lambda表达式。可是,但Lambda表达式使用Java泛型时,就须要声明类型信息。api
咱们来看下上述的这段代码:oracle
SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> { // 切分单词 Arrays.stream(line.split(" ")).forEach(word -> { ctx.collect(word); }); }).returns(Types.STRING);
之因此这里将全部的类型信息,由于Flink没法正确自动推断出来Collector中带的泛型。咱们来看一下FlatMapFuntion的源代码app
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}
咱们发现 flatMap的第二个参数是Collector<O>,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,因此Java编译器会变成成:dom
void flatMap(T value, Collector out)ide
这种状况,Flink将没法自动推断类型信息。若是咱们没有显示地提供类型信息,将会出现如下错误:this
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
这种状况下,必需要显示指定类型信息,不然输出将返回值视为Object类型,这将致使Flink没法正确序列化。spa
因此,咱们须要显示地指定Lambda表达式的参数类型信息,并经过returns方法显示指定输出的类型信息
咱们再看一段代码:
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS .map(word -> Tuple2.of(word, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT));
为何map后面也须要指定类型呢?
由于此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候一样会被查出泛型参数信息,致使Flink没法正确推断。
更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html