Kafka生产者发送消息的三种方式

Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量、灵活的offset是其它消息系统所没有的。bootstrap

Kafka发送消息主要有三种方式:服务器

1.发送并忘记 2.同步发送 3.异步发送+回调函数网络

 

下面以单节点的方式分别用三种方法发送1w条消息测试:异步

方式一:发送并忘记(不关心消息是否正常到达,对返回结果不作任何判断处理)分布式

发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,可是没法保证消息的可靠性:函数

 1 import pickle  2 import time  3 from kafka import KafkaProducer  4 
 5 producer = KafkaProducer(bootstrap_servers=['192.168.33.11:9092'],  6                          key_serializer=lambda k: pickle.dumps(k),  7                          value_serializer=lambda v: pickle.dumps(v))  8 
 9 start_time = time.time() 10 for i in range(0, 10000): 11     print('------{}---------'.format(i)) 12     future = producer.send('test_topic', key='num', value=i, partition=0) 13 
14 # 将缓冲区的所有消息push到broker当中
15 producer.flush() 16 producer.close() 17 
18 end_time = time.time() 19 time_counts = end_time - start_time 20 print(time_counts)

 测试结果:1.88s测试

 

方式二:同步发送(经过get方法等待Kafka的响应,判断消息是否发送成功)spa

以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 能够明确地知道每条消息的发送状况,可是因为同步的方式会阻塞,只有当消息经过get返回future对象时,才会继续下一条消息的发送:日志

 

 1 import pickle  2 import time  3 from kafka import KafkaProducer  4 from kafka.errors import kafka_errors  5 
 6 producer = KafkaProducer(  7     bootstrap_servers=['192.168.33.11:9092'],  8     key_serializer=lambda k: pickle.dumps(k),  9     value_serializer=lambda v: pickle.dumps(v) 10 ) 11 
12 start_time = time.time() 13 for i in range(0, 10000): 14     print('------{}---------'.format(i)) 15     future = producer.send(topic="test_topic", key="num", value=i) 16     # 同步阻塞,经过调用get()方法进而保证必定程序是有序的.
17     try: 18         record_metadata = future.get(timeout=10) 19         # print(record_metadata.topic)
20         # print(record_metadata.partition)
21         # print(record_metadata.offset)
22     except kafka_errors as e: 23         print(str(e)) 24 
25 end_time = time.time() 26 time_counts = end_time - start_time 27 print(time_counts)

 

测试结果:16scode

 

方式三:异步发送+回调函数(消息以异步的方式发送,经过回调函数返回消息发送成功/失败)

在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,经过回调函数可以对异常状况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,不然一直会阻塞:

 1 import pickle  2 import time  3 from kafka import KafkaProducer  4 
 5 producer = KafkaProducer(  6     bootstrap_servers=['192.168.33.11:9092'],  7     key_serializer=lambda k: pickle.dumps(k),  8     value_serializer=lambda v: pickle.dumps(v)  9 ) 10 
11 
12 def on_send_success(*args, **kwargs): 13     """
14  发送成功的回调函数 15  :param args: 16  :param kwargs: 17  :return: 18     """
19     return args 20 
21 
22 def on_send_error(*args, **kwargs): 23     """
24  发送失败的回调函数 25  :param args: 26  :param kwargs: 27  :return: 28     """
29 
30     return args 31 
32 
33 start_time = time.time() 34 for i in range(0, 10000): 35     print('------{}---------'.format(i)) 36     # 若是成功,传进record_metadata,若是失败,传进Exception.
37  producer.send( 38         topic="test_topic", key="num", value=i 39  ).add_callback(on_send_success).add_errback(on_send_error) 40 
41 producer.flush() 42 producer.close() 43 
44 end_time = time.time() 45 time_counts = end_time - start_time 46 print(time_counts)

测试结果:2.15s

 

三种方式虽然在时间上有所差异,但并非说时间越快的越好,具体要看业务的应用场景:

场景1:若是业务要求消息必须是按顺序发送的,那么可使用同步的方式,而且只能在一个partation上,结合参数设置retries的值让发送失败时重试,设置max_in_flight_requests_per_connection=1,能够控制生产者在收到服务器晌应以前只能发送1个消息,从而控制消息顺序发送;

场景2:若是业务只关心消息的吞吐量,允许少许消息发送失败,也不关注消息的发送顺序,那么可使用发送并忘记的方式,并配合参数acks=0,这样生产者不须要等待服务器的响应,以网络能支持的最大速度发送消息;

场景3:若是业务须要知道消息发送是否成功,而且对消息的顺序不关心,那么能够用异步+回调的方式来发送消息,配合参数retries=0,并将发送失败的消息记录到日志文件中;

相关文章
相关标签/搜索