依赖数据源的类型,你能够将用于批处理和流的DataSet接口作为数据源写一个batch或者streaming程序。本节教程旨在介绍在这两个方向上通用接口的基本概念。java
注:当我以StreamingExecutionEnvironment和DataStream API来做为讲述这些概念的实例时。其余和DataSet API是同样的,仅仅使用了ExecutionEnvironment和DataSet来指代而已。express
DataSet和DataStreamapache
Flink使用特殊的类——DataSet和DataStream来表达程序中的数据。你能够把他们想象成包含能够复制的数据的不可变集合。在DataSet中的数据是有限的可是在一个DataStream中元素的数量是无上限的。api
这些集合在一些关键的点上仍是和经常使用的java集合有一些区别的。首先,他们是可变的,意味着一旦他们被建立就不能增长或者移除任何元素。你要作的不只仅只是检查里面的元素这么简单。ide
一个集合是经过在Flink程序中增长一个source来初始化的。新的集合经过调用像map、filter等函数来作转换而生成的。函数
Flink程序的结构oop
Flink程序就像是转换数据集合的程序同样。每一个程序都包含下面的部分:学习
1 获取执行环境this
2 加载/建立初始化数据命令行
3 说明在数据上要作的转换
4 说明将你计算的结果保存在哪
5 执行程序
我如今只是给出这些步骤的一个总览,若是须要深刻学习还须要了解更多。关于java DataSet API的核心类均可以在包org.apache.flink.api.java中找到,java DataStream API的核心类能够在包org.apache.flink.streaming.api中找到。
StreamExecutionEnvironment是全部的Flink程序的基础。你能够经过下面列举的静态方法来获取:
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host,int port,String... jarFiles)
通常状况下,你只须要使用getExecutionEnvironment(),他依赖以下的上下文:若是你是在IDE中做为通常java程序来执行的话,他会在你本地的机器上建立一个本地环境来运行你的代码。若是你将本身的程序打包成jar文件,经过命令行的方式调用,Flink集群管理器就会执行你的main方法,getExecutionEnvironment()将会为你在集群上的程序打造一个执行环境。
对于特定的数据源执行环境使用不少方法从文件中读取数据:你只须要逐行读取,就像CSV文件那样,或者使用完整的自定义数据输入格式。为了将文本文件做为一系列行的集合来读,你能够这么用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
这样你就能够经过这个DataStream来建立转换为新的DataStream输入源。你能够经过调用一个转换函数来应用这个转换。举个例子,一个map转换像是这样:
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
这样你就能够将原始集合中的每一个字符串转换成整数。
一旦你有了一个包含你最终结果的DataStream,你就能够经过建立一个sink来写入外部系统。下面是建立sink的一些示例方法:
writeAsText(String path) print()
一旦你完成了整个程序,你须要经过在StreamExecutionEnvironment上调用execute()来执行你的程序。依赖于ExecutionEnvironment的类型执行环境能够决定是在本地机器运行仍是将你的程序提交到集群上执行。
execute()方法返回一个JobExecutionResult,他包含执行时间和累加结果。
延迟评估
全部的Flink程序都是延迟执行的:当主函数执行的时候,数据每每不会马上就加载并转换。然而,每一个操做都被建立并加入了程序计划中。当执行程序经过execute()触发的时候操做才真正被执行。无论执行环境的类型是本地仍是集群的方式。
延迟评估使得Flink的执行程序做为历史镜像计划单元来安排程序的运行。
特定的键
一些转换(join,coGroup,keyBy,groupBy)须要一个在集合元素中定义的键。另一些(Reduce,GroupReduce,Aggregate,Windows)则容许数据能够在他们被使用以前在键上组织。
DataSet被下面这样的方式组织:
DataSet<...> input = //[...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
而在DataStream中是这样作的:
DataStream<...> input = //[...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
Flink的数据格式不是基于键值对的。所以,你也不必非得讲数据集合转成键和值的类型。键是“虚拟”的:他们做为真实数据上的函数来指导分组算子。
注:下面我将使用DataStream API和keyBy函数。请自行脑补DataSet API的方式(你只须要将他们替换成DataSet和groupBy)。
为Tuples定义键
最简单的方式是在一个或多个Tuple的域上进行分组的Tuples:
DataStream<Tuple3<Integer,String,Long>> input = //[...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
元组在第一个域上进行分组:
DataStream<Tuple3<Integer,String,Long>> input = //[...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
如今,我使用包含第一个和第二个域的组合键来分组元组:
若是你有一个嵌套tuple的DataStream数据,好比:
DataStream<Tuple3<Tuple2<Integer,Float>,String,Long>> ds;
使用keyBy(0)可让系统来调用完整的Tuple2做为一个键。若是你想要看下嵌套的Tuple2是怎么用的,那你就不得不使用以下介绍的——field expression(域表达式)
使用域表达式定义键
你可使用基于字符串的域表达式来引出嵌套域和grouping,sorting,joining和coGrouping的定义。
域表达式使得你能够像Tuple和Java POJO同样简单地在嵌套组合类型中切换域。
在下面的例子中,你有一个WC的POJO,他有两个域——word和count。为了按照word分组,你只须要按照他的name来调用keyBy()函数。
//some ordinary POJO public class WC { public String word; public int count; } DataStream<WC> words = //[...] DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
域表达式示例:
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; //getter / setter for private field(count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
使用Selector函数来定义键:
另一种定义键的方式就是“key selector”函数。一个key selector函数将一个独立的元素来做为参数,返回这个元素的键。这个键能够是任意类型而且能够从计算结果中取出。
下面这个例子展现了key selector是怎样返回一个对象的域的:
// some ordinary POJO public class WC {public String word; public int count;} DataStream<WC> words = //[...] KeyedStream<WC> keyed = words .keyBy(new KeySelector<WC, String>() { pubic String getKey(WC wc) {return wc.word;} });
定义转换函数
大部分的转换都须要用户自定义的函数。这里列举了不一样的方式:
实现一个接口
最经常使用的方式就是实现一个提供的接口:
class MyMapFunction implements MapFunction<String,Integer> { public Integer map(String value) {return Integer.parseInt(value);} }); data.map(new MyMapFunction());
匿名类
你还能够经过一个叫作匿名类的东东:
data.map(new MapFunction<String,Integer>(){ public Integer map(String value) {return Integer.parseInt(value);} });
Java 8 Lambdas函数
Flink也支持Java 8中定义的Lambda接口。
data.filter(s->s.startsWith("http://")); data.reduce((i1,i2) -> i1 + i2);