Message Queue 之 Kafka

Message Queue 之 Kafkahtml

Author: Lijbjava

Email: lijb1121@163.com算法

Message:通常用于系统间通讯,一般系统间通讯之间经过两种方式发送消息.即时消息:消息发送方和接收方必须同时在线,例如WebService、Dubbo等这些常见的RPC框架通常发送都是即时消息(相似打电话)。离线消息:消息发送方和消息接受方不须要同时在线,实现消息异步接收。例如:Message Queue就是专门用于发送异步消息(相似发短信)。apache

消息队列:消息遵循先进先出的原则FIFO ,kafka 属于发布与订阅的bootstrap

kafka 特色:架构

1.高吞吐量、低延迟:(把消息书写到本地磁盘上,持久化消息,利用磁盘的顺序读写,追加文件)

Message Queue使用场景并发

参考:http://www.cnblogs.com/linjiqin/p/5720865.htmlapp

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ负载均衡

  • 异步通讯

异步通讯

  • 系统间解耦

解耦和

  • 削峰填谷

Kafka 系统架构框架

生产者:发送消息的进程,将消息发送到topic中
消费者:消费消息的进程,从topic中读取消息。
ConsumerGroup:由多个消费者组成的组,每个组中的全部消费者共同消费一个完整的topic,彼此消费的消息不重复。
broker:对应一个kafka实例。kafka集群负责分区leader的选举和迁移。上右图中红线对应的是leader。
record:对应一个消息,key,value,ts(timestamp).
topic:kafka对消息进行分类,每一类对应一个topic,一个topic至少被分红一个分区partition,全部的分区的消息加起来组成一个topic。
replication:副本,每一个分区都有多个副本。
leader、follower:leader负责消息的读写,follower负责从leader复制消息。

offset: 偏移量,每一个分区中的offset是有序的,局部性的,对应每一个record的惟一标识。

topic默认的分区策略:根据输入的key的hash值%分区个数。
zookeeper:负责kafka元数据的管理及Consumer相关数据的管理。

消费者的负载均衡:rang 均分| round-robin 轮询
总结:当一个ConsumerGroup中,有消费者成员Consumer加入或者离开时,就会触发kafka分区(partition)的从新分配,也就是partition的均衡,均衡的目的是为了提高topic的并发(多个线程并发消费分区)消费能力。至于哪一个消费者消费哪个分区,这是有一个算法的,这个算法是可以保证一个分区必定只能被一个消费者消费,而不能被多个消费者消费,还可以保证一个消费者能够消费多个分区。也就是分区和消费者之间是多对一(包含一对一)的关系。这个算法的最终结果是,当一个消费者组(ConsumerGroup)中消费者成员(Consumer)的数量大于一个topic分区的数量时,多余的消费者就没有办法消费到数据了。

搭建Kaka集群

  • 搭建zookeeper集群而且启动(略)

  • 解压并安装kafka集群(务必配置主机名和IP映射关系)

    [root@CentOSX ~]# tar -zxf kafka_2.11-0.11.0.0.tgz -C /usr/ [root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/config/server.properties

    #####Server Basics ######## broker.id=0 # 每一个节点都要修改,且不一样 delete.topic.enable=true

    Socket Server Settings

    listeners=PLAINTEXT://CentOSC:9092 # 每一个节点都要修改,且不一样 ######### Log Basics ########## log.dirs=/usr/kafka-logs

    Log Retention Policy

    log.retention.hours=168 ########## Zookeeper ########## zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181

  • 启动Kafka集群

    [root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh -daemon config/server.properties

  • 关闭kafka

    [root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/bin/kafka-server-stop.sh PIDS=$(jps| grep Kafka | awk '{print $1}') # 修改 获取进程id 号

    if [ -z "$PIDS" ]; then echo "No kafka server to stop" exit 1 else kill -s TERM $PIDS fi [root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-stop.sh

Kafka测试

//建立分区
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3 
//启动消费者
[root@CentOSB kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning
//启动生产者
[root@CentOSC kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01  
> hello kafka

Topic基本操做

  • 建立topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3

partitions:分区的个数,replication-factor副本因子

  • 查看topic详情

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --describe --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 Topic:topic01 PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic01 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: topic01 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic01 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

  • 查看全部Topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --list --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 topic01 topic02 topic03

  • 删除topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --delete --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic03 Topic topic03 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.

  • 修改Topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --alter --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic02 --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected

分区数目只容许增长,不容许减小。

Java 链接Kafka集群

<!--kafka依赖-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.2</version>
</dependency>

生产者

//1.建立Properties对象
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
//2.序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(props);
 //3.封装Record
ProducerRecord<String,String> record=
                new ProducerRecord<String, String>("topic01","0100","zhangsan 男 18");
 //4.发送消息
kafkaProducer.send(record);
kafkaProducer.flush();
kafkaProducer.close();

消费者

//1.建立Properties对象
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
//2.反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1");

KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(props);
//3.订阅相应的topic
kafkaConsumer.subscribe(Arrays.asList("topic01"));
//4.开始获取消息
while(true){
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    for (ConsumerRecord<String, String> record : records) {
        String key=record.key();
        String value = record.value();
        long ts = record.timestamp();
        int partition = record.partition();
        long offset = record.offset();
        System.out.println(key+"=>"+value+"\t offset "+offset+" ,partition:"+partition+"\t"+ts);
    }
        }

自定义序列化发送对象

public class ObjectSerializer implements Serializer<Object> { // Serializer该接口是kafka的类
    public void configure(Map<String, ?> map, boolean isKey) {}

    public byte[] serialize(String topic, Object o) {
        return SerializationUtils.serialize((Serializable) o);
    }

    public void close() {}
}

---
public class ObjectDeserializer implements Deserializer<Object> {
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    public Object deserialize(String topic, byte[] data) {
        return SerializationUtils.deserialize(data);
    }

    public void close() {

    }
}

如何干预Kafka分区策略 提升 record 的并行度

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,XxxPartitioner.class)
//自定义分区类实现Partitioner接口,而后指定分区策略
public class XxxPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return (key.hashCode()&Integer.MAX_VALUE)%numPartitions;
    }

    public void close(){
      
    }
}

订阅形式

  • subscribe(订阅)方式

    props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); kafkaConsumer.subscribe(Arrays.asList("topic02"));

优势:但是自动实现 组内负载均衡和故障转移。

  • assign (分配)方式

    TopicPartition part02 = new TopicPartition("topic02", 2); // 分区个数 kafkaConsumer.assign(Arrays.asList(part02));

优势:手动指定分区信息,缺点:没法实现负载均衡和故障转移

Offset自动提交

默认客户端自动开启了自动提交功能,默认提交时间间隔是5秒钟,用户能够采起手动提交的方式实现。开启手动提交以下:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//关闭自动提交
//消费代码后追加
kafkaConsumer.commitAsync();

或者适当调小自动提交时间间隔:

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//关闭自动提交

Kafka Stream-High Level

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;
public class KafkaStreamDemo {
    public static void main(String[] args) {
        //1.建立Properties对象
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        //2.添加kafka集群信息
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        //3.添加数据类型
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		//4.建立KStreamBuilder并读取数据文件
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        //5.对数据文件进行切分计算
        KTable<String, Long> wordCounts = textLines
                .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word)
                .count("counts");
        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
		//6.把KStreaamBuilder和Properties对象放入stream流
        KafkaStreams streams = new KafkaStreams(builder, props);
        //7.启动stream
        streams.start();
    }
}
相关文章
相关标签/搜索