经过租户id实现的SaaS方案

概况

项目开发到一半,用户忽然提出须要多个分公司共同使用,这种须要将系统设计成SaaS架构,将各个分公司的数据进行隔离。mysql

SaaS实现的方案

  • 独立数据库git

    每一个企业 独立的物理数据库,隔离性好,成本高。sql

  • 共享数据库、独立schemashell

    就是一台物理机,多个逻辑数据库,oracle叫作schema,mysql叫作database,每一个企业独立的schema。数据库

  • 共享数据库、数据库表(本次采用):express

    在表中添加“企业”或者“租户”字段区分是哪一个企业的数据。操做的时候根据“租户”字段去查询相应的数据。缓存

    优势: 全部租户使用同一数据库,因此成本低廉。安全

    缺点:隔离级别低,安全性低,须要在开发时加大对安全的开发量,数据备份和恢复最困难。mybatis

改造思路

  1. 本次采用共享数据库、数据库表的SaaS方案。改造时须要作如下工做:
  • 建立租户信息表。
  • 先要将全部的表添加租户id字段tenant_id。用于关联租户信息表。
  • tenant_id和原始表id建立联合主键。注意主键的顺序,原表主键必须在左边。
  • 将表修改成分区表。
  1. 改造后,在添加租户信息的时候,同时在全部表中添加该租户的分区,分区用于保存该租户的数据。
  2. 在后续增长记录时,须要tenant_id字段的值,在删改查中,都须要在where条件中以tenant_id为条件来操做某个租户的数据。

测试环境介绍

测试库中有5张表,我下文使用sys_log表进行测试。架构

sys_log的建表语句为:

CREATE TABLE `sys_log` (
  `log_id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
  `type` TINYINT(1) DEFAULT NULL COMMENT '类型',
  `content` VARCHAR(255) DEFAULT NULL COMMENT '内容',
  `create_id` BIGINT(18) DEFAULT NULL COMMENT '建立人ID',
  `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '建立时间',
  `tenant_id` INT NOT NULL,
  PRIMARY KEY (`log_id`,`tenant_id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='系统日志'
复制代码

表添加租户id字段

找出未添加租户id(tenant_id)字段的表。

SELECT 
    table_name 
  FROM
    INFORMATION_SCHEMA.TABLES
  WHERE table_schema = 'my'   -- my 是个人测试数据库名称
    AND table_name NOT IN 
    (SELECT 
      t.table_name 
    FROM
      (SELECT 
        table_name,
        column_name 
      FROM
        information_schema.columns 
      WHERE table_name IN 
        (SELECT 
          table_name 
        FROM
          INFORMATION_SCHEMA.TABLES 
        WHERE table_schema = 'my')) t 
    WHERE t.column_name = 'tenant_id') ;
复制代码

执行,找到两个符合条件的表,在数据库进行确认,确实表中没tenant_id字段。

建立租户信息表

仅供参考,用于保存租户信息

CREATE TABLE `t_tenant` (
  `tenant_id` varchar(40) NOT NULL DEFAULT 'c12dee54f652452b88142a0267ec74b7' COMMENT '租户id',
  `tenant_code` varchar(100) DEFAULT NULL COMMENT '租户编码',
  `name` varchar(50) DEFAULT NULL COMMENT '租户名称',
  `desc` varchar(500) DEFAULT NULL COMMENT '租户描述',
  `logo` varchar(255) DEFAULT NULL COMMENT '公司logo地址',
  `status` smallint(6) DEFAULT NULL COMMENT '状态1有效0无效',
  `create_by` varchar(100) DEFAULT NULL COMMENT '建立者',
  `create_time` datetime DEFAULT NULL COMMENT '建立时间',
  `last_update_by` varchar(100) DEFAULT NULL COMMENT '最后修改人',
  `last_update_time` datetime DEFAULT NULL COMMENT '最后修改时间',
  `street_address` varchar(200) DEFAULT NULL COMMENT '街道楼号地址',
  `province` varchar(20) DEFAULT NULL COMMENT '一级行政单位,如广东省,上海市等',
  `city` varchar(20) DEFAULT NULL COMMENT '城市, 如广州市,佛山市等',
  `district` varchar(20) DEFAULT NULL COMMENT '行政区,如番禺区,天河区等',
  `link_man` varchar(50) DEFAULT NULL COMMENT '联系人',
  `link_phone` varchar(50) DEFAULT NULL COMMENT '联系电话',
  `longitude` decimal(10,6) DEFAULT NULL COMMENT '经度',
  `latitude` decimal(10,6) DEFAULT NULL COMMENT '纬度',
  `adcode` varchar(8) DEFAULT NULL COMMENT '区域编码,用于经过区域id快速匹配后展现, 如广州是440100',
  PRIMARY KEY (`tenant_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='租户的基本信息表';
复制代码

将全部表添加tenant_id字段

DROP PROCEDURE IF EXISTS addColumn ;

DELIMITER $$

CREATE PROCEDURE addColumn () 
BEGIN
  -- 定义表名变量
  DECLARE s_tablename VARCHAR (100) ;
  /*显示表的数据库中的全部表
 SELECT table_name FROM information_schema.tables WHERE table_schema='databasename' Order by table_name ;
 */
  #显示全部
  DECLARE cur_table_structure CURSOR FOR 
  SELECT 
    table_name 
  FROM
    INFORMATION_SCHEMA.TABLES
  WHERE table_schema = 'my'     -- my = 个人测试数据库名称
    AND table_name NOT IN 
    (SELECT 
      t.table_name 
    FROM
      (SELECT 
        table_name,
        column_name 
      FROM
        information_schema.columns 
      WHERE table_name IN 
        (SELECT 
          table_name 
        FROM
          INFORMATION_SCHEMA.TABLES 
        WHERE table_schema = 'my')) t 
    WHERE t.column_name = 'tenant_id') ;
  DECLARE CONTINUE HANDLER FOR SQLSTATE '02000' SET s_tablename = NULL ;
  OPEN cur_table_structure ;
  FETCH cur_table_structure INTO s_tablename ;
  WHILE
    (s_tablename IS NOT NULL) DO SET @MyQuery = CONCAT(
      "alter table `",
      s_tablename,
      "` add COLUMN `tenant_id` INT not null COMMENT '租户id'"
    ) ;
    PREPARE msql FROM @MyQuery ;
    EXECUTE msql ;
    #USING @c; 
    FETCH cur_table_structure INTO s_tablename ;
  END WHILE ;
  CLOSE cur_table_structure ;
END $$

DELIMITER ;

#执行存储过程
CALL addColumn () ;
复制代码

实现表分区

实现的目标:在添加租户的时候实现对全部表添加分区

须要的条件:

  • 表必须是分区表,若是不是分区表,那么须要改为分区表。
  • tenant_id必须和原表log_id主键组成联合主键。

将表修改为分区表

表中添加分区有三种方式:

  • 建立临时分区表sys_log_copy,copy数据过来后,删除旧的sys_log,再将sys_log_copy修改成sys_log(本次采用,详见下文)
  • 直接将表修改成分区表,须要原表中无数据,不然没法成功:
-- 若是表中没数据,能够直接将表进行分区
ALTER TABLE sys_log PARTITION BY LIST COLUMNS (tenant_id)
(
    PARTITION a1 VALUES IN (1) ENGINE = INNODB,
    PARTITION a2 VALUES IN (2) ENGINE = INNODB,
    PARTITION a3 VALUES IN (3) ENGINE = INNODB
);
复制代码
  • 在分区表中添加新分区,须要表已是分区表,不然没法成功:
-- 已是分区表中添加分区
ALTER TABLE sys_log_copy ADD PARTITION
(
    PARTITION a4 VALUES IN (4) ENGINE = INNODB,
    PARTITION a5 VALUES IN (5) ENGINE = INNODB,
    PARTITION a6 VALUES IN (6) ENGINE = INNODB
);
复制代码

经过建立临时分区表的方式将原表转换成分区表

  1. 查看表建表语句:
SHOW CREATE TABLE `sys_log`;
复制代码
  1. 参考建表语句,建立copy表:
CREATE TABLE `sys_log_copy` (
  `log_id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
  `type` TINYINT(1) DEFAULT NULL COMMENT '类型',
  `content` VARCHAR(255) DEFAULT NULL COMMENT '内容',
  `create_id` BIGINT(18) DEFAULT NULL COMMENT '建立人ID',
  `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '建立时间',
  `tenant_id` INT NOT NULL,
  PRIMARY KEY (`log_id`,`tenant_id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='系统日志'
PARTITION BY LIST COLUMNS (tenant_id)
(
    PARTITION a1 VALUES IN (1) ENGINE = INNODB,
    PARTITION a2 VALUES IN (2) ENGINE = INNODB,
    PARTITION a3 VALUES IN (3) ENGINE = INNODB
);
复制代码

注意上文中的DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC

  • CHARSET=utf8mb4是由于utf8在mysql中是不健全的编码。
  • ROW_FORMAT=DYNAMIC是为了不因此长度过大后致使以下报错:
ERROR 1709 (HY000): Index column size too large. The maximum column size is 767 bytes.
复制代码

也能够在my.ini配置文件中设置为true解决这个问题,可是要重启数据库,会比较麻烦。

[mysqld]
innodb_large_prefix=true
复制代码
  1. 验证分区状况:
SELECT 
  partition_name part,
  partition_expression expr,
  partition_description descr,
  table_rows 
FROM
  information_schema.partitions 
WHERE TABLE_SCHEMA = SCHEMA() 
  AND TABLE_NAME = 'sys_log_copy' ;
复制代码

能够查看到添加的3个分区

  1. 将数据复制到copy表中
INSERT INTO `sys_log_copy` SELECT * FROM `sys_log`
复制代码
  1. 删除表sys_log,再修改sys_log_copy表中的名字为sys_log

编写自动建立分区的仓储过程

经过存储过程实现,在分区表中添加分区

DELIMITER $$

USE `my`$$

DROP PROCEDURE IF EXISTS `add_table_partition`$$

CREATE DEFINER=`root`@`%` PROCEDURE `add_table_partition`(IN _tenantId INT)
BEGIN
  DECLARE IS_FOUND INT DEFAULT 1 ;
  -- 用于记录游标中存在分区的表名
  DECLARE v_tablename VARCHAR (200) ;
  -- 用于缓存添加分区时候的sql
  DECLARE v_sql VARCHAR (5000) ;
  -- 分区名称定义
  DECLARE V_P_VALUE VARCHAR (100) DEFAULT CONCAT('P', REPLACE(_tenantId, '-', '')) ;
  DECLARE V_COUNT INT ;
  DECLARE V_LOONUM INT DEFAULT 0 ;
  DECLARE V_NUM INT DEFAULT 0 ;
  -- 定义游标,值是全部分区表的表名
  DECLARE curr CURSOR FOR 
  (SELECT 
    t.TABLE_NAME 
  FROM
    INFORMATION_SCHEMA.partitions t 
  WHERE TABLE_SCHEMA = SCHEMA() 
    AND t.partition_name IS NOT NULL 
  GROUP BY t.TABLE_NAME) ;
  -- 若是没影响的记录,程序也继续执行
  DECLARE CONTINUE HANDLER FOR NOT FOUND SET IS_FOUND=0;
  -- 获取上一步中的游标中获取到的表名的个数
  SELECT 
    COUNT(1) INTO V_LOONUM 
  FROM
    (SELECT 
      t.TABLE_NAME 
    FROM
      INFORMATION_SCHEMA.partitions t 
    WHERE TABLE_SCHEMA = SCHEMA() 
      AND t.partition_name IS NOT NULL 
    GROUP BY t.TABLE_NAME) A ;
  -- 只有在存在分区表的时候才打开游标
  IF V_LOONUM > 0 
  THEN -- 打开游标
  OPEN curr ;
  -- 循环
  read_loop :
  LOOP
    -- 声明结束的时候
    IF V_NUM >= V_LOONUM 
    THEN LEAVE read_loop ;
    END IF ;
    -- 取游标的值给变量
    FETCH curr INTO v_tablename ;
    -- 依次判断分区表是否存在改分区,若是不存在则添加分区
    SET V_NUM = V_NUM + 1 ;
    SELECT 
      COUNT(1) INTO V_COUNT 
    FROM
      INFORMATION_SCHEMA.partitions t 
    WHERE LOWER(T.TABLE_NAME) = LOWER(v_tablename) 
      AND T.PARTITION_NAME = V_P_VALUE 
      AND T.TABLE_SCHEMA = SCHEMA() ;
    IF V_COUNT <= 0 
    THEN SET v_sql = CONCAT(
      '  ALTER TABLE ',
      v_tablename,
      ' ADD PARTITION (PARTITION ',
      V_P_VALUE,
      ' VALUES IN(',
      _tenantId,
      ') ENGINE = INNODB) '
    ) ;
    SET @v_sql = v_sql ;
    -- 预处理须要执行的动态SQL,其中stmt是一个变量
    PREPARE stmt FROM @v_sql ;
    -- 执行SQL语句
    EXECUTE stmt ;
    -- 释放掉预处理段
    DEALLOCATE PREPARE stmt ;
    END IF ;
    -- 结束循环
  END LOOP read_loop;
  -- 关闭游标
  CLOSE curr ;
  END IF ;
END$$

DELIMITER ;
复制代码

调用存储过程测试

CALL add_table_partition (8) ;
复制代码
  • 若是表还不是分区表,那么调用存储过程会有以下报错:
错误代码: 1505
Partition management on a not partitioned table is not possible
复制代码

翻译出来的意思是:“在未分区的表上进行分区管理是不可能的”。

  • 可能会报错以下:
错误代码: 1329
No data - zero rows fetched, selected, or processed
复制代码

但若是经过查询下面的information_schema.partitions无误,那就是添加分区成功。

能够经过在定义游标后,打开游标以前,添加以下方式解决:

DECLARE CONTINUE HANDLER FOR NOT FOUND SET IS_FOUND=0;
复制代码
SELECT 
  partition_name part,
  partition_expression expr,
  partition_description descr,
  table_rows 
FROM
  information_schema.partitions 
WHERE TABLE_SCHEMA = SCHEMA() 
  AND TABLE_NAME = 'sys_log' ;
复制代码

经过mybatis调用存储过程

<select id="testProcedure" statementType="CALLABLE" useCache="false" parameterType="string">
        <![CDATA[
                call add_table_partition (
                        #{_tenantId,mode=IN,jdbcType=VARCHAR});
        ]]>
</select>
复制代码

若是您感受有收获,请点赞支持下,谢谢!
相关文章
相关标签/搜索