Flink内置了一些基本数据源和接收器,而且始终可用。该预约义的数据源包括文件,目录和插socket,并从集合和迭代器摄取数据。该预约义的数据接收器支持写入文件和标准输入输出及socket。html
链接器提供用于与各类第三方系统链接的代码。目前支持这些系统:数据库
要在应用程序中使用其中一个链接器,一般须要其余第三方组件,例如数据存储或消息队列的服务器。apache
虽然本节中列出的流链接器是Flink项目的一部分,而且包含在源版本中,但它们不包含在二进制分发版中。
Flink的其余流处理链接器正在经过Apache Bahir发布,包括:bootstrap
使用链接器不是将数据输入和输出Flink的惟一方法。一种常见的模式是在一个Map或多个FlatMap 中查询外部数据库或Web服务以渲染主数据流。segmentfault
Flink提供了一个用于异步I / O的API, 以便更有效,更稳健地进行这种渲染。windows
当Flink应用程序将大量数据推送到外部数据存储时,这可能会成为I / O瓶颈。若是所涉及的数据具备比写入更少的读取,则更好的方法能够是外部应用程序从Flink获取所需的数据。在可查询的状态界面,容许经过Flink被管理的状态,按须要查询支持这个。后端
此链接器提供一个Sink,可将分区文件写入任一Hadoop文件系统支持的文件系统 。安全
请注意,流链接器当前不是二进制发布的一部分
能够配置分段行为以及写入,但咱们稍后会介绍。这是能够建立一个默认状况下汇总到按时间拆分的滚动文件的存储槽的方法服务器
惟一必需的参数是存储桶的基本路径。能够经过指定自定义bucketer,写入器和批量大小来进一步配置接收器。架构
默认状况下,当数据元到达时,分段接收器将按当前系统时间拆分,并使用日期时间模式"yyyy-MM-dd--HH"命名存储区。这种模式传递给 DateTimeFormatter使用当前系统时间和JVM的默认时区来造成存储桶路径。用户还能够为bucketer指定时区以格式化存储桶路径。每当遇到新日期时,都会建立一个新存储桶。
例如,若是有一个包含分钟做为最精细粒度的模式,将每分钟得到一个新桶。每一个存储桶自己都是一个包含多个部分文件的目录:接收器的每一个并行实例将建立本身的部件文件,当部件文件变得太大时,接收器也会在其余文件旁边建立新的部件文件。当存储桶变为非活动状态时,将刷新并关闭打开的部件文件。若是存储桶最近未写入,则视为非活动状态。默认状况下,接收器每分钟检查一次非活动存储桶,并关闭任何超过一分钟未写入的存储桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一个BucketingSink。
也能够经过指定自定义bucketer setBucketer()上BucketingSink。若是须要,bucketer可使用数据元或元组的属性来肯定bucket目录。
默认编写器是StringWriter。这将调用toString()传入的数据元并将它们写入部分文件,由换行符分隔。在a setWriter() 上指定自定义编写器使用BucketingSink。若是要编写Hadoop SequenceFiles,可使用提供的 SequenceFileWriter,也能够配置为使用压缩。
有两个配置选项指定什么时候应关闭零件文件并启动新零件文件:
Long.MAX_VALUE
)当知足这两个条件中的任何一个时,将启动新的部分文件。看以下例子:
然而这种方式建立了太多小文件,不适合HDFS!仅供娱乐!
此链接器提供对Apache Kafka服务的事件流的访问。
Flink提供特殊的Kafka链接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不彻底依赖Kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。
为用例和环境选择一个包(maven artifact id)和类名。对于大多数用户来讲,FlinkKafkaConsumer08(部分flink-connector-kafka)是合适的。
而后,导入maven项目中的链接器:
http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.15.1.tar.gz
因为Kafka控制台脚本对于基于Unix和Windows的平台不一样,所以在Windows平台上使用bin windows 而不是bin /,并将脚本扩展名更改成.bat。
Kafka使用ZooKeeper,所以若是尚未ZooKeeper服务器,则须要先启动它。
Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其做为消息发送到Kafka集群。 默认状况下,每行将做为单独的消息发送。
运行生产者,而后在控制台中键入一些消息以发送到服务器。
Kafka还有一个命令行使用者,它会将消息转储到标准输出。
全部命令行工具都有其余选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。
从Flink 1.7开始,有一个新的通用Kafka链接器,它不跟踪特定的Kafka主要版本。 相反,它在Flink发布时跟踪最新版本的Kafka。
若是您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka链接器。 若是使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的链接器。
经过Kafka客户端API和代理的兼容性保证,通用Kafka链接器与较旧和较新的Kafka代理兼容。 它与版本0.11.0或更高版本兼容,具体取决于所使用的功能。
要执行迁移,请参阅升级做业和Flink版本指南和
而后实例化新源(FlinkKafkaConsumer)
Flink Kafka Consumer是一个流数据源,能够从Apache Kafka中提取并行数据流。 使用者能够在多个并行实例中运行,每一个实例都将从一个或多个Kafka分区中提取数据。
Flink Kafka Consumer参与了检查点,并保证在故障期间没有数据丢失,而且计算处理元素“刚好一次”。(注意:这些保证天然会假设Kafka自己不会丢失任何数据。)
请注意,Flink在内部将偏移量做为其分布式检查点的一部分进行快照。 承诺给Kafka的抵消只是为了使外部的进展观与Flink对进展的见解同步。 这样,监控和其余工做能够了解Flink Kafka消费者在多大程度上消耗了一个主题。
和接收器(FlinkKafkaProducer)。
除了从模块和类名中删除特定的Kafka版本以外,API向后兼容Kafka 0.11链接器。
Flink的Kafka消费者被称为FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供对一个或多个Kafka主题的访问。
构造函数接受如下参数:
Kafka消费者的属性。须要如下属性:
Example:
Flink Kafka Consumer须要知道如何将Kafka中的二进制数据转换为Java / Scala对象。
在 DeserializationSchema容许用户指定这样的一个架构。T deserialize(byte[] message) 为每一个Kafka消息调用该方法,从Kafka传递值。
从它开始一般颇有帮助AbstractDeserializationSchema,它负责将生成的Java / Scala类型描述为Flink的类型系统。实现vanilla的用户DeserializationSchema须要本身实现该getProducedType(...)方法。
为了访问Kafka消息的键和值,KeyedDeserializationSchema具备如下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)
。
为方便起见,Flink提供如下模式:
AvroDeserializationSchema它使用静态提供的模式读取使用Avro格式序列化的数据。它能够从Avro生成的类(AvroDeserializationSchema.forSpecific(...))中推断出模式,也能够GenericRecords 使用手动提供的模式(with AvroDeserializationSchema.forGeneric(...))。此反序列化架构要求序列化记录不包含嵌入式架构。
要使用此反序列化模式,必须添加如下附加依赖项:
当遇到因任何缘由没法反序列化的损坏消息时,有两个选项 - 从deserialize(...)方法中抛出异常将致使做业失败并从新启动,或者返回null以容许Flink Kafka使用者以静默方式跳过损坏的消息。请注意,因为使用者的容错能力(请参阅下面的部分以获取更多详细信息),所以对损坏的消息执行失败将使消费者尝试再次反序列化消息。所以,若是反序列化仍然失败,则消费者将在该损坏的消息上进入不间断重启和失败循环。
Flink的Kafka Producer被称为FlinkKafkaProducer011(或010 对于Kafka 0.10.0.x版本。或者直接就是FlinkKafkaProducer,对于Kafka>=1.0.0的版原本说)。
它容许将记录流写入一个或多个Kafka主题。
Example
上面的示例演示了建立Flink Kafka Producer以将流写入单个Kafka目标主题的基本用法。对于更高级的用法,还有其余构造函数变体容许提供如下内容:
生产者容许为内部的KafkaProducer提供自定义属性配置。
将记录分配给特定分区,能够为FlinkKafkaPartitioner构造函数提供实现。将为流中的每一个记录调用此分区程序,以肯定应将记录发送到的目标主题的确切分区。
与消费者相似,生产者还容许使用调用的高级序列化模式KeyedSerializationSchema,该模式容许单独序列化键和值。它还容许覆盖目标主题,以便一个生产者实例能够将数据发送到多个主题。
Flink Kafka Consumer容许配置如何肯定Kafka分区的起始位置。
Flink Kafka Consumer的全部版本都具备上述明确的起始位置配置方法。
从group.idKafka代理(或Zookeeper for Kafka 0.8)中的消费者组(在消费者属性中设置)提交的偏移量开始读取分区。若是找不到分区的偏移量,auto.offset.reset将使用属性中的设置。
从最先/最新记录开始。在这些模式下,Kafka中的承诺偏移将被忽略,不会用做起始位置。
从指定的时间戳开始。对于每一个分区,时间戳大于或等于指定时间戳的记录将用做起始位置。若是分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka中的已提交偏移将被忽略,不会用做起始位置。
还能够指定消费者应从每一个分区开始的确切偏移量:
上面的示例将使用者配置为从主题的分区0,1和2的指定偏移量开始myTopic。偏移值应该是消费者应为每一个分区读取的下一条记录。请注意,若是使用者须要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为(即)。
请注意,看成业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每一个Kafka分区的起始位置由存储在保存点或检查点中的偏移量肯定。
在0.9以前,Kafka没有提供任何机制来保证至少一次或刚好一次的语义。
启用Flink的检查点时,FlinkKafkaProducer09和FlinkKafkaProducer010 能提供至少一次传输保证。
除了开启Flink的检查点,还应该配置setter方法:
默认为false。启用此选项将使生产者仅记录失败日志而不是捕获和从新抛出它们。这大致上就是计数已成功的记录,即便它从未写入目标Kafka主题。这必须设为false对于确保 至少一次
默认为true。启用此函数后,Flink的检查点将在检查点成功以前等待检查点时的任何动态记录被Kafka确认。这可确保检查点以前的全部记录都已写入Kafka。必须开启,对于确保 至少一次
总之,默认状况下,Kafka生成器对版本0.9和0.10具备至少一次保证,即
setLogFailureOnly设置为false和setFlushOnCheckpoint设置为true。
默认状况下,重试次数设置为“0”。这意味着当setLogFailuresOnly设置为时false,生产者会当即失败,包括Leader更改。
默认状况下,该值设置为“0”,以免重试致使目标主题中出现重复消息。对于常常更改代理的大多数生产环境,建议将重试次数设置为更高的值。
Kafka目前没有生产者事务,所以Flink在Kafka主题里没法保证刚好一次交付
启用Flink的检查点后,FlinkKafkaProducer011
对于Kafka >= 1.0.0版本是FlinkKafkaProduce
能够提供准确的一次交付保证。
除了启用Flink的检查点,还能够经过将适当的语义参数传递给FlinkKafkaProducer011,选择三种不一样的算子操做模式
Flink啥都不保证。生成的记录可能会丢失,也可能会重复。
相似于setFlushOnCheckpoint(true)在 FlinkKafkaProducer010。这能够保证不会丢失任何记录(尽管它们能够重复)。
使用Kafka事务提供刚好一次的语义。每当您使用事务写入Kafka时,不要忘记为任何从Kafka消费记录的应用程序设置所需的isolation.level(read_committed 或read_uncommitted- 后者为默认值)。
Semantic.EXACTLY_ONCE 模式依赖于在从所述检查点恢复以后提交在获取检查点以前启动的事务的能力。若是Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将致使数据丢失(Kafka将自动停止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。
Kafka broker默认 transaction.max.timeout.ms 设置为15分钟。此属性不容许为生产者设置大于其值的事务超时。
FlinkKafkaProducer011默认状况下,将_transaction.timeout.msproducer config_中的属性设置为1小时,所以_transaction.max.timeout.ms_在使用 Semantic.EXACTLY_ONCE 模式以前应该增长 该属性。
在_read_committed_模式中KafkaConsumer,任何未完成的事务(既不停止也不完成)将阻止来自给定Kafka主题的全部读取超过任何未完成的事务。换言之,遵循如下事件顺序:
即便事务2已经提交了记录,在事务1提交或停止以前,消费者也不会看到它们。这有两个含义:
Semantic.EXACTLY_ONCE 模式为每一个FlinkKafkaProducer011实例使用固定大小的KafkaProducers池。每一个检查点使用其中一个生产者。若是并发检查点的数量超过池大小,FlinkKafkaProducer011 将引起异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。
Semantic.EXACTLY_ONCE 采起全部可能的措施,不要留下任何阻碍消费者阅读Kafka主题的延迟事务,这是必要的。可是,若是Flink应用程序在第一个检查点以前失败,则在从新启动此类应用程序后,系统中没有关于先前池大小的信息。所以,在第一个检查点完成以前按比例缩小Flink应用程序是不安全的 _ FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR_。
启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式按期检查其全部Kafka偏移以及其余 算子操做的状态。若是做业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始从新使用来自Kafka的记录。
所以,绘制检查点的间隔定义了程序在发生故障时最多能够返回多少。
启用流式传输做业的检查点。 将按期快照流式数据流的分布式状态。 若是发生故障,流数据流将从最新完成的检查点从新启动。
该做业在给定的时间间隔内按期绘制检查点。 状态将存储在配置的状态后端。
此刻未正确支持检查点迭代流数据流。 若是“force”参数设置为true,则系统仍将执行做业。
要使用容错的Kafka使用者,须要在运行环境中启用拓扑的检查点:
另请注意,若是有足够的处理插槽可用于从新启动拓扑,则Flink只能从新启动拓扑。所以,若是拓扑因为丢失了TaskManager而失败,那么以后仍然必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。
若是未启用检查点,Kafka使用者将按期向Zookeeper提交偏移量。