【Mysql源码分析】基于行的复制实现之“主从关系创建”

前言

  常常听到别人说Mysql的SBR、RBR、MBR,若是不清楚,那么能够跟着文章一块儿来学习。因为涉及到主从的内容比较多,须要拆分红多篇内容来概述,这章先从基础知识和主从关系创建开始讲起。还会出一篇文章详细讲解从主同步。mysql

1.了解什么是SBR、RBR、MBR?sql

2.了解下主从配置该如何配置?数据库

3.了解主从关系如何创建?缓存

1.配置Mysql主从

  在本文中,分为一主一从。主监听的端口为3306,从监听的端口为3309。安全

1.1主服务配置

master配置,配置文件my.cnf:bash

[mysqld]
port=3306
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data
socket=/tmp/mysql.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2 #表名存储为给定的大小写可是比较的时候是小写的
log_bin=mysql-bin
server_id =10

主服务启动Mysql命令:服务器

# sudo bin/mysqld --defaults-file=/usr/local/mysql8.0.20/etc/my.cnf --user=root

客户端链接master端多线程

# mysql --socket=/tmp/mysql.sock -u root

启动master后,须要建立一个用户,用于主从同步:并发

mysql> GRANT REPLICATION SLAVE ON *.* TO repl@'localhost' IDENTIFIED BY '123456';

查看主服务状态socket

mysql> show master status \G;

1.2 从服务配置

slave配置,配置文件salve.conf:

[mysqld]
port=3309
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data1
socket=/tmp/mysqlslave.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2 #表名存储为给定的大小写可是比较的时候是小写的
log_bin=mysql-bin
server_id=2
relay_log=/usr/local/mysql8.0.20/mysql-relay-bin
read_only=1     #执行模式

从服务启动Mysql命令:

# sudo bin/mysqld --defaults-file=/usr/local/mysql8.0.20/etc/salve.conf --user=root

链接Slave端:

# mysql --socket=/tmp/mysqlslave.sock -u root

Slave端主从同步配置:

mysql>
change master to master_host='localhost', 
master_user='repl',
master_password='123456',
master_log_file='mysql-bin.000001',
master_log_pos=0;

Slave/Master

#查看binlog事件
mysql> SHOW BINLOG EVENTS;

  若是发现主从没有同步,可使用以下命令查看相应的状态:

#查看slave状态
mysql> show slave status;

  若是遇到从库报这个错误:Got fatal error 1236 from master when reading data from binary log: 'Could not find first log file name in binary log index file'

Got fatal error 1236 from master when reading data from binary log: 'could not find next log'

  此时能够操做以下三个命令进行处理:

#关闭slave
mysql> stop slave;
#重置slave
mysql> reset slave;
#启动slave
mysql> start slave;

2.MYSQL中BINLOG_FORMAT的三种模式

  mysql三种复制方式:基于SQL语句的复制(statement-based replication, SBR),基于行的复制(row-based replication, RBR),混合模式复制(mixed-based replication, MBR)。对应的,binlog的格式也有三种:STATEMENT,ROW,MIXED。

2.1 STATEMENT模式(SBR)

  每一条会修改数据的sql语句会记录到binlog中。优势是并不须要记录每一条sql语句和每一行的数据变化,减小了binlog日志量,节约IO,提升性能。缺点是在某些状况下会致使master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)

2.2 ROW模式(RBR)

  不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改为什么样了。并且不会出现某些特定状况下的存储过程、或function、或trigger的调用和触发没法被正确复制的问题。缺点是会产生大量的日志,尤为是alter table的时候会让日志暴涨。

2.3 MIXED模式(MBR)

  以上两种模式的混合使用,通常的复制使用STATEMENT模式保存binlog,对于STATEMENT模式没法复制的操做使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

  binlog复制配置在mysql的配置文件my.cnf中,能够经过一下选项配置binglog相关,配置以下:

#binlog日志格式,mysql默认采用statement,建议使用mixed
binlog_format           = MIXED    

#binlog日志文件
log-bin                 = /data/mysql/mysql-bin.log  

