[若是代码显示有问题,请点击阅读原文]服务器
经过本文你会知道Python里面何时用yield最合适。本文不会给你讲生成器是什么,因此你须要先了解Python的yield,再来看本文。多线程
疑惑app
多年之前,当我刚刚开始学习Python协程的时候,我看到绝大多数的文章都举了一个生产者-消费者的例子,用来表示在生产者内部能够随时调用消费者,达到和多线程相同的效果。这里凭记忆简单还原一下当年我看到的代码:ide
import time def consumer(): product = None while True: if product is not None: print('consumer: {}'.format(product)) product = yield None def producer(): c = consumer() next(c) for i in range(10): c.send(i) start = time.time() producer() end = time.time() print(f'直到把全部数据塞入Kafka,一共耗时:{end - start}秒')
运行效果以下图所示。函数
这些文章的说法,就像统一好了口径同样,说这样写能够减小线程切换开销,从而大大提升程序的运行效率。可是当年我始终想不明白,这种写法与直接调用函数有什么区别,以下图所示。学习
直到后来我须要操做Kafka的时候,我明白了使用yield的好处。线程
探索设计
为了便于理解,我会把实际场景作一些简化,以方便说明事件的产生发展和解决过程。事件的原由是我须要把一些信息写入到Kafka中,个人代码一开始是这样的:3d
import time from pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(product): with topic.get_producer(delivery_reports=True) as producer: producer.produce(str(product).encode()) def feed(): for i in range(10): consumer(i) start = time.time() feed() end = time.time() print(f'直到把全部数据塞入Kafka,一共耗时:{end - start}秒')
这段代码的运行效果以下图所示。code
写入10条数据须要100秒,这样的龟速显然是有问题的。问题就出在这一句代码:
with topic.get_producer(delivery_reports=True) as producer
得到Kafka生产者对象是一个很是耗费时间的过程,每获取一次都须要10秒钟才能完成。因此写入10个数据就获取十次生产者对象。这消耗的100秒主要就是在获取生产者对象,而真正写入数据的时间短到能够忽略不计。
因为生产者对象是能够复用的,因而我对代码做了一些修改:
import timefrom pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] products = [] def consumer(product_list): with topic.get_producer(delivery_reports=True) as producer: for product in product_list: producer.produce(str(product).encode()) def feed(): for i in range(10): products.append(i) consumer(products) start = time.time() feed() end = time.time() print(f'直到把全部数据塞入Kafka,一共耗时:{end - start}秒')
首先把全部数据存放在一个列表中,最后再一次性给consumer函数。在一个Kafka生产者对象中展开列表,再把数据一条一条塞入Kafka。这样因为只须要获取一次生产者对象,因此须要耗费的时间大大缩短,以下图所示。
这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,若是所有先放在一个列表里面的话,服务器内存就爆了。
因而我又修改了代码。每100条数据保存一次,并清空暂存的列表:
import timefrom pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(product_list): with topic.get_producer(delivery_reports=True) as producer: for product in product_list: producer.produce(str(product).encode()) def feed(): products = [] for i in range(1003): products.append(i) if len(products) >= 100: consumer(products) products = [] if products: consumer(products) start = time.time() feed() end = time.time() print(f'直到把全部数据塞入Kafka,一共耗时:{end - start}秒')
因为最后一轮循环可能没法凑够100条数据,因此feed函数里面,循环结束之后还须要判断products列表是否为空,若是不为空,还要再消费一次。这样的写法,在上面这段代码中,一共1003条数据,每100条数据获取一次生产者对象,那么须要获取11次生产者对象,耗时至少为110秒。
显然,要解决这个问题,最直接的办法就是减小获取Kafka生产者对象的次数并最大限度复用生产者对象。若是读者触类旁通的能力比较强,那么根据开关文件的两种写法:
# 写法一 with open('test.txt', 'w', encoding='utf-8') as f: f.write('xxx') # 写法二 f = open('test.txt', 'w', encoding='utf-8') f.write('xxx') f.close()
能够推测出获取Kafka生产者对象的另外一种写法:
# 写法二 producer = topic.get_producer(delivery_reports=True) producer.produce(b'xxxx') producer.close()
这样一来,只要获取一次生产者对象并把它做为全局变量就能够一直使用了。
然而,pykafka的官方文档中使用的是第一种写法,经过上下文管理器with来得到生产者对象。暂且不论第二种方式是否会报错,只从写法上来讲,第二种方式必须要手动关闭对象。开发者常常会出现开了忘记关的状况,从而致使不少问题。并且若是中间出现了异常,使用上下文管理器的第一种方式会自动关闭生产者对象,但第二种方式仍然须要开发者手动关闭。
函数VS生成器
可是若是使用第一种方式,怎么能在一个上下文里面接收生产者传进来的数据呢?这个时候才是yield派上用场的时候。
首先须要明白,使用yield之后,函数就变成了一个生成器。生成器与普通函数的不一样之处能够经过下面两段代码来进行说明:
def funciton(i): print('进入') print(i) print('结束') for i in range(5): funciton(i)
运行效果以下图所示。
函数在被调用的时候,函数会从里面的第一行代码一直运行到某个return或者函数的最后一行才会退出。
而生成器能够从中间开始运行,从中间跳出。例以下面的代码:
def generator(): print('进入') i = None while True: if i is not None: print(i) print('跳出') i = yield None g = generator() next(g) for i in range(5): g.send(i)
运行效果以下图所示。
从图中能够看到,进入只打印了一次。代码运行到i = yield None后就跳到外面,外面的数据能够经过g.send(i)的形式传进生成器,生成器内部拿到外面传进来的数据之后继续执行下一轮while循环,打印出被传进来的内容,而后到i = yield None的时候又跳出。如此反复。
因此回到最开始的Kafka问题。若是把with topic.get_producer(delivery_reports=True) as producer写在上面这一段代码的print('进入')这个位置上,那岂不是只须要获取一次Kafka生产者对象,而后就能够一直使用了?
根据这个逻辑,设计以下代码:
import timefrom pykafka import KafkaClient client = KafkaClient(hosts="127.0.0.1:9092") topic = client.topics[b'test'] def consumer(): with topic.get_producer(delivery_reports=True) as producer: print('init finished..') next_data = '' while True: if next_data: producer.produce(str(next_data).encode()) next_data = yield True def feed(): c = consumer() next(c) for i in range(1000): c.send(i) start = time.time() feed() end = time.time() print(f'直到把全部数据塞入Kafka,一共耗时:{end - start}秒')
这一次直接插入1000条数据,总共只须要10秒钟,相比于每插入一次都获取一次Kafka生产者对象的方法,效率提升了1000倍。运行效果以下图所示。
后记``
读者若是仔细对比第一段代码和最后一段代码,就会发现他们本质上是一回事。可是第一段代码,也就是网上不少人讲yield的时候举的生产者-消费者的例子之因此会让人以为毫无用处,就在于他们的消费者几乎就是秒运行,这样看不出和函数调用的差异。而我最后这一段代码,它的消费者分红两个部分,第一部分是获取Kafka生产者对象,这个过程很是耗时;第二部分是把数据经过Kafka生产者对象插入Kafka,这一部分运行速度极快。在这种状况下,使用生成器把这个消费者代码分开,让耗时长的部分只运行一次,让耗时短的反复运行,这样就能体现出生成器的优点。