Flume实战案例 -- 采集某个目录到HDFS

需求分析
  • 采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就须要把文件采集到HDFS中去html

  • 结构示意图:
    web

  • 根据需求,首先定义如下3大要素shell

    • 数据源组件,即source ——监控文件目录 : spooldirapache

      spooldir特性:vim

      一、监视一个目录,只要目录中出现新文件,就会采集文件中的内容服务器

      二、采集完成的文件,会被agent自动添加一个后缀:COMPLETEDide

      三、此source可靠,不会丢失数据;即便flume重启或被killoop

      注意:ui

      所监视的目录中不容许有同名的文件;且文件被放入spooldir后,就不能修改this

      ①若是文件放入spooldir后,又向文件写入数据,会打印错误及中止

      ②若是有同名的文件出如今spooldir,也会打印错误及中止

    • 下沉组件,即sink——HDFS文件系统 : hdfs sink

    • 通道组件,即channel——可用file channel 也能够用内存channel

flume配置文件开发
  • 配置文件编写:

    cd /bigdata/install/flume-1.9.0/conf/
    mkdir -p /bigdata/install/mydata/flume/dirfile
    vim spooldir.conf
  • 内容以下

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    # 注意:不能往监控目中重复丢同名文件
    a1.sources.r1.type = spooldir
    # 监控的路径
    a1.sources.r1.spoolDir = /bigdata/install/mydata/flume/dirfile
    # Whether to add a header storing the absolute path filename
    #文件绝对路径放到header
    a1.sources.r1.fileHeader = true
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    #采集到的数据写入到次路径
    a1.sinks.k1.hdfs.path = hdfs://hadoop01:8020/spooldir/files/%y-%m-%d/%H%M/    
    # 指定在hdfs上生成的文件名前缀
    a1.sinks.k1.hdfs.filePrefix = events-
    # timestamp向下舍round down
    a1.sinks.k1.hdfs.round = true
    # 按10分钟,为单位向下取整;如55分,舍成50;38 -> 30
    a1.sinks.k1.hdfs.roundValue = 10
    # round的单位
    a1.sinks.k1.hdfs.roundUnit = minute
    # 每3秒滚动生成一个文件;默认30;(0 = never roll based on time interval)
    a1.sinks.k1.hdfs.rollInterval = 3
    # 每x字节,滚动生成一个文件;默认1024;(0: never roll based on file size)
    a1.sinks.k1.hdfs.rollSize = 20
    # 每x个event,滚动生成一个文件;默认10; (0 = never roll based on number of events)
    a1.sinks.k1.hdfs.rollCount = 5
    # 每x个event,flush到hdfs
    a1.sinks.k1.hdfs.batchSize = 1
    # 使用本地时间
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件类型,默认是Sequencefile;可选DataStream,则为普通文本;可选CompressedStream压缩数据
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    # channel中存储的event的最大数目
    a1.channels.c1.capacity = 1000
    # 每次传输数据,从source最多得到event的数目或向sink发送的event的最大的数目
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    组件官网地址:

    spooling directory source

    hdfs sink

    memory channel

  • Channel参数解释:

    • capacity:默认该通道中最大的能够存储的event数量
    • trasactionCapacity:每次最大能够从source中拿到或者送到sink中的event数量
    • keep-alive:event添加到通道中或者移出的容许时间
启动flume
cd /bigdata/install/flume-1.9.0
bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console
上传文件到指定目录
  • 将不一样的文件上传到下面目录里面去,注意文件不能重名

    mkdir -p /home/hadoop/datas
    cd /home/hadoop/datas
    vim a.txt
    
    # 加入以下内容
    ab cd ef
    english math
    hadoop alibaba
  • 再执行;

    cp a.txt /bigdata/install/mydata/flume/dirfile
  • 而后观察flume的console动静、hdfs webui生成的文件

  • 观察spooldir的目标目录

  • 将同名文件再次放到/bigdata/install/mydata/flume/dirfile观察现象:

    cp a.txt /bigdata/install/mydata/flume/dirfile
  • flume控制台报错

相关文章
相关标签/搜索