#binlog过时清理时间
expire_logs_days        = 7    

#binlog每一个日志文件大小
max_binlog_size         = 100m                  

#binlog缓存大小
binlog_cache_size       = 4m                    

#最大binlog缓存大小
max_binlog_cache_size   = 512m

2.4 优缺点对比

  对于执行的SQL语句中包含now()这样的时间函数,会在日志中产生对应的unix_timestamp()*1000的时间字符串,slave在完成同步时,取用的是sqlEvent发生的时间来保证数据的准确性。另外对于一些功能性函数slave能完成相应的数据同步,而对于上面指定的一些相似于UDF函数,致使Slave没法知晓的状况,则会采用ROW格式存储这些Binlog,以保证产生的Binlog能够供Slave完成数据同步。

如今来比较如下 SBR 和 RBR 这2种模式各自的优缺点:

SBR 的优势:

  • 技术比较成熟
  • binlog文件较小
  • binlog中包含了全部数据库更改信息,能够据此来审核数据库的安全等状况
  • binlog能够用于实时的还原,而不只仅用于复制
  • 主从版本能够不同,从服务器版本能够比主服务器版本高

SBR 的缺点:

  • 不是全部的UPDATE语句都能被复制,尤为是包含不肯定操做的时候。
  • 调用具备不肯定因素的 UDF 时复制也可能出问题
  • 使用如下函数的语句也没法被复制:

    • LOAD_FILE()
    • UUID()
    • USER()
    • FOUND_ROWS()
    • SYSDATE() (除非启动时启用了 --sysdate-is-now 选项)

INSERT ... SELECT 会产生比 RBR 更多的行级锁
复制须要进行全表扫描(WHERE 语句中没有使用到索引)的 UPDATE 时,须要比 RBR 请求更多的行级锁
对于有 AUTO_INCREMENT 字段的 InnoDB表而言,INSERT 语句会阻塞其余 INSERT 语句
对于一些复杂的语句,在从服务器上的耗资源状况会更严重,而 RBR 模式下,只会对那个发生变化的记录产生影响
存储函数(不是存储过程)在被调用的同时也会执行一次 NOW() 函数,这个能够说是坏事也多是好事
肯定了的 UDF 也须要在从服务器上执行
数据表必须几乎和主服务器保持一致才行,不然可能会致使复制出错
执行复杂语句若是出错的话,会消耗更多资源

RBR 的优势:

任何状况均可以被复制,这对复制来讲是最安全可靠的和其余大多数数据库系统的复制技术同样多数状况下,从服务器上的表若是有主键的话,复制就会快了不少。复制如下几种语句时的行锁更少:

  • INSERT ... SELECT
  • 包含 AUTO_INCREMENT 字段的 INSERT
  • 没有附带条件或者并无修改不少记录的 UPDATE 或 DELETE 语句执行 INSERT,UPDATE,DELETE 语句时锁更少,从服务器上采用多线程来执行复制成为可能。

RBR 的缺点:

  • binlog 大了不少
  • 复杂的回滚时 binlog 中会包含大量的数据

主服务器上执行 UPDATE 语句时,全部发生变化的记录都会写到 binlog 中,而 SBR 只会写一次,这会致使频繁发生 binlog 的并发写问题

  • UDF 产生的大 BLOB 值会致使复制变慢

没法从 binlog 中看到都复制了写什么语句
当在非事务表上执行一段堆积的SQL语句时,最好采用 SBR 模式,不然很容易致使主从服务器的数据不一致状况发生。

另外,针对系统库 mysql 里面的表发生变化时的处理规则以下:
若是是采用 INSERT,UPDATE,DELETE 直接操做表的状况,则日志格式根据 binlog_format 的设定而记录
若是是采用 GRANT,REVOKE,SET PASSWORD 等管理语句来作的话,那么不管如何都采用 SBR 模式记录
注:采用 RBR 模式后,能解决不少原先出现的主键重复问题。

2.5 如何查看复制格式

  经过以下命令便可查看复制格式,如图2-4-1:

