[leveldb] 3.put/delete操做

0.导读数组

LevelPut的流程:bash

 

Put操做首先将操做记录写入log文件,而后写入memtable,返回写成功。总体来看是这样,可是会引起下面的问题:数据结构

1. 写log的时候是实时刷到磁盘的吗? 2. 写入的时候memtable过大了咋办? 3. 同时多个线程并发写咋办? ......

 

 

下面的分析中就会面对这些问题,有些答案很清晰,有些涉及到超级多细节。多线程

step by step

在开始以前,咱们知道Write操做是要记录到log文件中的。那么一个记录它的格式是怎样的呢?看图:并发

 

这里特别解释一下,Delete操做也是经过Put实现的,只是图中的类型字段是0,而正常Write的操做类型是1,由此区分写操做和删除操做!app


熟悉了LevelDB整个脉络以后, Put方法是至关简单的, 一章就能够解决. 数据删除和写入是一个概念, 删除就是写入特殊deletion marker; 批量(batch)和单条写入也是一个概念, 单条写入就是只有一条记录的batch. 整个流程很短, 基本上写个log就行了, 所以速度很快.

step 1

 
Put interface

很简单吧。这里讲解一下那三个参数:函数

  1. WriteOptions:提供一些写操做的配置项,例如要不要写log的时候立刻flush磁盘
  2. key和value就是对应的keyValue,slice只是做者本身封装的char数组存储数据而已。<b>大牛喜欢把全部东西都封装一下,赋予数据结构意义!</b>这在大的工程里面是颇有意义的,既方便操做,也方便思考(这样就不用思考底层的真实的char数组仍是啥)。

Put对于多线程的处理很是精妙, 主体在DBImpl::Write函数中.ui

插入:this

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;        //leveldb中无论单个插入仍是多个插入都是以WriteBatch的方式进行的
  batch.Put(key, value);
  return Write(opt, &batch);
}

一条记录包含以下内容: 
Type、Key、Value 
当要插入记录时,Type为kTypeValue,当要删除记录时,Type为kTypeDeletion,同时中每个batch都有一个对当前批处理记录信息的统计(sequence(8字节)和count(4字节),共12字节) 
因而可知,当咱们要删除一个数据时,并非直接从内存中删除,而是插入一条带有删除标志的记录编码

在本例中要插入数据:key=”lili”; value=”hihi”; 
由以前对WriterBatch的分析可知,获得的batch为: 
01 00 00 00 00 00 00 00 01 00 00 00 (前8字节表示是第一个batch,后4字节表示此batch中只有一条记录) 
01(kTypeValue) 04(Key.size) 6C 69 6C 69(lili) 04(value.size) 68 69 68 69(hihi) 
共12+1+1+4+1+4=23字节=0x17

Delete也相似,只是调用了WriteBatch 的 Delete(key), 这样再内部会以不一样的形式编码传递至下一步进行处理。具体的WriteBatch的实现和编码方式在稍后的文章中进行介绍。Delete和Put都调用了Write,,这里的Write是在DBImpl::Write中经过虚函数的形式实现对其调用的,咱们接着看Write的流程

 1 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
 2   // A begin
 3   Writer w(&mutex_);
 4   w.batch = my_batch;
 5   w.sync = options.sync;
 6   w.done = false;
 7   // A end
 8  
 9   // B begin
