首先是关于flume的基础介绍git
组件名称 github |
功能介绍web |
Agent代理apache |
使用JVM 运行Flume。每台机器运行一个agent,可是能够在一个agent中包含多个sources和sinks。fetch |
Client客户端ui |
生产数据,运行在一个独立的线程。spa |
Source源插件 |
从Client收集数据,传递给Channel。线程 |
Sink接收器3d |
从Channel收集数据,进行相关操做,运行在一个独立线程。 |
Channel通道 |
链接 sources 和 sinks ,这个有点像一个队列。 |
Events事件 |
传输的基本数据负载。 |
目前来讲flume是支持多种source
其中是支持读取jms消息队列消息,网上并无找到关于读取rabbitmq的教程
虽然flume并不支持读取rabbitMq,因此须要对flume进行二次开发
这里主要就是flume怎么从rabbitMq读取数据
这里从git上找到了一个关于flume从rabbitMq读取数据的插件
下载地址是:https://github.com/gmr/rabbitmq-flume-plugin
上面有一些英文的描述,你们能够看下
环境介绍
centOS 7.3 jdk1.8 cdh5.14.0
1.用 mvn 打包该项目,会生成两个JAR包
2.由于我这边使用的以cdh方式安装集成flume的,因此把这两个jar 扔到 /usr/lib 下面
若是是普通的安装方式须要把这两个jar包复制到 flume安装目录的lib下面
3.进入cdh管理页面配置Agent
下面是详细的配置,我这边是直接把消息写入kafka集群里 的
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
tier1.sources.source1.bind = 127.0.0.1
tier1.sources.source1.port = 5672
tier1.sources.source1.virtual-host = /
tier1.sources.source1.username = guest
tier1.sources.source1.password = guest
tier1.sources.source1.queue = test
tier1.sources.source1.prefetchCount = 10
tier1.sources.source1.channels = channel1
tier1.sources.source1.threads = 2
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
tier1.sources.source1.interceptors.i1.preserveExisting = true
tier1.channels.channel1.type = memory
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = flume_out
tier1.sinks.sink1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,27.0.0.1:9094
tier1.sinks.sink1.requiredAcks = 1
tier1.sinks.sink11.batchSize = 20
配置完成更新配置从新启动Agent
这个是接收到rabbitMq消息
大功告成,若是配置中有疑问的能够留言,我看到后会回复