[Kafka]Kafka基础

Kafka

Kafka介绍

kafka最初是由linkedin开发的,是一个分布式,分区的,多副本的,基于Zookeeper协调的分布式日志系统,固然它也能够当作消息队列来使用。
常见的能够用于Web,nginx日志,访问日志,消息服务等等。
因此kafka的应用场景主要有:日志收集系统和消息系统。html

特色

1,解耦

消费者生产者之间不想相互耦合,只要都遵循一样的接口约束就行。nginx

2,冗余(副本)

这里主要是为了保证数据不会丢失,许多消息队列采用"插入-获取-删除"的模式,在把一个消息从队列中年删除以前,须要系统明确指出这个消息已经被处理完毕,从而确保数据被安全地保存直到使用完毕。web

3,扩展性

支持扩展spring

4,灵活性,峰值处理能力

在访问量剧增的状况下,使用消息队列可以使得关键组件顶住忽然的访问压力,使得应用仍然须要继续发挥做用。apache

5,可恢复性

系统的一部分组件失效时,不会影响整个系统,即便一个处理消息的线程挂掉,加入队列中的消息也能够在系统恢复后被处理。json

6,顺序保证

Kafka保证一个Partition中的消息的有序性。bootstrap

7,缓冲

经过一个缓冲层来帮助任务最高效率地执行,写入队列的处理尽量地传递。缓存

8,异步通讯

采用异步通讯机制,容许先把消息放入队列,但并不当即处理,而是在须要的时候再去用它们。安全

Kafka中的几个概念

avatar

1,Broker

Kafka集群包括一个或者多个服务器,服务器节点称为broker。broker存储topic的数据,若是某个topic有N个partition,集群有N个broker,那么每一个broker存储该topic的一个partition,若是某个topic有N个partition,集群有N+m个broker,那么N个broker存储该topic中的一个partition,剩下的m个broker不存储该topic的partition数据。若是某个topic的broker数量比partition的数量少,那么一个broker可能会存储多个该topic的partition。
在实际生产中应该尽可能避免这种状况发生,由于很容易形成kafka集群数据不均衡。服务器

2,Topic

每条发布到kafka的集群消息都有一个类别,这个类别称为topic。

3,Partition

Topic的数据分割为一个或者多个partition,每一个partition中的数据使用过个segment文件存储。partition的数据是有序的,不一样partition间的数据丢失了数据的顺序,若是topic有多个partition,消费数据就不能保证数据的顺序,在须要严格保证消息的消息顺序的场景下,须要将partition数目须要1。

4,Producer

生产者

5,Consumer

消费者

6,Consumer Group

每一个Consumer属于一个特定的ComsumerGroup,可为每一个Consumer指定GroupName,不指定则为默认。

7,Leader

每一个Partition有多个副本,其中有且仅有一个Leader,即负责读写数据的Partition。

8,Follower

Follower跟随Leader,全部的写请求都经过Leader路由,数据变动会广播到全部的Follower。若是Leader失效,那么Follower中会选举出一个新的Leader。

入门demo

本想继续写一写kafka的架构,高可用设计和其中的一些特性的,可是我这两天在看这些东西的时候发现这些仍是在一个demo的基础上再去学习比较好,因此这些留在下一篇写了。

前面的准备

安装kafka和Zookeeper,kafka运行须要Zookeeper来支持,来进行心跳等机制,因此在运行kafka以前安装好Zookeeper。网上帖子不少,就不细写了,可是我这里Zookeeper和kafka都是单实例的,并无配置集群。

建工程

IDEA用SpringInitializer创建一个大工程,而后创建KafkaConsumer和KafkaProducer两个module就好了。

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>
    </dependencies>

配置

生产者

server.port=8099

# kafka地址
spring.kafka.bootstrap-servers=127.0.0.1:9092
#写入失败的时候的重试次数
spring.kafka.producer.retries=0

# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
# producer积累数据一次性发送,缓存大小到达这个值就发送数据
spring.kafka.producer.buffer-memory=33554432