/*
mutex l上锁以后, 到了"w.cv.Wait()"的时候, 会先释放锁等待, 而后收到signal时再次上锁. 这段代码的做用就是多线程在提交任务的时候,
一个接一个push_back进队列. 但只有位于队首的线程有资格继续运行下去. 目的是把多个写请求合并成一个大batch提高效率.
*/
10 MutexLock l(&mutex_); 11 writers_.push_back(&w); 12 while (!w.done && &w != writers_.front()) { 13 w.cv.Wait(); 14 } 15 if (w.done) { 16 return w.status; 17 } 18 // B end 19 20 // May temporarily unlock and wait. 21 Status status = MakeRoomForWrite(my_batch == NULL); 22 uint64_t last_sequence = versions_->LastSequence(); 23 Writer* last_writer = &w; 24 if (status.ok() && my_batch != NULL) { // NULL batch is for compactions 25 WriteBatch* updates = BuildBatchGroup(&last_writer); 26 WriteBatchInternal::SetSequence(updates, last_sequence + 1); 27 last_sequence += WriteBatchInternal::Count(updates); 28 29 // Add to log and apply to memtable. We can release the lock 30 // during this phase since &w is currently responsible for logging 31 // and protects against concurrent loggers and concurrent writes 32 // into mem_. 33 { 34 mutex_.Unlock(); 35 status = log_->AddRecord(WriteBatchInternal::Contents(updates)); 36 bool sync_error = false; 37 if (status.ok() && options.sync) { 38 status = logfile_->Sync(); 39 if (!status.ok()) { 40 sync_error = true; 41 } 42 } 43 if (status.ok()) { 44 status = WriteBatchInternal::InsertInto(updates, mem_); 45 } 46 mutex_.Lock(); 47 if (sync_error) { 48 // The state of the log file is indeterminate: the log record we 49 // just added may or may not show up when the DB is re-opened. 50 // So we force the DB into a mode where all future writes fail. 51 RecordBackgroundError(status); 52 } 53 } 54 if (updates == tmp_batch_) tmp_batch_->Clear(); 55 56 versions_->SetLastSequence(last_sequence); 57 } 58 59 while (true) { 60 Writer* ready = writers_.front(); 61 writers_.pop_front(); 62 if (ready != &w) { 63 ready->status = status; 64 ready->done = true; 65 ready->cv.Signal(); 66 } 67 if (ready == last_writer) break; 68 } 69 70 // Notify new head of write queue 71 if (!writers_.empty()) { 72 writers_.front()->cv.Signal(); 73 } 74 75 return status; 76 }

因此从流程能够清晰的看到插入删除的流程主要为:

1. 将这条KV记录以顺序写的方式追加到log文件末尾;

2. 将这条KV记录插入内存中的Memtable中,在插入过程当中若是恰好后台进程在compaction会短暂停顿觉得后台进程compaction腾出时间及cpu

这里涉及到一次磁盘读写操做和内存SkipList的插入操做,可是这里的磁盘写时文件的顺序追加写入效率是很高的,因此并不会致使写入速度的下降;

并且从流程分析咱们知道,在插入(删除)过程当中若是多线程同时进行,那么这些操做将会将操做的同步设置相同的相邻的操做合并为一个批插入,这样可使整个系统的总吞吐量更大。因此一次插入记录操做只会等待一次磁盘文件追加写和内存SkipList插入操做,这是为什么leveldb写入速度如此高效的根本缘由。

  假设同时有w1, w2, w3, w4, w5, w6 并发请求写入。

  B部分代码让竞争到mutex资源的w1获取了锁。w1将它要写的数据添加到了writers_队列里去,此时队列只有一个w1, 从而其顺利的进行buildbatchgroup。当运行到34行时mutex_互斥锁释放,之因此这儿能够释放mutex_,是由于其它的写操做都不知足队首条件,进而不会进入log和memtable写入阶段。这时(w2, w3, w4, w5, w6)会竞争锁,因为B段代码中不知足队首条件,均等待并释放锁了。从而队列可能会如(w3, w5, w2, w4).

  继而w1进行log写入和memtable写入。 当w1完成log和memtable写入后,进入46行代码,则mutex_又锁住,这时B段代码中队列由于获取不到锁则队列不会修改。

  随后59行开始,w1被pop出来,因为ready==w, 而且ready==last_writer,因此直接到71行代码,唤醒了此时处于队首的w3.

      w3唤醒时,发现本身是队首,能够顺利的进行进入buildbatchgroup,在该函数中,遍历了目前全部的队列元素,造成一个update的batch,即将w3, w5, w2, w4合并为一个batch. 并将last_writer置为此时处于队尾的最后一个元素w4,34行代码运行后,由于释放了锁资源,队列可能随着dbimpl::write的调用而更改,如队列情况可能为(w3, w5, w2, w4, w6, w9, w8).

   35-45行的代码将w3, w5, w2, w4整个的batch写入log和memtable. 到65行,分别对w5, w2, w4进行了一次cond signal.当判断到完w4 == lastwriter时,则退出循环。72行则对队首的w6唤醒,从而按上述步骤依次进行下去。

  这样就造成了多个并发write 合并为一个batch写入log和memtable的机制。

相关文章
相关标签/搜索