浅谈Kafka

Kafka是当今大数据生态圈中最流行的消息队列框架,本人最近学习了下kafka;总结了一下本人的心得。java

相关概念:node

broker :至关于server节点api

topic:话题模块,不一样类型的消息能够放在不一样的话题中以示区分;框架

partition:分区,一个话题中能够有多个partition,消息在同一个partition中是有序的;不一样partition中是无序的;对应topic的partition 是能够有多个副本的多个副本之间存在一个leader,其他的为slave分布式

productor:生产者,消息的来源;学习

comsumer:消费者,消息的使用者;comsumer能够分红group,每一个group中的comsumer同一时间只能消费一个partition中的信息;comsumer的信息存储在zookeeper下;测试

1、kafka相对于其余消息队列的不一样大数据

一、吞吐量大:kafka是一个分布式集群服务的消息队列框架,支持多个comsumer同时消费;ui

二、信息有放回的取出:kafka中的信息消费完是不消失的,在kafka中有offset的概念,是记录comsumer消费到哪里了,信息默认在kafka中保存一周;code

2、Kafka的安装集群(以node十一、node十二、node13为例)

  •    解压安装包          tar zxvf kafka_2.10-0.9.0.1.tgz
  •    修改配置文件server.properties   
    • zookeeper.connect=node11:2181,node12:2181,node13:2181  
  •    将解压后的kafka拷贝到node12 、node13上
  •    修改配置文件server.properties  
    • 规划有3个节点,broker的id应该不一样
    • node1为broker.id=0
    • node2为broker.id=1
    • node3为broker.id=2
  •   启动kafka
    • 一、在3个节点启动ZooKeeper
    • 二、在3个节点启动kafka
    • $ bin/kafka-server-start.sh config/server.properties
  • 测试 kafka
    • 建立话题,使用kafka-topics.sh
    • $ bin/kafka-topics.sh --zookeeper node11:2181,node12:2181,node13:2181 --topic test --replication-factor 2 --partitions 3 --create
    • $ bin/kafka-topics.sh --zookeeper node11:2181,node12:2181,node13:2181 --list
    • $ bin/kafka-topics.sh --zookeeper node11:2181,node12:2181,node13:2181 --describe --topic test
  • 建立生成者和消费者
    • 在任意节点上开启生成者
    • $ bin/kafka-console-producer.sh --broker-list node11:9092,node12:9092,node13:9092 --topic test
    • 能够在多个节点上开启多个消费者
    • $ bin/kafka-console-consumer.sh --zookeeper node11:2181,node12:2181,node13:2181 --from-beginning --topic test  
      • --from-beginning 表示从最先开始获取队列的数据
      • 消费几条数据后,执行下面的语句,看看是否从头开始,以及不一样partition返回数据无序性
      • $ bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test  

=====================================================

 Producer代码

package com.kafka.test;

import java.util.Properties; 
   
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
   
public class MyProducer {   
     
        public static void main(String[] args) {   
            Properties props = new Properties();   
            props.setProperty("metadata.broker.list","192.168.47.12:9092,192.168.47.13:9092,192.168.47.14:9092");
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
            props.put("request.required.acks","1");   
            ProducerConfig config = new ProducerConfig(props);   
            //建立生产这对象
            Producer<String, String> producer = new Producer<String, String>(config);
            //生成消息
            KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("test","test kafka");
            KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("test2","hello world");
            try {   
                int i =1; 
                while(true){
                    //发送消息
                    producer.send(data1);   
                    producer.send(data2);
                    i++;
                 //   Thread.sleep(1000);
                } 
            } catch (Exception e) {   
                e.printStackTrace();   
            }   
            producer.close();   
        }   
}
相关文章
相关标签/搜索