Kafka实战解惑

目录

1、 kafka简介
2、 Kafka架构方案
3、 Kafka安装
4、 Kafka Client API
  4.1 Producers API
  4.2 Consumers API
  4.3 消息高可靠 At-Least-Once
  4.4 消息高可靠Consumer
  4.5 生产者、消费者总结
5、 Kafka运维
  5.1 Broker故障切换
  5.2 Broker动态扩容
  5.2.1 增长分区
  5.2.2 增长Broker Server
  5.3 Kafka配置优化
  5.4 数据清理
  5.4.1 数据删除
  5.4.2 数据压缩
  5.5 Kafka运行监控
6、Kafka其余组件](#c6)
  6.1 Kafka Connect
  6.2 Kafka Stream
  6.3 Kafka Camus
7、 Kafka典型应用场景
  7.1 ETL
8、 参考资料css


1、Kafka简介

Kafka是LinkedIn使用scala开发的一个分布式消息系统,它以水平扩展能力和高吞吐率著称,被普遍用于日志处理、ETL等应用场景。Kafka具备如下主要特色:html

  1. 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒能够生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
  2. 可进行持久化操做。将消息持久化到磁盘,所以可用于批量消费,例如ETL以及实时应用程序。经过将数据持久化到硬盘以及replication防止数据丢失。
  3. 分布式系统,易于向外扩展。全部的producer、broker和consumer都会有多个,均为分布式的,无需停机便可扩展机器。
  4. 消息被处理的状态由消费者同步到zookeeper而非broker server中,当broker server失效时,经过副本切换机制选择一个新的broker server,消费者从zookeeper中读取以前消费消息的位置,不会引发消息丢失。
  5. 支持online和offline的场景。

LinkedIn有个三人小组出来创业了—正是当时开发出Apache Kafka实时信息列队技术的团队成员,基于这项技术Jay Kreps带头创立了新公司Confluent。Confluent的产品围绕着Kafka作的,与Kafka相比,Confluent包含了更多的组件:java

  1. Confluent Control Center(闭源)。管理和监控Kafka最全面的GUI驱动系统
  2. Confluent Kafka Connectors(开源)。链接SQL数据库/Hadoop/Hive
  3. Confluent Kafka Clients(开源)。对于其余编程语言,包括C/C++,Python
  4. Confluent Kafka REST Proxy(开源)。容许一些系统经过HTTP和Kafka之间发送和接收消息。
  5. Confluent Schema Registry(开源)。帮助肯定每个应用使用正确的schema当写数据或者读数据到Kafka中。

2、 Kafka架构方案

从物理结构上看,整个Kafka系统由消息生产者、消息消费者、消费存储服务器外加Zookeeper构成。其中消息生产者被称为Producer、消息消费者被称为Consumer、消息存储服务器被称为Broker。整个Kafka的架构方案很是简单,典型的无状态水平扩展架构,经过水平增长Broker实例实现系统的高吞吐率,而有状态的数据则存储到Zookeeper中。linux

Kafka采用Push-Pull模式,生产者发送消息时,可根据策略存储在Kafka集群的任意一台broker上,消费者经过定时轮询(非固定周期)的方式从Broker上取得消息。消息发送到哪一台服务器上,又从哪台服务器上获取消息,则是由逻辑结构解决的,或者说逻辑结构创建在物理结构基础上,对于生产者、消费者而言,只要了解逻辑结构就能够了。数据库

从逻辑上讲,一个Kafka集群中包含若干个消息队列,每一个消息队列都有本身的名称,在Kafka中消息队列的名称被称为Topic,为了实现系统的高吞吐率,每一个消息队列被拆分红不一样部分,即咱们所说的分区(Partition),分区存储在不一样的Broker中。生产者发送消息时可根据必定策略发送到不一样的分区中,这相似于数据库的分库分表操做,一样消费者拉取消息时,也能够根据必定策略从某个分区中读取消息。就物理结构而言,每一个分区就是broker上的一个文件,试想一下并发的对多个分布在不一样broker上的文件进行读写,性能固然显著优于对单台broker上的文件进行读写,咱们所说的Kfaka具备高吞吐率就是这个道理。apache

Kafak每一个Topic的消息都存储在日志文件中,Kafka消息日志文件由一个索引文件和若干个具体的消息文件构成。每一个消息文件都由起始消息编号构成,经过索引能够快速定位消息文件进行读写,因为消息是顺序写入文件中,因此读写效率很是高。在6块7200转的SATA RAID-5磁盘阵列的线性写速度差很少是600MB/s,可是随即写的速度倒是100k/s,差了差很少6000倍。现代的操做系统都对次作了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,写的时候将各类微小琐碎的逻辑写入组织合并成一次较大的物理写入,不少时候线性读写磁盘比随机读取内存都快。编程

与其余常见的消息队列不一样,Kafka有一个叫作消费组的概念,多个消费者被逻辑上合并在一块儿叫作消费组。一个消息队列理论上可拥有无限个消费组,消费组是Kafka有别于其余消息队列的一个重要概念,同一个分区的消息只能被一个消费组内的某个消费者读取,但其余消费组内的消费者仍然可读取这个分区的消费。以下图所示整个Kafka消息队列由两个broker server构成,server1上包含两个分区p0、p3,server2上包含两个分区p一、p2。如今有两个消费组A、B,消费组A中包含两个消费者C一、C2,消费组B中包含4个消费者C三、C四、C五、C6。那么假定P0分区上有一条消息。Consumer Group A中的C一、C2其中之一会消费这条消息,Consumer Group B中的C三、C四、C五、C6其中之一也会消费这条消息,也就是说两个消费组A、B中的消费者都会同时消费这条消息,而组内只能有一个消费者消费这条消息。bootstrap

咱们所说的C一、C2只是一个逻辑上的划分就具体实现而言,C一、C2能够是一个进程内部的两个线程,也能够是两个独立的进程,对于C三、C四、C五、C6也是一样的道理。咱们知道Kafka每一个分区中的消息都是以顺序结构保存到文件中的,那么消费者每次从什么位置读取消息呢,奥秘就是每一个消费者都保存offset到zookeeper中。api

如前所述,Kafka是一个Push-Pull模式的消息队列,而且能够有多个生产者、多个消费者,那么这些生产者和消费者是如何协同工做的呢?首先咱们来看生产者怎么肯定把消费发送到哪一个分区上。默认状况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。缓存

def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
}

