上篇文章学习kafka的基本安装和基础概念,本文主要是学习kafka的经常使用API。其中包括生产者和消费者,java
多线程生产者,多线程消费者,自定义分区等,固然还包括一些避坑指南。apache
首发于我的网站:连接地址bootstrap
kafka版本:2.11-1.1.1centos
操做系统:centos7api
java:jdk1.8缓存
有了以上这些条件就OK了,具体怎么安装和启动Kafka这里就不强调了,能够看上一篇文章。服务器
新建一个maven工程,须要的依赖以下:网络
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.1</version> </dependency>
kafka的核心就是主题,学会使用kafka的脚本建立主题,也须要学习使用Java API来建立主题。session
Kafka将zookeeper的操做封装成一个ZkUtils类,经过AdminUtils类来调用ZkUtils,来实现Kafka中元数据的操做。多线程
下面一个例子是使用AdminUtils来建立主题,并同时建立指定大小的分区数。
1 // 链接配置 2 private static final String ZK_CONNECT = "10.0.90.53:2181"; 3 4 // session过时时间 5 private static final int SEESSION_TIMEOUT = 30 * 1000; 6 7 // 链接超时时间 8 private static final int CONNECT_TIMEOUT = 30 * 1000; 9 10 /** 11 * 建立主题 12 * 13 * @param topic 主题名称 14 * @param partition 分区数 15 * @param repilca 副本数 16 * @param properties 配置信息 17 */ 18 public static void createTopic(String topic, int partition, int repilca, Properties properties) { 19 ZkUtils zkUtils = null; 20 try { 21 // 建立zkutil 22 zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled()); 23 if (!AdminUtils.topicExists(zkUtils, topic)) { 24 //主题不存在,则建立主题 25 AdminUtils.createTopic(zkUtils, topic, partition, repilca, properties, AdminUtils.createTopic$default$6()); 26 } 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } finally { 30 zkUtils.close(); 31 } 32 }
执行该方法,建立主题,
在centos7中查看以前建立的主题:
bin/kafka-topics.sh --list --zookeeper localhost:2181
删除主题:
/** * 删除主题 * * @param topic */ public static void deleteTopic(String topic){ ZkUtils zkUtils = null; try { zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled()); AdminUtils.deleteTopic(zkUtils,topic); } catch (Exception e) { e.printStackTrace(); } finally { zkUtils.close(); } }
在掌握了建立和删除主题以后,接下来,学习Kafka的生产者API。
Kafka中的生产者,经过KafkaProducer这个类来实现的,在介绍这个类的使用以前,首先介绍kafka的配置项,这也是实际生产中比较关心的。
实例化生产者时,有三个配置是必须指定的:
向Kafka发送一个消息,基本上要通过如下的流程:
1.配置Properties对象,这个是必须的
2.实例化KafkaProducer对象
3.实例化ProducerRecord对象,每条消息对应一个ProducerRecord对象
4.调用KafkaProducer的send方法,发送消息。发送消息有两种,一种是带回调函数的(若是发送消息有异常,会在回调函数中返回),另外一种是不带回调函数的。
KafkaProducer默认是异步发送消息,首先它会将消息缓存到消息缓冲区中,当缓存区累积到必定数量时,将消息封装成一个
RecordBatch,统一发送消息。也就是说,发送消息实质上分为两个阶段,第一将消息发送到消息缓冲区,第二执行网络I/O操做
5.关闭KafkaProducer,释放链接的资源。
了解以上的流程,那么接下来就实现Java版本的API。
第一步:
新建一个消息实体类,模拟支付订单消息,包含消息的ID,商家名称,建立时间,备注。
public class OrderMessage { // 订单ID private String id; // 商家名称 private String sName; // 建立时间 private long createTime; // 备注 private String remake; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getsName() { return sName; } public void setsName(String sName) { this.sName = sName; } public long getCreateTime() { return createTime; } public void setCreateTime(long createTime) { this.createTime = createTime; } public String getRemake() { return remake; } public void setRemake(String remake) { this.remake = remake; } @Override public String toString() { return "OrderMessage{" + "id='" + id + '\'' + ", sName='" + sName + '\'' + ", createTime=" + createTime + ", remake='" + remake + '\'' + '}'; } }
第二步:
这里简单的发送一个消息demo,按照上面的流程,生产者例子以下:
package kafka.producer; import kafka.OrderMessage; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.UUID; /** * kafka生产者 */ public class ProducerSimpleDemo { static Properties properties = new Properties(); //主题名称 static String topic = "myTopic"; //生产者 static KafkaProducer<String, String> producer = null; //生产者配置 static { properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092"); properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); } public static void main(String args[]) throws Exception { sendMsg(); } /** * 发送消息 * * @throws Exception */ public static void sendMsg() throws Exception { ProducerRecord<String, String> record = null; try { // 循环发送一百条消息 for (int i = 0; i < 10; i++) { // 构造待发送的消息 OrderMessage orderMessage = new OrderMessage(); orderMessage.setId(UUID.randomUUID().toString()); long timestamp = System.nanoTime(); orderMessage.setCreateTime(timestamp); orderMessage.setRemake("remind"); orderMessage.setsName("test"); // 实例化ProducerRecord record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString()); producer.send(record, (metadata, e) -> { // 使用回调函数 if (null != e) { e.printStackTrace(); } if (null != metadata) { System.out.println(String.format("offset: %s, partition:%s, topic:%s timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp())); } }); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
运行,结果就出现了,异常。
异常记录:
2018-07-30 18:05:10.755 DEBUG 10272 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Connection with localhost/127.0.0.1 disconnected java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_111] at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_111] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148) [kafka-clients-0.10.1.1.jar:na] at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]
在配置kafka中,首先须要修改kafka的配置server.properties中的
advertised.listeners=PLAINTEXT://:your.host.name:9092
advertised.listeners=PLAINTEXT://10.0.90.53:9092
须要注意的是,若是Kafka有多个节点,那么须要每一个节点都按照这个节点的实际hostname和port状况进行设置。
修改完毕,重启Kafka服务,开启消费者,接受消息,在服务器中输入:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic myTopic --from-beginning
能够看到服务器中的消费者:
offset: 0, partition:0, topic:myTopic timestamp:1533199115840 offset: 1, partition:0, topic:myTopic timestamp:1533199115850 offset: 2, partition:0, topic:myTopic timestamp:1533199115850 offset: 3, partition:0, topic:myTopic timestamp:1533199115850 offset: 4, partition:0, topic:myTopic timestamp:1533199115850 offset: 5, partition:0, topic:myTopic timestamp:1533199115850 offset: 6, partition:0, topic:myTopic timestamp:1533199115850 offset: 7, partition:0, topic:myTopic timestamp:1533199115852 offset: 8, partition:0, topic:myTopic timestamp:1533199115852 offset: 9, partition:0, topic:myTopic timestamp:1533199115852
Kafka在底层摒弃了Java堆缓存机制,采用了操做系统级别的页缓存,同时将随机写操做改成顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
这个在单机上的提升,对于集群,Kafka使用了分区,将topic的消息分散到多个分区上,并保存在不一样的机器上。
可是是否分区越多,效率越高呢?也不尽然!
1.每一个分区在底层文件系统都有属于本身的一个目录。该目录下一般会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每一个broker都保存这两个文件句柄(file handler)。很明显,若是分区数越多,所须要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
2.消费者和生产者都会为分区缓存消息,分区越多,缓存的消息就越多,占用的内存就越大。
3.下降高可用,Kafka是经过高可用来实现高可用性的。咱们知道在集群中每每会有一个leader,假设集群中有10个Kafka进程,1个leader,9个follwer,若是一个leader挂了,那么就会从新选出一个leader,若是集群中有10000个分区,那么将要花费很长的时间,这对于高可用是有损耗的。
自己kafka有本身的分区策略的,若是未指定,就会使用默认的分区策略:
Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。若是Key相同的话,那么就会分配到统一分区。
Kafka提供了自定义的分区器,只要实现Partitioner接口便可,下面是自定义分区的例子:
package kafka.partition;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区器
*/
public class PartitionUtil implements Partitioner {
// 分区数
private static final Integer PARTITION_NUM = 6;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (null == key){
return 0;
}
String keyValue = String.valueOf(key);
// key取模
int partitionId = (int) (Long.valueOf(key.toString())%PARTITION_NUM);
return partitionId;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName());
offset: 3, partition:5, topic:MyOrder timestamp:1533205894785 offset: 5, partition:3, topic:MyOrder timestamp:1533205893202 offset: 6, partition:3, topic:MyOrder timestamp:1533205894784 offset: 2, partition:2, topic:MyOrder timestamp:1533205894785 offset: 4, partition:1, topic:MyOrder timestamp:1533205894785 offset: 5, partition:1, topic:MyOrder timestamp:1533205894785 offset: 5, partition:0, topic:MyOrder timestamp:1533205894784 offset: 6, partition:0, topic:MyOrder timestamp:1533205894784 offset: 7, partition:0, topic:MyOrder timestamp:1533205894785 offset: 8, partition:0, topic:MyOrder timestamp:1533205894786
在实际生产过程当中,一般消息数量是比较多的,就能够考虑使用线程池。
使用线程池发送消息时,要考虑两点:1.须要结合实际状况,合理设计线程池的大小;2.使用线程池时,消息的发送是无序的,若是对消息的顺序有要求,不建议使用。
若是使用线程池,建议是只实例化一个KafkaProducer对象,这样性能最好。代码以下:
首先写一个线程类:
package kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; /** * 生产者线程 */ public class ProducerThread implements Runnable { private KafkaProducer<String, String> producer = null; private ProducerRecord<String, String> record = null; public ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) { this.producer = producer; this.record = record; } @Override public void run() { producer.send(record, (metadata, e) -> { if (null != e) { e.printStackTrace(); } if (null != metadata) { System.out.println("消息发送成功 : "+String.format("offset: %s, partition:%s, topic:%s timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp())); } }); } }
接着完成启动类,启动类中自定义了一个线程池,这里仍是有一些遐思,就是没有自定义,线程建立工厂,没有指定建立的线程名称,在实际生产中,最好是自定义线程工厂。
代码以下:
package kafka.producer; import kafka.OrderMessage; import kafka.partition.PartitionUtil; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; import java.util.UUID; import java.util.concurrent.*; /** * 线程池生产者 * * @author tangj * @date 2018/7/29 20:15 */ public class ProducerDemo { static Properties properties = new Properties(); static String topic = "MyOrder"; static KafkaProducer<String, String> producer = null; // 核心池大小 static int corePoolSize = 5; // 最大值 static int maximumPoolSize = 20; // 无任务时存活时间 static long keepAliveTime = 60; // 时间单位 static TimeUnit timeUnit = TimeUnit.SECONDS; // 阻塞队列 static BlockingQueue blockingQueue = new LinkedBlockingQueue(); // 线程池 static ExecutorService service = null; static { // 配置项 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(properties); // 初始化线程池 service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue); } public static void main(String args[]) throws Exception { for (int i = 0; i < 6; i++) { service.submit(createMsgTask()); } } /** * 生产消息 * * @return */ public static ProducerThread createMsgTask() { OrderMessage orderMessage = new OrderMessage(); orderMessage.setId(UUID.randomUUID().toString()); long timestamp = System.nanoTime(); orderMessage.setCreateTime(timestamp); orderMessage.setRemake("rem"); orderMessage.setsName("test"); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString()); ProducerThread task = new ProducerThread(producer, record); return task; } }
对于Kafka的分区器和多线程生成者,切记一点,必定要根据实际业务进行设计。