最近工做比较忙,此外花了时间看杂书,文章写的比较少。html
本篇文章分享一个工做中遇到的小问题。python
要完成一个开发任务,读取一个具备80w条数据的CSV文件,将其入库,由于CSV文件中缺乏2个关键数据,须要经过调用内部Web API接口的形式去得到,2个参数分别请求两个URL,每次请求参数不一样。mysql
先不考虑Web API方面的内容,80w条数据,如何快速的入库MySQL?sql
一个直观的想法就是将多条INSERT语句合并成一条INSERT执行,合并成一条SQL后,会减小MySQL的日志,从而下降日志的数据量与使用磁盘的评率,从而提升效率,此外合并SQL语句后,能够减小SQL语句的解析书以及减小网络传输IO(MYSQL C/S模式)缓存
查阅资料,主要阅读了「MySQL 批量 SQL 插入性能优化」这篇文章,文中给出了相应的测试效果,这里分享一下。性能优化
多条插入数据服务器
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
复制代码
合并成一条后网络
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0), ('1', 'userid_1', 'content_1', 1);
复制代码
文中提供一些测试对比数据,分别是进行单条数据的导入与转化成一条 SQL 语句进行导入,分别测试 1 百、1 千、1 万条数据记录。session
此外文中提出,使用事务能够提升数据的插入效率,其缘由是由于MySQL在进行INSERT操做时内部会创建一个事务,在事务内才进行真正的插入处理,在使用INSERT语句时,直接使用事务能够减小屡次建立事务的消耗。并发
使用事务的修改以下。
START TRANSACTION;
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
...
COMMIT;
复制代码
文中一样给出了测试对比,分别是不使用事务与使用事务在记录数为 1 百、1 千、1 万的状况。
此外,还须要注意,插入数据时,其索引是有序的会比无序索引快那么一些。
数据有序的插入是指插入记录在主键上是有序排列。
例如 datetime 是记录的主键。
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('2', 'userid_2', 'content_2',2);
复制代码
从上面sql能够看出,datetime是记录主键,但倒是无序的。
将其修改为。
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('2', 'userid_2', 'content_2',2);
复制代码
MySQL在进行入库操做时,须要维护索引数据,插入时,索引数据无序会增大维护索引的成本,由于MySQL索引使用的结构是B+树,这个树本文就不讨论先,留点素材给将来的本身。
下面是随机数据与顺序数据的性能对比,分别是记录为 1 百、1 千、1 万、10 万、100 万。
将上面提到的三种方式合起来使用,可让INSERT语句执行效果大幅提升。
引用「MySQL 批量 SQL 插入性能优化」结论
从测试结果能够看到,合并数据 + 事务的方法在较小数据量时,性能提升是很明显的,数据量较大时(1 千万以上),性能会急剧降低,这是因为此时数据量超过了 innodb_buffer 的容量,每次定位索引涉及较多的磁盘读写操做,性能降低较快。而使用合并数据 + 事务 + 有序数据的方式在数据量达到千万级以上表现依旧是良好,在数据量较大时,有序数据索引定位较为方便,不须要频繁对磁盘进行读写操做,因此能够维持较高的性能。
由于我插入的数据其主键索引自己就是无序的,因此使用了「合并数据 + 事务」的方法,但在具体实践时,仍是遇到了「(2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))")」这个问题。
阅读MySQL相关文档与其余资料,出现这个问题有3个可能缘由。
1.max_allowed_packet值过小,在MySQL中max_allowed_packet默认为4M,即插入数据其大小不能超过4M,我要作的就是将其设置成更大的值。
// 查看 max_allowed_packet
>show VARIABLES like '%max_allowed_packet%';
// 将修改为 100M
>set global max_allowed_packet = 1024*1024*100;
复制代码
这种修改只会临时生效,MySQL重启后,依旧会变为4M,想要长期生效,须要修改「my.ini」。
2.wait_timeout过小,MySQL连接长时间没有新请求,就被Server端关闭了,对于一些ORM库而言,这个过程是透明的,此时还在使用被Server端关闭的连接来进行SQL操做,就会出现上述错误,而我要作的就是将其设置为更大的值。
>show global variables like '%timeout';
// MySQL无操做28800秒后会被自动化关闭
> set global wait_time = 28800;
复制代码
但这不是长久之策,由于长时间不操做,MySQL Server端依旧会将其关闭,这个问题依旧会出现,为了不这个问题,你须要本身关闭连接,对于一些MySQL操做量不大的情景,建议使用短链接的形式,若是依旧须要用MySQL链接池,以长链接的方式来操做MySQL,就须要实现判断当前连接是否存活的逻辑并在不存活的状况下自动重连。
若是使用的是pymysql,那么能够经过ping()方法来进行重连,其源码以下。
def ping(self, reconnect=True):
"""Check if the server is alive"""
if self._sock is None:
if reconnect:
self.connect()
reconnect = False
else:
raise err.Error("Already closed")
try:
self._execute_command(COMMAND.COM_PING, "")
return self._read_ok_packet()
except Exception:
if reconnect:
self.connect()
return self.ping(False)
else:
raise
复制代码
其实就是先判断当前连接是否存活,不存在就经过connect()方法再连接一次。
此外能够利用「try...except...」,当使用当前MySQL连接执行SQL时,若是报错,直接执行except中的逻辑,在except中只需将当前连接关闭,而后再获取新连接,而后再执行SQL则可。
3.此外执行大量数据的INSERT或REPLACE也可能会致使此类错误(这才是我遇到这个错误的缘由),要作的就是下降单词INSERT的数据行数则可,我本来一次性插入10000条,将其改为5000条后,这个问题就没有出现了。
这个缘由能够从其开发官方查阅到,地址为:dev.mysql.com/doc/refman/…
支持,80w条数据高效插入的问题就解决了
MySQL入库方面的问题解决后,接着就来考虑一下requests问题,要实现23条请求,并发是必须的,键盘啪啪啪两三下,利用线程池(ThreadPoolExecutor)的并发请求逻辑就实现好了,一个经验是,将参数处理后,其余的事情就不用多想了,经过ThreadPoolExecutor.map()方法,轻松实现并发。
我觉得事情就这样结束了,但程序正常运行一段时间后,出现了大量的「Cannot assign requested address」报错。
这是由于客户端短期内频繁的请求服务器,每次请求连接都在很短的时间内结束,从而致使不少TIME_WAIT,操做系统的端口号等资源被迅速用光,新的请求连接没办法得到新的端口号等资源,就抛出Cannot assign requested address。
能够经过netstat -an | grep TIME_WAIT
验证一下。
理解了问题所在,解决起来就很简单了,使用requests链接池就行了,实现端口号等资源的复用。
须要注意,这里请求的Web API是相同的2个API,因此能够经过创建长链接的方式复用资源,若是每次请求的host、IP等都不一样,这种方式是没有什么效果的。
requests其实已经考虑到了链接池的状况,简单使用一下则可。
@staticmethod
def get_http_session(pool_connections=1, pool_maxsize=10, max_retries=3):
''' http链接池 pool_connections 要缓存的 urllib3 链接池的数量。 pool_maxsize 要保存在池中的最大链接数。 max_retries 每一个链接的最大重试次数 '''
session =requests.session()
# 建立适配器
adapter = requests.adapters.HTTPAdapter(pool_connections=pool_connections,
pool_maxsize=pool_maxsize, max_retries=max_retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
复制代码
至此,这个东西就正常跑起来了。
你们在工做中有没有遇到什么有意思的问题?
若是本文对你有所启发,记得点击「在看」支持二两。