最近在作离线数据导入HBase项目,涉及将存储在Mysql中的历史数据经过bulkload的方式导入HBase。因为源数据已经不在DB中,而是以文件形式存储在机器磁盘,此文件是mysqldump导出的格式。如何将mysqldump格式的文件转换成实际的数据文件提供给bulkload做转换,是须要考虑的一个问题。python
咱们知道mysqldump导出的文件主要是Insert,数据库表结构定义语句。而要解析的对象也主要是包含INSERT关键字记录,这样咱们就把问题转换成如何从dmp文件解析Insert语句。接触过dmp文件的同窗应该了解,其INSERT语句的结构,主要包含表名、字段名、字段值, 这里面主要包含几个关键字:INSERT INTO, VALUES。咱们要作的就是把Values括号后的字段值给解析出来,这个过程须要考虑VALUES后面包含的是多少行的记录,有可能导出的记录Values后面包含多行对应mysql中存储的记录。mysql
在解析文件过程当中,我天然想到用Python来写,由于Python在处理文件方面有不少优点,也比较简单。在处理DMP文件这块,考虑到字段值间是用逗号分割的,在python中正好一个模块能够很好的来处理此类格式 ,即你们很熟悉的CSV模块,在处理CSV类型的文件有不少优点。在这里咱们把CSV模块有在解析dmp文件,同时加一些解析逻辑,能够很好解决此类问题。sql
同时,咱们要处理的dmp文件是通过压缩的,而且单个文件都比较大,都是Gigbytes的,在读取时须要注意机器内存大小,不能一次读出全部的数据,python也考虑到此类问题,采用的方法是惰性取值,即在真正使用时才从磁盘中加载相应的文件数据。若是想加块解析,还能够采集多进程或多线程的方法。数据库
处理流程图以下所示:多线程
代码以下图所示:app
1 #!/usr/bin/env python 2 import fileinput 3 import csv 4 import sys 5 import gzip 6 7 8 # 设定CSV读取的最大容量 9 csv.field_size_limit(sys.maxsize) 10 11 def check_insert(line): 12 """ 13 返回语句是否以insert into开头,若是是返回true,不然返回false 14 """ 15 return line.startswith('INSERT INTO') or False 16 17 18 def get_line_values(line): 19 """ 20 返回Insert语句中包含Values的部分 21 """ 22 return line.partition('VALUES ')[2] 23 24 25 def check_values_style(values): 26 """ 27 保证INSERT语句知足基本的条件,即包含(右括号 28 """ 29 30 if values and values[0] == '(': 31 return True 32 return False 33 34 def parse_line(values): 35 """ 36 建立csv对象,读取INSERT VALUES 字段值 37 """ 38 latest_row = [] 39 40 reader = csv.reader([values], delimiter=',', 41 doublequote=False, 42 escapechar='\\', 43 quotechar="'", 44 strict=True 45 ) 46 47 48 for reader_row in reader: 49 for column in reader_row: 50 # 判断字段值是否为空或为NULL 51 if len(column) == 0 or column == 'NULL': 52 latest_row.append("") 53 continue 54 55 # 判断字段开头是否以(开头,若是是则说明此VALUES后面不仅包含一行数据,可能有多行,要分别解析 56 if column[0] == "(": 57 new_row = False 58 if len(latest_row) > 0: 59 #判断行是否包含),若是包含则说明一行数据完毕 60 if latest_row[-1][-1] == ")": 61 # 移除) 62 latest_row[-1] = latest_row[-1][:-1] 63 if latest_row[-1] == "NULL": 64 latest_row[-1] = "" 65 new_row = True 66 # 若是是新行,则打印该行 67 if new_row: 68 line="}}}{{{".join(latest_row) 69 print "%s<{||}>" % line 70 latest_row = [] 71 72 if len(latest_row) == 0: 73 column = column[1:] 74 75 latest_row.append(column) 76 # 判断行结束符 77 if latest_row[-1][-2:] == ");": 78 latest_row[-1] = latest_row[-1][:-2] 79 if latest_row[-1] == "NULL": 80 latest_row[-1] = "" 81 82 line="}}}{{{".join(latest_row) 83 print "%s<{||}>" % line 84 85 def main(): 86 87 filename=sys.argv[1] 88 try: 89 #惰性取行 90 with gzip.open(filename,"rb") as f: 91 for line in f: 92 if check_insert(line): 93 values = get_line_values(line) 94 if check_values_style(values): 95 parse_line(values) 96 except KeyboardInterrupt: 97 sys.exit(0) 98 99 if __name__ == "__main__": 100 main()
总的说来,主要是利用Python的CSV模块来解析DMP文件的INSERT语句,若是DMP文件不规整,可能仍是有些问题。对于dmp文件很大状况,也是须要考虑解析时间效率问题,能够考虑增长多进程或多线程机制。ide