一,架构介绍html
生产中因为历史缘由web后端,mysql集群,kafka集群(或者其它消息队列)会存在一下三种结构。mysql
1,数据先入mysql集群,再入kafkaweb
数据入mysql集群是不可更改的,如何再高效的将数据写入kafka呢?面试
A),在表中存在自增ID的字段,而后根据ID,按期扫描表,而后将数据入kafka。sql
B),有时间字段的,能够按照时间字段按期扫描入kafka集群。数据库
C),直接解析binlog日志,而后解析后的数据写入kafka。json
2,web后端同时将数据写入kafka和mysql集群后端
3,web后端将数据先入kafka,再入mysql集群架构
这个方式,有不少优势,好比能够用kafka解耦,而后将数据按照离线存储和计算,实时计算两个模块构建很好的大数据架构。抗高峰,便于扩展等等。socket
二,实现步骤
1,mysql安装准备
安装mysql估计看这篇文章的人都没什么问题,因此本文不具体讲解了。
A),假如你单机测试请配置好server_id
B),开启binlog,只需配置log-bin
[root@localhost ~]# cat /etc/my.cnf
[mysqld]
server_id=1
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
log-bin=/var/lib/mysql/mysql-binlog
[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
?
建立测试库和表
create database school character set utf8 collate utf8_general_ci;
?
create table student(
name varchar(20) not null comment '姓名',
sid int(10) not null primary key comment '学员',
majora varchar(50) not null default '' comment '专业',
tel varchar(11) not null unique key comment '手机号',
birthday date not null comment '出生日期'
);
2,binlog日志解析
两种方式:
一是扫面binlog文件(有须要的话请联系浪尖)
二是经过复制同步的方式
暂实现了第二种方式,样例代码以下:
MysqlBinlogParse mysqlBinlogParse=new MysqlBinlogParse(args[0],Integer.valueOf(args[1]),args[2],args[3]){
@Override
public void processDelete(String queryType, String database, String sql) {
try {
String jsonString=SqlParse.parseDeleteSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void processInsert(String queryType, String database, String sql) {
try {
String jsonString=SqlParse.parseInsertSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void processUpdate(String queryType, String database, String sql) {
String jsonString;
try {
jsonString=SqlParse.parseUpdateSql(sql);
JSONObject jsonObject=JSONObject.fromObject(jsonString);
jsonObject.accumulate("database", database);
jsonObject.accumulate("queryType", queryType);
System.out.println(sql);
System.out.println(" ");
System.out.println(" ");
System.out.println(jsonObject.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
mysqlBinlogParse.setServerId(3);
mysqlBinlogParse.start();
?
?
3,sql语法解析
从原始的mysql 的binlog event中,咱们能解析到的信息,主要的也就是mysql的database,query类型(INSERT,DELETE,UPDATE),具体执行的sql。我这里封装了三个重要的方法。只暴露了这三个接口,那么咱们要明白的事情是,美味的英文咱们入kafka,而后流式处理的时候但愿的到的是跟插入mysql后同样格式的数据。这个时候咱们就要本身作sql的解析,将query的sql解析成字段形式的数据,供流式处理。解析的格式以下:
A),INSERT
B),DELETE
C),UPDATE
最终浪尖是将解析后的数据封装成了json,而后咱们本身写kafka producer将消息发送到kafka,后端就能够处理了。
三,总结
最后,浪尖仍是建议web后端数据最好先入消息队列,如kafka,而后分离线和实时将数据进行解耦分流,用于实时处理和离线处理。
?
消息队列的订阅者能够根据须要随时扩展,能够很好的扩展数据的使用者。
?
消息队列的横向扩展,增长吞吐量,作起来仍是很简单的。这个用传统数据库,分库分表仍是很麻烦的。
?
因为消息队列的存在,也能够帮助咱们抗高峰,避免高峰时期后端处理压力过大致使整个业务处理宕机。
?
具体源码球友能够在知识星球获取。
欢迎你们进入知识星球,学习更多更深刻的大数据知识,面试经验,获取更多更详细的资料。