1、官网教程案例学习
Kafka — 分布式消息队列
消息系统
消息中间件:缓冲于生产与消费中间
缓冲满了,能够进行Kafka的扩容
特性:
水平扩展性、容错性、实时、快
Kafka架构:
理解producer、consumer、broker(缓冲区)、topic(标签)
一个配置文件(server.properties)至关于一个broker
单节点(一台机器)的Kafka部署方法:
开启的时候记得建立多个控制台,方便分别在上面同时启动server(broker)、producer、consumer
1. 单broker部署:
准备工做:
先安装zookeeper,解压完后只须要更改conf目录下的zoo.cfg,改变dataDir不保存在tmp目录
ZK简单的使用,bin目录下的zkServer启动服务器,而后经过zkCli来链接
配置Kafka:
config目录下:
server.properties:
broker.id
listeners
host.name
…
启动:在KAFKA_HOME下
先启动ZK server
zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
再启动kafka server,启动时要加上config配置文件
kafka-server-start.sh $KAFKA_HOME/config/server.properties
建立topic:指定zookeeper端口
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
kafka-topics.sh --list --zookeeper localhost:2181
查看topic详细信息
describe命令,可查看活的broker有哪一个,leader是哪一个等
发送消息(生产):指定broker
kafka-console-producer.sh --broker-list localhost:9092 --topic test
注意:其中2181端口对应zookeeper server,而9092对应listener broker
消费消息:指定zk
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
注意:带有beginning参数的话,会把历史全部的都一块儿读取
2. 多broker部署:
复制多个server-properties
更改其中的broker.id listeners log.dir
启动多个kafka server:
-daemon在后台运行
&表明还有下几行
启动成功后jps中有三个kafka
建立多副本topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-repli
发送和单broker同样,只不过改为多个端口
多broker的容错机制:
若是leader broker干掉了,就会选举新的,也就是干掉任意哪一种broker都不会影响全局的使用
2、IDEA+Maven环境开发:
配置环境:
建立scala模版:
填信息:
修改setting路径:
建立完成scala project
修改pom.xml文件:
添加与删除dependency
kafka的版本:
<kafka.version>0.9.0.0</kafka.version>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
建立Java文件夹,并把它改为source属性(蓝色),在IDEA右上角改
3、用Java API来完成Kafka的Producer和Consumer的编程:
Producer:
首先定义Kafka中的经常使用变量类,brokerlist、ZK端口、topic名称
/*
* Kafka配置文件, 用于定义producer, consumer
* */
public class KafkaProperties {
//定义端口号
public static final String ZK = "localhost:2181";
public static final String TOPIC = "hello_topic";
public static final String BROKER_LIST = "localhost:9092";
}
而后建立producer:
- 定义全局变量topic,producer(选择kafka.javaapi.producer包)
- 写构造函数,包括了:
- 外部传入topic
- 建立producer,须要传入ProducerConfig对象
- PC对象须要传入一些参数,用properties类(java.util包)来传入
- properties对象中须要为PC对象设置”metadata.broker.list" “serializer.class" "request.required.acks"
最后经过Thread线程run方法来启动producer发送信息
(本测试实现的每隔2s发送一个message)
实现代码:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
/*
* Kafka生产者
* */
public class KafkaProducer extends Thread{
private String topic;
//选择kafka.javaapi.producer
private Producer<Integer, String> producer;
//构造方法,传入topic,生成producer
public KafkaProducer(String topic) {
this.topic = topic;
//用properties设置ProducerConfig所须要的参数, 这是生成Producer的前提
//分别是broker_list, 序列化, 握手机制
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
properties.put("serializer.class", "kafka.serializer.StringEncoder"); //此处序列化类用String
properties.put("request.required.acks", "1"); //可设置为0, 1, -1, 通常生产用1, 最严谨是-1, 不能用0
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}
//用线程来启动producer
@Override
public void run() {
int messageNo = 1;
while(true) {
String message = "massage_" + messageNo;
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("send: " + message);
messageNo++;
//2s间隔发送一次
try {
Thread.sleep(2000);
} catch(Exception e) {
e.printStackTrace();
}
}
}
}
Consumer:
建立过程:
- 构造方法中传入topic
- 建立createConnector方法,返回值是一个ConsumerConnector,注意不直接是Consumer
- 按照producer同样的方法,往ConsumerConnector中传入所须要的属性zookeeper.connect group.id
执行过程:经过Thread的run方法改写:
- 为了建立messageStream,先建立一个Map,装topic和kafka stream的数量
- 建立messageStream,并获取每次的数据
- 对messageStream进行迭代,获取消息
实现代码:
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static com.sun.org.apache.xml.internal.security.keys.keyresolver.KeyResolver.iterator;
import static javafx.scene.input.KeyCode.V;
/*
* Kafka消费者
* */
public class KafkaConsumer extends Thread {
private String topic;
public KafkaConsumer(String topic) {
this.topic = topic;
}
//ConsumerConnector选择kafka.javaapi.consumer包
//此处是要建立consumer链接器, 而不是建立consumer, 区别于producer
private ConsumerConnector createConnector() {
//一样地设置ConsumerConfig对象的属性
//须要设置ZK
Properties properties = new Properties();
properties.put("zookeeper.connect", KafkaProperties.ZK);
properties.put("group.id", KafkaProperties.GROUP_ID);
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
//线程启动consumer
@Override
public void run() {
ConsumerConnector consumer = createConnector();
//因为createMessageStreams须要传入一个Map, 因此建立一个
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//map中放入topic和kafka stream的数量
topicCountMap.put(topic, 1);
//建立messageStream, 从源码中可看出它的数据类型
//String是topic, List是数据比特流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
//获取每次的数据
KafkaStream<byte[], byte[]> byteStream = messageStream.get(topic).get(0);
//数据流进行迭代
ConsumerIterator<byte[], byte[]> iterator = byteStream.iterator();
while (iterator.hasNext()) {
//因为iterator里面的是byte类型,要转为String
String message = new String(iterator.next().message());
System.out.println("receive:" + message);
}
}
}
4、Kafka简易实战
整合Flume和Kafka完成实时数据采集
Kafka sink做为producer链接起来
技术选型:
Agent1: exec source -> memory channel -> avro sink
Agent2: avro source -> memory channel -> kafka sink(producer)
producer -> consumer
配置exec-memory-avro:
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /usr/local/mycode/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444
# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
配置avro-memory-kafka:
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = localhost
avro-memory-kafka.sources.avro-source.port = 44444
# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 5
avro-memory-kafka.sinks.kafka-sink.kafka.kafka.producer.acks = 1
# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
启动两个flume agent:(注意前后顺序)
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf --name avro-memory-kafka -Dflume.root.logger=INFO,console
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf --name exec-memory-avro -Dflume.root.logger=INFO,console
启动kafka consumer:
kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic
执行过程比较慢!要等一下 concumer的控制台才有数据显示