Flume将MySQL表数据存入到HBase

Flume将MySQL表数据存入到HBase

HBasesink的三种序列化模式mysql

  • SimpleHbaseEventSerializer
  • RegexHbaseEventSerializer
  • SimpleAsyncHbaseEventSerializer

使用SimpleHbaseEventSerializer序列化模式

1、在HBase中建立table1

hbase(main):021:0> create 'default:table1', 'info'
Created table default:table1
Took 1.3042 seconds
=> Hbase::Table - table1

2、flume的配置文件

agent.channels = ch1
agent.sinks = hbase-sink
agent.sources = sql-source
agent.channels.ch1.type = memory
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource


agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop
agent.sources.sql-source.hibernate.connection.user = root  
agent.sources.sql-source.hibernate.connection.password = root
agent.sources.sql-source.table = t_name
agent.sources.sql-source.columns.to.select = *

agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path = /home/lwenhao/flume
agent.sources.sql-source.status.file.name = sql-source.status


# sink 配置为HBaseSink 和 SimpleHbaseEventSerializer
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
#HBase表名
agent.sinks.hbase-sink.table = table1
#HBase表的列族名称
agent.sinks.hbase-sink.columnFamily  = info
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
#HBase表的列族下的某个列名称
agent.sinks.hbase-sink.serializer.payloadColumn = id,sip,dip,sport,dport,protocol,flowvalue,createtime
# 组合sink和channel
agent.sinks.hbase-sink.channel = ch1

3、启动flume

bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-hbase.conf -Dflume.root.logger=DEBUG,console

4、效果

字段对应的值存在问题,缘由:SimpleHbaseEventSerializer只能进行简单的匹配,数据已经存入hbase。若是想多个字段匹配怎么办?使用RegexHbaseEventSerializerSimpleAsyncHbaseEventSerializer,也能够自定义。sql

使用RegexHbaseEventSerializer序列化模式

RegexHbaseEventSerializer能够使用正则匹配切割event,而后存入HBase表的多个列apache

先清空table1oop

truncate 'table1'

1、修改flume的配置文件

agent.channels = ch1
agent.sinks = hbase-sink
agent.sources = sql-source
agent.channels.ch1.type = memory
agent.sources.sql-source.channels = ch1
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop
agent.sources.sql-source.hibernate.connection.user = root
agent.sources.sql-source.hibernate.connection.password = root
agent.sources.sql-source.table = t_name
agent.sources.sql-source.columns.to.select = *
agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0
agent.sources.sql-source.run.query.delay=5000
agent.sources.sql-source.status.file.path = /home/lwenhao/flume
agent.sources.sql-source.status.file.name = sql-source.status

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = table1
agent.sinks.hbase-sink.columnFamily  = info
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.hbase-sink.serializer.regex = ^\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"$
agent.sinks.hbase-sink.serializer.colNames = id,sip,dip,sport,dport,protocol,flowvalue,createtime
agent.sinks.hbase-sink.channel = ch1

2、启动flume

bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-hbase.conf

3、效果

相关文章
相关标签/搜索