Kafka学习(一) 初识

一,简介java

二,Kafka的角色 apache

三,Kafka的安装编程

  3.1 文件下载和解压bootstrap

  3.2 文件配置网络

  3.3 服务启动session

四,Kafka的经常使用命令分布式

五,Kafka的JAVA编程oop

  5.1 Producer编程网站

  5.2 Consumer编程this

 

 

正文

一,简介

  Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。 这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。 对于像Hadoop同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是经过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群来提供实时的消息。

二,Kafka的角色  

  Broker : 安装Kafka服务的那台集群就是一个broker(broker的id要全局惟一)
  Producer :消息的生产者,负责将数据写入到broker中(push)
  Consumer:消息的消费者,负责从kafka中读取数据(pull),老版本的消费者须要依赖zk,新版本的不须要
  Topic: 主题,至关因而数据的一个分类,不一样topic存放不一样的数据
  Consumer Group: 消费者组,一个topic能够有多个消费者同时消费,多个消费者若是在一个消费者组中,那么他们不能重复消费数据

三,Kafka的安装

  3.1 文件下载和解压

  我这里的spark是2.3.3因此须要kafka0.10.2.0版本:点击下载

  解压到相应的文件夹:以下图所示

  

  3.2 文件配置

  三个必要配置的地方:

broker.id=1  ===> 全局惟一,三台都要配置我这里分别是1,2,3
listeners=PLAINTEXT://hd1:9092   ===> 还有两台hd2,hd3
# 这个目录本身建立,用来保存kafka的数据
log.dirs=/usr/local/hadoop/kafka/data  
zookeeper.connect=hd1:2181,hd2:2181,hd3:2181 ===> zookeeper的地址

  以下:

  

  3.3 服务启动

./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties

四,Kafka的经常使用命令

# 启动
./bin/kafka-server-start.sh -daemon /usr/local/hadoop/kafka/kafka_2.10-0.10.2.0/config/server.properties

# 查看有那些topic
./bin/kafka-topics.sh --list --zookeeper hd1:2181,hd2:2181,hd3:2181

# 建立topic
./bin/kafka-topics.sh --create --zookeeper hd1:2181,hd2:2181,hd3:2181 --replication-factor 3 --partitions 3 --topic test

# 生产者数据
./bin/kafka-console-producer.sh --broker-list hd1:9092,hd2:9092,hd3:9092 --topic test

# 消费者消费数据
./bin/kafka-console-consumer.sh --zookeeper hd1:2181,hd2:2181,hd3:2181 --topic test --from-beginning

五,Kafka的JAVA编程

  5.1 Producer编程

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProduceDemo {

    public static void main(String[] args){
        Properties props = new Properties();//配置项
        props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");//使用新的API指定kafka集群位置
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        String messageStr = null;
        for (int i = 1;i<1000;i++){
            messageStr = "hello, this is "+i+"th message";
            producer.send(new ProducerRecord<String, String>("test","Message",messageStr));
        }
        producer.close();
    }
}

  5.2 Consumer编程

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo implements Runnable{
    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUDID = "groupA";

    public ConsumerDemo(String topicName){
        Properties props = new Properties();
        props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");
        props.put("group.id", GROUDID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    public void run(){
        int messageNum = 1;
        try{
            for (;;){
                msgList = consumer.poll(500);
                if (msgList!=null && msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList){
                        if (messageNum % 50 ==0){
                            System.out.println(messageNum+"=receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                        }
                        if (messageNum % 1000 == 0)
                            break;
                        messageNum++;
                    }
                }
                else{
                    Thread.sleep(1000);
                }
            }
        }
        catch (InterruptedException e){
            e.printStackTrace();
        }
        finally{
            consumer.close();
        }
    }

    public static void main(String[] args){
        ConsumerDemo demo = new ConsumerDemo("test");
        Thread thread = new Thread(demo);
        thread.start();
    }
}
相关文章
相关标签/搜索