这就保证了相同key的消息必定会被路由到相同的分区。若是你没有指定key,那么Kafka是如何肯定这条消息去往哪一个分区的呢?咱们来看下面的代码:

if(key == null) {  // 若是没有指定key
        val id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有没有缓存的现成的分区Id
        id match {
          case Some(partitionId) =>  
            partitionId  // 若是有的话直接使用这个分区Id就行了
          case None => // 若是没有的话,
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出全部可用分区的leader所在的broker
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 从中随机挑一个
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
            partitionId
        }
      }

 

能够看出,Kafka几乎就是随机找一个分区发送无key的消息,而后把这个分区号加入到缓存中以备后面直接使用——固然了,Kafka自己也会清空该缓存(默认每10分钟或每次请求topic元数据时)

接下来咱们来看消费者如何获取消息。对于消费者Kafka提供的两种分配策略: range和roundrobin,由参数 partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每一个线程都分配哪些分区呢?

C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9

为了保证高可靠,Kafka每一个分区都有必定数量的副本,当故障发生时经过zookeeper选择其一做为领导者,Kafka采用同步复制机制,写leader完成后在写副本。若是某个副本写失败,则将这个副本从当前分区一致集合中摘除,后期根据必定策略在进行异步补偿,将不一致状态变为一致状态。极端状况下若是全部副本写入均失败,变为不一致状态,若是在变成一致状态前leader崩溃,那么消息才可能真正丢失,但极端状况很难出现,一旦出现这种极端状况,任何系统都无能为力了,因此咱们说Kafka仍是很是可靠的。

3、 Kafka安装

咱们如今使用192.168.104.10一、192.168.104.102两台Centos 6服务器安装Kafka。安装Kafka以前首先须要安装Zookeeper,为了简便起见咱们采用单机伪分布式集群安装Zookeeper,将Zookeeper安装在192.168.104.101这台服务器上,并启动三个实例,组成高可高的Zookeeper集群。

接下来咱们安装Kafka_2.11-0.10.0.1,由于咱们有192.168.104.10一、192.168.104.102两台服务器,所以咱们能够构建一个完整的Kfaka集群。在192.168.104.101服务器上修改Kafka安装目录下的config/server. Properties文件,设置以下参数:
broker.id=0
listeners=PLAINTEXT://192.168.104.101:9092
advertised.listeners=PLAINTEXT://192.168.104.101:9092
zookeeper.connect=192.168.104.101:2181,192.168.104.101:2182,192.168.104.101:2183
对于192.168.104.102这台机器,咱们将listeners、advertised.listeners中的ip地址改成192.168.104.102。通过上述设置咱们能够在两个服务器上分别使用bin/Kafka-server-start.sh config/server.properties命令启动Kafka集群了。
View Code