#acks = 0 若是设置为零,则生产者将不会等待来自服务器的任何确认,该记录将当即添加到套接字缓冲区并视为已发送。在这种状况下,没法保证服务器已收到记录,而且重试配置将不会生效(由于客户端一般不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待全部副本服务器的彻底确认便可作出回应,在这种状况下,若是leader在确认记录后当即失败,但在将数据复制到全部的副本服务器以前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这至关于acks = -1的设置。
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

消费者

server.port=8090

# kafka地址
spring.kafka.bootstrap-servers=127.0.0.1:9092
# 自动提交的时间间隔
spring.kafka.consumer.auto-commit-interval=1S
# 指定消费者在读取一个没有偏移量的分区或者偏移量无效的分区的状况下如何处理。
# latest在偏移量无效的状况下,消费者将从最新的记录开始读取数据
# earliest在偏移量无效的状况下,消费者将从起始位置读取分区的记录
spring.kafka.consumer.auto-offset-reset=earliest
# 是否自动提交偏移量,为了不出现重复数据和数据丢失,能够把它设置为false,而后手动提交偏移量
spring.kafka.consumer.enable-auto-commit=false
# key的反序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.listener.concurrency=5
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

我这里采用的就是简单的StringSerializer和StringDeserializer,若是是传递对象,有两种方式,一种是自定义解码和编码器,须要实现Serializer接口,另外一种就是用已有的格式来解码和编码,好比json格式来传递信息,而后用fastjson等框架来解码和编码。
另一点就是消费者的监听器必需要设置ack-mode,由于上面设置的自动提交的选项设置为了false,因此须要手动设置提交offset的模式。

生产者实现

@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void send(Object o){

        String objStr = JSONObject.toJSONString((o));
        log.info("sending info:"+objStr);
        ListenableFuture<SendResult<String,Object>> future=
                kafkaTemplate.send("test-topic-1",o);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("test-topic-1发送失败,"+throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                log.info("test-topic-1发送成功,"+stringObjectSendResult.toString());
            }
        });
    }

}

而后简单写一个Controller来触发消息的发送。

@RestController
public class KafkaController {


    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/message/send")
    public boolean send(){
        kafkaProducer.send("this is a test message");
        return true;
    }


}

生产者实现

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic-1",groupId = "test-group-1")
    public void topic_test(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic){

        Optional message = Optional.ofNullable(record.value());

        if(message.isPresent()){
            Object msg = message.get();
            log.info("消费了: topic:"+topic+",message:"+msg);
            ack.acknowledge();

        }
    }

    @KafkaListener(topics = "test-topic-1",groupId = "test-group-2")
    public void topic_test_1(ConsumerRecord<?,?>record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费了: topic:"+topic+",message:"+msg);
            ack.acknowledge();
        }

    }
}

启动测试

在启动这两个模块以前,须要确认kafka和Zookeeper都已经启动。
启动生产者,控制台有以下信息:

2020-09-13 21:53:10.892  INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
2020-09-13 21:53:10.894  INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
2020-09-13 21:53:10.894  INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1600005190890
2020-09-13 21:53:11.125  INFO 17928 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: OtDSNkOFT4eFbSso_V8qAQ
2020-09-13 21:53:11.167  INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer             : test-topic-1发送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@4]
2020-09-13 21:55:34.570  INFO 17928 --- [nio-8099-exec-3] c.e.k.producer.KafkaProducer             : sending info:"this is a test message"
2020-09-13 21:55:34.579  INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer             : test-topic-1发送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@5]

启动消费者,能够看到控制台打印了发过来的信息

2020-09-13 21:55:24.077  INFO 13296 --- [ntainer#1-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : test-group-2: partitions assigned: [test-topic-1-0]
2020-09-13 21:55:24.077  INFO 13296 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test-group-1: partitions assigned: [test-topic-1-0]
2020-09-13 21:55:24.114  INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer             : 消费了: topic:test-topic-1,message:this is a test message
2020-09-13 21:55:24.114  INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer             : topic_test1 消费了: Topic:test-topic-1,Message:this is a test message
2020-09-13 21:55:34.579  INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer             : 消费了: topic:test-topic-1,message:this is a test message
2020-09-13 21:55:34.580  INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer             : topic_test1 消费了: Topic:test-topic-1,Message:this is a test message

参考资料

Kafka
个人小网站

相关文章
相关标签/搜索