Message Queue 之 Kafkahtml
Author: Lijbjava
Email: lijb1121@163.com算法
Message:通常用于系统间通讯,一般系统间通讯之间经过两种方式发送消息.即时消息:消息发送方和接收方必须同时在线,例如WebService、Dubbo等这些常见的RPC框架通常发送都是即时消息(相似打电话)。离线消息:消息发送方和消息接受方不须要同时在线,实现消息异步接收。例如:Message Queue就是专门用于发送异步消息(相似发短信)。apache
消息队列:消息遵循先进先出的原则FIFO ,kafka 属于发布与订阅的bootstrap
kafka 特色:架构
1.高吞吐量、低延迟:(把消息书写到本地磁盘上,持久化消息,利用磁盘的顺序读写,追加文件)
Message Queue使用场景并发
参考:http://www.cnblogs.com/linjiqin/p/5720865.htmlapp
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ负载均衡
Kafka 系统架构框架
生产者:发送消息的进程,将消息发送到topic中 消费者:消费消息的进程,从topic中读取消息。 ConsumerGroup:由多个消费者组成的组,每个组中的全部消费者共同消费一个完整的topic,彼此消费的消息不重复。 broker:对应一个kafka实例。kafka集群负责分区leader的选举和迁移。上右图中红线对应的是leader。 record:对应一个消息,key,value,ts(timestamp). topic:kafka对消息进行分类,每一类对应一个topic,一个topic至少被分红一个分区partition,全部的分区的消息加起来组成一个topic。 replication:副本,每一个分区都有多个副本。 leader、follower:leader负责消息的读写,follower负责从leader复制消息。 offset: 偏移量,每一个分区中的offset是有序的,局部性的,对应每一个record的惟一标识。 topic默认的分区策略:根据输入的key的hash值%分区个数。 zookeeper:负责kafka元数据的管理及Consumer相关数据的管理。 消费者的负载均衡:rang 均分| round-robin 轮询 总结:当一个ConsumerGroup中,有消费者成员Consumer加入或者离开时,就会触发kafka分区(partition)的从新分配,也就是partition的均衡,均衡的目的是为了提高topic的并发(多个线程并发消费分区)消费能力。至于哪一个消费者消费哪个分区,这是有一个算法的,这个算法是可以保证一个分区必定只能被一个消费者消费,而不能被多个消费者消费,还可以保证一个消费者能够消费多个分区。也就是分区和消费者之间是多对一(包含一对一)的关系。这个算法的最终结果是,当一个消费者组(ConsumerGroup)中消费者成员(Consumer)的数量大于一个topic分区的数量时,多余的消费者就没有办法消费到数据了。
搭建Kaka集群
搭建zookeeper集群而且启动(略)
解压并安装kafka集群(务必配置主机名和IP映射关系)
[root@CentOSX ~]# tar -zxf kafka_2.11-0.11.0.0.tgz -C /usr/ [root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/config/server.properties
#####Server Basics ######## broker.id=0
# 每一个节点都要修改,且不一样 delete.topic.enable=true
listeners=PLAINTEXT://CentOSC
:9092 # 每一个节点都要修改,且不一样 ######### Log Basics ########## log.dirs=/usr/kafka-logs
log.retention.hours=168 ########## Zookeeper ########## zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
启动Kafka集群
[root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
关闭kafka
[root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/bin/kafka-server-stop.sh PIDS=$(jps| grep Kafka | awk '{print $1}') # 修改 获取进程id 号
if [ -z "$PIDS" ]; then echo "No kafka server to stop" exit 1 else kill -s TERM $PIDS fi [root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-stop.sh
Kafka测试
//建立分区 [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3 //启动消费者 [root@CentOSB kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning //启动生产者 [root@CentOSC kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 > hello kafka
Topic基本操做
建立topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3
partitions:分区的个数,replication-factor副本因子
查看topic详情
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --describe --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 Topic:topic01 PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic01 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: topic01 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic01 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
查看全部Topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --list --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 topic01 topic02 topic03
删除topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --delete --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic03 Topic topic03 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
修改Topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --alter --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic02 --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
分区数目只容许增长,不容许减小。
Java 链接Kafka集群
<!--kafka依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency>
生产者
//1.建立Properties对象 Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); //2.序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(props); //3.封装Record ProducerRecord<String,String> record= new ProducerRecord<String, String>("topic01","0100","zhangsan 男 18"); //4.发送消息 kafkaProducer.send(record); kafkaProducer.flush(); kafkaProducer.close();
消费者
//1.建立Properties对象 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); //2.反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(props); //3.订阅相应的topic kafkaConsumer.subscribe(Arrays.asList("topic01")); //4.开始获取消息 while(true){ ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { String key=record.key(); String value = record.value(); long ts = record.timestamp(); int partition = record.partition(); long offset = record.offset(); System.out.println(key+"=>"+value+"\t offset "+offset+" ,partition:"+partition+"\t"+ts); } }
自定义序列化发送对象
public class ObjectSerializer implements Serializer<Object> { // Serializer该接口是kafka的类 public void configure(Map<String, ?> map, boolean isKey) {} public byte[] serialize(String topic, Object o) { return SerializationUtils.serialize((Serializable) o); } public void close() {} } --- public class ObjectDeserializer implements Deserializer<Object> { public void configure(Map<String, ?> configs, boolean isKey) { } public Object deserialize(String topic, byte[] data) { return SerializationUtils.deserialize(data); } public void close() { } }
如何干预Kafka分区策略 提升 record 的并行度
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,XxxPartitioner.class) //自定义分区类实现Partitioner接口,而后指定分区策略 public class XxxPartitioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); return (key.hashCode()&Integer.MAX_VALUE)%numPartitions; } public void close(){ } }
订阅形式
subscribe(订阅)方式
props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); kafkaConsumer.subscribe(Arrays.asList("topic02"));
优势:但是自动实现 组内负载均衡和故障转移。
assign (分配)方式
TopicPartition part02 = new TopicPartition("topic02", 2); // 分区个数 kafkaConsumer.assign(Arrays.asList(part02));
优势:手动指定分区信息,缺点:没法实现负载均衡和故障转移
Offset自动提交
默认客户端自动开启了自动提交功能,默认提交时间间隔是5秒钟,用户能够采起手动提交的方式实现。开启手动提交以下:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//关闭自动提交 //消费代码后追加 kafkaConsumer.commitAsync();
或者适当调小自动提交时间间隔:
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//关闭自动提交
Kafka Stream-High Level
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Properties; public class KafkaStreamDemo { public static void main(String[] args) { //1.建立Properties对象 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); //2.添加kafka集群信息 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092"); //3.添加数据类型 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //4.建立KStreamBuilder并读取数据文件 KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); //5.对数据文件进行切分计算 KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word) .count("counts"); wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); //6.把KStreaamBuilder和Properties对象放入stream流 KafkaStreams streams = new KafkaStreams(builder, props); //7.启动stream streams.start(); } }