1、Kafka概述 java
Kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。 这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。 对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是经过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群机来提供实时的消费。apache
2、Kafka相关术语vim
2、Kafka下载及安装服务器
一、下载网络
二、安装less
tar zxvf kafka_2.11-0.9.0.1.tgz cd kafka_2.11-0.9.0.1
三、集群配置分布式
设定有两台服务器192.168.1.23七、192.168.1.238,两台服务器各安装有两zookeeper,端口都为2181(zookeeper再也不说明),每一个服务器都为Kafka配置3个broker。oop
3.一、server.properties配置测试
broker.id = 10 port = 9090 host.name=192.168.1.237 advertised.host.name=192.168.1.237 log.dirs=/tmp/kafka-logs/server0 zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181
说明:host.name\advertised.host.name两个参数仍是要配置为IP,不然会有各类各样的问题。网站
3.二、server1.properties配置
cp config/servier.properties config/server1.properties
vim config/server1.properties
broker.id = 11 port = 9091 host.name=192.168.1.237 advertised.host.name=192.168.1.237 log.dirs=/tmp/kafka-logs/server1 zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181
3.三、server2.properties配置
cp config/servier.properties config/server2.properties vim config/server2.properties
broker.id = 12 port = 9092 host.name=192.168.1.237 advertised.host.name=192.168.1.237 log.dirs=/tmp/kafka-logs/server2 zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181
说明:同一台服务器port、log.dirs不能相同,不一样的服务器broker.id只要在一个集群中都不能相同。
3.四、同理 另外一台服务器的server.properties,server1.properties,server2.properties的broker.id分别为:20、2一、22,port分别为:9090、909一、9092 其它:host.name=192.168.1.23八、advertised.host.name=192.168.1.238
3.五、启动
bin/kafka-server-start.sh config/server.properties & bin/kafka-server-start.sh config/server1.properties & bin/kafka-server-start.sh config/server2.properties &
3.六、监控端口
netstat -tunpl |grep 2181 netstat -tunpl |grep 9090 netstat -tunpl |grep 9091 netstat -tunpl |grep 9092
看一下这4个端口起来没有,并看一下iptables有没有加入这4个IP的启动,或要把iptables相关,不然JAVA链接不进来。
4、测试
4.一、建立Topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.237:2181 --replication-factor 3 --partitions 1 --topic testTopic
4.二、查看建立状况
bin/kafka-topics.sh --describe --zookeeper 192.168.1.237:2181 --topic testTopic
4.三、生产者发送消息
bin/kafka-console-producer.sh --broker-list 192.168.1.237:9090 --topic testTopic
4.四、消费都接收消息
bin/kafka-console-consumer.sh --zookeeper 192.168.1.237:2181 --from-beginning --topic testTopic
4.五、检查consumer offset位置
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.1.237:2181 --group testTopic
5、遇到的问题
一、运行一段时间报错
# # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: # //hs_err_pid6500.log OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
解决:
you can adjust the JVM heap size by editing kafka-server-start.sh
, zookeeper-server-start.sh
and so on:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
The -Xms
parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only have 512M, you should change the maximum heap size (-Xmx
) too:
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
I'm not sure what the minimal memory requirements of kafka in default config are - maybe you need to adjust the message size in kafka to get it to run.