4、 Kafka Client API

如前所述Kafka是一个消息队列,生产者发送消息到Kafka,消费者从Kafka中拉取消息,所以Kafka提供生产者、消费者两类API供程序开发使用。咱们先来看一个生产者、消费者的简单例子,了解一下Kafka Client API的基本用法,然后在深刻了解Kafka Client API的细节。

4.1 Producers API

package com.Kafka.sample.newapi;

import org.apache.Kafka.clients.producer.Callback;
import org.apache.Kafka.clients.producer.KafkaProducer;
import org.apache.Kafka.clients.producer.ProducerRecord;
import org.apache.Kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class Producer {
    public void run() throws InterruptedException {
        KafkaProducer<String, String> producer = getProducer();

        int i = 0;

        while (true) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(ClientConfig.TOPICS, String.valueOf(i), "This is message: " + i);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                    }
                }
            });

            i++;

            Thread.sleep(1000);
        }
    }

    private KafkaProducer<String, String> getProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.Kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.Kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);

        return kp;
    }

    public static void main(String[] args) {
        Producer producer = new Producer();

        try {
            producer.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
View Code

 

Kafka 0.82版以后,提供新的API,对于生产者的API来说,使用逻辑比较简单,推荐使用新API向Kafka发送消息。向Kafka发送消息时首先须要构建一个KafkaProducer对象,并设置发送消息的一些参数。Producer端的经常使用配置有:

bootstrap.servers:Kafka集群链接串,能够由多个host:port组成
acks:broker消息确认的模式,有三种:
0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
1:由Leader确认,Leader接收到消息后会当即返回确认信息
all:集群完整确认,Leader会等待全部in-sync的follower节点都确认收到消息后,再返回确认信息。
咱们能够根据消息的重要程度,设置不一样的确认模式。默认为1
retries:发送失败时Producer端的重试次数,默认为0
batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。若是设置为0,则每条消息都独立发送。默认为16384字节。
linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的状况下,配置linger.ms可以让Producer在发送消息前等待必定时间,
以积累更多的消息打包发送,达到节省网络资源的目的。默认为0。 key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定。 buffer.memory:消息缓冲池大小。还没有被发送的消息会保存在Producer的内存中,若是消息产生的速度大于消息发送的速度,
那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)。

 

相比起Producers API的便宜使用,Consumer API的使用要复杂不少,核心问题就是如何高可靠的处理消息,保证消息不丢失。Kafka为了保证消息不丢失能被消费者成功的处理,在消费者处理消息成功后须要向Kafka发送确认确认消息被成功的消费。

package com.Kafka.sample.newapi;

import org.apache.Kafka.clients.consumer.ConsumerRecord;
import org.apache.Kafka.clients.consumer.ConsumerRecords;
import org.apache.Kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    public void run() {
        KafkaConsumer<String, String> consumer = getConsumer();
        consumer.subscribe(Arrays.asList(ClientConfig.TOPICS));

        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
            }
        }
    }

    private KafkaConsumer<String, String> getConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", ClientConfig.BOOTSTRAP_SERVERS);
        props.put("group.id", "1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);

        return kc;
    }

    public static void main(String[] args) throws Exception{
        Consumer consumer = new Consumer();
        consumer.run();
    }
}
View Code

 

上面的代码很容易看懂,但props.put("auto.commit.interval.ms", "1000")须要特殊说明一下。

4.3 消息高可靠 At-Least-Once

网上各类文章常常谈到Kafka丢消息问题,那么Kakfa真的不可靠,只能用在容许有必定错误的系统中吗?这个问题还得从Kaka的设计初衷来看。

