kafka学习:入门

1.kafka安装

1.1下载安装包解压

tar -zxvf kafka_2.11-0.10.1.1.tgz 
cd kafka_2.11-0.10.1.1/

1.2启动服务

首先启动zookeeper:这里使用zookeeper默认配置。html

nohup bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &

启动kafka服务:这里使用单节点多代理配置。java

修改server.propertiesapache

broker.id=0   #节点ID
listeners=PLAINTEXT://192.168.1.112:9092    #节点配置
log.dirs=/opt/kafka_2.11-0.10.1.1/log    #数据日志保存目录
num.partitions=4    #默认分区数
zookeeper.connect=192.168.1.112:2181    #zookeeper服务

其余配置项保持默认。bootstrap

复制配置文件api

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改配置节点id 、端口、日志路径缓存

server-1.properties安全

broker.id=1   #节点ID
listeners=PLAINTEXT://192.168.1.112:9093    #节点配置
log.dirs=/opt/kafka_2.11-0.10.1.1/log-1    #数据日志保存目录
num.partitions=4    #默认分区数
zookeeper.connect=192.168.1.112:2181    #zookeeper服务

server-2.properties服务器

broker.id=2   #节点ID
listeners=PLAINTEXT://192.168.1.112:9094    #节点配置
log.dirs=/opt/kafka_2.11-0.10.1.1/log-2    #数据日志保存目录
num.partitions=4    #默认分区数
zookeeper.connect=192.168.1.112:2181    #zookeeper服务

启动kafka节点:broker0,broker1,broker2session

nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
nohup bin/kafka-server-start.sh config/server-1.properties >/dev/null 2>&1 &
nohup bin/kafka-server-start.sh config/server-2.properties >/dev/null 2>&1 &

1.3建立topic

建立topic:设置备份2,分区3oracle

./bin/kafka-topics.sh --create --zookeeper 192.168.1.112:2181 --replication-factor 2 --partitions 3 --topic my-topic

运行命令 “describe topics”查看topic信息

bin/kafka-topics.sh --describe --zookeeper 192.168.1.112:2181 --topic my-topic

输出:

Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: my-topic	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0
	Topic: my-topic	Partition: 2	Leader: 0	Replicas: 0,1	Isr: 0,1

第一行是全部分区的摘要,其次,每一行提供一个分区信息。这里三行显示每一个分区的信息。

“Partition”:分区信息

“Leader”:该节点负责该分区的全部的读和写,每一个节点的leader都是随机选择的。

“Replicas”:备份节点列表,不管该节点是不是leader或者目前是否还活着,只是显示。

“Isr”:“同步备份”的节点列表,也就是活着的节点而且正在同步leader。

在topic中发布消息:

bin/kafka-console-producer.sh --broker-list 192.168.1.112:9092 --topic my-topic

this is message 1
this is message 2

在topic中获取消息

bin/kafka-console-consumer.sh --zookeeper 192.168.1.112:2181 --topic my-topic --from-beginning

this is message 1
this is message 2

测试kill掉其中一个节点,分区是否正常工做,这里咱们停掉broker0

ps | grep server.properties

kill -9 3789

而后查看分区信息

bin/kafka-topics.sh --describe --zookeeper 192.168.1.112:2181 --topic my-topic
Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: my-topic	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2
	Topic: my-topic	Partition: 2	Leader: 1	Replicas: 0,1	Isr: 1

跟前面执行"describe topic"命令相比,分区2的leader变为broker1,“同步备份”的 列表中也没有broker0。执行从最初获取消息的命令也会发现,消息并无丢失。

2.kafka接口API

Apache Kafka引入一个新的java客户端(在org.apache.kafka.clients 包中),替代老的Scala客户端,可是为了兼容,将会共存一段时间。为了减小依赖,这些客户端都有一个独立的jar,而旧的Scala客户端继续与服务端保留在同个包下。

