Ref: kafka中文教程html
做为消息中间件,其余组件先跟Kafka交流,而后再有Kafka统一跟Hadoop沟通。python
producer
:生产者,就是它来生产“鸡蛋”的。sql
consumer
:消费者,生出的“鸡蛋”它来消费。apache
topic
:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不一样的生产者生产出来的“鸡蛋”,消费者就能够选择性的“吃”了。bootstrap
broker
:就是篮子了。ubuntu
Ref: 大牛总结分享:大数据技术Storm 区别 Kafka 哪些场景更适合centos
可见更多的是“集成合做关系”。服务器
要与kafka文件夹中自带的zk的版本要同样:https://archive.apache.org/dist/zookeeper/zookeeper-3.4.8/app
Ref: ZooKeeper的安装与部署(集群模式)dom
Ref: How to Setup Apache ZooKeeper Cluster on Ubuntu 18.04 LTS(单机模式下实践没问题)
ipv6的坑,记得直接关掉就行了。
$ sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1 $ sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
Ref: Spark2.1.0+入门:Apache Kafka做为DStream数据源(Python版)
Ref: Kafka集群部署 (守护进程启动)
Ref: centos7下kafka集群安装部署
Ref: Zookeeper+Kafka集群部署(已测、可用)
消息具备类别 (Topic) 属性。一个 topic 的消息可能保存在一个活多个broker上。
分区 (Partition) 是物理上的概念,每一个 topic 包含一个或多个 partition。
Producer --> Kafka Broker --> Consumer (Spark Streaming)
每个消费者只属于某一个组 (Consumer Group),没指定就在默认的组。
Kafka的运行依赖于 ZooKeeper,其 "注册信息" 都在其中。
因此,先启动 ZooKeeper,再启动 Kafka。
参考资料:Spark2.1.0+入门:Apache Kafka做为DStream数据源(Python版)
不一样的版本兼容不一样的spark,例如:Kafka_2.11 - 0.8.2.2.tgz,2.11是scala版本号。
根据spark配置Kafka,过程在此略,详见 “课时64”。
记得下载对应的jar包以及/usr/local/kafka/libs下的内容,一并拷贝到/usr/local/spark/jars/kafka子目录。
在spark-env.sh设置:
# 打开第一个终端,先启动zookeeper $ cd /usr/local/kafka $ ./bin/zookeeper-server-start.sh config/zookeeper.properties # 打开第二个终端,再启动kafka $ cd /usr/local/kafka $ bin/kafka-server-start.sh config/server.properties # 打开第三个终端 # 建立一个topic:wordsendertest $ cd /usr/local/kafka $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \ >--replication-factor 1 --partitions 1 --topic wordsendertest # 列出全部建立的Topic,验证是否建立成功 $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181 # 建立生产者给topic扔数据,能够在当前终端输入一些测试文字 ./bin/kafka-console-producer.sh --broker-list localhost:9092 \ > --topic wordsendertest # 打开第四个终端 # 建立消费者接收topic的数据,接收到“以上输入的文字” $ cd /usr/local/kafka $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \ > --topic wordsendertest --from-beginning
将以上 ”第四个终端“ 换为以下自定义的 ”消费者程序“。
localhost:9092 ----> Kafka 做为数据源 ----> localhost:2181
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) exit(-1)
# 准备参数 sc = SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 1) zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
# 至此,kafka做为数据源,开始“转换”
lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
运行程序:
/usr/local/spark/bin/spark-submit ./KafkaWordCount.py localhost:2181 wordsendertest
生产者每0.1秒生成2个单词并写入此topic。
消费者订阅 wordcount-topic 收到单词,并每隔8秒钟进行一次统计。
统计结果发送给另外一个主题:wordcount-result-topic。
# (1) 启动 Zookeeper 服务 bin/zookeeper-server-start.sh config/zookeeper.properties # (2) 启动 Kafka 服务 bin/kafka-server-start.sh config/server.properties # (3) 监督"输入"终端 bin/kafka-console-consumer.sh -bootstrap-server localhost:9092 --topic wordcount-topic # (4) 监督"输出"终端 bin/kafka-console-consumer.sh -bootstrap-server localhost:9092 --topic wordcount-result-topic
# spark_ss_kafka_producer.py import string import random import time from kafka import KafkaProducer if __name__ = "__main__": # broker服务器的位置9092 producer = KafkaProducer(bootstrap_servers=['localhost:9092']) while True: s2 = (random.choice(string.ascii_lowercase) for _ in range(2)) # 随机生成两个小写字母 word = ''.join(s2) # 拼接起来 value = bytearray(word, 'utf-8') # 字节序列 producer.send('wordcount-topic', value=value).get(timeout=10) # 生产者向该主题发送出去,循环发送 time.sleep(0.1)
运行代码
sudo apt-get install pip3 sudo pip3 install kafka-python python3 spark_ss_kafka_producer.py
从 topic: wordcount-topic 得到消息,而后再往 topic: wordcount-result-topic 中投入消息。
# spark_ss_kafka_consumer.py from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSessoin.builder.appName("StructuredKafkaWordCount").getOrCreate() spark.sparkContext.setLogLevel('WARN') lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", 'wordcount-topic').load().selectExpr("CAST(value ASSTRING)") # 转化为字符串类型 wordCounts = lines.groupBy("value").count()
运行代码
/usr/local/spark/bin/spark-submit \
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 > spark_ss_kafka_consumer.py
End.