欲直接下载代码文件,关注咱们的公众号哦!查看历史消息便可!python
在机器学习里,咱们对时间序列数据作预处理的时候,常常会碰到一个问题:有多个时间序列存在多个表里,每一个表的的时间轴不彻底相同,要如何把这些表在时间轴上进行对齐,从而合并成一个表呢?尤为是当这些表都存在数据库里,并且超级超级大的时候,怎样才能更高效地处理呢?mysql
在上一篇文章中,已经介绍过了如何在Python中建立数据库链接以及对数据库进行增删改查、分组聚合以及批量读取和处理等操做。程序员
今天就以上面的问题为导向,手把手教你如何用Python一步步实现相应的功能。讲解的内容主要有:sql
如何实现两个有序序列的合并;数据库
延伸到两个时间序列数据的对齐;express
从数据库中自动循环分批读取数据。编程
须要掌握的主要编程技巧包括:服务器
用函数实现特定功能数据结构
用类对功能进行封装app
实现基本的迭代器
使用的工具及版本:Python3.7,MySQL8.0, Jupyter Notebook
本节主要介绍如何实现将2个有序(默认从小到大排序)序列合并成一个序列,同时介绍Python中基本的循环结构。
其实在Python中当然有相应的方法能够很容易地作到(例如集合的set.union()方法),这里之因此要本身实现,主要是要理解这种思想,为后文的功能实现作铺垫。
if 语句的语法结构为:
if boolean_expression1: #若是知足条件1,则执行suite1代码块 suite1 elif boolean_expression2: #若是知足条件2,则执行suite2代码块 suite2 else: #不然执行else_suite代码块 else_suite
其中elif
和else
为可选。
用于编写通用迭代结构
顶端测试为真时执行循环体,并会重复屡次测试直到为假后结束循环
while boolean_expression: #若是测试为真,则执行while_suite代码块(循环执行) while_suite else: #直到测试为假,则执行一遍else_suite代码块以后结束循环 else_suit
其中else
为可选。
通用的序列迭代器,用于遍历任何有序的序列对象内的元素
可用于字符串、元组、列表和其它的内置可迭代对象,以及经过类所建立的新对象
for expression in iterable: for_suite else: else_suite
其中else
为可选。
🍎tips1: for循环比while循环执行速度快的多,能用for的尽可能使用for
函数是python为了代码最大程度地重复利用和最小化冗余而提供的基本程序结构。
它可以将整块代码巧妙地隔离成易管理的一小块,把重复代码放在函数中,而不是进行大块的复制,这是一个程序员应该具有的基本技能。
使用def
语句定义函数,而且函数都会有一个返回值,默认为None,也能够用return
语句明确指定返回值。
语法格式:
def funtionName(parameters): #定义函数名,设置函数的参数 suite #函数体 return something
在Python中,函数是一个可调用对象,它有一个内置的方法,叫call
。
咱们在写程序的时候,会碰到一类错误:"xxx" object is not callable
,这就表示这个对象是不可调用的。
调用函数的方法也很简单,在函数名后面加小括号(),有参数的时候在括号中传入参数便可:funtionName(par1,..)
🍎tips2:python中定义函数名的时候,一般第一个单词均小写,第二个单词开始一般首字母大写,例如,printName,calculateSum
🍎tips3:写函数的时候,尽可能写得简单,功能尽量单一,不要写得又长又复杂
注:在 Python 中,list(列表)是最经常使用、最核心的数据结构之一,它是一种序列类型,能够接收各类类型的元素,也能够同时接收不一样类型的元素。此外,list 仍是一个可迭代对象。本文的演示多采用 list 结构组织数据。
假设有两个序列:a = [1,3,7,9,11], b = [3,4,7,8],怎么合并成一个序列?
思路:用第3个序列 c 记录结果,同时对 a、b 进行遍历,按必定的顺序依次将 a、b 中的元素添加到 c 中;遍历的方法是用指针进行索引。
a[0]=1, b[0]=3, a[0]<b[0]
→ 将 a[0] 添加到 c ,idx_a=idx_a+1
→ 此时 c=[1], idx_a=1, idx_b=0
a[1]=3, b[0]=3, a[0]=b[0]
→ 将 a[1]或b[0] 添加到 c ,idx_a=idx_a+1, idx_b=idx_b+1
→ 此时 c=[1,3], idx_a=2, idx_b=1;
a[2]=7, b[1]=4, a[0]>b[1]
→ 将b[1] 添加到 c , idx_b=idx_b+1
→ 此时 c=[1,3,4], idx_a=2, idx_b=2;
a[2]=7, b[2]=7, a[2]=b[2]
→ 将a[2]或b[2] 添加到 c , idx_a=idx_a+1, idx_b=idx_b+1
→ 此时 c=[1,3,4,7], idx_a=3, idx_b=3;
a[3]=9, b[3]=8, a[3]>b[3]
→ 将b[3] 添加到 c , idx_b=idx_b+1
→ 此时 c=[1,3,4,7,8], idx_a=3, idx_b=4;
idx_b=4超出了b的索引范围,及idx_b=len(b),但此时idx_a<len(a),因此将 a[idx_a:] 直接添加到 c
→ 此时c=[1,3,4,7,8,9,11],结束,输出结果c 。
def orderedListUnion(a, b): ''' 合并两个按从小到大排好序的序列a,b ''' # 设置循环初始值 idx_a = 0 idx_b = 0 c = [] # 声明变量len_a,len_b,指向序列a,b的长度,用来控制循环条件 len_a = len(a) len_b = len(b) while (idx_a < len_a) and (idx_b < len_b): #若两个元素相等,则将该元素添加到c,且两个idx同时右移: if a[idx_a] == b[idx_b]: c.append(a[idx_a]) idx_a += 1 idx_b += 1 #若不相等,取较小的元素,且较小元素的idx右移 elif a[idx_a] < b[idx_b]: c.append(a[idx_a]) idx_a += 1 else: c.append(b[idx_b]) idx_b += 1 # 当一个序列遍历结束后,跳出循环,将未遍历完的序列的剩余元素添加到c if idx_a == len_a: c = c + b[idx_b:] if idx_b == len_b: c = c + a[idx_a:] return c # 测试 a = [1,3,7,9,11] b = [3,4,7,8] print(orderedListUnion(a,b))
输出结果:
前面的练习仅仅做为热身,如今回到文章开头的问题,假设一个更具体场景:
在医院的ICU里,须要持续观察病人的各项生命指标。这些指标的采集频率每每是不一样的(例若有些指标隔几秒采集一个,有些几个小时采集一个,有些一天采集一个),并且有些是按期的,有些是不按期的,或者因为某些缘由某些指标在某段时间上是缺失的,因此不一样生命指标的时间序列数据在时间轴上的表现每每是不对齐的。
因此如今的问题是:
如何将存储在不一样数据表里,且时间轴不一样的两个时间序列进行合并,对齐到同一个时间轴上?
举例说明:
假设如今有2个数据表,分别记录了某个病人某一天当中某些时刻的一些生命体征指标:
表1:
表2:
能够看到,两张表的时间点有些是相同的,更多时候是不一样的,如今咱们想把这两条时间线并到一条时间轴上。
这里咱们将一张表的信息用一个 list 的形式来表示:
每一行记录为这个列表的一个元素,每行记录用一个元组tuple (python中另外一个经常使用的数据结构,与list的区别在于list是可变的,而tuple是不可变的)来表示。例如表1的第一行,即列表的第0个元素表示为('01:30', 128, 19)
。
每一个元组的第0个元素是这条记录发生的时间点,也就是咱们用来索引的指针。
全部的时间点连起来就造成了一条时间轴,也就是表的第一列Time。每一个时间点上的多项生命指标能够理解为这个指针所带的属性。
ts1 = [('01:30',128,19), ('05:00',124,20), ('13:00',131,18), ('20:00',138,24), ('21:30',122,22)] ts2 = [('01:30',7,129,60), ('09:30',6.5,112,63), ('12:00',8,135,74), ('13:00',8.5,110,60), ('20:32',7,78,49), ('22:00',7.5,96,55), ('23:30',6,124,59)]
这里与1中 有序序列合并 的区别在于:
前面的 list 的元素是个具体的数值,而这里的 list 的元素是个 tuple;
前面只须要比较当前指针所指的元素(数值)自己的大小,这里须要比较的是当前指针所指的元素(tuple)的第0个元素的大小;
不过这只是形式上的区别,更重要、更核心的区别在于:
例如,在01:30时刻,两个表均有记录,则合并的记录为('01:30',128,19,7,129,60);可是在05:00时刻,只有表1有记录,表2没有,那么合并后的记录应该是多少呢?
因此咱们须要采起一些策略,例如:
直接用 None 表示,表示没有:('05:00',124,20,None,None,None)
用前一个时刻的记录表示:('05:00',124,20,7,129,60)
在前一条记录和后一条记录之间进行插值
具体应该根据实际的应用场景来选择不一样的处理方式。
本文先以第二个策略为例:用前一个时刻的记录表示。
因此这里的代码要比前面的,多设置一个变量pre
,用来存储上一条记录。当指针移动到某个表没有记录的时刻时,就用pre
来补上,而且pre
也是跟着指针往前推移的。
因为第一条记录没有上一条,因此把初始的pre
设置为None *(**这也是个小技巧,若没有此项设置,则须要增长条件判断)*
def tsAlign(x, y): ''' x,y: lsit of tuple, 每一个tuple表明一条记录,tule的第0个元素为这条记录的id(eg,时间), list里的tuple按照tuple的id从小到大进行排序 把x和y在id上进行对齐: 若x,y同时存在某个id,则将这两个tuple进行合并; 若x存在某个id而y不存在,则合并 x当前id对应的tuple 和 y小于当前id的最大id对应的tuple。 return z: 对齐了x和y以后的list of tuple ''' # 设置循环初始值 i = 0 #index of x j = 0 #index of y z=[] #store result of merging x and y pre_x = (None,)*len(x) pre_y = (None,)*len(y) while (i<len(x)) and (j<len(y)): #若x当前的id小于y当前的id,则合并x当前的tuple和y的前一个tuple if x[i][0] < y[j][0]: z.append(x[i] + pre_y[1:]) pre_x = x[i] i += 1 #若x当前的id大于y当前的id,则合并x的前一个tuple和y当前的tuple elif x[i][0] > y[j][0]: z.append((y[j][0],) + pre_x[1:] + y[j][1:]) #注意:在定义tuple的时候,若只有一个元素,须要在元素后加个逗号 pre_y = y[j] j += 1 #若x当前的id等于y当前的id,则合并x当前的tuple和y当前的tuple else: z.append(x[i] + y[j][1:]) pre_x = x[i] pre_y = y[j] i += 1 j +=1 while i < len(x): z.append(x[i] + pre_y[1:]) i += 1 while j < len(y): z.append((y[j][0],) + pre_x[1:] + y[j][1:]) j += 1 return z print(tsAlign(ts1, ts2))
输出结果:
另外:用字典dict结构存储的时候也很方便,dict 的 key 为时间,value 为各项生命指标组成的 list。须要注意的是,dict 是无序的,因此在进行遍历的时候,须要将全部的key提取出来,先进行排列。(读者可自行尝试)
不过python里其实还有个叫OrderDict
的数据结构,就是个有序的字典,这里也不作介绍。
在前面的示例中,数据表的行和列都不多。若是当数据表很大的时候,直接把整张表读进来,将会消耗巨大的内存,程序可能根本跑不起来。
一个很天然的想法是分批读取并进行处理(前一篇文章中有相关的示例)。
也就是,能够先把“读取”+“处理”操做的功能封装起来,再在外面套个循环,不断地重复对应模块的操做。
到这里就须要跟你们讲讲Python中另外一个很是重要的概念——类。
咱们常说有两种程序设计的方式:面向过程和面向对象。在Python中,这两种编程方式均可实现。而面向对象封装、继承、多态性的三大特性,可使系统更加灵活、更加易于维护 。
首先须要理解 对象 的概念。
对象通常都由属性+方法组成。你能够这么理解:属性表示是什么(变量或数据),方法表示能干什么(函数或功能)。
面向对象的一个特色就在于,把操做同一组数据的各类功能集成在一块儿;对象的属性就表示我要操做的这组数据,对象的方法就是我要怎么操做这些数据。
而在有对象以前,必需要有类。
类,就是具备同类属性的对象,是个抽象的概念。而对象是由类实例化而来的。
同一个类实例化出来的不一样对象,具备相同的方法和相同的属性,但属性的值不同。
好比,猫是一个类,是一个抽象的概念;而中华田园猫是实例化出来的具体对象,英国短毛猫是实例化出来的另外一个对象,这两个对象都有本身的属性(体型,毛长,毛色等),和相同的方法(会吃,会跑,会喵喵喵)
在咱们的问题中,也定义了这样一个类,提供给它一个数据库链接(属性),它就能够对这个数据库的表进行增删查改等各类操做(功能)。提供给它另外一个数据库链接,又能够对另外一个数据库进行操做。每提供一个数据库链接,就至关于实例化出一个对象。当数据存在多个数据库中时,咱们就能够实例化出多个对象,同时进行操做。
Python使用class
关键字建立类,通常形式为:
class ClassName(bases): # bases表示这个类是从哪一个类继承而来的,即父类,为可选 'class documentation string' # 文档字符串,为可选 data = value # 定义类变量 def method(self, ...): # 定义类方法 self.member = value
主要包括两个部分:定义类变量 和 定义类方法
注意,在这里定义类方法的方式比较独特。
虽然说其实类方法就是函数,可是这个函数的首个参数必须为self
。
由于类自己不能对方法进行调用,必需要实例化成对象了,对象才能调用方法。
因此这里就意味着,这个方法的目的是对实例化对象进行操做,也就是说,self
的属性只能被实例化对象本身使用,是私有的,咱们称之为“实例变量”。
相比之下,在方法外面定义的属性,则是能够被全部实例化对象共同使用,是公共的,咱们称之为“类变量”。
在class语句内,任何赋值语句都会建立类属性。
定义实例变量则须要采用一种特殊的方式,称为类的构造器或初始化方法:__init__()
,后面会举例说明
🍎tips4:python中定义类名的时候,一般从第一个单词开始,每一个单词开始首字母大写,例如,Animal,TableReader
建立实例化对象在其余编程语言中通常使用关键字new
,可是python里面没有这个关键字,而是用相似函数调用的方式:ClassName(args...)
这里传进去的参数会被__init__
方法接收,成为实例变量,也就是这个实例化对象私有的属性。
'''代码示例:类的建立和使用''' # 定义类 class MyClass1(): say = 'Hello' #定义类变量 def __init__(self, name): # 类的构造函数 self.name = name #定义实例变量 def show(self): #定义类方法 print(self.say, self.name) #把类实例化成对象 obj1 = MyClass1('Tom') #传入实例变量参数 print('obj1的变量say为:', obj1.say) #self.say='Hello' print('obj1的变量name为:', obj1.name) #此时self.name='Tom' print('obj1执行show()方法:') obj1.show() #调用show()方法 print('--------------') obj2 = MyClass1('Lisa') #传入实例变量参数 print('obj2的变量say为:', obj2.say) #self.say='Hello' print('obj2的变量name为:', obj2.name) #此时self.name='Lisa' print('obj2执行show()方法:') obj2.show() #调用show()方法
输出结果:
在这里,咱们定义了一个类,叫 MyClass1 ,而且由这个类实例化出来两个对象,叫 obj1 和 obj2 。
从输出结果咱们能够看到,obj1 和 obj2 都有两个变量:
变量say:是相同的。它是定义在类方法外面的变量,是全部对象公共的,属于类变量;
变量name:是不一样的。它是定义在__init__
方法内的,是每一个对象私有的,属于实例变量。
从输出结果还能够看到,obj1 和 obj2 都有相同的方法 show()``。
再次如今回到咱们前面的需求:在数据库中读取并合并两个超级大的数据表并进行必定的处理。
分解一下任务流程:
从数据库中读取一批数据
对该批数据进行处理
2.1 对当前行进行处理
2.2 判断是否存在下一行:
存在:跳到下一行,回到2.1
不存在:回到1
发现了吗,这里存在两个循环的过程:1是经过循环遍历整个数据库,2是经过循环遍历每一个批次中的每一行。
这种遍历咱们称为迭代(Iteration)——能够说这是Python最强大的功能之一了。
迭代器是一个能够记住当前遍历位置的对象。(python里面一切皆对象)
迭代器对象从集合的第一个元素开始访问,直到全部的元素被访问完结束。
有个专门的 iter()
函数,传入一个可迭代对象,便可建立一个迭代器。举个例子:
'''示例:迭代器测建立及调用''' l = [1,2,3,4] #列表是可迭代对象 it = iter(l) #建立迭代器对象 print('经过for循环调用:') for i in it: print(i) it = iter(l) print('经过next()方法调用:') print(next(it)) #pythonz2中是it.next() print(next(it)) print(next(it)) print(next(it)) print(next(it)) #遍历结束,触发StopIteration异常
输出结果:
在这里,咱们建立了一个迭代器,叫it
,迭代器的调用方式通常有两种:
经过for…in结构进行调用,对迭代器里的元素逐个进行读取
全部的迭代器都有个next()
方法,就是用来逐个访问迭代器内的元素的,调用一次就读一个出来,直到结束。
从上面的结果咱们能够看到,当迭代器内的元素所有遍历完以后,继续调用next()
方法会触发 StopIteration 异常。
因此这里须要特别注意:迭代器只能往前不会后退。若是遍历完想再遍历一遍,就须要从新再建立一个迭代器。
若是要把一个类做为一个迭代器使用的话,须要在类中实现两个方法 __iter__()
与 __next__()
:
__iter__()
:返回一个特殊的迭代器对象,这个迭代器对象实现了 __next__()
方法并经过 StopIteration 异常标识迭代的完成。
__next__()
:会返回下一个迭代器对象,每一次for循环都调用该方法(必须存在)
'''示例:在类中实现迭代器''' class MyClass2(): def __init__(self, start, end): self.s = start self.e = end def __iter__(self): ''' @summary: 生成迭代对象时调用,返回值必须是对象本身,而后for能够循环调用next方法 ''' return self def __next__(self): ''' @summary: 每一次for循环都调用该方法(必须存在) ''' if self.s < self.e: x = self.s self.s += 1 return x else: raise StopIteration # 实例化出来的对象是个可迭代对象 it = MyClass2(1,5) for i in it: print(i)
输出结果:
知识点讲的差很少了,如今就一步步来实现解决问题所须要的功能。
class BufferTableReader(): pass
2. 在类中应该包含哪些参数呢?
简单起见,先假设数据表不是在数据库中,而是已经存在于当前工做空间中了:
# 初始化 def __init__(self, data, batch_size ): self.data = data # 数据源 self.bs = batch_size # 每次读取的批次大小 self.buf = [] # 用来存储当前批次的数据,初始化为空 self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0 self.offset = 0 # 数据源(data)指针,初始化为0
3. 接着,实现从 data 中读取一个batch_size 的数据的方法:
判断何时须要从data中读取数据:
当前批次数据已经处理完的时候,即self.idx==len(self.buf)
时;
读取数据,需判断从哪里读到哪里:
若是self.offset+batch_size
没有超出data的范围,则读取data[self.offset:self.offset+batch_size];
若是self.offset+batch_size
已经超出data的范围,即data剩下的数据量已经小于一个batch_size,则直接读取剩下的所有数据,即data[self.offset:];
把读取的batch数据存在self.buf中。
def readBatch(self): if self.idx==len(self.buf): slice_size=min(len(self.data)-self.offset,self.bs)#读多少 self.buff=self.data[self.offset: self.offset+slice_size] #更新指针 self.offset+=slice_size self.idx=0
4.定义一个判断data中是否还有下一行的方法:
该方法首先会调用 readBatch() 方法
若是当前的self.buf中还没读完,则显然self.idx<len(self.buf)
为真,此时readBatch()
中什么也不作,且该方法返回 True;
若是当前的 self.buf 恰好读完,则self.idx==len(self.buf)
,此时readBatch()
会读取下一批次,更新self.idx=0
:
若是data中还有数据,则len(self.buf)>0,self.idx<len(self.buf)为真,返回True;
若是data中没有数据了,则len(self.buf)=0,self.idx<len(self.buf)为假,返回False。
def hasNext(self): self.readBatch() return self.idx < len(self.buf)
5.定义一个在当前批次self.buf中读取行的方法:
def readNext(self): self.readBatch() if self.idx < len(self.buf): line = self.buf[self.idx] self.idx += 1 return line
到这里,已经能够实现一些功能了,你能够尝试一下:
'''类BufferTableReader(未实现迭代功能)''' class BufferTableReader(): # 初始化 def __init__(self, data, batch_size ): self.data = data # 数据源 self.bs = batch_size # 每次读取的批次大小 self.buf = [] # 用来存储当前批次的数据,初始化为空 self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0 self.offset = 0 # 数据源(data)指针,初始化为0 def readBatch(self): if self.idx == len(self.buf): slice_size=min(len(self.data)-self.offset,self.bs) #读多少 self.buf=self.data[self.offset:self.offset+slice_size] #更新指针 self.offset+=slice_size self.idx=0 # 为了便于观察,每读完一个批次作一个标注 print('-------') def hasNext(self): self.readBatch() return self.idx < len(self.buf) def readNext(self): self.readBatch() if self.idx < len(self.buf): line = self.buf[self.idx] self.idx += 1 return line #实例化 ts1 = [('01:30',128,19), ('05:00',124,20), ('13:00',131,18), ('20:00',138,24), ('21:30',122,22)] btr = BufferTableReader(ts1, 2) while btr.hasNext(): print(btr.readNext())
输出结果:
从结果来看,类BufferTableReader的实例化对象btr已经可以实现从list中分批读取数据了。(在这里,每次读取2条,到最后不足2条的时候,则把剩下的一次读出)
但到目前为止,这还不能算是一个Iter,只能说是个Reader,你会发现用for...in
循环对没法对其进行遍历,由于它不是一个可迭代对象。
btr = BufferTableReader(ts1, 2) for l in btr: print(l)
输出结果:
前面说了,可迭代对象须要在类中实现两个方法 __iter__()
与__next__()。
因此接下来咱们在BufferTableReader的基础上,定义一个新的类BufferTableIter,增长上述两个方法(因为后面的代码都比较长,为了方便读者阅读,我会在相较于上一个版本新增或修改部分作特别的标注):
'''类BufferTableIter 1.0 - 实现通常迭代功能''' class BufferTableIter(): # 初始化 def __init__(self, data, batch_size ): self.data = data # 数据源 self.bs = batch_size # 每次读取的批次大小 self.buf = [] # 用来存储当前批次的数据,初始化为空 self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0 self.offset = 0 # 数据源(data)指针,初始化为0 def readBatch(self): if self.idx == len(self.buf): slice_size=min(len(self.data)-self.offset,self.bs) #读多少 self.buf=self.data[self.offset:self.offset+slice_size] #更新指针 self.offset += slice_size self.idx = 0 # 为了便于观察,每读完一个批次作一个标注 print('-------') def hasNext(self): self.readBatch() return self.idx < len(self.buf) def readNext(self): self.readBatch() if self.idx < len(self.buf): line = self.buf[self.idx] self.idx += 1 return line ############新增部分############# def __iter__(self): print('iter called') return self def __next__(self): if self.hasNext(): return self.readNext() else: raise StopIteration ################################## #实例化 ts1 = [('01:30',128,19), ('05:00',124,20), ('13:00',131,18), ('20:00',138,24), ('21:30',122,22)] print('经过next()方法调用:') btr = BufferTableIter(ts1, 2) while btr.hasNext(): print(btr.readNext()) ##或者: #print(next(btr)) print('经过for循环调用:') btr = BufferTableIter(ts1, 2) for l in btr: print(l)
输出结果:
此时既能够经过next()方法调用,也能够经过 for 循环进行调用。
固然仍是须要注意,迭代不能重复,即遍历结束后不能从头再遍历一次(再执行以下代码,结果为空;虽然此时也调用了__iter__
函数,但迭代器自己如今已经为空了),须要从新建立一个实例化对象才行。
for l in btr: print(l)
输出结果:
可是若是我就是想可以重复遍历,而又不想从新建立实例化对象怎么办呢?
也是能够的,修改一下__iter__()
函数的返回值,让它从新生成一个实例化对象。也就是说,每for一次,就会调用__iter__
从新建立一个迭代器。
以下,把__iter__()
的返回值return self
改成return BufferedTableIter(self.data, self.bs)
:
'''类BufferTableIter 2.0 - 实现重复迭代功能''' class BufferTableIter(): # 初始化 def __init__(self, data, batch_size ): self.data = data # 数据源 self.bs = batch_size # 每次读取的批次大小 self.buf = [] # 用来存储当前批次的数据,初始化为空 self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0 self.offset = 0 # 数据源(data)指针,初始化为0 def readBatch(self): if self.idx == len(self.buf): slice_size=min(len(self.data)-self.offset,self.bs) #读多少 self.buf=self.data[self.offset:self.offset+slice_size] #更新指针 self.offset += slice_size self.idx = 0 def hasNext(self): self.readBatch() return self.idx < len(self.buf) def readNext(self): self.readBatch() if self.idx < len(self.buf): line = self.buf[self.idx] self.idx += 1 return line ############修改部分############# def __iter__(self): print('iter called') return BufferTableIter(self.data, self.bs) ################################## def __next__(self): if self.hasNext(): return self.readNext() else: raise StopIteration #实例化 ts1 = [('01:30',128,19), ('05:00',124,20), ('13:00',131,18), ('20:00',138,24), ('21:30',122,22)] btr = BufferTableIter(ts1, 2) for l in btr: print(l) for l in btr: print(l) for l in btr: print(l) #任你for多少次
输出结果:
或者不修改__iter__()
函数的返回值,而是将每for一次就实例化一次的这部分功能抽离出来,定义成另外一个类,就命名为DBTable。说白了这个类就是专门用来初始化迭代器的:
class DBTable: def __init__(self, data, batch_size): self.data = data self.bs = batch_size def __iter__(self): print("__iter__ called") return BufferTableIter(self)
原来的类BufferTableIter则修改成:
'''类BufferTableIter 2.1 - 实现重复迭代功能''' class BufferTableIter(): # 初始化 def __init__(self, dbTable): ############修改部分############# self.data = dbTable.data self.bs = dbTable.bs ################################## self.buf = [] self.idx = 0 self.offset = 0 def readBatch(self): if self.idx == len(self.buf): slice_size=min(len(self.data)-self.offset,self.bs) self.buf=self.data[self.offset:self.offset+slice_size] #更新指针 self.offset += slice_size self.idx = 0 def hasNext(self): self.readBatch() return self.idx < len(self.buf) def readNext(self): self.readBatch() if self.idx < len(self.buf): line = self.buf[self.idx] self.idx += 1 return line def __iter__(self): return self def __next__(self): if self.hasNext(): return self.readNext() else: raise StopIteration
#实例化ts1 = [('01:30',128,19), ('05:00',124,20), ('13:00',131,18), ('20:00',138,24), ('21:30',122,22)]dbTable = DBTable(ts1, 2)btr = BufferTableIter(dbTable)
实例化以后进行调用,见下面代码的运行结果能够发现:DBTable实例化出来的对象能够任意屡次重复遍历;而BufferTableIter则不行。
#实例化 ts1 = [('01:30',128,19), ('05:00',124,20), ('13:00',131,18), ('20:00',138,24), ('21:30',122,22)] dbTable = DBTable(ts1, 2) btr = BufferTableIter(dbTable)``` 输出结果: 
for l in btr: print(l)
print('--------------')
for l in btr: print(l)```
输出结果:
最后,因为实际中咱们的数据是存在数据库中的,因此初始化函数__init__
和readBatch()
函数须要作些修改(python链接数据库请参考上一篇),具体再也不赘述,最终的代码以下:
'''类BufferTableIter 3.0 - 实现数据库链接''' class DBTable: def __init__(self, con, sql, batch_size): ############修改部分############# self.con = con #建立链接 self.sql = sql #须要执行的sql语句 ################################ self.bs = batch_size def __iter__(self): return BufferTableIter(self) class BufferTableIter(): # 初始化 def __init__(self, dbTable): ############修改部分############# self.cursor = dbTable.con.cursor(buffered=True) #建立游标 self.cursor.execute(dbTable.sql) #执行sql语句 self.readCount = 0 ################################ self.bs = dbTable.bs self.buf = [] self.idx = 0 def readBatch(self): if self.idx == len(self.buf): ############修改部分############# self.buf = self.cursor.fetchmany(size=self.bs) #从数据库中读取批次数据 #更新指针 self.readCount += len(self.buf) self.idx = 0 ################################ def hasNext(self): self.readBatch() return self.idx < len(self.buf) def readNext(self): self.readBatch() if self.idx < len(self.buf): line = self.buf[self.idx] self.idx += 1 return line return None def __iter__(self): return self def __next__(self): if self.hasNext(): return self.readNext() raise StopIteration
如今结合上一篇的内容,往数据库中添加一些数据:
'''在mysql数据库中添加测试数据''' import mysql.connector con = mysql.connector.connect( host="127.0.0.1", user="WXT", passwd="12345", database="wxt_db", use_pure="False" ) mycursor=con.cursor(buffered=True) # 建立一个表 mycursor.execute("CREATE TABLE patient (time VARCHAR(255), hr INT, hxpl INT)") # 往表里插入一些记录 mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('01:30',128,19)") mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('05:00',124,20)") mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('13:00',131,18)") mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('20:00',138,24)") mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('21:30',122,22)") con.commit() con.close()
进行测试:
'''链接mysql数据库进行功能测试:分批读取数据''' import mysql.connector con = mysql.connector.connect( host="127.0.0.1", user="WXT", passwd="12345", database="wxt_db", use_pure="False" ) # 检查一个表是否存在 def tableExists(mycursor, name): stmt="SHOW TABLES LIKE '"+name+"'" mycursor.execute(stmt) return mycursor.fetchone() try: mycursor=con.cursor(buffered=True) if tableExists(mycursor, 'patient'): print('process table:', 'patient') print("--------") #查询表里的记录 sql = "SELECT * FROM patient" dbTbl = DBTable(con,sql,2) btr = BufferTableIter(dbTbl) for rec in dbTbl: print("read record:", rec) finally: con.close() #关闭数据
输出结果:
再补充一个小知识点,这里使用了try..finally
结构,这是一种检验和处理异常的机制。一般状况下,若是程序在某个位置出现异常,整个程序会被直接中断,后面的语句不会再执行。
而try..finally
中,try 语句块里的代码会被监测,不论这部分有没有发生异常,finally 里的语句都会执行,这样就能够对异常作一些收尾工做,好比这里的关闭数据库链接操做。
由于若是前面一旦发生异常,数据库没可以被关闭,会存在必定的危险性。
🍎tips5:try-finally
结果能够对异常进行检测和处理,若是try语句块中出现了异常,finally后面能够作一些必要的清理工做(如关闭文件或断开服务器链接等)
总结一下,本文实现了有序序列的合并、时间序列数据表的对齐、以及对数据库中的数据表进行分批查询,主要使用的Pyhton编程技巧有循环、函数、类和迭代器。
但其实尚未彻底解决问题,目前只是把数据从数据库给读出来了,尚未对其进行处理,因此以后还会再写后半部分的内容,计划有:
对齐后的时间序列作分组(例如每小时,天天)聚合(例如每组作计数,求平均等)
用生成器机制(yield)对迭代器的功能进行优化。
(注:本文是由团队内部培训的笔记整理而来,若有问题,欢迎交流指正!)