大数据处理——硬盘级的缓冲队列和BitMap

同步博客地址 :http://blog.jerrylab.info/2016/05/dashujuchuliyingpanjidehuanchongduiliehebitmap/数组

  • 介绍

这篇文章中,Jerry 提到了因为抓取队列过大,Bitmap位数过多致使内存没法装下这两个数据结构的问题。所以我须要一种新的,可以将硬盘扩展为可用内存的实现方法,以便在 不花钱升级配置的状况下持久地运行TB级数据抓取任务。由此Jerry本身造了个硬盘级缓冲的轮子。这样,就不再用担忧因内存不够而没法判重或者是没法 调度的问题了。不过缺点也是显而易见的,运行速度会变慢。但这样至少给本来内存溢出的项目提供了一种可行的解决方案。缓存

  • 思路

仿 照高速缓存到内存的缓冲机制,将这种机制再现到硬盘与内存便可实现硬盘级的缓存。由此咱们设置了两种结构。对于BitMap来讲,因为它对硬盘是采起随机 访问的方式,即访问的内容和顺序是不固定的,所以咱们要仿照高速缓存中的快表对硬盘实现内存中哈希表缓存。对于队列来讲,因为它每次的存取都一定发生在队 首或者队尾,因此咱们只须要对队首的输出和队尾的输入作内存缓存便可。数据结构

具体的来讲,咱们把操做系统中的一个文件抽象地看做是一块连续的存储区域,抑或是一个超大的数组(虽然在物理结构上并非这样,但逻辑结构能够这样看)。所以能够对每个字节进行编号。因而乎,这些编号就能够看做是文件这个低速内存的地址了。ide

由 于硬盘访问延迟很是高,因此咱们须要合理的缓冲机制。对于BitMap,咱们把文件中的每个比特当作一个数存在与否的标志(相对应原先的版本,这些比特 是存在内存中的。在内存中,创建一个哈希表,将硬盘地址多对一地映射到哈希表的项。对最近读写的内容作缓存,即记录读写的真实位置和值。所以若是连续读写 相同的位置,就不须要屡次读取或写入硬盘。当哈希表的项发生冲突时,即对两个不一样的地址读写,但它们映射到的确是相同的哈希表项时,咱们将脏数据(仅在内 存中修改过的数据)写回到硬盘中。ui

对于,队列,考虑到硬盘读写连续地址的速度比较快,所以咱们每次都的在头尾读写指定的数据量到内存缓冲数组中,防止出现屡次读写小数据的事情。操作系统

  • 实现

硬盘级缓冲队列实现code

# from blog.Jerrylab.info
# 2016/5/6
# size -- the queue max size
# filename -- the file where the queue stored 
# ele -- the max size of one element of queue
# buffer -- the output input buffer size
# the class is safe in multiple thread environment
from Queue import Queue
import thread
import time
class HDQueue:
    def __init__(self,size,ele,filename,buffer=10000):
        self.buffer=buffer
        ele+=1
        self.ele=ele
        self.filename=filename
        self.size=size
        self.fo=open(filename,"wb")
        self.fs=(size+5)*ele+512;#512 head structure
        self.iq=Queue(maxsize=buffer)
        self.oq=Queue(maxsize=buffer)
        self.fo.truncate(self.fs)
        self.tail=1
        self.head=1
        self.dsize=0
        self.lpos=-111
        self.lock=thread.allocate_lock()
    def pos(self,index):
        return 512+index*self.ele
    def read(self,index):
        self.fi.seek(self.pos(index))
        i=self.fi.readline(self.ele)
        return i.split("\t")[0]
    def write(self,index,data):
        #print self.pos(index)," ",index
        pos=self.pos(index)
        if pos!=self.lpos+self.ele:
            self.fo.seek(pos)
        data=str(data)+"\t"
        self.fo.write(data)
        for i in xrange(self.ele-len(data)):
            self.fo.write(" ")
        self.lpos=pos
    def next(self,index):
        index+=1
        if (index>=self.size):
            index=0
        return index
    def maintain(self):
        self.lock.acquire()
        while not self.iq.empty():
            self.write(self.tail,self.iq.get())
            self.tail=self.next(self.tail)
            self.dsize+=1
        self.fo.flush()
        self.fi=open(self.filename,"rb")
        if self.oq.qsize()<self.buffer/10:
            while not self.oq.full() and self.dsize>0:
                self.dsize-=1
                self.oq.put(self.read(self.head))
                self.head=self.next(self.head)
        self.fi.close()
        self.lock.release()
    def put(self,data):
        data=str(data)
        if len(data)>self.ele:
            raise "Element Len Error"
        if self.iq.full():
            self.maintain()
        self.iq.put(data)
    def get(self):
        if (self.oq.empty()):
            self.maintain()
        if self.oq.empty():
            return None
        return self.oq.get()
    def empty(self):
        if self.iq.qsize()+self.oq.qsize()+self.dsize==0:
            return True
        else:
            return False
if __name__ == '__main__':
    s=HDQueue(1000,10,"hiqueue.txt",500)
    for i in xrange(2000):
        s.put(str(i))
        #k=s.get()
        #print k,len(k),s.head
    print "put finish"
    for i in xrange(2000):
        k=s.get()
        print k,len(k),s.head

硬盘级缓冲BitMap实现blog

#blog.jerrylab.info
#2016/5/6
#the class is unsafe in multiple thread environment 
#you need to ensure the method of set and get once at a time
#number -- the bitmap supported max number from 0
import os
import thread
class HDBitSet:
    def __init__(self,number,filename,buffer=100003):
        self.filename=filename
        self.fo=open(filename,"wb")
        self.fi=open(filename,"rb")
        self.change=False
        self.size=number
        self.buffer=buffer
        self.number=int(number/256+1)*32
        self.fo.truncate(self.number)
        self.mod=buffer
        self.hash=[-1]*buffer
        self.val=[0]*buffer
        self.number=number
    def read(self,block):
        if self.change:
            self.change=False
            self.fi.close()
            self.fo.flush()
            self.fi=open(self.filename,"rb")
        self.fi.seek(block)
        data=self.fi.readline(1)
        return  ord(data[0])
    def write(self,block,data):
        self.change=True
        self.fo.seek(block)
        self.fo.write(chr(data))
    def get(self,index):
        index=int(index)
        if index>self.size:
            return True
        block=int(index/8)
        m=index-block*8
        hs=block%self.buffer
        if self.hash[hs]==block:
            val=self.val[hs]
        else:
            val=self.read(block)
            self.hash[hs]=block
            self.val[hs]=val
        return bool(val&(1<<m))
    def set(self,index):
        index=int(index)
        f=True
        if index>self.size:
            return
        block=int(index/8)
        m=index-block*8
        hs=block%self.buffer
        if self.hash[hs]==block:
            val=self.val[hs]
        else:
            if self.hash[hs]>0:
                self.write(self.hash[hs],self.val[hs])
            val=self.read(block)
            self.hash[hs]=block
            f=False
        val=val|(1<<m)
        self.val[hs]=val
        return
if __name__ == '__main__':
    s=HDBitSet(1000000,"hibitset.txt",5)
    for i in xrange(15):
        print 1<<i
    for i in xrange(500):
        s.set(i)
    for i in xrange(51):
        s.set(i)
    print "set finish"    
    for i in xrange(100):
        if s.get(i+400):
            print "True",i
        else:
            print "False",i
相关文章
相关标签/搜索