在项目中引入kafka服务端jar包,包含scala客户端。

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.11</artifactId>
	<version>0.10.1.1</version>
</dependency>

咱们鼓励全部新开发的程序使用新的JAVA客户端,新的java客户端比之前的Scala的客户端更快、功能更全面。

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.1</version>
		</dependency>

2.1生产者API

生产者是线程安全的,在多个线程之间共享单个生产者实例。

public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put("bootstrap.servers", "192.168.1.112:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 10);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		KafkaProducer producer = new KafkaProducer(props);
		for (int i = 0; i < 10; i++) {
			Future<RecordMetadata> future = producer
					.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
		}
		producer.close();

	}

生产者缓冲空间池保留还没有发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。

默认缓冲可当即发送,即使缓冲空间尚未满,可是,若是你想减小请求的数量,能够设置linger.ms大于0。这将指示生产者发送请求以前等待一段时间,但愿更多的消息填补到未满的批中。

send():方法是异步的,添加消息到缓冲区等待发送,并当即返回。生产者将单个的消息批量在一块儿发送来提升效率。

producer提供两种send方法实现,第一种不带回调方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record)

异步发送一条消息到topic,而且消息一旦被保存到“等待发送的消息缓存”中,此方法就当即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。发送消息返回的结果是RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。

因为send调用是异步的,它将为发送此消息的响应RecordMetadata返回一个Future。调用future的get()方法则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。

阻塞方式调用:

Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
			RecordMetadata recordMetadata = future.get();
			System.out.println("partition="+recordMetadata.partition()+",offset="+recordMetadata.offset()+"value="+i);

另外一种send方法实现是带回调方法:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

异步发送消息,当发送成功或异常时会调用回调方法,返回发送结果RecordMetadata信息或异常。这样能够不阻塞的方式确保每条消息发送成功。

非阻塞方式调用:

Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)),new Callback() {
						@Override
						public void onCompletion(RecordMetadata metadata, Exception exception) {
							if(exception == null){
								//发送成功
								System.out.println("partition=" + metadata.partition() + ",offset="+ metadata.offset());
							}else{
								//发送异常
								exception.printStackTrace();
							}
							
						}
					});

发送到同一个分区的消息回调保证按必定的顺序执行。

注意:callback通常在生产者的I/O线程中执行,因此是至关的快的,不然将延迟其余的线程的消息发送。若是你须要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用本身的Executor来并行处理。

2.2消费者API

随着0.9.0版本,咱们已经增长了一个新的Java消费者替换咱们现有的基于zookeeper的高级和低级消费者。这个客户端仍是测试版的质量。为了确保用户平滑升级,咱们仍然维护旧的0.8版本的消费者客户端继续在0.9集群上工做,两个老的0.8 API的消费者( 高级消费者 和 低级消费者)。

这个新的消费API,清除了0.8版本的高版本和低版本消费者之间的区别。

kafka客户端消费者类

public class KafkaConsumer<K, V> implements Consumer<K, V>

偏移量和消费者位置

kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的惟一标示符。也表示消费者在分区的位置。

消费者的位置指向下一条记录的偏移量。它比消费者在该分区中读取过的最大偏移量要大一个。 它在每次消费者在调用poll(long)中拉取消息时自动增加。

“已提交”的位置是已保存的最后偏移量,若是进程失败或从新启动时,消费者将恢复到这个偏移量。消费者能够选择按期自动提交偏移量,也能够选择经过调用commit API来手动的控制(如:commitSync 和 commitAsync)。

消费者组和订阅主题

消费者组:具备相同group.id的消费者进程,能够在一台机器上运行也能够分布在多台机器上,拉取kafka消息并处理消息。

订阅主题:分组中的每一个消费者都经过subscribe API动态的订阅一个topic列表。kafka将已订阅topic的消息发送到每一个消费者组中。并经过平衡分区在消费者分组中全部成员之间来达到平均。

