Mysql 流增量写入 Hdfs(一) --从 mysql 到 kafka

一. 概述

在大数据的静态数据处理中,目前广泛采用的是用 Spark + Hdfs (Hive / Hbase) 的技术架构来对数据进行处理。mysql

但有时候有其余的需求,须要从其余不一样数据源不间断得采集数据,而后存储到 Hdfs 中进行处理。而追加(append)这种操做在 Hdfs 里面明显是比较麻烦的一件事。所幸有了 Storm 这么个流数据处理这样的东西问世,能够帮咱们解决这些问题。git

不过光有 Storm 还不够,咱们还须要其余中间件来协助咱们,让全部其余数据源都归于一个通道。这样就能实现不一样数据源以及 Hhdfs 之间的解耦。而这个中间件 Kafka 无疑是一个很好的选择。github

这样咱们就可让 Mysql 的增量数据不停得抛出到 Kafka ,然后再让 storm 不停得从 Kafka 对应的 Topic 读取数据并写入到 Hdfs 中。redis

二. 基本知识

2.1 Mysql binlog 介绍

binlog 即 Mysql 的二进制日志。它能够说是 Mysql 最重要的日志了,它记录了全部的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。sql

上面所说的提到了 DDL 和 DML ,可能有些同窗不了解,这里顺便说一下:数据库

  • DDL:数据定义语言DDL用来建立数据库中的各类对象-----表、视图、索引、同义词、聚簇等如:CREATE TABLE/VIEW/INDEX/SYN/CLUSTER...
  • DML:数据操纵语言DML主要有三种形式:插入(INSERT), 更新(UPDATE),以及删除(DELETE)。

在 Mysql 中,binlog 默认是不开启的,由于有大约 1% (官方说法)的性能损耗,若是要手动开启,流程以下:编程

  1. vi编辑打开mysql配置文件:
vi /usr/local/mysql/etc/my.cnf

在[mysqld] 区块设置/添加以下,json

log-bin=mysql-bin

注意必定要在 [mysqld] 下。 2. 重启 Mysqlbootstrap

pkill mysqld
/usr/local/mysql/bin/mysqld_safe --user=mysql &

2.2 kafka

这里只对 Kafka 作一个基本的介绍,更多的内容能够度娘一波。安全

<img src="https://img2018.cnblogs.com/blog/1011838/201812/1011838-20181206075114483-251042842.png" width="70%">

上面的图片是 kafka 官方的一个图片,咱们目前只须要关注 Producers 和 Consumers 就好了。

Kafka 是一个分布式发布-订阅消息系统。分布式方面由 Zookeeper 进行协同处理。消息订阅其实说白了吧,就是一个队列,分为消费者和生产者,就像上图中的内容,有数据源充当 Producer 生产数据到 kafka 中,而有数据充当 Consumers ,消费 kafka 中的数据。

<img src="https://img2018.cnblogs.com/blog/1011838/201812/1011838-20181206075127662-202539931.png" width="70%">

上图中的 offset 指的是数据的写入以及消费的位置的信息,这是由 Zookeeper 管理的。也就是说,当 Consumers 重启或是怎样,须要从新从 kafka 读取消息时,总不能让它从头开始消费数据吧,这时候就须要有个记录能告诉你从哪里开始从新读取。这就是 offset 。

kafka 中还有一个相当重要的概念,那就是 topic 。不过这个其实仍是很好理解的,好比你要订阅一些消息,你确定是不会订阅全部消息的吧,你只须要订阅你感兴趣的主题,好比摄影,编程,搞笑这些主题。而这里主题的概念其实和 topic 是同样的。总之,能够将 topic 归结为通道,kafka 中有不少个通道,不一样的 Producer 向其中一个通道生产数据,也就是抛数据进去这个通道,Comsumers 不停得消费通道中的数据。

而咱们要作的就是将 Mysql binlog 产生的数据抛到 kafka 中充看成生产者,而后由 storm 充当消费者,不停得消费数据并写入到 Hdfs 中。

至于怎么将 binlog 的数据抛到 kafka ,别急,下面咱们就来介绍。

2.3 maxwell

maxwell 这个工具能够很方便得监听 Mysql 的 binlog ,**而后每当 binlog 发生变化时,就会以 json 格式抛出对应的变化数据到 Kafka 中。**好比当向 mysql 一张表中插入一条语句的时候,maxwell 就会马上监听到 binlog 中有对应的记录增长,而后将一些信息包括插入的数据都转化成 json 格式,而后抛到 kafka 指定的 topic 中。

下载地址在这里能够找到。

除了 Kafka 外,其实 maxwell 还支持写入到其余各类中间件,好比 redis。 同时 maxwell 是比较轻量级的工具,只须要在 mysql 中新建一个数据库供它记录一些信息,而后就能够直接运行。

三. 使用 maxwell 监听 binlog

接下来咱们将的是若是使用 maxwell ,让它监听 mysql 的 binlog 并抛到 kafka 中。maxwell 主要有两种运行方式。一种是使用配置文件,另外一种则是在命令行中添加参数的方式运行。这里追求方便,只使用命令行的方式进行演示。

这里介绍一下简单的将数据抛到 kafka 的命令行脚本吧。

bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
   --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306

各项参数说明以下

  • user:mysql 用户名
  • password:mysql 密码
  • host:Mysql 地址
  • producer:指定写入的中间件类型,好比还有 redies
  • kafka.bootstrap.servers:kafka 的地址
  • kafka_topic:指明写入到 kafka 哪一个 topic
  • port:mysql 端口

启动以后,maxwell 便开始工做了,固然若是你想要让这条命令能够在后台运行的话,可使用 Linux 的 nohup 命令,这里就很少赘述,有须要百度便可。

这样配置的话一般会将整个数据库的增删改都给抛到 kafka ,但这样的需求显然不常见,更常见的应该是具体监听对某个库的操做,或是某个表的操做。

在升级到 1.9.2(最新版本)后,maxwell 为咱们提供这样一个参数,让咱们能够轻松实现上述需求:--filter

这个参数一般包含两个配置项,exclude 和 include。意思就是让你指定排除哪些和包含哪些。好比我只想监听 Adatabase 库下的 Atable 表的变化。我能够这样。

--filter='exclude: *.*, include: Adatabase.Atable'

这样咱们就能够轻松实现监听 mysql binlog 的变化,并能够定制本身的需求。

OK,这一章咱们介绍了 mysql binlog ,kafka 以及 maxwell 的一些内容,下一篇咱们将会看到 storm 如何写入 hdfs 以及定制一些策略。see you~~


欢迎关注公众号哈尔的数据城堡,里面有数据,代码,以及深度的思考。

相关文章
相关标签/搜索