Flink内嵌支持的数据源很是多,好比HDFS、Socket、Kafka、Collections Flink也提供了addSource方 式,能够自定义数据源html
经过读取本地、HDFS文件建立一个数据源
若是读取的是HDFS上的文件,那么须要导入Hadoop依赖node
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.5</version> </dependency>
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment //在算子转换的时候,会将数据转换成Flink内置的数据类型,因此须要将隐式转换导入进来,才能自动进行 类型转换 import org.apache.flink.streaming.api.scala._ object FileSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val textStream = env.readTextFile("hdfs://node01:9000/flink/data/wc") textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() //读完就中止 env.execute() } }
每隔10s中读取HDFS指定目录下的新增文件内容,而且进行WordCount
业务场景:在企业中通常都会作实时的ETL,当Flume采集来新的数据,那么基于Flink实时作ETL入仓apache
//每隔10s中读取 hdfs上新增文件内容 val textStream = env.readFile(textInputFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10 )
readTextFile底层调用的就是readFile方法,readFile是一个更加底层的方式,使用起来会更加的灵活bootstrap
基于本地集合的数据源,通常用于测试场景,没有太大意义api
接受Socket Server中的数据dom
val initStream:DataStream[String] = env.socketTextStream("node01",8888)
Flink接受Kafka中的数据,首先先配置flink与kafka的链接器依赖
官网地址:https://ci.apache.org/project...
maven依赖socket
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.2</version> </dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment val prop = new Properties() prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") prop.setProperty("group.id","flink-kafka-id001") prop.setProperty("key.deserializer",classOf[StringDeserializer].getName) prop.setProperty("value.deserializer",classOf[StringDeserializer].getName) /** * earliest:从头开始消费,旧数据会频繁消费 * latest:从最近的数据开始消费,再也不消费旧数据 */ prop.setProperty("auto.offset.reset","latest") val kafkaStream = env.addSource(new FlinkKafkaConsumer[(String, String)] ("flink-kafka", new KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(t: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } //指定返回数据类型 override def getProducedType: TypeInformation[(String, String)] = createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) }, prop)) kafkaStream.print() env.execute()
kafka命令消费key value值
kafka-console-consumer.sh --zookeeper node01:2181 --topic flink-kafka --property print.key=true 默认只是消费value值
KafkaDeserializationSchema:读取kafka中key、value
SimpleStringSchema:读取kafka中valuemaven
val env = StreamExecutionEnvironment.getExecutionEnvironment //source的并行度为1 单并行度source源 val stream = env.addSource(new SourceFunction[String] { var flag = true override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val random = new Random() while (flag) { ctx.collect("hello" + random.nextInt(1000)) Thread.sleep(200) } } //中止产生数据 override def cancel(): Unit = flag = false }) stream.print() env.execute()
实现ParallelSourceFunction接口=继承RichParallelSourceFunctionide
val env = StreamExecutionEnvironment.getExecutionEnvironment val sourceStream = env.addSource(new ParallelSourceFunction[String] { var flag = true override def run(ctx:SourceFunction.SourceContext[String]): Unit = { val random = new Random() while (flag) { ctx.collect("hello" + random.nextInt(1000)) Thread.sleep(500) } } override def cancel(): Unit = { flag = false } }).setParallelism(2)