大数据学习——flume相关笔记

flume

收集、移动、聚合大量日志数据的服务。
基于流数据的架构,用于在线日志分析。

基于事件。
在生产和消费者之间启动协调做用。
提供了事务保证,确保消息必定被分发。
Source 多种
sink多种.

multihop		//多级跃点.

水平扩展:		//加节点
	
竖直扩展		//增长硬件。

Source

接受数据,类型有多种。

Channel

临时存放地,对Source中来的数据进行缓冲,直到sink消费掉。

Sink

从channel提取数据存放到中央化存储(hadoop / hbase)。

安装flume

1.下载
2.tar
3.环境变量
4.验证flume是否成功
	$>flume-ng version			//next generation.下一代.

配置flume

1.建立配置文件
[/soft/flume/conf/hello.conf]
#声明三种组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#定义source信息
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

#定义sink信息
a1.sinks.k1.type=logger

#定义channel信息
a1.channels.c1.type=memory

#绑定在一块儿
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2.运行
	a)启动flume agent
		$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
	b)启动nc的客户端
		$>nc localhost 8888
		$nc>hello world
	
	c)在flume的终端输出hello world.

安装nc

$>sudo yum install nmap-ncat.x86_64

清除仓库缓存

$>修改ali.repo --> ali.repo.bak文件。
$>sudo yum clean all
$>sudo yum makecache

#例如阿里基本源 
$>sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 

#阿里epel源
$>sudo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo

flume source

1.netcat
	nc ..

2.exec
	实时日志收集,实时收集日志。
	a1.sources = r1
	a1.sinks = k1
	a1.channels = c1

	a1.sources.r1.type=exec
	a1.sources.r1.command=tail -F /home/centos/test.txt

	a1.sinks.k1.type=logger

	a1.channels.c1.type=memory

	a1.sources.r1.channels=c1
	a1.sinks.k1.channel=c1

3.批量收集
	监控一个文件夹,静态文件。
	收集完以后,会重命名文件成新文件。.compeleted.
	a)配置文件
		[spooldir_r.conf]
		a1.sources = r1
		a1.channels = c1
		a1.sinks = k1

		a1.sources.r1.type=spooldir
		a1.sources.r1.spoolDir=/home/centos/spool
		a1.sources.r1.fileHeader=true

		a1.sinks.k1.type=logger

		a1.channels.c1.type=memory

		a1.sources.r1.channels=c1
		a1.sinks.k1.channel=c1

	b)建立目录
		$>mkdir ~/spool

	c)启动flume
		$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console

4.序列source
	[seq]
	a1.sources = r1
	a1.channels = c1
	a1.sinks = k1

	a1.sources.r1.type=seq
	a1.sources.r1.totalEvents=1000

	a1.sinks.k1.type=logger

	a1.channels.c1.type=memory

	a1.sources.r1.channels=c1
	a1.sinks.k1.channel=c1

	[运行]
	$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console

5.StressSource
	a1.sources = stresssource-1
	a1.channels = memoryChannel-1
	a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
	a1.sources.stresssource-1.size = 10240
	a1.sources.stresssource-1.maxTotalEvents = 1000000
	a1.sources.stresssource-1.channels = memoryChannel-1

flume sink

1.hdfs
	a1.sources = r1
	a1.channels = c1
	a1.sinks = k1

	a1.sources.r1.type = netcat
	a1.sources.r1.bind = localhost
	a1.sources.r1.port = 8888

	a1.sinks.k1.type = hdfs
	a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
	a1.sinks.k1.hdfs.filePrefix = events-

	#是不是产生新目录,每十分钟产生一个新目录,通常控制的目录方面。
	#2017-12-12 -->
	#2017-12-12 -->%H%M%S

	a1.sinks.k1.hdfs.round = true			
	a1.sinks.k1.hdfs.roundValue = 10
	a1.sinks.k1.hdfs.roundUnit = second
	
	a1.sinks.k1.hdfs.useLocalTimeStamp=true

	#是否产生新文件。
	a1.sinks.k1.hdfs.rollInterval=10
	a1.sinks.k1.hdfs.rollSize=10
	a1.sinks.k1.hdfs.rollCount=3

	a1.channels.c1.type=memory

	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1

2.hive
	略

3.hbase
	a1.sources = r1
	a1.channels = c1
	a1.sinks = k1

	a1.sources.r1.type = netcat
	a1.sources.r1.bind = localhost
	a1.sources.r1.port = 8888

	a1.sinks.k1.type = hbase
	a1.sinks.k1.table = ns1:t12
	a1.sinks.k1.columnFamily = f1
	a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

	a1.channels.c1.type=memory

	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1

4.kafka

使用avroSource和AvroSink实现跃点agent处理

1.建立配置文件
	[avro_hop.conf]
	#a1
	a1.sources = r1
	a1.sinks= k1
	a1.channels = c1

	a1.sources.r1.type=netcat
	a1.sources.r1.bind=localhost
	a1.sources.r1.port=8888

	a1.sinks.k1.type = avro
	a1.sinks.k1.hostname=localhost
	a1.sinks.k1.port=9999

	a1.channels.c1.type=memory

	a1.sources.r1.channels = c1
	a1.sinks.k1.channel = c1

	#a2
	a2.sources = r2
	a2.sinks= k2
	a2.channels = c2

	a2.sources.r2.type=avro
	a2.sources.r2.bind=localhost
	a2.sources.r2.port=9999

	a2.sinks.k2.type = logger

	a2.channels.c2.type=memory

	a2.sources.r2.channels = c2
	a2.sinks.k2.channel = c2

2.启动a2
	$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console

3.验证a2
	$>netstat -anop | grep 9999
	

4.启动a1
	$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1
	
5.验证a1
	$>netstat -anop | grep 8888

channel

1.MemoryChannel
	略
2.FileChannel

a1.sources = r1
a1.sinks= k1
a1.channels = c1web

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888apache

a1.sinks.k1.type=loggercentos

a1.channels.c1.type = file
	a1.channels.c1.checkpointDir = /home/centos/flume/fc_check
	a1.channels.c1.dataDirs = /home/centos/flume/fc_data

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1缓存

可溢出文件通道

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
#0表示禁用内存通道,等价于文件通道
a1.channels.c1.memoryCapacity = 0
#0,禁用文件通道,等价内存通道。
a1.channels.c1.overflowCapacity = 2000架构

a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /user/centos/flume/fc_check
a1.channels.c1.dataDirs = /user/centos/flume/fc_datamaven

建立Flume模块

1.添加pom.xml
	<?xml version="1.0" encoding="UTF-8"?>
	<project xmlns="http://maven.apache.org/POM/4.0.0"
			 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
			 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
		<modelVersion>4.0.0</modelVersion>
		<groupId>com.it18zhang</groupId>
		<artifactId>FluemDemo</artifactId>
		<version>1.0-SNAPSHOT</version>
		<dependencies>
			<dependency>
				<groupId>org.apache.flume</groupId>
				<artifactId>flume-ng-core</artifactId>
				<version>1.7.0</version>
			</dependency>
			<dependency>
				<groupId>junit</groupId>
				<artifactId>junit</artifactId>
				<version>4.11</version>
			</dependency>
		</dependencies>
	</project>