mysql> show variables like 'binlog_format';

241.png
<center>图2-4-1 查看复制格式</center>

  默认binlog_format参数为行复制,在源码mysql-8.0.20/sql/sys_vars.cc中

static Sys_var_enum Sys_binlog_format(
    "binlog_format",
    "What form of binary logging the master will "
    "use: either ROW for row-based binary logging, STATEMENT "
    "for statement-based binary logging, or MIXED. MIXED is statement-"
    "based binary logging except for those statements where only row-"
    "based is correct: those which involve user-defined functions (i.e. "
    "UDFs) or the UUID() function; for those, row-based binary logging is "
    "automatically used. If NDBCLUSTER is enabled and binlog-format is "
    "MIXED, the format switches to row-based and back implicitly per each "
    "query accessing an NDBCLUSTER table",
    SESSION_VAR(binlog_format), CMD_LINE(REQUIRED_ARG, OPT_BINLOG_FORMAT),
    binlog_format_names, 
    DEFAULT(BINLOG_FORMAT_ROW), //默认binlog的同步为行复制
    NO_MUTEX_GUARD,
    NOT_IN_BINLOG, ON_CHECK(binlog_format_check),
    ON_UPDATE(fix_binlog_format_after_update));

  经过上面代码能够看出mysql-8.0.20默认为行复制。

  那么继续来看一下binlog_format都有那些模式,能够看mysql-8.0.20/sql/system_variables.h文件

// Values for binlog_format sysvar
enum enum_binlog_format {
  BINLOG_FORMAT_MIXED = 0,  ///<混合模式 statement if safe, otherwise row - autodetected
  BINLOG_FORMAT_STMT = 1,   ///<SQL复制 statement-based
  BINLOG_FORMAT_ROW = 2,    ///<行复制 row-based
  BINLOG_FORMAT_UNSPEC =
      3  ///< thd_binlog_format() returns it when binlog is closed
};

  基于以下代码能够得知,binlog_format包含BINLOG_FORMAT_MIXED、BINLOG_FORMAT_STMT、BINLOG_FORMAT_ROW三种模式,也就是对应:STATEMENT模式(SBR)、ROW模式(RBR)、MIXED模式(MBR)

3.创建主从关系

  经过张图来看一下主从关系创建的相关流程,如图3-1-1:
311.jpg
<center>图3-1-1 主从同步创建</center>

3.1 slave端start_salve方法

  在mysql主从创建中,是由slave端先发起,当执行“start slave;” 语句时,会调用rpl_slave.cc中的start_slave方法,其中实现以下:

