狂创客圈 经典图书 : 《Netty Zookeeper Redis 高并发实战》 面试必备 + 面试必备 + 面试必备 【博客园总入口 】html
疯狂创客圈 经典图书 : 《SpringCloud、Nginx高并发核心编程》 大厂必备 + 大厂必备 + 大厂必备 【博客园总入口 】java
入大厂+涨工资必备: 高并发【 亿级流量IM实战】 实战系列 【 SpringCloud Nginx秒杀】 实战系列 【博客园总入口 】node
组件 | 连接地址 |
---|---|
windows centos 虚拟机 安装&排坑 | vagrant+java+springcloud+redis+zookeeper镜像下载(&制做详解)) |
centos mysql 安装&排坑 | centos mysql 笔记(内含vagrant mysql 镜像) |
linux kafka安装&排坑 | kafka springboot (或 springcloud ) 整合 |
Linux openresty 安装 | Linux openresty 安装 |
【必须】Linux Redis 安装(带视频) | Linux Redis 安装(带视频) |
【必须】Linux Zookeeper 安装(带视频) | Linux Zookeeper 安装, 带视频 |
Windows Redis 安装(带视频) | Windows Redis 安装(带视频) |
RabbitMQ 离线安装(带视频) | RabbitMQ 离线安装(带视频) |
ElasticSearch 安装, 带视频 | ElasticSearch 安装, 带视频 |
Nacos 安装(带视频) | Nacos 安装(带视频) |
【必须】Eureka | Eureka 入门,带视频 |
【必须】springcloud Config 入门,带视频 | springcloud Config 入门,带视频 |
【必须】SpringCloud 脚手架打包与启动 | SpringCloud脚手架打包与启动 |
Linux 自启动 假死自启动 定时自启 | Linux 自启动 假死启动 |
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也能够当作MQ系统),常见能够用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。mysql
但愿你已经在你的机器上安装了java,因此你只需使用下面的命令验证它。linux
$ java -version
若是java在您的机器上成功安装,您能够看到已安装的Java的版本。nginx
这里须要安装1.8以上版本web
第一步:用java -version于查看是否安装了jdk, 若是版本是对的, 则不须要重复安装面试
第二步:下载须要安装的linux版本redis
JDK1.8安装包在Oracle官网的下载路径为:spring
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
下载以前,须要注册Oracle帐号。
第三步: 将安装包上传到Linux,而且解压
上传到Linux服务器,建立JDK的安装目录,将jdk压缩包解压到安装目录
mkdir -p /usr/local/java
tar -zxvf /usr/local/jdk-8u121-linux-x64.tar.gz -C /usr/local/java
为了方便后续的使用,和JDK版本的升级,能够为JDK创建一个统一的软链接 /usr/jdk,命令以下:
ln -s /usr/local/java/jdk1.8.0_121/ /usr/jdk
第四步骤:检查而且升级Linux的glibc 核心源码包
在linux 上运行jdk 1.7及以上版本,会依赖到glibc 核心源码包版本,其版本必须在2.4或以上。 能够经过如下命令,查看glibc核心源码包的版本。
rpm -qi glibc
若是版本低于2.4,使用如下命令进行安装
yum install glibc.i686
第四步:配置JDK的环境变量,而且加载环境变量
编辑linux系统配置文件
vi /etc/profile
在最后一行,追加JDK的环境变量、全局类路径配置
export JAVA_HOME=/usr/local/java/jdk1.8.0_121
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
添加完成后,还须要加载修改完的linux配置文件,执行下面的指令:
source /etc/profile
第六步:查看JDK是否安装成功
使用java -version 命令,若是看到以下的输出,则表示JDK的安装,已经成功:
[root@localhost local]# java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
/work/zookeeper/zookeeper-1/bin/zkServer.sh status
具体的结果以下:
[root@localhost work]# /work/zookeeper/zookeeper-1/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /work/zookeeper/zookeeper-1/bin/../conf/zoo.cfg Mode: follower [root@localhost work]# /work/zookeeper/zookeeper-2/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /work/zookeeper/zookeeper-2/bin/../conf/zoo.cfg Mode: leader
下载地址为:
http://kafka.apache.org/downloads , 疯狂创客圈网盘也已经备好
建议下载1.1之前的版本,若是kafka_2.11-1.0.2, 安装的时候问题比较少, 而后将kafka 安装包上传到 虚拟机
如今您已经在您的机器上下载了最新版本的Kafka, 使用如下命令提取tar文件, 也就是解压缩 -
$ cd /work/ $ tar -zxvf kafka_2.11-1.0.2.tgz $ cd kafka_2.11-1.0.2 [root@localhost kafka_2.11-1.0.2]# ll total 52 drwxr-xr-x 3 root root 4096 Apr 7 2020 bin drwxr-xr-x 2 root root 4096 Apr 7 2020 config drwxr-xr-x 2 root root 4096 Nov 23 22:23 libs -rw-r--r-- 1 root root 32216 Apr 7 2020 LICENSE -rw-r--r-- 1 root root 337 Apr 7 2020 NOTICE drwxr-xr-x 2 root root 4096 Apr 7 2020 site-docs
[root@localhost ~]# cd /work/kafka_2.11-1.0.2/ [root@localhostkafka_2.11-1.0.2]# mkdir -p logs/kafka1-logs
建立环境变量 vi /etc/profile
export KAFKA_HOME=/work/kafka_2.11-1.0.2
进入kafka的config目录下,有一个server.properties,主要修改的地方以下:
broker的全局惟一编号,不能重复
broker.id=1
监听
listeners=PLAINTEXT://192.168.233.128:9092advertised.listeners=PLAINTEXT://192.168.233.128:9092
日志目录
log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs
配置zookeeper的链接(若是不是本机,须要该为ip或主机名)
zookeeper.connect=localhost:2181
vi /work/kafka_2.11-1.0.2/config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.233.128:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0
$ nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties 2>&1 &
打印的日志信息没有报错,能够看到以下信息
[root@localhost ~]# $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties [2020-11-25 21:59:42,557] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 1 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 1.0-IV0 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL listeners = PLAINTEXT://192.168.233.128:9092 log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /work/kafka_2.11-1.0.2/logs/kafka1-logs log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 1.0-IV0 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 1 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = null producer.purgatory.purge.interval.requests = 1000 queued.max.request.bytes = -1 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000 transaction.max.timeout.ms = 900000 transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr = 1 transaction.state.log.num.partitions = 50 transaction.state.log.replication.factor = 1 transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = false zookeeper.connect = localhost:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2020-11-25 21:59:42,694] INFO starting (kafka.server.KafkaServer) [2020-11-25 21:59:42,699] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2020-11-25 21:59:42,878] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2020-11-25 21:59:42,886] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,886] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,886] INFO Client environment:java.version=1.8.0_11 (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,886] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,886] INFO Client environment:java.home=/work/java/jdk1.8.0_11/jre (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,886] INFO Client environment:java.class.path=.:/work/java/jdk1.8.0_11/lib/dt.jar:/work/java/jdk1.8.0_11/lib/tools.jar:/work/kafka_2.11-1.0.2/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/argparse4j-0.7.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/commons-lang3-3.5.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-api-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-file-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-json-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-runtime-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-transforms-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/guava-20.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-api-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-locator-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-utils-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-core-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-databind-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-base-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-json-provider-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-module-jaxb-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.20.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.21.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.annotation-api-1.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-1.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.servlet-api-3.1.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.ws.rs-api-2.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-client-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-common-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-guava-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-media-jaxb-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-server-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-continuation-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-http-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-io-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-security-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-server-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlet-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlets-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-util-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jopt-simple-5.0.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-test-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-clients-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-log4j-appender-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-examples-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-tools-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/log4j-1.2.17.jar:/work/kafka_2.11-1.0.2/bin/../libs/lz4-java-1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/maven-artifact-3.5.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/metrics-core-2.2.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/osgi-resource-locator-1.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/plexus-utils-3.0.24.jar:/work/kafka_2.11-1.0.2/bin/../libs/reflections-0.9.11.jar:/work/kafka_2.11-1.0.2/bin/../libs/rocksdbjni-5.7.3.jar:/work/kafka_2.11-1.0.2/bin/../libs/scala-library-2.11.12.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-api-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-log4j12-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/snappy-java-1.1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/validation-api-1.1.0.Final.jar:/work/kafka_2.11-1.0.2/bin/../libs/zkclient-0.10.jar:/work/kafka_2.11-1.0.2/bin/../libs/zookeeper-3.4.10.jar (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,887] INFO Client environment:user.dir=/root (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,888] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@481a996b (org.apache.zookeeper.ZooKeeper) [2020-11-25 21:59:42,991] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2020-11-25 21:59:42,999] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2020-11-25 21:59:43,012] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2020-11-25 21:59:43,086] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1006049103f0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2020-11-25 21:59:43,094] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2020-11-25 21:59:44,369] INFO Cluster ID = 4MOhHbbzS42FdvekFfLwTQ (kafka.server.KafkaServer) [2020-11-25 21:59:44,381] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2020-11-25 21:59:44,412] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2020-11-25 21:59:44,429] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2020-11-25 21:59:44,442] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2020-11-25 21:59:44,541] INFO Loading logs. (kafka.log.LogManager) [2020-11-25 21:59:44,547] INFO Logs loading complete in 6 ms. (kafka.log.LogManager) [2020-11-25 21:59:45,086] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2020-11-25 21:59:45,095] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2020-11-25 21:59:45,394] INFO Awaiting socket connections on 192.168.233.128:9092. (kafka.network.Acceptor) [2020-11-25 21:59:45,399] INFO [SocketServer brokerId=1] Started 1 acceptor threads (kafka.network.SocketServer) [2020-11-25 21:59:45,422] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-11-25 21:59:45,423] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-11-25 21:59:45,427] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-11-25 21:59:45,438] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler) [2020-11-25 21:59:45,646] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2020-11-25 21:59:45,648] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-11-25 21:59:45,651] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-11-25 21:59:45,658] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-11-25 21:59:45,698] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator) [2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator) [2020-11-25 21:59:45,705] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-11-25 21:59:45,718] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2020-11-25 21:59:45,741] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2020-11-25 21:59:45,771] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2020-11-25 21:59:45,774] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2020-11-25 21:59:45,807] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2020-11-25 21:59:45,811] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2020-11-25 21:59:45,812] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(192.168.233.128,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2020-11-25 21:59:45,813] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2020-11-25 21:59:45,893] INFO [SocketServer brokerId=1] Started processors for 1 acceptors (kafka.network.SocketServer) [2020-11-25 21:59:45,894] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser) [2020-11-25 21:59:45,894] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser) [2020-11-25 21:59:45,895] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
可是并不能保证Kafka已经启动成功,输入jps查看进程,若是能够看到Kafka进程,表示启动成功
[hadoop@Master ~]$ jps 9173 Kafka 9462 Jps 8589 QuorumPeerMain [hadoop@Master ~]$ jps -m 9472 Jps -m 9173 Kafka /opt/kafka/config/server.properties 8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg
[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test
参数说明:
– zookeeper:指定kafka链接zk的链接url,该值和server.properties文件中的配置项{zookeeper.connect}同样
这里为 192.168.233.128:2181
– replication-factor:指定副本数量
– partitions:指定分区数量
– topic:主题名称
[root@localhost ~]# $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".
[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:2181
结果以下;
[root@localhost ~]# $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:9092 test
[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092 --topic test
注意,命令中的端口,是kafka的端口
执行上述命令后,就会在控制台等待键入消息体,直接输入消息值(value)便可,每行(以换行符分隔)表示一条消息,以下所示。
>Hello Kafka! >你好 kafka!
正常状况,每次回车表示触发“发送”操做,回车后可直接使用“Ctrl + c”退出生产者控制台,再使用 kafka-console-consumer.sh 脚本验证本次的生产状况。
[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:9092 --topic test --from-beginning
注意:
1 命令中的端口,是zookeeper 的端口
–from-beginning参数若是有表示从最开始消费数据,旧的和新的数据都会被消费,而没有该参数表示只会消费新产生的数据
发送端的执行效果
[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092 --topic test >aaa bbbb >ccc fff >Hello Kafka! >你好 kafka! >
接收端的执行效果
[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:2181 --topic test --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. aaa bbbb ccc fff Hello Kafka! 你好 kafka!
config/server.properties
复制三份,分别命名为server1.properties
,server2.properties
,server3.properties
。server1.properties
- - broker.id=1 - listeners=PLAINTEXT://:9092 - advertised.listeners=PLAINTEXT://192.168.233.128:9092(其中192.168.233.128是我本机的ip) - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
server2.properties
- - broker.id=2 - listeners=PLAINTEXT://:9093 - advertised.listeners=PLAINTEXT://192.168.233.128:9093(其中192.168.233.128是我本机的ip) - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka2-logs - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
server3.properties
- - broker.id=3 - listeners=PLAINTEXT://:9094 - advertised.listeners=PLAINTEXT://192.168.233.128:9094(其中192.168.233.128是我本机的ip) - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka3-logs - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server3.properties > /work/kafka_2.11-1.0.2/logs/kafka3-logs/startup.log 2>&1 & nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server2.properties > /work/kafka_2.11-1.0.2/logs/kafka2-logs/startup.log 2>&1 & nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server1.properties > /work/kafka_2.11-1.0.2/logs/kafka1-logs/startup.log 2>&1 &
一个消息系统负责将数据从一个应用传递到另一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。可是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据以后,该条数据则从消息队列中删除。该模式即便有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图以下:
生产者发送一条消息到queue,只有一个消费者能收到。
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不一样的是,消费者能够订阅一个或多个topic,消费者能够消费该topic中全部的数据,同一条数据能够被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图以下:
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
如上图所示,发布订阅模式是一个基于消息送的消息传送模型,改模型能够有多种不一样的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(相似微信公众号)。
大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。
在深刻理解Kafka以前,先介绍一下Kafka中的术语。下图展现了Kafka的相关术语以及之间的关系:
上图中, 一个topic配置了3个partition。集群中的每一个broker存储一个或多个partition。
Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id刚好相同。
若是一个topic的副本数为3,那么Kafka将在集群中为每一个partition建立3个相同的副本。多个producer和consumer可同时生产和消费数据。
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据。若是某topic有N个partition,集群有N个broker,那么每一个broker存储该topic的一个partition。
若是某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
若是某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽可能避免这种状况的发生,这种状况容易致使Kafka集群数据不均衡。
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不一样Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic便可生产或消费数据而没必要关心数据存于何处)
相似于数据库的表名
topic中的数据分割为一个或多个partition。每一个topic至少有一个partition。每一个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不一样partition间的数据丢失了数据的顺序。若是topic有多个partition,消费数据时就不能保证数据的顺序。在须要严格保证消息的消费顺序的场景下,须要将partition数目设为1。
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也能够指定数据存储的partition。
消费者能够从broker中读取数据。消费者能够消费多个topic中的数据。
每一个Consumer属于一个特定的Consumer Group(可为每一个Consumer指定group name,若不指定group name则属于默认的group)。
每一个partition有多个副本,其中有且仅有一个做为Leader,Leader是当前负责数据的读写的partition。
Follower跟随Leader,全部写请求都经过Leader路由,数据变动会广播给全部Follower,Follower与Leader保持数据同步。若是Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,从新建立一个Follower。
RabbitMQ是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它自己支持MQ功能,因此彻底能够当作一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操做,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不一样大小的数据。实验代表:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而若是数据大小超过了10K,Redis则慢的没法忍受;出队时,不管数据大小,Redis都表现出很是好的性能,而RabbitMQ的出队性能则远低于Redis。
ZeroMQ号称最快的消息队列系统,尤为针对大吞吐量的需求场景。ZeroMQ可以实现RabbitMQ不擅长的高级/复杂的队列,可是开发人员须要本身组合多种技术框架,技术上的复杂度是对这MQ可以应用成功的挑战。ZeroMQ具备一个独特的非中间件的模式,你不须要安装和运行一个消息服务器或中间件,由于你的应用程序将扮演这个服务器角色。你只须要简单的引用ZeroMQ程序库,可使用NuGet安装,而后你就能够愉快的在应用程序之间发送消息了。可是ZeroMQ仅提供非持久性的队列,也就是说若是宕机,数据将会丢失。其中,Twitter的Storm 0.9.0之前的版本中默认使用ZeroMQ做为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty做为传输模块)。
ActiveMQ是Apache下的一个子项目。 相似于ZeroMQ,它可以以代理人和点对点的技术实现队列。同时相似于RabbitMQ,它少许代码就能够高效地实现高级应用场景。
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具备如下特性:快速持久化,能够在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka经过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个很是轻量级的消息系统,除了性能很是好以外,仍是一个工做良好的分布式系统。
package test; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class SimpleProducer { private static Producer<Integer,String> producer; private final Properties props=new Properties(); public SimpleProducer(){ //定义链接的broker list props.put("metadata.broker.list", "192.168.1.216:9092"); //定义序列化类 Java中对象传输以前要序列化 props.put("serializer.class", "kafka.serializer.StringEncoder"); producer = new Producer<Integer, String>(new ProducerConfig(props)); } public static void main(String[] args) { SimpleProducer sp=new SimpleProducer(); //定义topic String topic="mytopic"; //定义要发送给topic的消息 String messageStr = "This is a message"; //构建消息对象 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr); //推送消息到broker producer.send(data); producer.close(); } }
kafka单机环境端口就是kafka broker端口9092,这里定义topic为mytopic固然能够本身随便定义不用考虑服务器是否建立,对于发送消息的话上面代码是简单的单条发送,若是发送数据量很大的话send方法屡次推送会耗费时间,因此建议把data数据按必定量分组放到List中,最后send一下AarrayList便可,这样速度会大幅度提升
package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; public SimpleHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); //定义链接zookeeper信息 props.put("zookeeper.connect", zookeeper); //定义Consumer全部的groupID props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); //定义订阅topic数量 topicCount.put(topic, new Integer(1)); //返回的是全部topic的Map Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); //取出咱们要须要的topic中的消息流 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Topic :" + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); } public static void main(String[] args) { String topic = "mytopic"; SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.233.128:2181/kafka", "testgroup", topic); simpleHLConsumer.testConsumer(); } }
消费者代码主要逻辑就是对生产者发送过来的数据作简单处理和输出,注意这里的地址是zookeeper的地址而且包括节点/kafka,topic名称要一致
开发一个通用的kafka模块发送和接收模块,其余的模块,只须要调用该kafka模块统一的发送接口和开发接收逻辑便可。
能够经过数据库,进行 微服务Provider、 订阅主题 topic、订阅组 group 配置。 服务在消息后,自动就那些配置的接收类的回调。
库表的配置以下:
该kafka模块处于 疯狂创客圈的 Crazy-SpringCloud脚手架中, 模块名称为 base-kafka ,启动以后的swagger 界面以下:
能够经过该接口发送某个topic的消息,若是在数据库里配置了订阅关系,若是 provider-name( 微服务名称) 订阅了 test 主题,而且配置了消息的回调类和方法, 那么就会就会进行消息的消费。
消费的界面以下:
来看看生产者和消费者、主题和组之间的关系:
若是看到这张图你很懵逼,木有关系!咱们先来分析相关概念
Producer:Producer即生产者,消息的产生者,是消息的入口。
kafka cluster:
Broker:Broker是kafka实例,每一个服务器上有一个或多个kafka的实例,咱们姑且认为每一个broker对应一台服务器。每一个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,能够理解为消息的分类,kafka的数据就保存在topic。在每一个broker上均可以建立多个topic。
Partition:Topic的分区,每一个topic能够有多个分区,分区的做用是作负载,提升kafka的吞吐量。同一个topic在不一样的Partition分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
Replication:每个分区都有多个副本,副本的做用是作备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不一样的机器,同一机器对同一个分区也只可能存放一个副本(包括本身)。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:咱们能够将多个消费组组成一个消费者组,在kafka的设计中,同一个Partition分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者能够消费同一个topic的不一样分区的数据,这也是为了提升kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
要点1:同一个topic在不一样的Partition分区的数据是不重复的
要点2:同一个Partition分区的数据只能被消费者组中的某一个消费者消费
上面介绍了kafka的基础架构及基本概念,不知道你们看完有没有对kafka有个大体印象,若是对还比较懵也不要紧!咱们接下来再结合上面的结构图分析kafka的工做流程,最后再回来整个梳理一遍我相信你会更有收获!
咱们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候永远的找leader,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?咱们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!须要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,因此保证同一分区内的数据是有序的!写入示意图以下:
上面说到数据会写入到不一样的分区,那kafka为何要作分区呢?相信你们应该也能猜到,分区的主要目的是:
一、 方便扩展。由于一个topic能够有多个partition,因此咱们能够经过扩展机器去轻松的应对日益增加的数据量。
二、 提升并发。以partition为读写单位,能够多个消费者同时消费数据,提升了消息的处理效率。
熟悉负载均衡的朋友应该知道,当咱们向某个服务器发送请求的时候,服务端可能会对请求作一个负载,将流量分发到不一样的服务器,那在kafka中,若是某个topic有多个partition,producer又怎么知道该将数据发往哪一个partition呢?kafka中有几个原则:
一、 partition在写入的时候能够指定须要写入的partition,若是有指定,则写入对应的partition。
二、 若是没有指定partition,可是设置了数据的key,则会根据key的值hash出一个partition。
三、 若是既没指定partition,又没有设置key,则会轮询选出一个partition。
保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是经过ACK应答机制!在生产者向队列写入数据的时候能够设置参数来肯定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。
0表明producer往集群发送数据不须要等到集群的返回,不确保消息发送成功。安全性最低可是效率最高。
1表明producer往集群发送数据只要leader应答就能够发送下一条,只确保leader发送成功。
all表明producer往集群发送数据须要全部的follower都完成从leader的同步才会发送下一条,确保leader发送成功和全部的副本都完成备份。安全性最高,可是效率最低。
最后要注意的是,若是往不存在的topic写数据,能不能写入成功呢?kafka会自动建立topic,分区和副本的数量根据默认配置都是1。
Producer将数据写入kafka后,集群就须要对数据进行保存了!kafka将数据保存在磁盘,可能在咱们的通常的认知里,写入磁盘是比较耗时的操做,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
Partition 结构
前面说过了每一个topic均可以分为一个或多个partition,若是你以为topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每一个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
如上图,这个partition有三组segment文件,每一个log文件的大小是同样的,可是存储的message数量是不必定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。
Message结构
上面说到log文件就实际是存储message的地方,咱们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!咱们重点须要知道的是下面三个:
一、 offset:offset是一个占8byte的有序id号,它能够惟一肯定每条消息在parition内的位置!
二、 消息大小:消息大小占用4byte,用于描述消息的大小。
三、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不同。
存储策略
不管消息是否被消费,kafka都会保存全部的消息。那对于旧数据有什么删除策略呢?
一、 基于时间,默认配置是168小时(7天)。
二、 基于大小,默认配置是1073741824。
须要注意的是,kafka读取特定消息的时间复杂度是O(1),因此这里删除过时的文件并不会提升kafka的性能!
消息存储在log文件后,消费者就能够进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是找leader去拉取。
多个消费者能够组成一个消费者组(consumer group),每一个消费者组都有一个组id!同一个消费组者的消费者能够消费同一topic下不一样分区的数据,可是不会组内多个消费者消费同一分区的数据!!!是否是有点绕。咱们看下图:
图示是消费者组内的消费者小于partition数量的状况,因此会出现一个消费者消费多个partition数据的状况,消费的速度也就不及只处理一个partition的消费者的处理速度!
若是是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?
上面已经提到过不会出现这种状况!注意:多出来的消费者不消费任何partition的数据。因此在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!,至少比partition多。
partition划分为多组segment,每一个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体……咱们屡次提到segment和offset,查找消息的时候是怎么利用segment+offset配合查找的呢?假如如今须要查找一个offset为368801的message是什么样的过程呢?咱们先看看下面的图:
一、 先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
二、 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,咱们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,因此这里要查找的相对offset为5)。因为该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,因此直接找相对offset为5的索引找不到,这里一样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,因此找到的是相对offset为4的这个索引。
三、 根据找到的相对offset为4的索引肯定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。
这套机制是创建在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到须要处理的数据进行处理了。
那每一个消费者又是怎么记录本身消费的位置呢?
在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易致使重复消费,且性能很差!在新的版本中消费者消费到的offset已经直接维护在kafk集群的__consumer_offsets这个topic中!
疯狂创客圈 - Java高并发研习社群,为你们开启大厂之门