confluent-kafka是Python模块,是对librdkafka的轻量级封装,支持Kafka 0.8以上版本。本文基于confluent-kafka 1.3.0编写。
GitHub地址:
https://github.com/confluentinc/confluent-kafka-pythonpython
(1)可靠。confluent-kafka是对普遍应用于各类生产环境的librdkafka的封装,使用Java客户端相同的测试集进行测试,由Confluent进行支持。
(2)性能。性能是一个关键的设计考虑因素,对于较大的消息,最大吞吐量与Java客户机至关(Python解释器的开销影响较小),延迟与Java客户端至关。
(3)将来支持。Coufluent由Kafka创始人建立,致力于构建以Apache Kafka为核心的流处理平台。确保核心Apache Kafka和Coufluent平台组件保持同步是当务之急。git
建立confluent源:
进入/etc/yum.repos.d目录建立confluent.repo文件:github
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.4/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.4/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.4 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.4/archive.key enabled=1
安装:正则表达式
sudo yum clean all && sudo yum install confluent-community-2.12 sudo yum install librdkafka-devel python-devel pip install confluent-kafka
安装AvroProducer、AvroConsumer:pip install "confluent-kafka[avro]"
docker
Consumer(config)
使用指定的配置dict建立Consumer实例。Consumer.assign(partitions)
由指定TopicPartition列表设置Consumer的分区分配策略,启动消费。若是对关闭的Consumer调用本函数会抛出RuntimeError。Consumer.assignment()
返回当前分区分配策略,返回list(TopicPartition)Consumer.close()
关闭和终止Consumer实例,关闭Consumer实例会执行如下操做:中止消费;提交位移(若是enable.auto.commit设置为False会抛出异常)、离开Consumer Group。Consumer.commit([message=None][, offsets=None][, asynchronous=True])
提交一条消息或位移列表,message和offsets是互斥参数,若是没有指定参数,会使用当前分区分配策略的offsets。
message:提交消息的位移加1
offsets:要提交的TopicPartition列表
asynchronous:是否异步提交。异步提交会当即返回None。若是设置为False,会阻塞直到提交成功或失败,若是提交成功,会返回提交的offsets。注意:提交成功,须要对返回的TopicPartition列表的每一个TopicPartition的err字段进行检查,TopicPartition可能会提交失败。Consumer.committed(partitions[, timeout=None])
获取已提交的分区的offsets。
partitions:TopicPartition列表
timeout:请求超时,单位秒。
返回TopicPartition列表或错误集Consumer.consume([num_messages=1][, timeout=-1])
消费消息,调用回调函数,返回消息列表,若是超时,返回空。
应用程序必须检查返回Message的error方法,正常Message的error返回None。
num_messages:返回的最大消息数量,默认为1
timeout:阻塞等待消息、事件、回调函数的最大时间Connsumer.get_watermark_offsets(partition[, timeout=None][, cached=False])
获取分区的低水位和高水位
partition:TopicPartition对象
Timeout:请求超时,
Cached:是否替换正在查询的Broker使用的缓存信息。
成功返回低水位和高水位的元组,超时返回None。Consumer.list_topics([topic=None][, timeout=-1])
请求集群的元数据信息。
topic:字符串类,若是指定,只请求本Topic的信息,不然返回集群的全部Topic。
timeout:超时前的最大响应时间,-1表示永不超时。
返回ClusterMetadata类型Consumer.offsets_for_times(partitions[, timeout=None])
对指定的分区列表根据时间戳查询offsets。
返回每一个分区的offsets大于等于指定分区列表的时间戳的位移。
partitions:TopicPartition列表
timeout:请求超时时间。Consumer.pause(partitions)
暂停指定分区列表的分区的消费Consumer.poll([timeout=None])
消费消息,调用回调函数,返回事件。
应用程序必须检查返回的Message对象的error()方法,若是是正常消息,返回None。
返回Message对象回None。Consumer.position(partitions)
获取指定分区列表分区的位移
partitions:分区列表
返回带位移的TopicPartition列表,当前位移是最新消费消息的位移加1。Consumer.resume(partitions)
恢复指定分区列表的分区的消费
partitions:要恢复的TopicPartitio列表Consumer.seek(partition)
定位分区的消费位移到offset。offset能够是绝对值,也能够是逻辑位移OFFSET_BEGINNING。本函数只用于活跃消费分区更新消费位移,要设置分区的起始位移可使用assign函数。Consumer.store_offsets([message=None][, offsets=None])
存储一条消息的位移或位移列表。
message和offsets是互斥参数。
被存储的位移会根据auto.commit.interval.m参数值被提交,使用本函数时enable.auto.offset.store参数必须被设置为False。
message:存储message的位移加1。
offsets:要存储位移的TopicPartition列表Consumer.subscribe(topics[, on_assign=None][, on_revoke=None])
设置要订阅的Topic列表,会替代此前订阅的Topic。
订阅的Topic名称支持正则表达式,使用”^”做为Topic名称前缀。
topics:Topic名称列表
on_assign:完成分区再分配的回调函数
on_revoke:再平衡操做的bootstrap
on_assign(consumer, partitions) on_revoke(consumer, partitions)
Consumer.unassign()
删除当前分区分配策略和中止消费Consumer.unsubscribe()
删除当前订阅Topicapi
confluent_kafka.Producer是异步Kafka生产者。Producer.Producer(config)
使用config字典建立Producer实例。
config:配置字典对象,至少应该设置bootstrap.servers属性
Producer.len()
返回要传递到Broker的消息数量Producer.flush([timeout])
等待Producer队列中要传递的全部消息。
timeout:阻塞的最大时间,要求librdkafka版本大于0.9.4。
返回Producer消息队列中仍然存在的消息的数量。Producer.list_topics([topic=None][, timeout=-1])
请求集群的元数据。
topic:若是指定Topic,只返回Topic的相应元数据,不然返回全部Topic的元数据。
timeout:超时前的最大响应时间,-1表示永不超时。
返回ClusterMetadata类型。Producer.poll([timeout])
Poll生产者事件,调用相应回调函数。
timeout:阻塞等待事件的最大时间。
返回处理事件的数量。Producer.produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
生产消息到指定Topic,异步操做。
topic:要生产消息到的指定Topic。
value:str或bytes类型,消息数据。
key:str或bytes类型,消息Key。
partition:要生产消息到指定分区,不然使用内置分区分配策略。
on_delivery:投递报告回调函数
timestamp:消息事件戳,要求librdkafka v0.9.4以上版本,api.version.request=true, Kafka 0.10.0.0以上版本。
headers:消息头,字典类型,消息头的key必须是字符串,value必须是二进制数据,unicode或None。要求librdkafka v0.11.4以上版本和Kafka 0.11.0.0以上版本。缓存
AdminClient提供对Kafka Broker、Topic、Group、Broker支持的其它资源进行管理操做。AdminClient.alter_configs(resources, **kwargs)
更新指定resource的配置值。AdminClient.create_partitions(new_partitions, **kwargs)
建立指定Topic的分区AdminClient.create_topics(new_topics, **kwargs)
建立TopicAdminClient.delete_topics(topics, **kwargs)
删除TopicAdminClient.describe_configs(resources, **kwargs)
获取指定resource的配置安全
BrokerMetadata定义了Kafka Broker的信息,是非实例化类,BrokerMetadata包含的属性以下:
id:整型,Broker ID
host:字符串类型,Broker 主机名
port:整型,Broker端口bash
ClusterMetadata定义了Kafka集群、Broker、Topic等信息,是非实例化类,ClusterMetadata包含以下属性:
cluster_id :字符串,Kafka集群ID字符串。
controller_id:id类型,当前Controller Broker ID
brokers:字典,key为整型Broker ID,值为BrokerMetadata对象。
topics :字典,key为字符串Topic名称, 值为TopicMetadata对象。
orig_broker_id:整型,数据源于Broker的ID。
orig_broker_name:字符串,数据源于Broker的名称或地址。
ConfigEntry由describe_configs()返回指定资源的配置实体,是非实例化类,ConfigEntry包含以下属性:
name:字符串,配置属性名称。
value:字符串,配置值。
source :ConfigSource类型,配置源。
is_read_only:bool类型,指明配置属性是否只读。
is_default:bool类型,指明配置属性是否使用默认值。
is_sensitive:bool类型,指明配置属性值是否包含敏感信息,如安全配置。
is_synonym:bool类型,指明配置属性是不是赋配置实体的别名。
synonyms :list类型,配置属性的备用源的配置实体列表。
ConfigResource(restype, name, set_config=None, described_configs=None, error=None)
restype:resource类型
name:resource名称
set_config:胚珠属性值设置方法
described_configs:
error:错误信息
Kafka资源ConfigResource.Type以下:
ANY= 1,任何资源
BROKER= 4,Broker资源,资源名称是Broker ID
GROUP= 3,Group资源,资源名称是group.id
TOPIC= 2,Topic资源,资源名称是Topic名称。
UNKNOWN= 0,未知类型,未设置类型。
set_config(name, value, overwrite=True)
设置、覆写配置实体
name:配置属性名称。
value:配置属性值。
overwrite:是否覆写。
ConfigSource是由 describe_configs()返回的配置实体的配置源。
DEFAULT_CONFIG= 5
DYNAMIC_BROKER_CONFIG= 2
DYNAMIC_DEFAULT_BROKER_CONFIG= 3
DYNAMIC_TOPIC_CONFIG= 1
STATIC_BROKER_CONFIG= 4
UNKNOWN_CONFIG= 0
PartitionsMetadata包含Kafka分区的元数据,是非实例化类,PartitionsMetadata包含的属性以下:
id :整型,分区编号。
leader :整型,分区的当前Leader,或是-1。
replicas: 整型列表,分区的副本的Broker ID的列表。
Isrs: 整型列表,分区的ISR Broker ID列表。
error:KafkaError类型,分区错误。
TopicMetadata包含Kafka Topic相关的元数据,是非实例化类,TopicMetadata包含的属性以下:
topic:字符串类型,Topic名称
partitions:字典,key是分区编号,值是PartitionMetadata对象。
error:KafkaError类型,Topic错误。
AvroConsumer(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None)
Kafka Consumer客户端,对消息进行avro模式解码,处理消息反序列化。
config:字典,配置参数,包含schema.registry.url和bootstrap.servers参数。
reader_key_schema:schema类型,消息key的读取器。
reader_value_schema:(schema类型,消息值的读取器。
AvroConsumer.poll(timeout=None)
confluent_kafka.Consumer类的poll方法的覆写,使用avro scema处理消息的反序列化。
AvroProducer(config, default_key_schema=None, default_value_schema=None, schema_registry=None)
对消息进行avro schema编码的Kafka Producer客户端,处理结构注册、消息序列化。
config:字典,配置参数,包含schema.registry.url和bootstrap.servers参数。
default_key_schema:字符串,可选,key的默认avro schema。
default_value_schema:字符串,可选,value的默认avro schema。AvroProducer.produce(**kwargs)
异步发送消息到Kafka,使用指定编码,默认使用avro schema。
topic:字符串,Topic名称。
value: object类型,要序列化的对象。
value_schema:字符串,值的Avro schema。
key:object类型,要序列化的对象。
key_schema:字符串,键Avro schema。
Message对象表示一条消费或生产的消息,或是一个事件。
应用程序必须使用Message.error()方法检查Message对象是不是一个正常的Message仍是一个错误事件。
Message类不是用户可实例化的类,Message.len()
返回消息数据的长度Message.error()
Message对象用于传播错误或事件,应用程序必须检查error()方法以肯定Message对象是不是一个正常的消息(返回None),错误或是事件(返回KafkaError对象)。Message.headers()
获取消息的头。消息头是键值对集合,消息头的键是有序的,能够重重复。
返回消息头键值对的列表。Message.key()
获取消息键。Message.offset()
消息位移Message.partition()
分区编号Message.set_headers(value)
使用新值设置Message.key字段的值。Message.set_value()
使用新值设置Message.value字段的值Message.timestamp()
获取消息的时间戳类型和时间戳。时间戳类型以下:
TIMESTAMP_NOT_AVAILABLE:Broker不支持的时间戳
TIMESTAMP_CREATE_TIME:生产者时间戳
TIMESTAMP_LOG_APPEND_TIME:Broker接收时间
返回时间戳类型和时间戳的元组。
若是返回的时间戳类型是TIMESTAMP_NOT_AVAILABLE,返回的时间戳应该被忽略。时间戳要求Kafka 0.10.0.0以上版本,客户端配置的api.version.request属性值为True。Message.topic()
返回消息的TopicMessage.Value()
返回消息数据。
TopicPartition是一种泛类型,用户保存单个分区及其相关的各类信息。一般用于为不一样的操做提供TopicPartition列表。TopicPartition(topic[, partition][, offset])
实例化TopicPartition对象。
topic:Topic名称
partition:分区编号
offset:位移
TopicPartition.error:属性值,使用KafkaError表示一个错误。
TopicPartition.offset:属性值,位移
TopicPartition.partition:属性值,分区编号
TopicPartition.topic:属性值,Topic名称
KafkaError表示Kafka错误和事件对象,不是用户实例化类,用于事件传播、错误传播、异常。KafkaError.code()
返回错误或事件的代码KafkaError.name()
返回错误或事件的枚举名称KafkaError.str()
返回事件或错误的可读字符串描述
KafkaException是对KafkaError类的封装,使用exception.args[0]能够提取KafkaError对象。
OFFSET_BEGINNING:从分区开始
OFFSET_END:分区结束位置
OFFSET_STORED:使用存储提交位移
OFFSET_INVALID:非法/默认位移。
ThrottleEvent包含限制请求的相关数据,是非实例化类,包含以下属性:
broker_name:字符串,限制请求的Broker的主机名称。
broker_id:整型,Broker ID。
throttle_time:float类型,Broker限制请求的时间,单为秒。
生产者和消费者实例的配置。
conf = {'bootstrap.servers': 'mybroker.com', 'group.id': 'mygroup', 'session.timeout.ms': 6000, 'on_commit': my_commit_callback, 'default.topic.config': {'auto.offset.reset': 'smallest'}} consumer = confluent_kafka.Consumer(conf)
Python绑定提供了其它的配置属性:
default.topic.config:属性值是顶层配置属性字典。
from confluent_kafka import Producer class KafkaProducer: def __init__(self, brokers): self.producer = Producer({'bootstrap.servers': brokers}) def sendMessage(self, topic, payloads): for payload in payloads: # Trigger any available delivery report callbacks from previous produce() calls self.producer.poll(0) # Asynchronously produce a message, the delivery report callback # will be triggered from poll() above, or flush() below, when the message has # been successfully delivered or failed permanently. self.producer.produce(topic, payload.encode('utf-8'), callback=self.delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. self.producer.flush() @staticmethod def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) if __name__ == "__main__": producer = KafkaProducer('192.168.0.105:9092') source_data = list() for x in range(1000): source_data.append("Hello kafka{}".format(x)) producer.sendMessage('test2', source_data)
进入kafka容器:docker exec -it kafka-test /bin/bash
查看Topic的消息:kafka-console-consumer.sh --bootstrap-server kafka-test:9092 --topic test2 --from-beginning
from confluent_kafka import Consumer class KafkaConsumer: def __init__(self, brokers, group): config = dict() config['bootstrap.servers'] = brokers config['group.id'] = group config['auto.offset.reset'] = 'earliest' self.consumer = Consumer(config) def subscribe(self, topics): self.consumer.subscribe(topics=topics) def pull(self): while True: msg = self.consumer.poll(1.0) if msg is None: continue if msg.error(): print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) def close(self): self.consumer.close() if __name__ == "__main__": consumer = KafkaConsumer("192.168.0.105:9092", "test_group") consumer.subscribe(["test1", "test2"]) consumer.pull() consumer.close()
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer class Message: """ Message struct """ def __init__(self, key, value): self.key = {"name": "{}".format(key)} self.value = {"name": "{}".format(value)} class KafkaAvroProducer: """ Kafka Avro Producer Wrapper class """ value_schema = "" key_schema = "" def __init__(self, brokers, schema_registry_url): config = dict() config['bootstrap.servers'] = brokers config['on_delivery'] = KafkaAvroProducer.delivery_report config['schema.registry.url'] = schema_registry_url self.avro_producer = AvroProducer(config=config, default_key_schema=KafkaAvroProducer.key_schema, default_value_schema=KafkaAvroProducer.value_schema) @classmethod def register_value_schema(cls, schema): cls.key_schema = avro.loads(schema) @classmethod def register_key_schema(cls, schema): cls.value_schema = avro.loads(schema) def send_message(self, topic, messages): for message in messages: self.avro_producer.produce(topic='test1', value=message.value, key=message.key) self.avro_producer.flush() @staticmethod def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) if __name__ == "__main__": value_schema_str = """ { "namespace": "kafka.test", "name": "value", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ] } """ key_schema_str = """ { "namespace": "kafka.test", "name": "key", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ] } """ messages = list() for i in range(1000): messages.append(Message("key{}".format(i), "Hello Confluent Kafka{}".format(i))) KafkaAvroProducer.register_key_schema(key_schema_str) KafkaAvroProducer.register_value_schema(value_schema_str) schema_registry_url = 'http://127.0.0.1:8081' avroProducer = KafkaAvroProducer("192.168.0.105:9092", schema_registry_url) avroProducer.send_message("test1", messages=messages)
from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError class KafkaAvroConsumer: def __init__(self, brokers, group, schema_registry_url): self.avro_consumer = AvroConsumer({ 'bootstrap.servers': brokers, 'group.id': group, 'auto.offset.reset': 'earliest', 'schema.registry.url': schema_registry_url}) def subscribe(self, topics): self.avro_consumer.subscribe(topics=topics) def pull_message(self): while True: try: msg = self.avro_consumer.poll(2) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.key(), ": ", msg.value()) def close(self): self.avro_consumer.close() if __name__ == "__main__": schema_registry_url = "http://127.0.0.1:8081" avroConsumer = KafkaAvroConsumer("192.168.0.105:9092", "test_group", schema_registry_url) avroConsumer.subscribe(["test1", "test2"]) avroConsumer.pull_message() avroConsumer.close()
from confluent_kafka.admin import NewTopic, AdminClient class KafkaManager: def __init__(self, broker): self.admin_client = AdminClient({'bootstrap.servers': broker}) def create_topics(self, topics, num_partition): new_topics = [NewTopic(topic, num_partitions=num_partition, replication_factor=1) for topic in topics] fs = self.admin_client.create_topics(new_topics) # Wait for each operation to finish. for topic, f in fs.items(): try: f.result() # The result itself is None print("Topic {} created".format(topic)) except Exception as e: print("Failed to create topic {}: {}".format(topic, e)) def delete_topics(self, topics): fs = self.admin_client.delete_topics(topics=topics) for topic, f in fs.items(): try: f.result() # The result itself is None print("Topic {} deleted".format(topic)) except Exception as e: print("Failed to delete topic {}: {}".format(topic, e)) if __name__ == "__main__": import time manager = KafkaManager("192.168.0.105:9092") manager.create_topics(["test3", "test4"], 1) time.sleep(3) manager.delete_topics(["test3", "test4"])