bool start_slave(THD *thd, LEX_SLAVE_CONNECTION *connection_param,
                 LEX_MASTER_INFO *master_param, int thread_mask_input,
                 Master_info *mi, bool set_mts_settings) {
  bool is_error = false;
  int thread_mask;

  DBUG_TRACE;
    
  lock_slave_threads(mi);  //中止运行线程
  // 获取已中止线程的掩码
  init_thread_mask(&thread_mask, mi, true /* inverse */);
  /*
    咱们将中止下面的全部线程。但若是用户想只启动一个线程,
    就好像另外一个线程正在运行同样(就像咱们不要想碰另外一根线),
    因此将位设置为0其余线程
  */
  if (thread_mask_input) {
    thread_mask &= thread_mask_input;
  }
  if (thread_mask)  // 一些线程中止,启动它们
  {
    if (load_mi_and_rli_from_repositories(mi, false, thread_mask)) {
      is_error = true;
      my_error(ER_MASTER_INFO, MYF(0));
    } else if (*mi->host || !(thread_mask & SLAVE_IO)) {
      /*
        若是咱们要启动IO线程,咱们须要考虑经过启动从机提供的选项。
      */
      if (thread_mask & SLAVE_IO) {
        if (connection_param->user) { //设置用户
          mi->set_start_user_configured(true);
          mi->set_user(connection_param->user);
        }
        if (connection_param->password) { //设置密码
          mi->set_start_user_configured(true);
          mi->set_password(connection_param->password);
        }
        if (connection_param->plugin_auth) //设置受权插件
          mi->set_plugin_auth(connection_param->plugin_auth);
        if (connection_param->plugin_dir) //插件目录
          mi->set_plugin_dir(connection_param->plugin_dir);
      }

        //...

        //初始化设置
        int slave_errno = mi->rli->init_until_option(thd, master_param);
        if (slave_errno) {
          my_error(slave_errno, MYF(0));
          is_error = true;
        }

        if (!is_error) is_error = check_slave_sql_config_conflict(mi->rli);
      } else if (master_param->pos || master_param->relay_log_pos ||
                 master_param->gtid)
        push_warning(thd, Sql_condition::SL_NOTE, ER_UNTIL_COND_IGNORED,
                     ER_THD(thd, ER_UNTIL_COND_IGNORED));

      if (!is_error)
        //启动slave线程
        is_error =
            start_slave_threads(false /*need_lock_slave=false*/,
                                true /*wait_for_start=true*/, mi, thread_mask);
    } else {
      is_error = true;
      my_error(ER_BAD_SLAVE, MYF(0));
    }
  } else {
    /* 若是全部线程都已启动,则没有错误,只有一个警告 */
    push_warning_printf(
        thd, Sql_condition::SL_NOTE, ER_SLAVE_CHANNEL_WAS_RUNNING,
        ER_THD(thd, ER_SLAVE_CHANNEL_WAS_RUNNING), mi->get_channel());
  }

  /*
    若是有人试图启动,请清除启动信息,IO线程以免任何安全问题。
  */
  if (is_error && (thread_mask & SLAVE_IO) == SLAVE_IO) mi->reset_start_info();

  unlock_slave_threads(mi);

  mi->channel_unlock();

  return is_error;
}

  从如上代码能够得知调用“start slave;”时,会对线程作一些中止操做。而后进行一些设置后,调用start_slave_threads方法启动slave线程。而后start_slave_threads是一个比较关键的方法。

  那么接下来看一下start_slave_threads方法,实现以下:

bool start_slave_threads(bool need_lock_slave, bool wait_for_start,
                         Master_info *mi, int thread_mask) {
  mysql_mutex_t *lock_io = nullptr, *lock_sql = nullptr,
                *lock_cond_io = nullptr, *lock_cond_sql = nullptr;
  mysql_cond_t *cond_io = nullptr, *cond_sql = nullptr;
  bool is_error = false;
  DBUG_TRACE;
  DBUG_EXECUTE_IF("uninitialized_master-info_structure", mi->inited = false;);

  //...
  
  if (thread_mask & SLAVE_IO)  //判断是否支持SLAVE_IO
    is_error = start_slave_thread(
#ifdef HAVE_PSI_THREAD_INTERFACE
        key_thread_slave_io,
#endif
        handle_slave_io, lock_io, lock_cond_io, cond_io, &mi->slave_running,
        &mi->slave_run_id, mi);  //调用handle_slave_io方法
        
  if (!is_error && (thread_mask & SLAVE_SQL)) { //判断是否支持SLAVE_SQL
   
    //...
    if (!is_error)
      is_error = start_slave_thread(
#ifdef HAVE_PSI_THREAD_INTERFACE
          key_thread_slave_sql,
#endif
          handle_slave_sql, lock_sql, lock_cond_sql, cond_sql,
          &mi->rli->slave_running, &mi->rli->slave_run_id, mi); //调用handle_slave_sql方法
    if (is_error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO,
                              rpl_stop_slave_timeout, need_lock_slave);
  }
  return is_error;
}

  经过如上方法能够得知thread_mask掩码是用于判断是否支持SLAVE_IO与支持SLAVE_SQL。若是支持则线程调用对应的方法。由于考虑到主题为主从关系创建,这里主要关注一下handle_slave_io方法。
312.png
<center>图3-1-2 主从同步协议</center>
  如图3-1-2中IO Thread与SQL Thread其实就是对应handle_slave_io方法与handle_slave_sql方法。

