1.概述
LevelDB(http://code.google.com/p/leveldb/)是google开源的Key/Value存储系统,它的committer阵容相当强大,基本上是bigtable的原班人马,包括像jeff dean这样的大牛,它的代码合设计非常具有借鉴意义,是一种典型的LSM Tree的KV引擎的实现,从它的数据结构来看,基本就是sstable的开源实现,而且针对各种平台作了port,目前被用在chrome等项目中。
2. LSM Tree
LevelDB是典型的Log-Structured-Merge Tree的实现,它通过延迟写入以及Write Log Ahead技术来加速数据的写入并保障数据的安全。LevelDB的每个数据文件(sstable)中的记录都是按照Key的顺序进行排序的,但是随机写入时,key的到来是无序的,因此难以将记录插入到其排序位置。于是需要它采取一种延迟写入的方式,批量攒集一定量的数据,将它们在内存中排好序,一次性写入到磁盘中。但是这期间一旦系统断电或其他异常,则可能导致数据丢失,因此需要将数据先写入到log的文件中,这样便将随机写转化为追加写入,对于磁盘性能会有很大提升,如果进程发生中断,重启后可以根据log恢复之前写入的数据。
2.1 Write Batch
std::string key1,key2,value; leveldb::Status s; s = db->Put(leveldb::WriteOptions(),key1,value); s = db->Delete(leveldb::WriteOptions(),key2);
std::string key1,value; leveldb::WriteBatch batch; batch.Delete(key1); batch.Put(key2,value); leveldb::status s = db->Write(leveldb::WriteOptions(),&batch);
2.2 Log Format
class PosixMmapFile : public WritableFile { private: std::string filename_; // 文件名称 int fd_; // 文件句柄 size_t page_size_; // size_t map_size_; // 内存映射的区域大小 char* base_; // 内存映射区域的起始地址 char* limit_; // 内存映射区域的结束地址 char* dst_; // 最后一次占用的内存的结束地址 char* last_sync_; // 最后一次同步到磁盘的结束地址 uint64_t file_offset_; // 当前文件的偏移值 bool pending_sync_; // 延迟同步的标志 public: PosixMmapFile(const std::string& fname,int fd,size_t page_size) : filename_(fname),fd_(fd),page_size_(page_size),map_size_(Roundup(65536,page_size)),base_(NULL),limit_(NULL),dst_(NULL),last_sync_(NULL),file_offset_(0),pending_sync_(false) { assert((page_size & (page_size - 1)) == 0); } ~PosixMmapFile() { if (fd_ >= 0) { PosixMmapFile::Close(); } } Status Append(const Slice& data) { const char* src = data.data(); size_t left = data.size(); while (left > 0) { // 计算上次最后一次申请的区域的剩余容量,如果已完全耗尽, // 则卸载当前区域,申请一个新的区域 size_t avail = limit_ - dst_; if (avail == 0) { if (!UnmapCurrentRegion() || !MapNewRegion()) { return IOError(filename_,errno); } } // 填充当前区域的剩余容量 size_t n = (left <= avail) ? left : avail; memcpy(dst_,src,n); dst_ += n; src += n; left -= n; } return Status::OK(); } Status PosixMmapFile::Close() { Status s; size_t unused = limit_ - dst_; if (!UnmapCurrentRegion()) { s = IOError(filename_,errno); } else if (unused > 0) { // 关闭时将文件没有使用用的空间truncate掉 if (ftruncate(fd_,file_offset_ - unused) < 0) { s = IOError(filename_,errno); } } if (close(fd_) < 0) { if (s.ok()) { s = IOError(filename_,errno); } } fd_ = -1; base_ = NULL; limit_ = NULL; return s; } virtual Status Sync() { Status s; if (pending_sync_) { // 上个区域也有数据未同步,则先同步数据 pending_sync_ = false; if (fdatasync(fd_) < 0) { s = IOError(filename_,errno); } } if (dst_ > last_sync_) { // 计算未同步数据的起始与结束地址,同步时,起始地址按page_size_向下取整, // 结束地址向上取整,保证每次同步都是同步一个或多个page size_t p1 = TruncateToPageBoundary(last_sync_ - base_); size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1); // 如果刚好为整数个page_size_,由于下面同步时必然会加一个page_size_,所以这里可以减去1 last_sync_ = dst_; if (msync(base_ + p1,p2 - p1 + page_size_,MS_SYNC) < 0) { s = IOError(filename_,errno); } } return s; } private: // 将x按y向上对齐 static size_t Roundup(size_t x,size_t y) { return ((x + y - 1) / y) * y; } // 将s按page_size_向下对齐 size_t TruncateToPageBoundary(size_t s) { s -= (s & (page_size_ - 1)); assert((s % page_size_) == 0); return s; } // 卸载当前映射的内存区域 bool UnmapCurrentRegion() { bool result = true; if (base_ != NULL) { if (last_sync_ < limit_) { // 如果当前页没有完全被同步,则标明本文件需要被同步,下次调用Sync()方法时会将本页中未同步的数据同步到磁盘 pending_sync_ = true; } if (munmap(base_,limit_ - base_) != 0) { result = false; } file_offset_ += limit_ - base_; base_ = NULL; limit_ = NULL; last_sync_ = NULL; dst_ = NULL; // 使用翻倍的策略增加下次申请区域的大小,最大到1MB if (map_size_ < (1<<20)) { map_size_ *= 2; } } return result; } bool MapNewRegion() { assert(base_ == NULL); // 申请一个新的区域时,上一个申请的区域必须已经卸载 // 先将文件扩大 if (ftruncate(fd_,file_offset_ + map_size_) < 0) { return false; } // 将新区域映射到文件 void* ptr = mmap(NULL,map_size_,PROT_READ | PROT_WRITE,MAP_SHARED,fd_,file_offset_); if (ptr == MAP_Failed) { return false; } base_ = reinterpret_cast<char*>(ptr); limit_ = base_ + map_size_; dst_ = base_; last_sync_ = base_; return true; } };
Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); size_t left = slice.size(); Status s; bool begin = true; do { const int leftover = kBlockSize - block_offset_; assert(leftover >= 0); if (leftover < kHeaderSize) { // 如果当前page的剩余长度小于7字节且大于0,则都填充'\0',并新起一个page if (leftover > 0) { assert(kHeaderSize == 7); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00",leftover)); } block_offset_ = 0; } // 计算page能否容纳整体日志,如果不能,则将日志切分为多条entry,插入不同的page中,type中注明该entry是日志的开头部分,中间部分还是结尾部分。 const size_t avail = kBlockSize - block_offset_ - kHeaderSize; const size_t fragment_length = (left < avail) ? left : avail; RecordType type; const bool end = (left == fragment_length); if (begin && end) { type = kFullType; // 本Entry保存完整的Batch } else if (begin) { type = kFirstType; // 本Entry只保存起始部分 } else if (end) { type = kLastType; // 本Entry只保存结束部分 } else { type = kMiddleType; // 本Entry保存Batch的中间部分,不含起始与结尾,有时可能需要保存多个middle } s = EmitPhysicalRecord(type,ptr,fragment_length); ptr += fragment_length; left -= fragment_length; begin = false; } while (s.ok() && left > 0); return s; } Status Writer::EmitPhysicalRecord(RecordType t,const char* ptr,size_t n) { assert(n <= 0xffff); assert(block_offset_ + kHeaderSize + n <= kBlockSize); // 填充记录头 char buf[kHeaderSize]; buf[4] = static_cast<char>(n & 0xff); buf[5] = static_cast<char>(n >> 8); buf[6] = static_cast<char>(t); // 计算crc uint32_t crc = crc32c::Extend(type_crc_[t],n); crc = crc32c::Mask(crc); EncodeFixed32(buf,crc); // 填充entry内容 Status s = dest_->Append(Slice(buf,kHeaderSize)); if (s.ok()) { s = dest_->Append(Slice(ptr,n)); if (s.ok()) { s = dest_->Flush(); } } block_offset_ += kHeaderSize + n; return s; }
2.3 Write Log Ahead
Status DBImpl::Write(const WriteOptions& options,WriteBatch* updates) { Status status; MutexLock l(&mutex_); // 锁定互斥体,同一时间只能有一个线程更新数据 LoggerId self; // 获取Logger的使用权,如果有其他线程拥有所有权,则等待至其释放所有权。 AcquireLoggingResponsibility(&self); status = MakeRoomForWrite(false); // May temporarily release lock and wait uint64_t last_sequence = versions_->LastSequence(); // 获取当前的版本号 if (status.ok()) { // 将当前版本号加1后作为本次更新的日志的版本, // 一次批量更新可能包含多个操作,这些操作都用一个版本有一个好处: // 本次更新的所有操作,要么都可见,要么都不可见,不存在一部分可见,另一部分不可见的情况。 WriteBatchInternal::SetSequence(updates,last_sequence + 1); // 但是本次更新可能有多个操作,跳过与操作数相等的版本号,保证不被使用 last_sequence += WriteBatchInternal::Count(updates); // 将batch写入log,然后应用到memtable中 { assert(logger_ == &self); mutex_.Unlock(); // 这里,可以解锁,因为在AcquireLoggingResponsibility()方法中已经获取了Logger的拥有权, // 其他线程即使获得了锁,但是由于&self != logger,其会阻塞在AcquireLoggingResponsibility()方法中。 // 将更新写入log文件,如果设置了每次写入进行sync,则将其同步到磁盘,这个操作可能比较长, // 防止了mutex_对象长期被占用,因为其还负责其他一些资源的同步 status = log_->AddRecord(WriteBatchInternal::Contents(updates)); if (status.ok() && options.sync) { status = logfile_->Sync(); } if (status.ok()) { // 成功写入了log后,才写入memtable status = WriteBatchInternal::InsertInto(updates,mem_); } // 重新锁定mutex_ mutex_.Lock(); assert(logger_ == &self); } // 更新版本号 versions_->SetLastSequence(last_sequence); } // 释放对logger的所有权,并通知等待的线程,然后解锁 ReleaseLoggingResponsibility(&self); return status; } // force参数表示强制新起一个memtable Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); assert(logger_ != NULL); bool allow_delay = !force; Status s; while (true) { if (!bg_error_.ok()) { // 后台线程存在问题,则返回错误,不接受更新 s = bg_error_; break; } else if ( allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { // 如果不是强制写入,而且level 0的sstable超过8个,则本次更新阻塞1毫秒, // leveldb将sstable分为多个等级,其中level 0中的不同表的key是可能重叠的, // 如果l0的sstable过多,会导致查询性能下降,这时需要适当降低更新速度,让 // 后台线程进行compaction操作,但是设计者不希望让某次写操作等待数秒, // 而是让每次更新操作分担延迟,即每次写操作阻塞1毫秒,平衡读写速率; // 另外,理论上这也能让compaction线程获得更多的cpu时间(当然, // 这是假定compaction与更新操作共享一个cpu时才有意义) mutex_.Unlock(); env_->SleepForMicroseconds(1000); allow_delay = false; // 最多延迟一次,下次不延迟 mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // 如果当前memtable已使用的空间小于write_buffer_size,则跳出,更新到当前memtable即可。 // 当force为true时,第一次循环会走后面else逻辑,切换了memtable后force被置为false, // 第二次循环时就可以在此跳出了 break; } else if (imm_ != NULL) { // 如果当前memtable已经超过write_buffer_size,且备用的memtable也在被使用,则阻塞更新并等待 bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // 如果当前memtable已使用的空间小于write_buffer_size,但是备用的memtable未被使用, // 则检查level 0的sstable个数,如超过12个,则阻塞更新并等待 Log(options_.info_log,"waiting...\n"); bg_cv_.Wait(); } else { // 否则,使用新的id新创建一个log文件,并将当前memtable切换为备用的memtable,新建一个 // memtable,然后将数据写入当前的新memtable,即切换log文件与memtable,并告诉后台线程 // 可以进行compaction操作了 assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* lfile = NULL; s = env_->NewWritableFile(LogFileName(dbname_,new_log_number),&lfile); if (!s.ok()) { break; } delete log_; delete logfile_; logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_; has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_); mem_->Ref(); force = false; // 下次判断可以不新建memtable了 MaybeScheduleCompaction(); } } return s; } void DBImpl::AcquireLoggingResponsibility(LoggerId* self) { while (logger_ != NULL) { logger_cv_.Wait(); } logger_ = self; } void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) { assert(logger_ == self); logger_ = NULL; logger_cv_.SignalAll(); }
2.4 Skip List
1. 看上图,假定我们链不存在record3,level0中,record2的下一条记录是record4,level1中,record2的下一条记录是record5。
2. 现在,我们插入一条记录record3,通过key的比较,我们定位到它应该在record2与record4之间。
3. 然后,我们按照下面的代码确定一条记录需要在跳表中建立几重索引:
template<typename Key,class Comparator> int SkipList<Key,Comparator>::RandomHeight() { // Increase height with probability 1 in kBranching static const unsigned int kBranching = 4; int height = 1; while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) { height++; } return height; }
按照上面的代码,我们可以得出,建立x级索引的概率是0.25 ^(x - 1) * 0.75,所以,建立1级索引的概率为75%,建立2级索引的概率为25%*75%=18.75%,...(个人感觉,google把分支因子定为4有点高了,这样在绝大多数情况下,跳表的高度都不大于3)。
4. 在level0 ~ level (x-1)中链表的合适位置插入record3,假定根据上面的公式,我们得到需要为record3建立2级索引,即x=2,因此需要在level0与level1中的链中插入record3:在level 0的链中,record3插在record2与record4之间,在level 1的链中,record3插入在record2与record5之间,形成了现在的索引结构,在查询一个记录时,可以从最高一级索引向下查找,节约比较次数。2.5 Record Format
Internal Key在比较时,按照下面的算法:
int InternalKeyComparator::Compare(const Slice& akey,const Slice& bkey) const { int r = user_comparator_->Compare(ExtractUserKey(akey),ExtractUserKey(bkey)); if (r == 0) { // 比较后面8个字节构造的整数,第一个字节的type为Least Significant Byte const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8); const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8); if (anum > bnum) // 注意:整数大反而key比较小 { r = -1; } else if (anum < bnum) { r = +1; } } return r; }
根据上面的算法,我们可以得知Internal Key的比较顺序:
1. 如果User Key不相等,则User Key比较小的记录的Internal Key也比较小,User Key默认采用字典序(lexicographic)进行比较,可以在建表参数中自定义comparator。 2. 如果type也相同,则比较Sequence Num,Sequence Num大的Internal Key比较小。 3. 如果Sequence Num相等,则比较Type,type为更新(Key Type=1)的记录比的type为删除(Key Type=0)的记录的Internal Key小。 在插入到跳表时,一般不会出现Internal Key相等的情况(除非在一个Batch中操作了同一条记录两次,这里会出现一种bug:在一个Write Batch中,先插入一条记录,然后删除这条记录,最后把这个Batch写入DB,会发现DB中这条记录存在。因此,不推荐在Batch中多次操作相同key的记录),User Key相同的记录插入跳表时,Sequence Num大的记录会排在前面。 设计Internal Key有个以下一些作用: 1. Level DB支持快照查询,即查询时指定快照的版本号,查询出创建快照时某个User Key对应的Value,那么可以组成这样一个Internal Key:Sequence=快照版本号,Type=1,User Key为用户指定Key,然后查询数据文件与内存,找到大于等于此Internal Key且User Key匹配的第一条记录即可(即Sequence Num小于等于快照版本号的第一条记录)。 2.如果查询最新的记录时,将Sequence Num设置为0xFFFFFFFFFFFFFF即可。因为我们更多的是查询最新记录,所以让Sequence Num大的记录排前面,可以在遍历时遇见第一条匹配的记录立即返回,减少往后遍历的次数。