先建立一个用来测试的数据库和表,为了让插入数据更快,表中主键采用的是GUID,表中没有建立任何索引。GUID必然是比自增加要快的,由于你生成一个GUID算法所花的时间确定比你从数据表中从新查询上一条记录的ID的值而后再进行加1运算要少。而若是存在索引的状况下,每次插入记录都会进行索引重建,这是很是耗性能的。若是表中无可避免的存在索引,咱们能够经过先删除索引,而后批量插入,最后再重建索引的方式来提升效率。算法
create database CarSYS;
go
use CarSYS;
go
CREATE TABLE Product( Id UNIQUEIDENTIFIER PRIMARY KEY, NAME VARCHAR(50) NOT NULL, Price DECIMAL(18,2) NOT NULL )
咱们经过SQL脚原本插入数据,常见以下四种方式。sql
方式一:一条一条插入,性能最差,不建议使用。数据库
INSERT INTO Product(Id,Name,Price) VALUES(newid(),'牛栏1段',160); INSERT INTO Product(Id,Name,Price) VALUES(newid(),'牛栏2段',260); ......
方式二:insert bulk缓存
语法以下:并发
BULK INSERT [ [ 'database_name'.][ 'owner' ].]{ 'table_name' FROM 'data_file' } WITH ( [ BATCHSIZE [ = batch_size ] ], [ CHECK_CONSTRAINTS ], [ CODEPAGE [ = 'ACP' | 'OEM' | 'RAW' | 'code_page' ] ], [ DATAFILETYPE [ = 'char' | 'native'| 'widechar' | 'widenative' ] ], [ FIELDTERMINATOR [ = 'field_terminator' ] ], [ FIRSTROW [ = first_row ] ], [ FIRE_TRIGGERS ], [ FORMATFILE = 'format_file_path' ], [ KEEPIDENTITY ], [ KEEPNULLS ], [ KILOBYTES_PER_BATCH [ = kilobytes_per_batch ] ], [ LASTROW [ = last_row ] ], [ MAXERRORS [ = max_errors ] ], [ ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ], [ ROWS_PER_BATCH [ = rows_per_batch ] ], [ ROWTERMINATOR [ = 'row_terminator' ] ], [ TABLOCK ], )
相关参数说明:ide
BULK INSERT [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ] FROM 'data_file' [ WITH ( [ [ , ] BATCHSIZE = batch_size ] --BATCHSIZE指令来设置在单个事务中能够插入到表中的记录的数量 [ [ , ] CHECK_CONSTRAINTS ] --指定在大容量导入操做期间,必须检查全部对目标表或视图的约束。若没有 CHECK_CONSTRAINTS 选项,则全部 CHECK 和 FOREIGN KEY 约束都将被忽略,而且在此操做以后表的约束将标记为不可信。 [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ] --指定该数据文件中数据的代码页 [ [ , ] DATAFILETYPE = { 'char' | 'native'| 'widechar' | 'widenative' } ] --指定 BULK INSERT 使用指定的数据文件类型值执行导入操做。 [ [ , ] FIELDTERMINATOR = 'field_terminator' ] --标识分隔内容的符号 [ [ , ] FIRSTROW = first_row ] --指定要加载的第一行的行号。默认值是指定数据文件中的第一行 [ [ , ] FIRE_TRIGGERS ] --是否启动触发器 [ [ , ] FORMATFILE = 'format_file_path' ] [ [ , ] KEEPIDENTITY ] --指定导入数据文件中的标识值用于标识列 [ [ , ] KEEPNULLS ] --指定在大容量导入操做期间空列应保留一个空值,而不插入用于列的任何默认值 [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ] [ [ , ] LASTROW = last_row ] --指定要加载的最后一行的行号 [ [ , ] MAXERRORS = max_errors ] --指定容许在数据中出现的最多语法错误数,超过该数量后将取消大容量导入操做。 [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ] --指定数据文件中的数据如何排序 [ [ , ] ROWS_PER_BATCH = rows_per_batch ] [ [ , ] ROWTERMINATOR = 'row_terminator' ] --标识分隔行的符号 [ [ , ] TABLOCK ] --指定为大容量导入操做持续时间获取一个表级锁 [ [ , ] ERRORFILE = 'file_name' ] --指定用于收集格式有误且不能转换为 OLE DB 行集的行的文件。 )]
方式三:INSERT INTO xx select...高并发
INSERT INTO Product(Id,Name,Price) SELECT NEWID(),'牛栏1段',160 UNION ALL SELECT NEWID(),'牛栏2段',180 UNION ALL ......
方式四:拼接SQLsqlserver
INSERT INTO Product(Id,Name,Price) VALUES (newid(),'牛栏1段',160) ,(newid(),'牛栏2段',260) ......
在C#中经过ADO.NET来实现批量操做存在四种与之对应的方式。性能
#region 方式一 static void InsertOne() { Console.WriteLine("采用一条一条插入的方式实现"); Stopwatch sw = new Stopwatch(); using (SqlConnection conn = new SqlConnection(StrConnMsg)) //using中会自动Open和Close 链接。 { string sql = "INSERT INTO Product(Id,Name,Price) VALUES(newid(),@p,@d)"; conn.Open(); for (int i = 0; i < totalRow; i++) { using (SqlCommand cmd = new SqlCommand(sql, conn)) { cmd.Parameters.AddWithValue("@p", "商品" + i); cmd.Parameters.AddWithValue("@d", i); sw.Start(); cmd.ExecuteNonQuery(); Console.WriteLine(string.Format("插入一条记录,已耗时{0}毫秒", sw.ElapsedMilliseconds)); } if (i == getRow) { sw.Stop(); break; } } } Console.WriteLine(string.Format("插入{0}条记录,每{4}条的插入时间是{1}毫秒,预估总得插入时间是{2}毫秒,{3}分钟",
totalRow, sw.ElapsedMilliseconds, ((sw.ElapsedMilliseconds / getRow) * totalRow), GetMinute((sw.ElapsedMilliseconds / getRow * totalRow)), getRow)); } static int GetMinute(long l) { return (Int32)l / 60000; } #endregion
运行结果以下:测试
咱们会发现插入100w条记录,预计须要50分钟时间,每插入一条记录大概须要3毫秒左右。
#region 方式二 static void InsertTwo() { Console.WriteLine("使用Bulk插入的实现方式"); Stopwatch sw = new Stopwatch(); DataTable dt = GetTableSchema(); using (SqlConnection conn = new SqlConnection(StrConnMsg)) { SqlBulkCopy bulkCopy = new SqlBulkCopy(conn); bulkCopy.DestinationTableName = "Product"; bulkCopy.BatchSize = dt.Rows.Count; conn.Open(); sw.Start(); for (int i = 0; i < totalRow;i++ ) { DataRow dr = dt.NewRow(); dr[0] = Guid.NewGuid(); dr[1] = string.Format("商品", i); dr[2] = (decimal)i; dt.Rows.Add(dr); } if (dt != null && dt.Rows.Count != 0) { bulkCopy.WriteToServer(dt); sw.Stop(); } Console.WriteLine(string.Format("插入{0}条记录共花费{1}毫秒,{2}分钟", totalRow, sw.ElapsedMilliseconds, GetMinute(sw.ElapsedMilliseconds))); } } static DataTable GetTableSchema() { DataTable dt = new DataTable(); dt.Columns.AddRange(new DataColumn[] { new DataColumn("Id",typeof(Guid)), new DataColumn("Name",typeof(string)), new DataColumn("Price",typeof(decimal))}); return dt; } #endregion
运行结果以下:
插入100w条记录才8s多,是否是很溜。
打开Sqlserver Profiler跟踪,会发现执行的是以下语句:
insert bulk Product ([Id] UniqueIdentifier, [NAME] VarChar(50) COLLATE Chinese_PRC_CI_AS, [Price] Decimal(18,2))
从sqlserver 2008起开始支持TVPs。建立缓存表ProductTemp ,执行以下SQL。
CREATE TYPE ProductTemp AS TABLE( Id UNIQUEIDENTIFIER PRIMARY KEY, NAME VARCHAR(50) NOT NULL, Price DECIMAL(18,2) NOT NULL )
执行完成以后,会发如今数据库CarSYS下面多了一张缓存表ProductTemp
可见插入100w条记录共花费了11秒多。
此种方法在C#中有限制,一次性只能批量插入1000条,因此就得分段进行插入。
#region 方式四 static void InsertFour() { Console.WriteLine("采用拼接批量SQL插入的方式实现"); Stopwatch sw = new Stopwatch(); using (SqlConnection conn = new SqlConnection(StrConnMsg)) //using中会自动Open和Close 链接。 { conn.Open(); sw.Start(); for (int j = 0; j < totalRow / getRow;j++ ) { StringBuilder sb = new StringBuilder(); sb.Append("INSERT INTO Product(Id,Name,Price) VALUES"); using (SqlCommand cmd = new SqlCommand()) { for (int i = 0; i < getRow; i++) { sb.AppendFormat("(newid(),'商品{0}',{0}),", j*i+i); } cmd.Connection = conn; cmd.CommandText = sb.ToString().TrimEnd(','); cmd.ExecuteNonQuery(); } } sw.Stop(); Console.WriteLine(string.Format("插入{0}条记录,共耗时{1}毫秒",totalRow,sw.ElapsedMilliseconds)); } } #endregion
运行结果以下:
咱们能够看到大概花费了10分钟。虽然在方式一的基础上,性能有了较大的提高,可是显然仍是不够快。
总结:大数据批量插入方式一和方式四尽可能避免使用,而方式二和方式三都是很是高效的批量插入数据方式。其都是经过构建DataTable的方式插入的,而咱们知道DataTable是存在内存中的,因此当数据量特别特别大,大到内存中没法一次性存储的时候,能够分段插入。好比须要插入9千万条数据,能够分红9段进行插入,一次插入1千万条。而在for循环中直接进行数据库操做,咱们是应该尽可能避免的。每一次数据库的链接、打开和关闭都是比较耗时的,虽然在C#中存在数据库链接池,也就是当咱们使用using或者conn.Close(),进行释放链接时,其实并无真正关闭数据库链接,它只是让链接以相似于休眠的方式存在,当再次操做的时候,会从链接池中找一个休眠状态的链接,唤醒它,这样能够有效的提升并发能力,减小链接损耗。而链接池中的链接数,咱们都是能够配置的。