3.2 slave端handle_slave_io方法

  handle_slave_io方法为创建主从的主要方法,其中包含了初始化slave线程、发起链接master、注册slave到master,发起COM_BINLOG_DUMP或COM_BINLOG_DUMP_GTID操做等,实现以下:

extern "C" void *handle_slave_io(void *arg) {
  //...
  my_thread_init();
  {
    //初始化slave线程
    if (init_slave_thread(thd, SLAVE_THD_IO)) {
      mysql_cond_broadcast(&mi->start_cond);
      mysql_mutex_unlock(&mi->run_lock);
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
                 ER_THD(thd, ER_SLAVE_FATAL_ERROR),
                 "Failed during slave I/O thread initialization ");
      goto err;
    }

    //...
    
    mysql_cond_broadcast(&mi->start_cond); //调用作唤醒操做

    
    //...
    
    //发起登录操做
    successfully_connected = !safe_connect(thd, mysql, mi);
    // we can get killed during safe_connect
#ifdef HAVE_SETNS
    if (mi->is_set_network_namespace()) {
      // Restore original network namespace used to be before connection has
      // been created
      successfully_connected =
          restore_original_network_namespace() | successfully_connected;
    }
#endif

    //...

    /*
      注册slave到master
    */
    THD_STAGE_INFO(thd, stage_registering_slave_on_master);
    if (register_slave_on_master(mysql, mi, &suppress_warnings)) {
      if (!check_io_slave_killed(thd, mi,
                                 "Slave I/O thread killed "
                                 "while registering slave on master")) {
        LogErr(ERROR_LEVEL, ER_RPL_SLAVE_IO_THREAD_CANT_REGISTER_ON_MASTER);
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
          goto err;
      } else
        goto err;
      goto connected;
    }

    //...
    
    while (!io_slave_killed(thd, mi)) {
      MYSQL_RPL rpl;

      THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
      if (request_dump(thd, mysql, &rpl, mi, &suppress_warnings)) { //发起dump指令
        LogErr(ERROR_LEVEL, ER_RPL_SLAVE_ERROR_REQUESTING_BINLOG_DUMP,
               mi->get_for_channel_str());
        if (check_io_slave_killed(thd, mi,
                                  "Slave I/O thread killed while \
requesting master dump") ||
            try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
                             reconnect_messages[SLAVE_RECON_ACT_DUMP]))
          goto err;
        goto connected;
      }
      //...
    }
  
  //...
  
  my_thread_end(); //线程结束
#if OPENSSL_VERSION_NUMBER < 0x10100000L
  ERR_remove_thread_state(0);
#endif /* OPENSSL_VERSION_NUMBER < 0x10100000L */
  my_thread_exit(nullptr); //退出线程
  return (nullptr);  // Avoid compiler warnings
}

  从代码中能够得知调用safe_connect进行slave对master的登录,具体登录能够协议能够看一下以前写的文章:https://blog.csdn.net/byxiaoyuonly/article/details/108212013

  而后又调用register_slave_on_master会发送COM_REGISTER_SLAVE指令进行把slave注册到master,再调用request_dump发起binlog_dump指令。

321.png
<center>图3-2-1 发送COM_BINLOG_DUMP</center>
  在request_dump发送指令其实支持COM_BINLOG_DUMP与COM_BINLOG_DUMP_GTID两种,可是具体发什么取决因而否开启gtid设置,如图3-2-1所示。

3.4 master创建实现

  经过对上面内容的了解,咱们得知register_slave_on_master会发起发起COM_REGISTER_SLAVE对把slave注册到master,而后调用request_dump发起binlog_dump指令。master指令处理以下:

bool dispatch_command(THD *thd, const COM_DATA *com_data,
                      enum enum_server_command command) {
  
  //...
  
  switch (command) {
    
    case COM_REGISTER_SLAVE: { //注册slave到master
      // TODO: access of protocol_classic should be removed
      if (!register_slave(thd, thd->get_protocol_classic()->get_raw_packet(),
                          thd->get_protocol_classic()->get_packet_length()))
        my_ok(thd);
      break;
    }
    
    //...
    
    case COM_BINLOG_DUMP_GTID: //binlog_dump_gtid
      // TODO: access of protocol_classic should be removed
      error = com_binlog_dump_gtid(
          thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
          thd->get_protocol_classic()->get_packet_length());
      break;
    case COM_BINLOG_DUMP: //binlog_dump
      // TODO: access of protocol_classic should be removed
      error = com_binlog_dump(
          thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
          thd->get_protocol_classic()->get_packet_length());
      break;
  }

  return error;
}

  注册slave到master过程能够看后续的数据包,这边能够接着看一下对应的com_binlog_dump方法实现:

bool com_binlog_dump(THD *thd, char *packet, size_t packet_length) {
  DBUG_TRACE;
  ulong pos;
  ushort flags = 0;
  const uchar *packet_position = (uchar *)packet;
  size_t packet_bytes_todo = packet_length;

  DBUG_ASSERT(!thd->status_var_aggregated);
  thd->status_var.com_other++;
  thd->enable_slow_log = opt_log_slow_admin_statements;
  if (check_global_access(thd, REPL_SLAVE_ACL)) return false;

  /*
    4 bytes is too little, but changing the protocol would break
    compatibility.  This has been fixed in the new protocol. @see
    com_binlog_dump_gtid().
  */
  READ_INT(pos, 4);
  READ_INT(flags, 2);
  READ_INT(thd->server_id, 4);

  DBUG_PRINT("info",
             ("pos=%lu flags=%d server_id=%d", pos, flags, thd->server_id));

  kill_zombie_dump_threads(thd);

  query_logger.general_log_print(thd, thd->get_command(), "Log: '%s'  Pos: %ld",
                                 packet + 10, (long)pos);
  mysql_binlog_send(thd, thd->mem_strdup(packet + 10), (my_off_t)pos, nullptr,
                    flags); //发送binlog

  unregister_slave(thd, true, true /*need_lock_slave_list=true*/);
  /* 若是咱们到了这里,线程须要终止 */
  return true;

error_malformed_packet:
  my_error(ER_MALFORMED_PACKET, MYF(0));
  return true;
}

  在mysql_binlog_send中其实调用Binlog_sender的run方法,sender.run()方法中又调用Binlog_sender::init 初始化检测、Binlog_sender::check_start_file() 检查文件等。最终调用Binlog_sender::send_binlog对从服务发送binlog,如图3-4-1所示。
341.png
<center>图3-4-1 send binlog</center>

4.主从创建过程当中数据包

41.jpg
<center>图4-1 主从创建过程数据包</center>
  经过图4-1能够得知在主从关系创建会发起以下操做:

#查询当前时间戳
SELECT UNIX_TIMESTAMP()

#查询master的serverid
SELECT @@GLOBAL.SERVER_ID

#设置心跳周期,单位为纳秒,其实只有30s。初始化心跳如图4-2
SET @master_heartbeat_period= 30000001024

#设置master_binlog_checksum
SET @master_binlog_checksum= @@global.binlog_checksum

#查询master_binlog_checksum
SELECT @master_binlog_checksum

#得到是否支持gtid
SELECT @@GLOBAL.GTID_MODE

#查询server uuid
SELECT @@GLOBAL.SERVER_UUID

#设置slave uuid
SET @slave_uuid= '2dc27df4-e143-11ea-b396-cc679ee1902b'

42.png
<center>图4-2 初始化心跳周期</center>

总结

  1. mysql复制模式分为三种: STATEMENT模式(SBR)、ROW模式(RBR)、MIXED模式(MBR)。
  2. mysql8.0.20默认ROW模式(RBR)。
  3. 发送binlog支持两种形式: COM_BINLOG_DUMP与COM_BINLOG_DUMP_GTID。
  4. 默认状况master_heartbeat_period为30秒,单位为纳秒。
  5. IO Thread与SQL Thread其实就是对应handle_slave_io方法与handle_slave_sql方法。