flink算子-Dataflows DataSource数据源

Dataflows DataSource数据源

Flink内嵌支持的数据源很是多,好比HDFS、Socket、Kafka、Collections Flink也提供了addSource方 式,能够自定义数据源html

File Source

经过读取本地、HDFS文件建立一个数据源
若是读取的是HDFS上的文件,那么须要导入Hadoop依赖node

<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-client</artifactId>              
    <version>2.6.5</version> 
</dependency>
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
//在算子转换的时候,会将数据转换成Flink内置的数据类型,因此须要将隐式转换导入进来,才能自动进行 类型转换 
import org.apache.flink.streaming.api.scala._ 
object FileSource { 
    def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val textStream = env.readTextFile("hdfs://node01:9000/flink/data/wc") textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() 
    //读完就中止 
    env.execute() 
    } 
}

每隔10s中读取HDFS指定目录下的新增文件内容,而且进行WordCount
业务场景:在企业中通常都会作实时的ETL,当Flume采集来新的数据,那么基于Flink实时作ETL入仓apache

//每隔10s中读取    hdfs上新增文件内容 
val textStream = 
env.readFile(textInputFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10 )

readTextFile底层调用的就是readFile方法,readFile是一个更加底层的方式,使用起来会更加的灵活bootstrap

Collection Source

基于本地集合的数据源,通常用于测试场景,没有太大意义api

Socket Source

接受Socket Server中的数据dom

val initStream:DataStream[String] = env.socketTextStream("node01",8888)

Kafka Source

Flink接受Kafka中的数据,首先先配置flink与kafka的链接器依赖
官网地址:https://ci.apache.org/project...
maven依赖socket

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka_2.11</artifactId> 
    <version>1.9.2</version> 
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment 
val prop = new Properties() 
prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") prop.setProperty("group.id","flink-kafka-id001") 
prop.setProperty("key.deserializer",classOf[StringDeserializer].getName) prop.setProperty("value.deserializer",classOf[StringDeserializer].getName) /** 
* earliest:从头开始消费,旧数据会频繁消费 
* latest:从最近的数据开始消费,再也不消费旧数据 
*/ 
prop.setProperty("auto.offset.reset","latest") 
val kafkaStream = env.addSource(new FlinkKafkaConsumer[(String, String)] ("flink-kafka", new KafkaDeserializationSchema[(String, String)] { 
override def isEndOfStream(t: (String, String)): Boolean = false 
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { 
    val key = new String(consumerRecord.key(), "UTF-8") 
    val value = new String(consumerRecord.value(), "UTF-8") 
    (key, value) 
} 
//指定返回数据类型 
override def getProducedType: TypeInformation[(String, String)] = createTuple2TypeInformation(createTypeInformation[String], 
createTypeInformation[String]) 
}, prop)) 
kafkaStream.print() 
env.execute()

kafka命令消费key value值
kafka-console-consumer.sh --zookeeper node01:2181 --topic flink-kafka --property print.key=true 默认只是消费value值
KafkaDeserializationSchema:读取kafka中key、value
SimpleStringSchema:读取kafka中valuemaven

Custom Source

基于SourceFunction接口实现单并行度数据源
val env = StreamExecutionEnvironment.getExecutionEnvironment 
//source的并行度为1 单并行度source源 
val stream = env.addSource(new SourceFunction[String] { 
var flag = true 
override def run(ctx: SourceFunction.SourceContext[String]): Unit = { 
    val random = new Random() 
    while (flag) { 
        ctx.collect("hello" + random.nextInt(1000)) 
        Thread.sleep(200) 
    } 
} 
//中止产生数据 
override def cancel(): Unit = flag = false 
}) 
stream.print() 
env.execute()
基于ParallelSourceFunction接口实现多并行度数据源

实现ParallelSourceFunction接口=继承RichParallelSourceFunctionide

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val sourceStream = env.addSource(new ParallelSourceFunction[String] { 
    var flag = true 
    override def run(ctx:SourceFunction.SourceContext[String]): Unit = { 
        val random = new Random() 
        while (flag) { 
            ctx.collect("hello" + random.nextInt(1000)) 
            Thread.sleep(500) 
        } 
    } 
    override def cancel(): Unit = { 
    flag = false 
    } 
}).setParallelism(2)
相关文章
相关标签/搜索