消费者组的成员是动态维护的:若是一个消费者故障。分配给它的分区将从新分配给同一个分组中其余的消费者。一样的,若是一个新的消费者加入到分组,将从现有消费者中移一个分区给它。这被称为从新平衡分组。

此外,当分组从新分配自动发生时,能够经过ConsumerRebalanceListener通知消费者,这容许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。容许消费者经过使用assign(Collection)手动分配指定分区,若是使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。

消费者故障

订阅一组topic后,当调用poll(long)时,消费者将自动加入到组中。只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者向服务器定时发送心跳。 若是消费者崩溃或没法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,而且其分区将被从新分配。

还有一种可能,消费可能遇到“活锁”的状况,它持续的发送心跳,可是没有处理。为了预防消费者在这种状况下一直持有分区,咱们使用max.poll.interval.ms活跃检测机制。 在此基础上,若是你调用的poll的频率大于最大间隔,则客户端将主动地离开组,以便其余消费者接管该分区。 发生这种状况时,你会看到offset提交失败(调用commitSync()引起的CommitFailedException)。这是一种安全机制,保障只有活动成员可以提交offset。因此要留在组中,你必须持续调用poll。

两个配置设置来控制poll循环:

max.poll.interval.ms:增大poll的间隔,能够为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,一般返回的消息都是一批)。缺点是此值越大将会延迟组从新平衡。

max.poll.records:此设置限制每次调用poll返回的消息数,这样能够更容易的预测每次poll间隔要处理的最大值。经过调整此值,能够减小poll间隔,减小从新平衡分组的。

对于消息处理时间不可预测地的状况,这些选项是不够的。 处理这种状况的推荐方法是将消息处理移到另外一个线程中,让消费者继续调用poll。

另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量。 还要注意,你须要pause暂停分区,不会从poll接收到新消息,让线程处理完以前返回的消息(若是你的处理能力比拉取消息的慢,那建立新线程将致使你机器内存溢出)。

自动提交偏移量实例:

public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "192.168.1.112:9092");
		props.setProperty("group.id", "test");
		props.setProperty("enable.auto.commit", "true");
		props.setProperty("auto.commit.interval.ms", "1000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("my-topic"));
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);//拉取消息超时时间100ms
			for (ConsumerRecord<String, String> consumerRecord : records) {
				System.out.printf("offset=%d,key=%s,value=%s%n", consumerRecord.offset(), consumerRecord.key(),
						consumerRecord.value());
			}
			
		}

	}

enable.auto.commit:true 自动提交偏移量
auto.commit.interval.ms:1000 自动提交偏移量间隔时间

session.timeout.ms:30000 broker自动检测客户端进程心跳的最大超时时间。若是中止心跳的时间超过该值,则认为该客户端出现故障,它的分区将被从新分配。

集群是经过配置bootstrap.servers指定一个或多个broker。不用指定所有的broker,它将自动发现集群中的其他的borker(最好指定多个,万一有服务器故障)。

手动控制偏移量

不须要定时的提交offset,能够本身控制offset,当消息被认为已消费过了,这个时候再去提交它们的偏移量,这样能够保证消息被完整的处理。

public static void main(String[] args) {
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "192.168.1.112:9092");
		props.setProperty("group.id", "test");
		props.setProperty("enable.auto.commit", "false");
		props.setProperty("session.timeout.ms", "30000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		
		KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("my-topic"));
		final int minBatchSize = 20;
		List<ConsumerRecord<String,String>> buffer = new ArrayList<>();
		while(true){
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> consumerRecord : records) {
				buffer.add(consumerRecord);//拉取kafka数据到内存中
			}
			if(buffer.size() >= minBatchSize){
				//处理buffer中消息
				for (ConsumerRecord<String,String> cr : buffer) {
					System.out.println("处理消息:"+cr.value());
				}
				consumer.commitAsync();//处理完内存中缓存的消息以后手动提交
				System.out.println("提交offset");
				buffer.clear();
			}
			
		}
	}

