归属公司
Apache Kafka
软件语言:scalahtml
# 查看防火墙状态 systemctl status firewalld # 关闭防火墙 service firewalld stop # 启动防火墙 service firewalld start
# 检查java的命令 java -version
# 获取kafka最新安装包,这边使用的是镜像地址,能够去官方网站得到最新地址 wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz # 解压程序 tar -xzf kafka_2.11-0.11.0.1.tgz # 进入目录 cd kafka_2.11-0.11.0.1
# 配置服务器zk地址 zookeeper.connect=localhost:2181 # 配置内网绑定关系 listeners=PLAINTEXT://<your.ip>:9092 # 配置外网绑定关系 advertised.listeners=PLAINTEXT://your.host.name:9092 # 配置kafka使用内存kafka-server-start.sh # 在start中加入jvm的启动参数,默认是1G export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
kafka须要使用zookeeper,因此你首先须要启动一个zookeeper的服务,若是你没有的话,就使用kafka内置的脚原本启动一个单节点的zookeeper的实例
加入& 使进程常驻在内存中
默认端口:9092
默认为localhost,若是不配置对应的服务器ip的话java
#执行快速启动zookeeper,经过内置的zookeeper进行启动,若是要zookeeper服务器的话吗,须要再server.properties的配置文件里面加入zookeeper.connect = 你的服务器内网ip:2181 bin/zookeeper-server-start.sh config/zookeeper.properties & [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
而后启动kafka的服务器:经过配置文件启动kafkagit
# 启动kafka # server.properties的配置文件中有一个项目: host.name须要配置成为你的内网服务器ip地址,访问的时候经过外网环境经过外网ip地址访问,内网环境经过内网地址访问 bin/kafka-server-start.sh config/server.properties & [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
# 经过脚本命令建立一个主题为test的,而且使用的zookeeper的地址为localhost的 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
确认一下这个topic是否含有完毕github
# 经过zookeeper的地址来访问对应的topics中的主题列表 bin/kafka-topics.sh --list --zookeeper localhost:2181 test
# 启动客户端推送对应的消息到服务器的kafka提供的端口 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
# 启动客户端获取对应服务端信息的地址来消费消息,使用pull的方式,每间隔0.1s进行一次服务器获取 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
这个是在服务器本地测试场景:
切换成服务器场景的状况下,须要首先在将server.properties的配置文件中的
配置方式修正为服务器的内网ip地址,对外提供的外网ip地址会进行映射,映射到最终的内网地址中去
新版的只须要修改以下两个配置:参考文章
http://blog.csdn.net/chenxun_2010/article/details/72626618
zookeeper.connect = localhost:9092
listeners = PLAINTEXT://ip:9092apache
服务器kafka版本:2.11- 0.11.1
客户端kafka版本:0.11.0.1bootstrap
因此去maven中寻找对应的版本的jar包进行使用
api
org.apache.kafka
kafka-clients
0.11.0.1
包结构:
clients
admin
consumer:KafkaProducer(实现类)
producer:KafkaConsumer(实现类)
otherclass
common
server.policy(服务公用)
在具体实现类里面源码中有启动的示例代码再类的头部注释中服务器
遇到了一个问题:
Failed to load class "org.slf4j.impl.StaticLoggerBinder".框架
kafka默认引用再java类中在程序中引入了org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar的jar包
引入对应的实现日志的框架使用logback框架jvm
再pom的配置文件中加入对应的依赖
而且debug的日志等级很烦人,因此就加入了配置文件
参考文章:http://www.cnblogs.com/h--d/p/5668152.html
加入logback的库依赖引用
至少须要引用三个模块:
logback-classic
logback-core
logback-access
这三个模块的内容
其中参考了这篇文章以为很详细,因此就提供出来:
http://www.cnblogs.com/warking/p/5710303.html
使用一个新的工程进行测试,一个启动消费者,一个启动生产者,编写对应的代码