[toc]html
$ pip install pykafka $ conda install -c conda-forge pykafka 注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902) 该做者在https://github.com/dpkp/kafka-python/pull/1152 这个推送增长了kerberos支持
java或者文件中 | 对应python参数 | 描述 |
---|---|---|
security.protocol | security_protocol | 安全协议 |
kerberos.domain.name | sasl_kerberos_domain_name | 域名 |
sasl.kerberos.service.name | sasl_kerberos_service_name | 服务名 |
sasl.enabled.mechanisms&sasl.mechanism.inter.broker.protocol | sasl_mechanism | 认证机制 |
principal | sasl_plain_username | 用户租户名称 |
配置通常在consumer.properties中 拆解一个Principal: xianglei/dmp-master1.hadoop@HADOOP.COM 一个完整的Principal由3个部分构成。 用户名/FQDN(Full Quafilied Domain Name)的主机名@REALM(受保护的域,全大写) 固然这个用户名须要是Linux下存在的用户 FQDN全限定域名,就是必定要带上hostname.domain这种形式,固然,若是你的主机并无给出domain,那么不写域名也能够。反正就是要所有的主机名加域名(若是存在域名的话)。但实际上,在Kerberos里面,这个并不称之为主机名,而是叫作Instance,实例名,他能够不是任何服务器的主机名称,可是便于理解和认识,咱们仍是先把他当初主机名来看待吧。 REALM,受到Kerberos保护的域名称,就是一类或一组受到Kerberos保护服务的服务器集合,你能够想象成Windows里面的域。因为一个KDC能够同时保护多个域,好比你能够在一个KDC上既保护HADOOP服务器组,也保护MYSQL服务器组,因此咱们一般会使用域名来进行区别。 若是你的hostname里面使用了domain name,那么你必须在Principal的第二部分写完整,不然KDC将没法验证主机的合法性,加密的tgt是要带着主机名信息的。 还有,特别须要注意的是,这里面第二部分的domain(域名),第三部分的realm(域),在中文里的字是同样,可是英文单词彻底不一样,他们所表达的含义也彻底不一样。因为一般Kerberos的Realm部分也会写成域名的形式,因此就会让人迷惑,而实际上,你把realm部分理解成windows里面的workgroup或者home这种域也是能够的。名称能够随便起,不必定用你的真实域名。只是个区分不一样服务集合的代号。
资料python
我是用来链接华为kafka的,测试能够经过kerberos验证。具体代码就不贴了,引用一下其余做者的,感谢 #coding=utf8 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=["xxxx:9200"], security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka") print "connect success." future = producer.send("xxxx", "test") result = future.get(timeout=60) print "send success."
原贴git
kafka简介(摘自百度百科) 1、简介: 详见:https://blog.csdn.net/Beyond_F4/article/details/80310507 2、安装 详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689 3、按照官网的样例,先跑一个应用 一、生产者: from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此处ip能够是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ] for i in range(3): msg = "msg%d" % i producer.send('test', msg) producer.close() 二、消费者(简单demo): from kafka import KafkaConsumer consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 启动后生产者、消费者能够正常消费。 三、消费者(消费群组) from kafka import KafkaConsumer consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['172.21.10.136:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 启动多个消费者,只有其中能够能够消费到,知足要求,消费组能够横向扩展提升处理能力 四、消费者(读取目前最先可读的消息) from kafka import KafkaConsumer consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['172.21.10.136:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) auto_offset_reset:重置偏移量,earliest移到最先的可用消息,latest最新的消息,默认为latest 源码定义:{'smallest': 'earliest', 'largest': 'latest'} 五、消费者(手动设置偏移量) from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092']) print consumer.partitions_for_topic("test") #获取test主题的分区信息 print consumer.topics() #获取主题列表 print consumer.subscription() #获取当前消费者订阅的主题 print consumer.assignment() #获取当前消费者topic、分区信息 print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量 consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 六、消费者(订阅多个主题) from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test','test0')) #订阅要消费的主题 print consumer.topics() print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 七、消费者(手动拉取消息) from kafka import KafkaConsumer import time consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test','test0')) while True: msg = consumer.poll(timeout_ms=5) #从kafka获取消息 print msg time.sleep(1) 八、消费者(消息挂起与恢复) from kafka import KafkaConsumer from kafka.structs import TopicPartition import time consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test')) consumer.topics() consumer.pause(TopicPartition(topic=u'test', partition=0)) num = 0 while True: print num print consumer.paused() #获取当前挂起的消费者 msg = consumer.poll(timeout_ms=5) print msg time.sleep(2) num = num + 1 if num == 10: print "resume..." consumer.resume(TopicPartition(topic=u'test', partition=0)) print "resume......" pause执行后,consumer不能读取,直到调用resume后恢复。
原贴github
"""生产者""" def produce(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) for i in range(4): msg = "msg%d" %i producer.send(self.topic,key=str(i),value=msg) producer.close() """一个消费者消费一个topic""" def consume(self): #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers) print consumer.partitions_for_topic(self.topic) #获取test主题的分区信息 print consumer.topics() #获取主题列表 print consumer.subscription() #获取当前消费者订阅的主题 print consumer.assignment() #获取当前消费者topic、分区信息 print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量 consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) #重置偏移量,从第1个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value)) """一个消费者订阅多个topic """ def consume2(self): consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) #订阅要消费的主题 print consumer.topics() print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) """消费者(手动拉取消息)""" def consume3(self): consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) while True: message = consumer.poll(timeout_ms=5) #从kafka获取消息 if message: print message time.sleep(1)
原贴bootstrap
#coding=utf-8 from pykafka import KafkaClient import codecs import logging logging.basicConfig(level = logging.INFO) client = KafkaClient(hosts = "172.16.82.163:9091") #生产kafka数据,经过字符串形式 def produce_kafka_data(kafka_topic): with kafka_topic.get_sync_producer() as producer: for i in range(4): producer.produce('test message' + str(i ** 2)) #消费kafka数据 def consume_simple_kafka(kafka_topic, timeout): consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms = timeout) for message in consumer: if message is not None: print message.offset, message.value #消费同一份kafka topic时,建议使用 get_balanced_consumer(),暂时不能使用 #问题:kazoo.handlers.threading.KazooTimeoutError: Connection time-out def consume_kafka(kafka_topic, zkhost): balanced_consumer = kafka_topic.get_balanced_consumer( consumer_group = "testgroup", auto_commit_enable = False, zookeeper_connect = zkhost, #zookeeper = zkhost, zookeeper_connection_timeout_ms = 6000, consumer_timeout_ms = 10000, ) for message in balanced_consumer: if message is not None: print message.offset, message.value #经过文件,往kafka刷数据 def produce_kafka_file(filename, kafka_topic): with kafka_topic.get_sync_producer() as producer: with codecs.open(filename, "r", "utf8") as rf: for line in rf: line = line.strip() if not line: continue producer.produce(line) #=========================================================== topic = client.topics["mytest"] #在consumer_timeout_ms内没有任何信息返回,则中断接受消息 cosumer = topic.get_simple_consumer(consumer_timeout_ms = 10000) cnt = 0 for message in cosumer: if message is not None: print message.offset, message.value cnt += 1 print cnt
kafka实战教程windows