Kafka最初是被LinkedIn设计用来处理log的分布式消息系统,所以它的着眼点不在数据的安全性(log偶尔丢几条无所谓),换句话说Kafka并不能彻底保证数据不丢失。尽管Kafka官网声称可以保证at-least-once,但若是consumer进程数小于partition_num,这个结论不必定成立。考虑这样一个case,partiton_num=2,启动一个consumer进程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。若是auto.commit.enable=true,当consumer fetch了一些数据但尚未彻底处理掉的时候,恰好到commit interval出发了提交offset操做,接着consumer crash掉了。这时已经fetch的数据尚未处理完成但已经被commit掉,所以没有机会再次被处理,数据丢失。若是auto.commit.enable=false,假设consumer的两个fetcher各自拿了一条数据,而且由两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset,这里须要着重说明的是,当手动执行commit的时候,其实是对这个consumer进程所占有的全部partition进行commit,Kafka暂时尚未提供更细粒度的commit方式,也就是说,即便t2没有处理完partition2的数据,offset也被t1提交掉了。若是这时consumer crash掉,t2正在处理的这条数据就丢失了。若是但愿可以严格的不丢数据,解决办法有两个:

  1. 手动commit offset,并针对partition_num启一样数目的consumer进程,这样就能保证一个consumer进程占有一个partition,commit offset的时候不会影响别的partition的offset。但这个方法比较局限,由于partition和consumer进程的数目必须严格对应。
  2. 另外一个方法一样须要手动commit offset,另外在consumer端再将全部fetch到的数据缓存到queue里,当把queue里全部的数据处理完以后,再批量提交offset,这样就能保证只有处理完的数据才被commit。固然这只是基本思路,实际上操做起来不是这么简单,具体作法之后我再另开一篇。

4.4 消息高可靠Consumer

public class ManualOffsetConsumer {
    private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);

    public ManualOffsetConsumer() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //设置brokerServer(Kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id","manual_g1");

        props.put("enable.auto.commit", "false");

        //设置使用最开始的offset偏移量为该group.id的最先。若是不设置,则会是latest即该topic最新一个消息的offset
        //若是采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));

        final int minBatchSize = 5;  //批量提交数量
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset());
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                LOG.info("now commit offset");
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}
View Code

 

上面例子中咱们将自动提交改成手动提交,若是取得消息后,由于某种缘由没有进行提交,那么消息仍然保持在Kafka中,能够重复拉取以前没有确认的消息,保证消息不会丢失,但有可能重复处理相同的消息,消费者接收到重复消息后应该经过业务逻辑保证重复消息不会带来额外影响,这就是Kafka所说的At-Least-Once。上面的这种读取消息的方法是单线程的,除此以外还能够用多线程方法读取消息,每一个线程从指定的分区中读取消息。
public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //设置brokerServer(Kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id","manual_g2");

        props.put("enable.auto.commit", "false");

        //设置使用最开始的offset偏移量为该group.id的最先。若是不设置,则会是latest即该topic最新一个消息的offset
        //若是采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    LOG.info("now consumer the message it's offset is :"+record.offset() + " and the value is :" + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                LOG.info("now commit the partition[ "+partition.partition()+"] offset");
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }
View Code

 