在这个例子中,咱们将消费一批消息并将它们存储在内存中。当咱们积累足够多的消息后,咱们再将它们批量处理。等确保消息被成功彻底处理则手动提交offset。

这种状况咱们能够保证“至少一次”,上面例子当处理完全部消息以后,在提交offset时出现异常(虽然几率很小)。这种状况下进程重启以后就会重复消费这部分消息。“至少一次”保证,在故障状况下,能够重复。

使用自动提交也能够“至少一次”。可是要求你必须下次调用poll(long)以前或关闭消费者以前,处理完全部返回的数据。若是操做失败,这将会致使已提交的offset超过消费的位置,从而致使丢失消息。使用手动控制offset的优势是,你能够直接控制消息什么时候提交。

处理完每一个分区中的消息后,提交偏移量实例:

public static void main(String[] args) {
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "192.168.1.112:9092");
		props.setProperty("group.id", "test");
		props.setProperty("enable.auto.commit", "false");
		props.setProperty("session.timeout.ms", "30000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("my-topic"));
		while (true) {
			ConsumerRecords<String,String> records = consumer.poll(100);
			for (TopicPartition partition : records.partitions()) {
				//获取分区中的消息
				List<ConsumerRecord<String,String>> partitionRecords = records.records(partition);
				for (ConsumerRecord<String, String> consumerRecord : partitionRecords) {
					System.out.println("partion:"+consumerRecord.partition()+",value:"+consumerRecord.value()+",offset:"+consumerRecord.offset());
				}
				long lastOffset = partitionRecords.get(partitionRecords.size()-1).offset();
				System.out.println(partition.partition()+"分区最后offset:"+lastOffset);
				//提交最后一个消息的下一个offset
				consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1)));
			}
		}
	}

注意:已提交的offset应始终是你的程序将读取的下一条消息的offset。所以,调用commitSync(offsets)时,你应该加1个到最后处理的消息的offset。

订阅指定分区

分区的设置只能经过调用assign修改,由于手动分配不会进行分组协调,所以消费者故障不会引起分区从新平衡。

public static void main(String[] args) {
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "192.168.1.112:9092");
		props.setProperty("group.id", "test");
		props.setProperty("enable.auto.commit", "true");
		props.setProperty("auto.commit.interval.ms", "1000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		TopicPartition p = new TopicPartition("my-topic",0);//订阅主题的指定分区
		consumer.assign(Arrays.asList(p));
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);//拉取消息超时时间100ms
			for (ConsumerRecord<String, String> consumerRecord : records) {
				System.out.printf("offset=%d,key=%s,value=%s%n", consumerRecord.offset(), consumerRecord.key(),
						consumerRecord.value());
			}
			
		}
	}

assign方法只指定消费者拉取指定分区的消息,消费者分组仍须要提交offset。

自定义存储offset

消费者能够不使用kafka内置的offset仓库,能够选择本身来存储offset。将消费的offset和消息结果存储在同一个的系统中,用原子的方式存储结果和offset。提供的“正好一次”的消费保证比kafka默认的“至少一次”的语义要更高。

本身管理偏移量要注意一下几点:

1.关闭自动提交offset,enable.auto.commit=false。

2.根据ConsumerRecord 保存分区offset的位置。

3.启动时恢复分区消费到的位置,经过方法seek(TopicPartition, long)。

手动控制分区,启动时指定分区偏移量消费实例:

public class ConsumerTest5 {
	//用于跟踪偏移量的map,手动提交偏移量状况下,在分区再均衡时提交每一个分区消费的位置,以便均衡以后新的消费者开始从该位置进行消费
	static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap();
	//用于保存偏移量位置的map,以便下次启动时从该位置开始消费。
	static Map<String, Long> map = new ConcurrentHashMap<>(); 
	private static KafkaConsumer<String, String> consumer;

