Flink使用 DataSet 和 DataStream 表明数据集。DateSet 用于批处理,表明数据是有限的;而 DataStream 用于流数据,表明数据是无界的。数据集中的数据是不能够变的,也就是说不能对其中的元素增长或删除。咱们经过数据源建立 DataSet 或者 DataStream ,经过 map,filter 等转换(transform)操做对数据集进行操做产生新的数据集。ide
编写 Flink 程序通常通过一下几个步骤:函数
下面咱们将介绍编写 Flink 程序所涉及的基本 API。oop
首先,须要得到 execution 环境,Flink 提供了一下如下三种方式:大数据
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
以第一个为例建立 execution 环境的代码以下this
批处理:spa
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("file:///D:\\words.txt"); text.print();
流处理:code
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///D:\\words.txt"); text.print();
env.execute();
words.txt 文件内容:orm
a
b
c
d
e
a
b
上面代码建立了 execution 环境,同时利用 env 建立了输入源。在数据集上调用 print 方法能够将数据输出到控制台,固然也能够调用 writeAsText 等方法将数据输出到其余介质。上面流处理最后一行代码调用了 execute 方法,在流处理中须要显式调用该方法触发程序的执行。对象
上述代码有两种方式运行,一种是直接在 IDE 中执行,就像运行一个普通的 Java 程序,Flink 将启动一个本地的环境执行程序。另外一种方式是将程序打包,提交到 Flink 集群运行。上面例子基本包含了一个 Flink 程序的基本骨架,可是并无对数据集进行更多的 transform 操做,下面咱们简单介绍基本 transform 操做。blog
这里的 map 操做相似 MapReduce 中的 map,对数据进行解析,处理。示例以下
批处理:
DataSet<Tuple2<String, Integer>> words = text.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } }); words.print();
流处理:
DataStream<Tuple2<String, Integer>> words = text.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } });
words.print()
这里批处理和流处理除了数据集的类型不一样,其他写法都同样。就是将每一个单词映射成了一个 (单词, 1) 二元组。与 map 相似的 transform 还有 filter,过滤不须要的记录,读者能够自行尝试。
大数据处理常常须要按照某个维度进行处理,也就是须要指定 key。在 DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key。这里咱们以 keyBy 为例进行介绍。
Flink 的数据模型并非基于 key-value 的,key 是虚拟的,能够看作是定义在数据上的函数。
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = words.keyBy(0); //0 表明 Tuple2 (二元组)中第一个元素
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = words.keyBy(0,1); //0,1 表明二元组中第一个和第二个元素做为 key\
对于嵌套的 tuple
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
ds.keyBy(0) 将会把 Tuple2<Integer, Float> 总体做为 key。
public class WC { public String word; public int count; } DataStream<WC> words = // [...] DataStream<WC> wordCounts = words.keyBy("word");
这里指定 WC 对象的 word 字段做为 key。字段表达式语法以下:
字段表达式的举例
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
经过 key 选择器函数来制定 key,key 选择器的输入为每一个元素,输出为指定的 key,例子以下
words.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.f0; } });
能够看到实现的效果与 keyBy(0) 是同样的。
以上即是 Flink 指定 key 的方法。
这篇文章主要介绍了 Flink 程序的基本骨架。得到环境、建立输入源、对数据集作 transform 以及输出。因为数据处理常常会按照不一样维度(不一样的 key)进行统计,所以,本篇内容重点介绍了 Flink 中如何指定 key。后续将会继续介绍 Flink API 的使用。
欢迎关注公众号「渡码」