咱们还能够进一步让消费者消费某个分区的消息。

    public static void main(String[] args) {
        Properties props = new Properties();
        //设置brokerServer(Kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id", "manual_g4");
        //设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
        props.put("enable.auto.commit", "true");
        //偏移量(offset)提交频率
        props.put("auto.commit.interval.ms", "1000");
        //设置使用最开始的offset偏移量为该group.id的最先。若是不设置,则会是latest即该topic最新一个消息的offset
        //若是采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");
        TopicPartition partition0 = new TopicPartition("producer_test", 0);
        TopicPartition partition1 = new TopicPartition("producer_test", 1);
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.assign(Arrays.asList(partition0, partition1));
        while (true) {
              ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
              for (ConsumerRecord<String, String> record : records)
                  System.out.printf("offset = %d, key = %s, value = %s  \r\n", record.offset(), record.key(), record.value());
              try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
}
4.5 生产者、消费者总结
  1. 若是consumer比partition多,是浪费,由于Kafka的设计是在一个partition上是不容许并发的,因此consumer数不要大于partition数。
  2. 若是consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,不然会致使partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,因此partition数目很重要,好比取24,就很容易设定consumer数目。
  3. 若是consumer从多个partition读到数据,不保证数据间的顺序性,Kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不一样。
  4. 增减consumer,broker,partition会致使rebalance,因此rebalance后consumer对应的partition会发生变化
  5. High-level接口中获取不到数据的时候是会block的。

5、 Kafka运维

5.1 Broker故障切换

咱们在192.168.104.10一、192.168.104.102两台服务器上启动Kafka组成一个集群,如今咱们观察一下topic t1的状况。

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1 Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs: Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

咱们看到t1由两个分区组成,分布于Leader 0、1两个服务器上。下面咱们运行消费者程序,同时运行生产者程序,咱们向topic t1发送1十一、22二、33三、44四、55五、666的数据。

bash-4.1# ./Kafka-console-producer.sh --broker-list 192.168.104.101:9092, 192.168.104.102:9092 --topic t1 111 222 333 444 555 666

接下来咱们观察一下消费者接收消息的状况。

fetched from partition 0, offset: 3, message: fetched from partition 1, offset: 4, message: 111 fetched from partition 0, offset: 4, message: 222 fetched from partition 1, offset: 5, message: 333 fetched from partition 0, offset: 5, message: 444 fetched from partition 1, offset: 6, message: 555 fetched from partition 0, offset: 6, message: 666

能够看到消息被很是均匀的发送到两个分区,消费者从两个分区中拉取了消息。为了模拟故障,咱们手工kill 101上的Kafka进程。这时咱们在观察Kafka的分区状况,对照以前的结果,咱们发现两个分区的Leader都变为1了,说明Kafka启用了副本机制进行故障切换。

./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1 Topic:t1 PartitionCount:2 ReplicationFactor:2 Configs:  Topic: t1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1  Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1

咱们继续向分区发送888, 999,消费者仍然可以接收到发送的消息,而不受故障进度的影响。逻辑上看消费者只是读取分区上的消息,与具体的服务器不要紧。

fetched from partition 1, offset: 7, message: 999 fetched from partition 0, offset: 7, message: 888

5.2 Broker动态扩容

5.2.1 增长分区

咱们为已经建立的包含两个分区的Topic在添加一个分区。

Kafka-topics.sh --zookeeper 192.168.104.101:2181 --alter --topic t1 --partitions 3

咱们观察一下增长分区后的结果:

bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1 Topic:t1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: t1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0 Topic: t1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: t1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

接下来,咱们使用生产者程序发送数据,过了一段时间后发现生产者程序已经能够向新增分区写入数据了。说明分区的增减对正在运行的应用程序(生产者、消费者)没有影响, 生产者、消费者都不须要从新启动。

5.2.2 增长Broker Server

待补充

5.3 Kafka配置优化

待补充

5.4 数据清理

5.4.1 数据删除

消息被kafka存储后,针对过时消息,能够经过设置策略(log.cleanup.policy=delete)进行删除。除了在kafka中作默认设置外,也能够再 topic建立时指定参数,这样将会覆盖kafka的默认设置,触发删除动做有两种条件:

  1. 清理超过指定时间消息,经过log.retention.hours设置过时时间。
  2. 清理大小超过预约设置消息,经过log.retention.bytes进行设置。

5.4.2 数据压缩

kafka还能够进行数据压缩,设置log.cleanup.policy=compact只保留每一个key最后一个版本的数据。

5.5 Kafka运行监控

目前Kafka有三个经常使用的监控系统: Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor,这三个系统或多或少都有些问题,不是特别完善,推荐使用KafkaOffsetMonitor。

6、Kafka其余组件

6.1 Kafka Connect

Kafka 0.9+增长了一个新的特性 Kafka Connect ,能够更方便的建立和管理数据流管道。它为Kafka和其它系统建立规模可扩展的、可信赖的流数据提供了一个简单的模型,经过 connectors能够将大数据从其它系统导入到Kafka中,也能够从Kafka中导出到其它系统。Kafka Connect能够将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,而后像正常的Kafka流处理机制同样进行数据流处理。而导出工做则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,好比数据库、 Elastic Search 、 Apache Ignite 等。

Kafka Connect特性包括:

  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员没必要担忧错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

当前Kafka Connect支持两种分发担保:at least once (至少一次) 和 at most once(至多一次),exactly once将在将来支持,当前已有的Connectors包括:

Connector Name Owner Status
HDFS confluent-platform@googlegroups.com Confluentsupported
JDBC confluent-platform@googlegroups.com Confluentsupported
Debezium - CDC Sources debezium@gmail.com Community project
MongoDB Source a.patelli@reply.de a.topchyan@reply.de In progress
MQTT Source tomasz.pietrzak@evok.ly Community project
MySQL Binlog Source wushujames@gmail.com In progress
Twitter Source rollulus@xs4all.nl In progress
Cassandra Sink Cassandra Sink Community project
Elastic Search Sink ksenji@gmail.com Community project
Elastic Search Sink hannes.stockner@gmail.com In progress
Elastic Search Sink a.patelli@reply.de a.topchyan@reply.de In progress

咱们来看一个使用Kafka Connect从一个文件读取数据在传输到另外一个文件的例子。

  • 首先在192.168.104.10一、192.168.104.102两台服务器上启动Kafka。
  • 在192.168.104.102服务器的Kafka安装目录上,修改connect-standalone.properties文件:
bootstrap.servers=192.168.104.101:9092, 192.168.104.102:9092 key.converter=org.apache.Kafka.connect.storage.StringConverter value.converter=org.apache.Kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false 修改connect-file-source.properties文件: file=/root/data.txt topic=t1 修改connect-file-sink.properties文件: file=/root/output.txt topics=t1
  • 在192.168.104.102服务器上启动Kafka-connect
    bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 向/root/data.txt中写入数据,echo “Kafka connect”>> data.txt,能够观察”Kafka connect”被写入到/root/output.txt文件中。

6.2 Kafka Stream

Kafka Streams是一套类库,嵌入到java应用程序中,它使得Apache Kafka能够拥有流处理的能力,经过使用Kafka Stream API进行业务逻辑处理最后写回Kakfa或者其余系统中。Kafka Stream中有几个重要的流处理概念:严格区分Event time和Process Time、支持窗口函数、应用状态管理。开发者使用Kafka Stream的门槛很是低,好比单机进行一些小数据量的功能验证而不须要在其余机器上启动一些服务(好比在Storm运行Topology须要启动Nimbus和Supervisor,固然也支持Local Mode),Kafka Stream的并发模型能够对单应用多实例进行负载均衡。有了Kafka Stream能够在不少场景下代替Storm、Spark Streaming减小技术复杂度。目前Kafka Stream仍然处于开发阶段,不建议生产环境使用,因此期待正式版发布吧。

6.3 Kafka Camus

Camus是Linkedin开源的一个从Kafka到HDFS的数据管道,本质上上Camus是一个运行在Hadoop中的MapReduce程序,调用一些Camus提供的API从Kafka中读取数据而后写入HDFS。Camus2015年已经中止维护了,gobblin是后续产品,camus功能是是Gobblin的一个子集,经过执行MapReduce任务实现从Kafka读取数据到HDFS,而gobblin是一个通用的数据提取框架,能够将各类来源的数据同步到HDFS上,包括数据库、FTP、Kafka等。

7、 Kafka典型应用场景

Kafka做为一个消息中间件,最长应用的场景是将数据进行加工后从源系统移动到目的系统,也就是所谓的ETL过程,ETL是一个数据从源头到目的地的移动过程,固然其中也伴随数据清洗。一般数据源头是应用程序所输出的消息、日志、生产数据库数据。应用程序输出消息一般由应用程序主动控制写入Kfaka的行为,而从日志、生产数据库到Kfaka一般由第三方独立应用处理。从日志到Kfaka典型的技术方案如ELK,从生产数据库到Kafka一般可采用以下三种方式:

  • 经过时间戳方式记录数据变动并写入kafka,如使用kettle等ETL工具。
  • 经过触发器方式记录数据变动并写入kafka,如使用kettle等ETL工具。
  • 经过数据库特有特性记录数变动并写入kafka,如Oracle GoldenGate,MySQL Binlog,Postgre SQL Wal,MongoDB Oplog,CouchDB Changes Feed,值得一提的是PostgreSQL 9.4后的Bottled Water是一个很是好用的方案,将PostgreSQL数据同步到Kfaka中。

数据经过Kafka移动到Hadoop一般有以下方案:

  • Kafka -> Flume -> Hadoop Hdfs
  • Kafka -> Gobblin -> Hadoop Hdfs
  • Kafka -> Kafka Hadoop Loader -> Hadoop Hdfs
  • Kafka -> KaBoom -> Hadoop Hdfs
  • Kafka -> Kafka Connect -> Hadoop Hdfs
  • Kafka -> Storm\Spark Streaming -> Hadoop Hdfs

从目前看这些方法都是经常使用的成熟方案,不少技术也在被一线互联网公司所使用,好比京东内部在使用Gobblin将数据从Kafka同步到Hdfs中,但从长远看Kafka Connect则是最佳方案,毕竟是官方标准出品并且Kafka Connect还在快速的发展。

8、 参考资料

相关文章
相关标签/搜索