	public static void main(String[] args) {
		map.put("my-topic0", 1000l);
		map.put("my-topic1", 1000l);
		map.put("my-topic2", 1000l);

		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "192.168.1.112:9092");
		props.setProperty("group.id", "test");
		props.setProperty("enable.auto.commit", "false");
		props.setProperty("auto.commit.interval.ms", "1000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		consumer = new KafkaConsumer<>(props);
		// 手动指定位置消费,须要手动注册
		TopicPartition p = new TopicPartition("my-topic", 0);
		TopicPartition p1 = new TopicPartition("my-topic", 1);
		TopicPartition p2 = new TopicPartition("my-topic", 2);
		//手动注册分区
		consumer.subscribe(Arrays.asList("my-topic"),new ConsumerRebalanceListener() {
			
			@Override
			public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
				//再均衡开始以前和消费者中止读取消息以后被调用
				//在这里提交偏移量,下一个接管该分区的消费者就知道从什么位置开始消费。
				//提交全部分区的偏移量
				System.out.println("提交全部分区偏移量");
				consumer.commitSync(currentOffsets);
			}
			
			@Override
			public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
				//从新分配partition以后和消费者开始读取消息以前被调用
			}
		});
		//这里要先调用一次poll方法,由于本次拉取数据不是咱们须要的位置因此不作处理
		consumer.poll(100);
		//指定分区消费位置
		consumer.seek(p, map.get("my-topic0"));
		consumer.seek(p1, map.get("my-topic1"));
		consumer.seek(p2, map.get("my-topic2"));
		long offset = 0;
		String partitionKey;
		
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			Set<TopicPartition> partitions = records.partitions();
			for (TopicPartition topicPartition : partitions) {
				// 标识分区
				partitionKey = topicPartition.topic() + topicPartition.partition();
				System.out.println(partitionKey + "从" + map.get(partitionKey) + "开始消费");
				List<ConsumerRecord<String, String>> list = records.records(topicPartition);
				offset = 0;
				for (ConsumerRecord<String, String> consumerRecord : list) {
					System.out.println("partition=" + topicPartition.partition() + "offset=" + consumerRecord.offset()
							+ ",value=" + consumerRecord.value());
					offset = consumerRecord.offset();
				}
				// offset保存到map中
				map.put(partitionKey, offset + 1);
				//手动提交偏移量
				consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1)));
				//缓存跟踪分区的偏移量
				currentOffsets.put(topicPartition, new OffsetAndMetadata(offset+1));
			}

		}

	}
}

再均衡监听器(ConsumerRebalanceListener

在为消费者分配新的partition或者移除旧的partition时,能够经过消费者API执行一些应用程序代码,在使用subscribe()方法时传入一个ConsumerRebalanceListener实例。

ConsumerRebalanceListener须要实现的两个方法

1:public void onPartitionRevoked(Collection<TopicPartition> partitions)方法会在再均衡开始以前消费者中止读取消息以后被调用。若是在这里提交偏移量,下一个接管partition的消费者就知道该从哪里开始读取了。

2:public void onPartitionAssigned(Collection<TopicPartition> partitions)方法会在从新分配partition以后消费者开始读取消息以前被调用。

消费者流量控制

若是消费者分配了多个分区,并同时消费全部的分区,这些分区具备相同的优先级。在一些状况下,消费者须要首先消费一些指定的分区,当指定的分区有少许或者已经没有可消费的数据时,则开始消费其余分区。

kafka支持动态控制消费流量,分别在consumer的poll(long)中使用pause 和 resume 来暂停消费指定分配的分区,从新开始消费指定暂停的分区。

public void pause(Collection<TopicPartition> partitions) //暂停消费指定分区。

public void resume(Collection<TopicPartition> partitions)//从新开始消费,已经暂停消费的分区。

2.3Stream API

 

2.4Connect API

相关文章
相关标签/搜索