1-Flink入门javascript
3-DataSet APIhtml
4-DataSteam APIjava
5-集群部署node
6-分布式缓存c++
7-重启策略面试
Flink时间戳和水印数据库
首先咱们来看一下编程结构:
public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { if (args.length != 2){ System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>"); return; } String hostName = args[0]; Integer port = Integer.parseInt(args[1]); final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); DataStream<String> text = env.socketTextStream(hostName, port); DataStream<Tuple2<String, Integer>> counts text.flatMap(new LineSplitter()) .keyBy(0) .sum(1); counts.print(); env.execute("Java WordCount from SocketTextStream Example"); }
上面的SocketTextStreamWordCount
是一个典型的Flink程序,他由一下及格部分构成:
全网惟一一个从0开始帮助Java开发者转作大数据领域的公众号~
大数据技术与架构或者搜索import_bigdata关注~
海量【java和大数据的面试题+视频资料】整理在公众号,关注后能够下载~
分类:
readTextFile(path)/ TextInputFormat
- 按行读取文件并将其做为字符串返回。
readTextFileWithValue(path)/ TextValueInputFormat
- 按行读取文件并将它们做为StringValues返回。StringValues是可变字符串。
readCsvFile(path)/ CsvInputFormat
- 解析逗号(或其余字符)分隔字段的文件。返回元组或POJO的DataSet。支持基本java类型及其Value对应做为字段类型。
readFileOfPrimitives(path, Class)/ PrimitiveInputFormat
- 解析新行(或其余字符序列)分隔的原始数据类型(如String或)的文件Integer。
readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat
- 解析新行(或其余字符序列)分隔的原始数据类型的文件,例如String或Integer使用给定的分隔符。
readSequenceFile(Key, Value, path)/ SequenceFileInputFormat
- 建立一个JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们做为Tuple2 <Key,Value>返回。
fromCollection(Collection)
- 从Java Java.util.Collection建立数据集。集合中的全部数据元必须属于同一类型。
fromCollection(Iterator, Class)
- 从迭代器建立数据集。该类指定迭代器返回的数据元的数据类型。
fromElements(T ...)
- 根据给定的对象序列建立数据集。全部对象必须属于同一类型。
fromParallelCollection(SplittableIterator, Class)
- 并行地从迭代器建立数据集。该类指定迭代器返回的数据元的数据类型。
generateSequence(from, to)
- 并行生成给定间隔中的数字序列。
readFile(inputFormat, path)/ FileInputFormat
- 接受文件输入格式。
createInput(inputFormat)/ InputFormat
- 接受通用输入格式。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从本地文件系统读 DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // 读取HDFS文件 DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); // 读取CSV文件 DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class); // 读取CSV文件中的部分 DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class); // 读取CSV映射为一个java类 DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode"); // 读取一个指定位置序列化好的文件 DataSet<Tuple2<IntWritable, Text>> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); // 从输入字符建立 DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); // 建立一个数字序列 DataSet<Long> numbers = env.generateSequence(1, 10000000); // 从关系型数据库读取 DataSet<Tuple2<String, Integer> dbData = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("select name, age from persons") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish());
详细能够参考官网:https://flink.sojb.cn/dev/batch/dataset_transformations.html#filter
采用一个数据元并生成一个数据元。
data.map(new MapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
采用一个数据元并生成零个,一个或多个数据元。
data.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) { for (String s : value.split(" ")) { out.collect(s); } } });
在单个函数调用中转换并行分区。该函数将分区做为Iterable流来获取,而且能够生成任意数量的结果值。每一个分区中的数据元数量取决于并行度和先前的 算子操做。
data.mapPartition(new MapPartitionFunction<String, Long>() { public void mapPartition(Iterable<String> values, Collector<Long> out) { long c = 0; for (String s : values) { c++; } out.collect(c); } });
计算每一个数据元的布尔函数,并保存函数返回true的数据元。
重要信息:系统假定该函数不会修改应用谓词的数据元。违反此假设可能会致使错误的结果。
data.filter(new FilterFunction<Integer>() { public boolean filter(Integer value) { return value > 1000; } });
经过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce能够应用于完整数据集或分组数据集。
data.reduce(new ReduceFunction<Integer> { public Integer reduce(Integer a, Integer b) { return a + b; } });
若是将reduce应用于分组数据集,则能够经过提供CombineHintto 来指定运行时执行reduce的组合阶段的方式 setCombineHint。在大多数状况下,基于散列的策略应该更快,特别是若是不一样键的数量与输入数据元的数量相比较小(例如1/10)。
将一组数据元组合成一个或多个数据元。ReduceGroup能够应用于完整数据集或分组数据集。
data.reduceGroup(new GroupReduceFunction<Integer, Integer> { public void reduce(Iterable<Integer> values, Collector<Integer> out) { int prefixSum = 0; for (Integer i : values) { prefixSum += i; out.collect(prefixSum); } } });
将一组值聚合为单个值。聚合函数能够被认为是内置的reduce函数。聚合能够应用于完整数据集或分组数据集。
Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
您还可使用简写语法进行最小,最大和总和聚合。
Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
返回数据集的不一样数据元。它相对于数据元的全部字段或字段子集从输入DataSet中删除重复条目。
data.distinct();
使用reduce函数实现Distinct。您能够经过提供CombineHintto 来指定运行时执行reduce的组合阶段的方式 setCombineHint。在大多数状况下,基于散列的策略应该更快,特别是若是不一样键的数量与输入数据元的数量相比较小(例如1/10)。
经过建立在其键上相等的全部数据元对来链接两个数据集。可选地使用JoinFunction将数据元对转换为单个数据元,或使用FlatJoinFunction将数据元对转换为任意多个(包括无)数据元。请参阅键部分以了解如何定义链接键。
result = input1.join(input2)
.where(0) // key of the first input (tuple field 0) .equalTo(1); // key of the second input (tuple field 1)
您能够经过Join Hints指定运行时执行链接的方式。提示描述了经过分区或广播进行链接,以及它是使用基于排序仍是基于散列的算法。
若是未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。
// This executes a join by broadcasting the first data set // using a hash table for the broadcast data result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) .where(0).equalTo(1);
请注意,链接转换仅适用于等链接。其余链接类型须要使用OuterJoin或CoGroup表示。
在两个数据集上执行左,右或全外链接。外链接相似于常规(内部)链接,并建立在其键上相等的全部数据元对。此外,若是在另外一侧没有找到匹配的Keys,则保存“外部”侧(左侧,右侧或二者都满)的记录。匹配数据元对(或一个数据元和null另外一个输入的值)被赋予JoinFunction以将数据元对转换为单个数据元,或者转换为FlatJoinFunction以将数据元对转换为任意多个(包括无)数据元。请参阅键部分以了解如何定义链接键。
input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins .where(0) // key of the first input (tuple field 0) .equalTo(1) // key of the second input (tuple field 1) .with(new JoinFunction<String, String, String>() { public String join(String v1, String v2) { // NOTE: // - v2 might be null for leftOuterJoin // - v1 might be null for rightOuterJoin // - v1 OR v2 might be null for fullOuterJoin } });
reduce 算子操做的二维变体。将一个或多个字段上的每一个输入分组,而后关联组。每对组调用转换函数。
data1.coGroup(data2)
.where(0) .equalTo(1) .with(new CoGroupFunction<String, String, String>() { public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) { out.collect(...); } });
构建两个输入的笛卡尔积(交叉乘积),建立全部数据元对。可选择使用CrossFunction将数据元对转换为单个数据元
DataSet<Integer> data1 = // [...] DataSet<String> data2 = // [...] DataSet<Tuple2<Integer, String>> result = data1.cross(data2);
注:交叉是一个潜在的很是计算密集型 算子操做它甚至能够挑战大的计算集群!建议使用crossWithTiny()和crossWithHuge()来提示系统的DataSet大小。
生成两个数据集的并集。
DataSet<String> data1 = // [...] DataSet<String> data2 = // [...] DataSet<String> result = data1.union(data2);
均匀地Rebalance 数据集的并行分区以消除数据误差。只有相似Map的转换可能会遵循Rebalance 转换。
DataSet<String> in = // [...] DataSet<String> result = in.rebalance() .map(new Mapper());
散列分区给定键上的数据集。键能够指定为位置键,表达键和键选择器函数。
DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByHash(0) .mapPartition(new PartitionMapper());
Range-Partition给定键上的数据集。键能够指定为位置键,表达键和键选择器函数。
DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByRange(0) .mapPartition(new PartitionMapper());
手动指定数据分区。
注意:此方法仅适用于单个字段键。
DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
本地按指定顺序对指定字段上的数据集的全部分区进行排序。能够将字段指定为元组位置或字段表达式。经过连接sortPartition()调用来完成对多个字段的排序。
DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING) .mapPartition(new PartitionMapper());
返回数据集的前n个(任意)数据元。First-n能够应用于常规数据集,分组数据集或分组排序数据集。分组键能够指定为键选择器函数或字段位置键。
DataSet<Tuple2<String,Integer>> in = // [...] // regular data set DataSet<Tuple2<String,Integer>> result1 = in.first(3); // grouped data set DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0) .first(3); // grouped-sorted data set DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0) .sortGroup(1, Order.ASCENDING) .first(3);
数据接收器使用DataSet用于存储或返回。使用OutputFormat描述数据接收器算子操做 。Flink带有各类内置输出格式,这些格式封装在DataSet上的算子操做中:
能够将DataSet输入到多个 算子操做。程序能够编写或打印数据集,同时对它们执行其余转换。
示例:
// text data DataSet<String> textData = // [...] // write DataSet to a file on the local file system textData.writeAsText("file:///my/result/on/localFS"); // write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); // write DataSet to a file and overwrite the file if it exists textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE); // tuples as lines with pipe as the separator "a|b|c" DataSet<Tuple3<String, Integer, Double>> values = // [...] values.writeAsCsv("file:///path/to/the/result/file", "\n", "|"); // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.writeAsText("file:///path/to/the/result/file"); // this writes values as strings using a user-defined TextFormatter object values.writeAsFormattedText("file:///path/to/the/result/file", new TextFormatter<Tuple2<Integer, Integer>>() { public String format (Tuple2<Integer, Integer> value) { return value.f1 + " - " + value.f0; } });
使用自定义输出格式:
DataSet<Tuple3<String, Integer, Double>> myResult = [...] // write Tuple DataSet to a relational database myResult.output( // build and configure OutputFormat JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("insert into persons (name, age, height) values (?,?,?)") .finish() );
使用avro序列化:env.getConfig().enableForceAvro(); 使用kryo序列化:env.getConfig().enableForceKryo(); 使用自定义序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
Java Tuple 和 Scala case class
Java POJOs:java实体类
Primitive Types
默认支持java和scala基本数据类型
General Class Types
默认支持大多数java和scala class
Hadoop Writables
支持hadoop中实现了org.apache.hadoop.Writable的数据类型
Special Types
例如scala中的Either Option 和Try
全网惟一一个从0开始帮助Java开发者转作大数据领域的公众号~
大数据技术与架构或者搜索import_bigdata关注~
海量【java和大数据的面试题+视频资料】整理在公众号,关注后能够下载~