IDEA Spark Streaming Flume数据源 --解决没法转化为实际输入数据,及中文乱码(Scala)

须要三步:shell

1.shell:往 1234 端口写数据apache

nc localhost 1234 spa

2.shell: 启动flume服务code

cd /usr/local2/flume/binblog

./flume-ng agent --conf /usr/local2/flume/conf -f /usr/local2/flume/conf/flume-to-spark.conf  --name a1ci

3.IDEA:get

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStream_Flume_source {
  def main(args: Array[String]): Unit = {
    val host="localhost"
    val port=4321
    val setIntervalTime=Seconds(2)
    val sparkConf=new SparkConf().setAppName("flume 数据源").setMaster("local[2]")
    val ssc=new StreamingContext(sparkConf,setIntervalTime)
    val stream=FlumeUtils.createStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
        stream.count().map(x=>"收到"+x+"个 flume events").print()
    val words=stream.flatMap(x=>new String(x.event.getBody.array()).split(" ")).map(x=>(x,1))
         words.reduceByKey((x,y)=>x+y).print()
      ssc.start()
    ssc.awaitTermination()
  }

}

在IDEA中能够看到输入的数据,中文也能够照常显示it

/usr/local2/flume/conf/flume-to-spark.confspark

     a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 1234

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port = 4321

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000000

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

 注意整个启动顺序:IDEA>>>>shell2>>>>shell1 不然报错 io

相关文章
相关标签/搜索