天不生仲尼,万古如长夜。在计算机科学中,也有一个划时代的发明,B树(多路平衡查找树)及其变体(B树,b*树,b+树);html
由德国科学家(鲁道夫·拜尔 Rudolf Bayer),美国科学家(爱德华·M·麦克特 Edward Meyers McCreight)于1970年共同发明;node
B树这种数据结构特别适合用于数据库与文件系统设计中,是人类精神财富的精华部分,B树不诞生,计算机在处理大数据量计算时会变得很是困难。mysql
基本上都是软件产品最底层的,最核心的功能。程序员
如:各类操做系统(windows,Linux,Mac)的文件系统索引,各类数据库(sqlserver、oracle、mysql、MongoDB、等等),算法
基本上大部分与大数据量读取有关的事务,多少都与B树家族有关,由于B树的优势太明显,特别是读取磁盘数据效率很是的高效,sql
查找效率O(log n),甚至在B+树中查询速度恒定,不管多少存储多少数据,查询任何一个速度都同样。简直就是天才的发明。数据库
在上世纪时期,计算机内存储器都很是的小,以KB为单位,比起如今动不动以G计算,简直小的可怜。windows
计算机运算数据时,数据是在内存中进行操做的,好比一些加减乘除、正删改查等。数组
举个简单的栗子:从一个数组 int a[1,2,3,4,5,6,7,8,9]中找出3,那很是简单;大概步骤以下:数据结构
一、在内存中初始化这个数组
二、获取数组指针遍历这个数组,查到3就完成
可是这个数组很大,好比包含1亿个数字怎么办?若是数组容量大大超过内存大小,那这种比较就不现实了。如今的作法都是把文件
数据存放在外存储器,好比磁盘,U盘,光盘;而后把文件分屡次的拷贝数据至内存进行操做。可是读取外存储器效率对比读取内存,
差距是很是大的,通常是百万级别的差距,差6个数量级,因此这个问题不解决一切都是空谈。
好在操做系统在设计之初,就对读取外存储器进行了必定的优化,引入了“逻辑块”概念,当作操做文件的最小单元,而B树合理地利用这个“逻辑块”
功能开发的高效存储数据结构;在介绍B树特性以前,先来了解一下磁盘的基本工做原理。
1)磁盘结构介绍
网上引用的两张图,将就看看,基本结构是:磁盘 > 盘面 > 磁道 > 扇区
左边是物理图,这个你们应该都是常常见到了,通常圆形的那部分有不少层,每一层叫盘片;右边的是示意图,表明左图的一个盘面。
每一个盘面有跟多环形的磁道,每一个磁道有若干段扇区组成,扇区是磁盘的最小组成单元,若干段扇区组成簇(也叫磁盘块、逻辑块等)
先看看我电脑的磁盘簇与扇区大小
能够看到个人E盘每一个扇区512个字节,每一个簇4096字节,这个先记下来,后边有用到
扇区是磁盘组成的最小单元,簇是虚拟出来的,主要是为了操做系统方便读写磁盘;因为扇区比较小,数量很是多,
在寻址比较麻烦,操做系统就将相邻的几个扇区组合在一块儿,造成簇,再以簇为每次操做文件的最小单元。好比加载一个磁盘文件内容,
操做系统是分批次读取,每次只拷贝一个簇的单位数据,个人电脑就是一次拷贝4096字节,知道文件所有拷贝完成。
2)读写速度
磁盘读取时间是毫秒级别的通常几毫秒到十几毫秒之间,这个跟磁盘转速有点关系,还有就是数据所在磁道远近有关系;
CPU处理时间是纳秒级别,毫秒:纳秒 = 1:1000000,因此在程序设计中,读取文件是时间成本很是高的,应该尽可能合理设计;
B树(英语:B-tree)是一种自平衡的树,可以保持数据有序。这种数据结构可以让查找数据、顺序访问、插入数据及删除的动做,
都在对数时间内完成。B树,归纳来讲是一个通常化的二叉查找树(binary search tree)一个节点能够拥有最少2个子节点。
与自平衡二叉查找树不一样,B树适用于读写相对大的数据块的存储系统,例如磁盘。B树减小定位记录时所经历的中间过程,从而加快存取速度。
B树这种数据结构能够用来描述外部存储。这种数据结构常被应用在数据库和文件系统的实现上。
一个 m 阶的B树是一个有如下特性:
好吧,上边这一段看了等于没看的定义能够不看,这里有个重要的B树特性须要了解,就是B树的阶,对于阶的定义国内外是有分歧的,有的定义为度。
阶指的是节点的最大孩子数,度指的是节点的最小孩子数,我查阅了不少资料,基本上能够理解为:
1度 = 2阶,好比说3度B树,能够理解为6阶B树。这点有些疑问,有更好的说法的能够留言讨论一下。
1)内部节点:
内部节点是除叶子节点和根节点以外的全部节点。每一个内部节点拥有最多 U 个,最少 L 个子节点。元素的数量老是比子节点指针的数量少1。
U 必须等于 2L 或者 2L-1。这个L通常是度数。
2)根节点:根节点拥有的子节点数量的上限和内部节点相同,可是没有下限。
3)叶子节点:叶子节点对元素的数量有相同的限制,可是没有子节点,也没有指向子节点的指针。
4)为了分析方便举例3阶3层B树
图1
从上图中能够得出如下几个信息:
数据检索分析:
依据上图分析,由于整棵树已经在内存中,至关于一个变量,数据检索首先是从根节点开始;
1)若是要查询9,首先从根节点比较,那比较一次就获得结果,
2)若是要查询第二层的三、4,首先判断根节点键值,没有匹配到,可是能够判断要检索的键值比根节点小,
因此接下来是从左孩子树继续检索,十二、15也是相似,总共须要2次比较就获得结果
3)若是查询叶子节点键值,相似2),只须要3次比较就能获得结果。
4)对比普通的数组遍历查询,B树检索的时间成本没有随数据量增长而线性增长,效率大大提升。
前面已经提到,若是树已经在内存中,那固然好办,直接遍历就行了。若是B树仅仅如此,那也和数组差异不大,一样受限于内存大小;
因此,在内存中建立整棵B树是不现实的,这不是B树的正确打开方式。
前面也已经提到,操做系统加载磁盘文件的时候,若是文件超过簇大小(即4096个字节),那会分屡次的读取磁盘,直到拷贝数据完成。
这里看似一个加载动做,其实这个动做包含了N次磁盘寻址,而咱们已经知道,每次磁盘寻址直至拷贝数据开销是很是大的;是CPU指令耗时百万倍以上;
这种操做应该尽可能少地执行,而B树这种数据结构就是为了解决磁盘读取瓶颈这个问题而产生的。
实际应用中,B树会持久化到磁盘,而后只在内存保留一个根节点的指针。已上图1为例:
每一个节点大小恰好等于簇大小,这样只需一次磁盘IO就能够获取到一整个节点的全部键值,及其全部子树的指针。
好比,查询键值8:
1)第一步,读取根节点获得键值9,以及2个子树指针,分别指向左右孩子节点,由于9 > 8,因此下一步加载左孩子节点
2)第二部,加载节点2,获得键值三、6,以及3个子树指针,由于三、6 < 8,因此下一步要加载节点2的右孩子节点
3)第三部,加载节点6,获得键值七、8,由于是叶子节点因此没有子树指针,遍历键值匹配到8,返回。
总结:
在这个3阶3层的B树中,不管查找哪个键值,最多只须要3次磁盘操做,就算平均每次耗时10毫秒,总共须要耗时30毫秒(CPU运算耗时能够忽略);
以此类推,3阶4层的B树,须要读取4次磁盘,耗时40毫秒,5层50毫秒,6层60毫秒,7层,8层,,,,
这样一看貌似也没什么,几十毫秒已经不能说快了,可是别忘了咱们这颗树只有3阶,即一个节点保存2个键值。一个簇最多能有4096/4=1024个键值;
若是建立一个1024阶的B树,分别控制在三、四、5层的话,根据B树高度公式:,H为层数,T为1024,n为数据总数
耗时以下:
3阶3层:能容纳2147483648(20亿)个键值,检索耗时也将30毫秒内
3阶4层:能容纳2147483648(20亿) ~ 2199023255552(2兆亿)个键值,检索耗时也将40毫秒内,固然这已经超出键值表达范围了
3阶5层:难以想象。。。
固然实际运用当中达不到1024阶,由于树持久化到磁盘时,索引结构体通常都是超过4个字节,好比12个字节,那一个簇最多能有4096/12=341个键值。
若是阶数按341来算:
3阶3层:能容纳79303642(7千万)个键值,检索耗时也将30毫秒内
3阶4层:能容纳79303642(7千万) ~ 27042541922(200亿)个键值,检索耗时也将40毫秒内
也是很是多了。。
1)首先,咱们把B树基本信息定义出来
1 public class Consts 2 { 3 public const int M = 3; // B树的最小度数 4 public const int KeyMax = 2 * M - 1; // 节点包含关键字的最大个数 5 public const int KeyMin = M - 1; // 非根节点包含关键字的最小个数 6 public const int ChildMax = KeyMax + 1; // 孩子节点的最大个数 7 public const int ChildMin = KeyMin + 1; // 孩子节点的最小个数 8 }
先写个简单的demo,由于最小度数为3,那就是6阶。先实现几个简单的方法,新增,拆分,其他的合并,删除比较复杂之后有机会再看看
2)定义BTreeNode,B树节点
1 public class BTreeNode 2 { 3 private bool leaf; 4 public int[] keys; 5 public int keyNumber; 6 public BTreeNode[] children; 7 public int blockIndex; 8 public int dataIndex; 9 10 public BTreeNode(bool leaf) 11 { 12 this.leaf = leaf; 13 keys = new int[Consts.KeyMax]; 14 children = new BTreeNode[Consts.ChildMax]; 15 } 16 17 /// <summary>在未满的节点中插入键值</summary> 18 /// <param name="key">键值</param> 19 public void InsertNonFull(int key) 20 { 21 var index = keyNumber - 1; 22 23 if (leaf == true) 24 { 25 // 找到合适位置,而且移动节点键值腾出位置 26 while (index >= 0 && keys[index] > key) 27 { 28 keys[index + 1] = keys[index]; 29 index--; 30 } 31 32 // 在index后边新增键值 33 keys[index + 1] = key; 34 keyNumber = keyNumber + 1; 35 } 36 else 37 { 38 // 找到合适的子孩子索引 39 while (index >= 0 && keys[index] > key) index--; 40 41 // 若是孩子节点已满 42 if (children[index + 1].keyNumber == Consts.KeyMax) 43 { 44 // 分裂该孩子节点 45 SplitChild(index + 1, children[index + 1]); 46 47 // 分裂后中间节点上跳父节点 48 // 孩子节点已经分裂成2个节点,找到合适的一个 49 if (keys[index + 1] < key) index++; 50 } 51 52 // 插入键值 53 children[index + 1].InsertNonFull(key); 54 } 55 } 56 57 /// <summary>分裂节点</summary> 58 /// <param name="childIndex">孩子节点索引</param> 59 /// <param name="waitSplitNode">待分裂节点</param> 60 public void SplitChild(int childIndex, BTreeNode waitSplitNode) 61 { 62 var newNode = new BTreeNode(waitSplitNode.leaf); 63 newNode.keyNumber = Consts.KeyMin; 64 65 // 把待分裂的节点中的通常节点搬到新节点 66 for (var j = 0; j < Consts.KeyMin; j++) 67 { 68 newNode.keys[j] = waitSplitNode.keys[j + Consts.ChildMin]; 69 70 // 清0 71 waitSplitNode.keys[j + Consts.ChildMin] = 0; 72 } 73 74 // 若是待分裂节点不是也只节点 75 if (waitSplitNode.leaf == false) 76 { 77 for (var j = 0; j < Consts.ChildMin; j++) 78 { 79 // 把孩子节点也搬过去 80 newNode.children[j] = waitSplitNode.children[j + Consts.ChildMin]; 81 82 // 清0 83 waitSplitNode.children[j + Consts.ChildMin] = null; 84 } 85 } 86 87 waitSplitNode.keyNumber = Consts.KeyMin; 88 89 // 拷贝通常键值到新节点 90 for (var j = keyNumber; j >= childIndex + 1; j--) 91 children[j + 1] = children[j]; 92 93 children[childIndex + 1] = newNode; 94 for (var j = keyNumber - 1; j >= childIndex; j--) 95 keys[j + 1] = keys[j]; 96 97 // 把中间键值上跳至父节点 98 keys[childIndex] = waitSplitNode.keys[Consts.KeyMin]; 99 100 // 清0 101 waitSplitNode.keys[Consts.KeyMin] = 0; 102 103 // 根节点键值数自加 104 keyNumber = keyNumber + 1; 105 } 106 107 /// <summary>根据节点索引顺序打印节点键值</summary> 108 public void PrintByIndex() 109 { 110 int index; 111 for (index = 0; index < keyNumber; index++) 112 { 113 // 若是不是叶子节点, 先打印叶子子节点. 114 if (leaf == false) children[index].PrintByIndex(); 115 116 Console.Write("{0} ", keys[index]); 117 } 118 119 // 打印孩子节点 120 if (leaf == false) children[index].PrintByIndex(); 121 } 122 123 /// <summary>查找某键值是否已经存在树中</summary> 124 /// <param name="key">键值</param> 125 /// <returns></returns> 126 public BTreeNode Find(int key) 127 { 128 int index = 0; 129 while (index < keyNumber && key > keys[index]) index++; 130 131 // 该key已经存在, 返回该索引位置节点 132 if (keys[index] == key) return this; 133 134 // key 不存在,而且节点是叶子节点 135 if (leaf == true) return null; 136 137 // 递归在孩子节点中查找 138 return children[index].Find(key); 139 } 140 }
3)B树模型
1 public class BTree 2 { 3 public BTreeNode Root { get; private set; } 4 5 public BTree() { } 6 7 /// <summary>根据节点索引顺序打印节点键值</summary> 8 public void PrintByIndex() 9 { 10 if (Root == null) 11 { 12 Console.WriteLine("空树"); 13 return; 14 } 15 16 Root.PrintByIndex(); 17 } 18 19 /// <summary>查找某键值是否已经存在树中</summary> 20 /// <param name="key">键值</param> 21 /// <returns></returns> 22 public BTreeNode Find(int key) 23 { 24 if (Root == null) return null; 25 26 return Root.Find(key); 27 } 28 29 /// <summary>新增B树节点键值</summary> 30 /// <param name="key">键值</param> 31 public void Insert(int key) 32 { 33 if (Root == null) 34 { 35 Root = new BTreeNode(true); 36 Root.keys[0] = key; 37 Root.keyNumber = 1; 38 return; 39 } 40 41 if (Root.keyNumber == Consts.KeyMax) 42 { 43 var newNode = new BTreeNode(false); 44 45 newNode.children[0] = Root; 46 newNode.SplitChild(0, Root); 47 48 var index = 0; 49 if (newNode.keys[0] < key) index++; 50 51 newNode.children[index].InsertNonFull(key); 52 Root = newNode; 53 } 54 else 55 { 56 Root.InsertNonFull(key); 57 } 58 } 59 }
4)新增20个无序键值,测试一下
1 var bTree = new BTree(); 2 3 bTree.Insert(4); 4 bTree.Insert(5); 5 bTree.Insert(6); 6 bTree.Insert(1); 7 bTree.Insert(2); 8 bTree.Insert(3); 9 bTree.Insert(10); 10 bTree.Insert(11); 11 bTree.Insert(12); 12 bTree.Insert(7); 13 bTree.Insert(8); 14 bTree.Insert(9); 15 bTree.Insert(13); 16 bTree.Insert(14); 17 bTree.Insert(18); 18 bTree.Insert(19); 19 bTree.Insert(20); 20 bTree.Insert(15); 21 bTree.Insert(16); 22 bTree.Insert(17); 23 24 Console.WriteLine("输出排序后键值"); 25 bTree.PrintByIndex();
5)运行
上文提到,B数不可能只存在内存而没法落地,那样没有意义。因此就须要将整棵树持久化到磁盘文件,而且还要支持快速地从磁盘文件中检索到键值;
要持久化就要考虑不少问题,像上边的简单示例是没有实际意义的,由于节点不可能只有键值与孩子树,还得有数据指针,存储位置等等,大概有如下一些问题:
问题比较多,很是麻烦。具体的过程就不列举了,如下展现如下修改后的B树模型。
一、先定义一个结构体
1 [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi, Pack = 1)] 2 public struct BlockItem 3 { 4 public int ChildBlockIndex; 5 public int Key; 6 public int DataIndex; 7 8 public BlockItem(int key, int dataIndex) 9 { 10 ChildBlockIndex = -1; 11 Key = key; 12 DataIndex = dataIndex; 13 } 14 }
结构体总共12字节,为了可以持久化整棵B树到磁盘,加入了ChildBlockIndex子孩子节点块索引,根据这个块索引在下一次重建子孩子树层级关系时就知道从
文件的那个位置开始读取;Key键值,DataIndex数据索引,数据索引也是一个文件位置记录,跟ChildBlockIndex差很少,这样检索到key后就知道从
文件哪一个位置获取真正的数据。为了更形象了解B树应用,我画了一个结构体的示意图:
0、总共3个节点,每一个节点由N个结构体组成,最末尾只有孩子指针,没有数据与键值
一、黄色为子树块索引,即ChildBlockIndex,指向这个子孩子树全部数据在文件中的位置
二、红色为键值,即Key,键值通常是惟一的,不容许重复
三、蓝色为数据块索引,即DataIndex,指向键值对应的数据在文件中的什么位置开始,而后读取一个结构体的长度便可
四、底下绿色的一块是数据指针指向的具体数据块
二、数据结构体
1 [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi, Pack = 1)] 2 public struct SDataTest 3 { 4 public int Idx; 5 public int Age; 6 public byte Sex; 7 8 [MarshalAs(UnmanagedType.ByValArray, SizeConst = 20)] 9 public byte[] Name; 10 11 public byte Valid; 12 };
三、B树节点类修改改一下,这个就不解释了,复习一下程序员基本功,啃代码。
1 public class BTreeNode 2 { 3 private BTree tree; 4 private bool leaf; 5 6 public int keyNumber; 7 public BlockItem[] keys; 8 public BTreeNode[] children; 9 10 public int blockIndex; 11 public int findIndex; 12 13 public BTreeNode(BTree tree, bool leaf) 14 { 15 this.tree = tree; 16 this.leaf = leaf; 17 keys = new BlockItem[Consts.KeyMax]; 18 children = new BTreeNode[Consts.ChildMax]; 19 blockIndex = Consts.BlockIndex++; 20 } 21 22 /// <summary>在未满的节点中插入键值</summary> 23 /// <param name="key">键值</param> 24 public void InsertNonFull(BlockItem item) 25 { 26 var index = keyNumber - 1; 27 28 if (leaf == true) 29 { 30 // 找到合适位置,而且移动节点键值腾出位置 31 while (index >= 0 && keys[index].Key > item.Key) 32 { 33 keys[index + 1] = keys[index]; 34 index--; 35 } 36 37 // 在index后边新增键值 38 keys[index + 1] = item; 39 keyNumber = keyNumber + 1; 40 } 41 else 42 { 43 // 找到合适的子孩子索引 44 while (index >= 0 && keys[index].Key > item.Key) index--; 45 46 // 若是孩子节点已满 47 if (children[index + 1].keyNumber == Consts.KeyMax) 48 { 49 // 分裂该孩子节点 50 SplitChild(index + 1, children[index + 1]); 51 52 // 分裂后中间节点上跳父节点 53 // 孩子节点已经分裂成2个节点,找到合适的一个 54 if (keys[index + 1].Key < item.Key) index++; 55 } 56 57 // 插入键值 58 children[index + 1].InsertNonFull(item); 59 } 60 } 61 62 /// <summary>分裂节点</summary> 63 /// <param name="childIndex">孩子节点索引</param> 64 /// <param name="waitSplitNode">待分裂节点</param> 65 public void SplitChild(int childIndex, BTreeNode waitSplitNode) 66 { 67 var newNode = new BTreeNode(tree, waitSplitNode.leaf); 68 newNode.keyNumber = Consts.KeyMin; 69 70 // 把待分裂的节点中的通常节点搬到新节点 71 for (var j = 0; j < Consts.KeyMin; j++) 72 { 73 newNode.keys[j] = waitSplitNode.keys[j + Consts.ChildMin]; 74 75 // 清0 76 waitSplitNode.keys[j + Consts.ChildMin] = default(BlockItem); 77 } 78 79 // 若是待分裂节点不是也只节点 80 if (waitSplitNode.leaf == false) 81 { 82 for (var j = 0; j < Consts.ChildMin; j++) 83 { 84 // 把孩子节点也搬过去 85 newNode.children[j] = waitSplitNode.children[j + Consts.ChildMin]; 86 87 // 清0 88 waitSplitNode.children[j + Consts.ChildMin] = null; 89 } 90 } 91 92 waitSplitNode.keyNumber = Consts.KeyMin; 93 94 for (var j = keyNumber; j >= childIndex + 1; j--) 95 children[j + 1] = children[j]; 96 97 children[childIndex + 1] = newNode; 98 99 for (var j = keyNumber - 1; j >= childIndex; j--) 100 keys[j + 1] = keys[j]; 101 102 // 把中间键值上跳至父节点 103 keys[childIndex] = waitSplitNode.keys[Consts.KeyMin]; 104 105 // 清0 106 waitSplitNode.keys[Consts.KeyMin] = default(BlockItem); 107 108 // 根节点键值数自加 109 keyNumber = keyNumber + 1; 110 } 111 112 /// <summary>根据节点索引顺序打印节点键值</summary> 113 public void PrintByIndex() 114 { 115 int index; 116 for (index = 0; index < keyNumber; index++) 117 { 118 // 若是不是叶子节点, 先打印叶子子节点. 119 if (leaf == false) children[index].PrintByIndex(); 120 121 Console.Write("{0} ", keys[index].Key); 122 } 123 124 // 打印孩子节点 125 if (leaf == false) children[index].PrintByIndex(); 126 } 127 128 /// <summary>查找某键值是否已经存在树中</summary> 129 /// <param name="item">键值</param> 130 /// <returns></returns> 131 public BTreeNode Find(BlockItem item) 132 { 133 findIndex = 0; 134 int index = 0; 135 while (index < keyNumber && item.Key > keys[index].Key) index++; 136 137 // 遍历所有都未找到,索引计数减1 138 if (index > 0 && index == keyNumber) index--; 139 140 // 该key已经存在, 返回该索引位置节点 141 if (keys[index].Key == item.Key) 142 { 143 findIndex = index; 144 return this; 145 } 146 147 // key 不存在,而且节点是叶子节点 148 if (leaf == true) return null; 149 150 // 重建children[index]数据结构 151 var childBlockIndex = keys[index].ChildBlockIndex; 152 tree.LoadNodeByBlock(ref children[index], childBlockIndex); 153 154 // 递归在孩子节点中查找 155 if (children[index] == null) return null; 156 return children[index].Find(item); 157 } 158 }
四、B树模型也要修改一下 ,不解释
1 public class BTree 2 { 3 private FileStream rwFS; 4 5 public BTreeNode Root; 6 7 public BTree(string fullName) 8 { 9 rwFS = new FileStream(fullName, FileMode.OpenOrCreate, FileAccess.ReadWrite); 10 11 // 建立10M的空间,用作索引存储 12 if (rwFS.Length == 0) 13 { 14 rwFS.SetLength(Consts.IndexTotalSize); 15 } 16 17 // 从数据文件重建根节点,内存只保存根节点 18 LoadNodeByBlock(ref Root, 0); 19 } 20 21 public void LoadNodeByBlock(ref BTreeNode node, int blockIndex) 22 { 23 var items = Helper.Read(rwFS,blockIndex); 24 if (items.Count > 0) 25 { 26 var isLeaf = items[0].ChildBlockIndex == Consts.NoChild; 27 28 node = new BTreeNode(this, isLeaf); 29 node.blockIndex = blockIndex; 30 node.keys = items.ToArray(); 31 node.keyNumber = items.Count; 32 } 33 } 34 35 /// <summary>根据节点索引顺序打印节点键值</summary> 36 public void PrintByIndex() 37 { 38 if (Root == null) 39 { 40 Console.WriteLine("空树"); 41 return; 42 } 43 44 Root.PrintByIndex(); 45 } 46 47 /// <summary>查找某键值是否已经存在树中</summary> 48 /// <param name="item">键值</param> 49 /// <returns></returns> 50 public BTreeNode Find(BlockItem item) 51 { 52 if (Root == null) return null; 53 54 return Root.Find(item); 55 } 56 public BTreeNode Find(int key) 57 { 58 return Find(new BlockItem() { Key = key }); 59 } 60 61 /// <summary>新增B树节点键值</summary> 62 /// <param name="item">键值</param> 63 private void Insert(BlockItem item) 64 { 65 if (Root == null) 66 { 67 Root = new BTreeNode(this, true); 68 Root.keys[0] = item; 69 Root.keyNumber = 1; 70 } 71 else 72 { 73 if (Root.keyNumber == Consts.KeyMax) 74 { 75 var newNode = new BTreeNode(this, false); 76 77 newNode.children[0] = Root; 78 newNode.SplitChild(0, Root); 79 80 var index = 0; 81 if (newNode.keys[0].Key < item.Key) index++; 82 83 newNode.children[index].InsertNonFull(item); 84 Root = newNode; 85 } 86 else 87 { 88 Root.InsertNonFull(item); 89 } 90 } 91 } 92 93 public void Insert(SDataTest data) 94 { 95 var item = new BlockItem() 96 { 97 Key = data.Idx 98 }; 99 100 var node = Find(item); 101 if (node != null) 102 { 103 Console.WriteLine("键值已经存在,info:{0}", item.Key); 104 return; 105 } 106 107 // 保存数据 108 item.DataIndex = Helper.InsertData(rwFS, data); 109 110 // 保存索引 111 if (item.DataIndex >= 0) 112 Insert(item); 113 } 114 115 /// <summary>持久化整棵树</summary> 116 public void SaveIndexAll() 117 { 118 SaveIndex(Root); 119 } 120 121 /// <summary>持久化某节点如下的树枝</summary> 122 /// <param name="node">某节点</param> 123 public void SaveIndex(BTreeNode node) 124 { 125 var bw = new BinaryWriter(rwFS); 126 var keyItem = default(BlockItem); 127 128 // 第一层 129 var nodeL1 = node; 130 if (nodeL1 == null) return; 131 132 for (var i = 0; i <= nodeL1.keyNumber; i++) 133 { 134 keyItem = default(BlockItem); 135 if (i < nodeL1.keyNumber) keyItem = nodeL1.keys[i]; 136 137 SaveIndex(bw, 0, i, nodeL1.children[i], keyItem); 138 139 // 第二层 140 var nodeL2 = nodeL1.children[i]; 141 if (nodeL2 == null) continue; 142 143 for (var j = 0; j <= nodeL2.keyNumber; j++) 144 { 145 keyItem = default(BlockItem); 146 if (j < nodeL2.keyNumber) keyItem = nodeL2.keys[j]; 147 148 SaveIndex(bw, nodeL2.blockIndex, j, nodeL2.children[j], keyItem); 149 150 // 第三层 151 var nodeL3 = nodeL2.children[j]; 152 if (nodeL3 == null) continue; 153 154 for (var k = 0; k <= nodeL3.keyNumber; k++) 155 { 156 keyItem = default(BlockItem); 157 if (k < nodeL3.keyNumber) keyItem = nodeL3.keys[k]; 158 159 SaveIndex(bw, nodeL3.blockIndex, k, nodeL3.children[k], keyItem); 160 161 // 第四层 162 var nodeL4 = nodeL3.children[k]; 163 if (nodeL4 == null) continue; 164 165 for (var l = 0; l <= nodeL4.keyNumber; l++) 166 { 167 keyItem = default(BlockItem); 168 if (l < nodeL4.keyNumber) keyItem = nodeL4.keys[l]; 169 170 SaveIndex(bw, nodeL4.blockIndex, l, nodeL4.children[l], keyItem); 171 172 // 第五层 173 var nodeL5 = nodeL4.children[l]; 174 if (nodeL5 == null) continue; 175 176 for (var z = 0; z <= nodeL5.keyNumber; z++) 177 { 178 keyItem = default(BlockItem); 179 if (z < nodeL5.keyNumber) keyItem = nodeL5.keys[z]; 180 181 SaveIndex(bw, nodeL5.blockIndex, z, nodeL5.children[z], keyItem); 182 } 183 } 184 } 185 } 186 } 187 } 188 private void SaveIndex(BinaryWriter bw, int blockIndex, int num, BTreeNode node, BlockItem item) 189 { 190 bw.Seek((blockIndex * Consts.BlockSize) + (num * Consts.IndexSize), SeekOrigin.Begin); 191 bw.Write(node == null ? Consts.NoChild : node.blockIndex); 192 bw.Write(item.Key); 193 bw.Write(item.DataIndex); 194 bw.Flush(); 195 } 196 197 public SDataTest LoadData(int dataIndex) 198 { 199 return Helper.Load(rwFS, dataIndex); 200 } 201 }
五、写测试
1 private static void InsertTest(ref BTree bTree) 2 { 3 // 新增测试数据 4 for (int i = 1; i <= Consts.TotalKeyNumber; i++) 5 { 6 bTree.Insert(new SDataTest() 7 { 8 Idx = i, 9 Age = i, 10 Sex = 1, 11 Name = Helper.Copy("Name(" + i.ToString() + ")", 20), 12 Valid = 1 13 }); 14 } 15 16 Console.WriteLine("测试数据添加完毕,共新增{0}条数据", Consts.TotalKeyNumber); 17 }
六、读测试
1 private static void FindTest(ref BTree bTree) 2 { 3 var count = 0; 4 5 // 校验数据查找 6 for (int i = 1; i <= Consts.TotalKeyNumber; i++) 7 { 8 var node = bTree.Find(i); 9 if (node == null) 10 { 11 //Console.WriteLine("未找到{0}", i); 12 continue; 13 } 14 15 //Console.WriteLine("findIndex:{0},key:{1},dataIndex:{2}", node.findIndex, node.keys[node.findIndex].Key, node.keys[node.findIndex].DataIndex); 16 17 count++; 18 if (count % 10000 == 0) 19 { 20 var data = bTree.LoadData(node.keys[node.findIndex].DataIndex); 21 var name = Encoding.Default.GetString(data.Name).TrimEnd('\0'); 22 Console.WriteLine("Idx:{0},Age:{1},Sex:{2},Name:{3},Valid:{4}", data.Idx, data.Age, data.Sex, name, data.Valid); 23 } 24 } 25 26 Console.WriteLine("有效数据个数:{0}", count); 27 }
七、最后测试一下
八、测试查询时间
1 private static void CheckLoadTime(ref BTree bTree, int key) 2 { 3 var start = DateTime.Now; 4 var node = bTree.Find(key); 5 if (node == null) return; 6 7 Console.WriteLine("查找{0},耗时:{1}", key.ToString(), (DateTime.Now - start).TotalMilliseconds.ToString()); 8 9 var data = bTree.LoadData(node.keys[node.findIndex].DataIndex); 10 var name = Encoding.Default.GetString(data.Name).TrimEnd('\0'); 11 Console.WriteLine("Idx:{0},Age:{1},Sex:{2},Name:{3},Valid:{4}", data.Idx, data.Age, data.Sex, name, data.Valid); 12 Console.WriteLine(); 13 }
1 CheckLoadTime(ref bTree, 1000); 2 CheckLoadTime(ref bTree, 10000); 3 CheckLoadTime(ref bTree, 50000); 4 CheckLoadTime(ref bTree, 100000);
九、从新生成10000000条数据,测试查询效率
1 CheckLoadTime(ref bTree, 100000); 2 CheckLoadTime(ref bTree, 1000000); 3 CheckLoadTime(ref bTree, 3000000); 4 CheckLoadTime(ref bTree, 5000000); 5 CheckLoadTime(ref bTree, 8000000); 6 CheckLoadTime(ref bTree, 10000000);
全是1毫秒内返回,数据检索效率很是高,
实际上最初在学校潦草学了一遍【数据结构】以后,工做那么多年都用不着这方面的知识点,早就忘得一干二净了。
从新引发我兴趣的是2017年下半年,当时一个项目须要用到共享内存做为快速读写数据的底层核心功能。在设计共享内存存储关系时,
就遇到了索引的快速检索要求,第一次是顺序检索,当数据量达到5万以上时系统就崩了,检索速度太慢;后来改成二分查找法,轻松达到20万数据;
达到20万后就差很少到了单机处理性能瓶颈了,由于CPU不够用,除了检索还须要作其余的业务计算;
那时候就一直在搜索快速查找的各类算法,什么快速排序算法、堆排序算法、归并排序、二分查找算法、DFS(深度优先搜索)、BFS(广度优先搜索),
基本上都了解了一遍,可是看得头疼,没去实践。最后看到树结构,引发我很大兴趣,就是园友nullzx的这篇:B+树在磁盘存储中的应用,
这让我了解到原来数据库是这样读写的,这颇有意思,得造个轮子本身试一次。
粗陋仓促写成,恐怕有不少地方有漏洞,因此若是文中有错误的地方,欢迎留言讨论,可是拒绝一波流的吐槽,我但是会删低级评论的。