kafka java编程

自定义系列化方式Encoderjava

kafka自带的序列化方式正则表达式

DefaultEncoder默认的这个Encoder事实上不作任何处理,接收到什么byte[]就返回什么byte[]:api

class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] { override def toBytes(value: Array[Byte]): Array[Byte] = value }服务器

NullEncoder无论接收什么都返回null:session

class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] { 架构

override def toBytes(value: T): Array[Byte] = null }app

StringEncoder则返回字符串,默认是utf-8的格式:负载均衡

class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {dom

 val encoding = ide

    if(props == null)

        "UTF8"

    else

        props.getString("serializer.encoding", "UTF8")

 override def toBytes(s: String): Array[Byte] =

    if(s == null)

        null

    else

        s.getBytes(encoding) }

本身编写Encoder来序列化消息,只须要实现下面接口:

interface Encoder<T> {  

  public Message toMessage(T data);  

}

例如,咱们的消息是一个对象

用四个字段分别表示消息的ID、用户、查询关键词和查询时间。固然你若是要设计的更复杂,能够加入IP这些信息。这些用java写就是一个简单的pojo类,这是getter/setter方法便可。因为在封转成kafkamessage时须要将数据转化成bytep[]类型,能够提供一个序列化的方法。我在这里直接重写toString了:

@Override

     public String toString() {

         String keyword = "[info kafka producer:]" ;

         keyword = keyword + this .getId() + "-" + this .getUser() + "-"

                 + this .getKeyword() + "-" + this .getCurrent();

         return keyword;

     }

这样尚未完成,这只是将数据格式用java对象表现出来,解析来要对其按照kafka的消息类型进行封装,在这里咱们只须要实现Encoder类便可:

public class KeywordMessage implements kafka.serializer.Encoder<Keyword>{

     public static final Logger LOG=LoggerFactory.getLogger(Keyword. class );

     @Override

     public Message toMessage(Keyword words) {

         LOG.info( "start in encoding..." );

         return new Message(words.toString().getBytes());

     }

 

}

自定义partition

kafka自带分区方式

DefaultPartitioner默认的分区函数,他根据key的hashcode与分区数取余,获得相应的分区。

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {

    private val random = new java.util.Random

    def partition(key: Any, numPartitions: Int): Int = {

        Utils.abs(key.hashCode) % numPartitions

        }

}

若是key为null会在必定时间内往一个特定的分区发送,超过必定时间又会随机选择一个,请参考 key为null时Kafka会将消息发送给哪一个分区? .因此推荐你发送Kafka消息时老是指定一个key,以便消息能均匀的分到每一个分区上。

自定义分区方式须要实现下面的接口:

interface Partitioner<T> {  

     int partition(T key, int numPartitions);  

分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.若是key是null,就随机的选择一个能够经过参数partitioner.class定制分区函数例如:

public class ProducerPartitioner implements Partitioner<String> {

     public static final Logger LOG=LoggerFactory.getLogger(Keyword. class );

     @Override

     public int partition(String key, int numPartitions) {

         LOG.info( "ProducerPartitioner key:" +key+ " partitions:" +numPartitions);

         return key.length() % numPartitions;

     }

}

key咱们是在构造数据发送对象时设置的,这个key是区分存储的关键,好比我想将个人数据按照不一样的用户类别存储。

java编写producer

producer api:

class Producer {  

/* 将消息发送到指定分区 */  

public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);  

/* 批量发送一批消息 */  

public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);  

/* 关闭producer */  

public void close();  

}  

例子:

Properties props = new Properties();		
//指定kafka节点:注意这里无需指定集群中全部Boker,只要指定其中部分便可,它会自动取meta信息并链接到对应的Boker节点		
props.put("metadata.broker.list", "172.17.1.163:9093");		
//指定采用哪一种序列化方式将消息传输给Boker,你也能够在发送消息的时候指定序列化类型,不指定则以此为默认序列化类型		
props.put("serializer.class", "kafka.serializer.StringEncoder");		
//指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也能够在发送消息的时候指定分区类型。	
props.put("partitioner.class", "example.producer.DefaultPartitioner");	
//该属性表示你须要在消息被接收到的时候发送ack给发送者。以保证数据不丢失	
props.put("request.required.acks", "1");		
ProducerConfig config = new ProducerConfig(props);		
//申明生产者:泛型1为分区key类型,泛型2为消息类型		
Producer<String, String> producer = new Producer<String, String>(config);
//建立KeyedMessage发送消息,参数1为topic名,参数2为分区名(若为null则随机发到一个分区),参数3为消息	
producer.send(new ProducerData<String,String>("topic","partitionKey1","msg1"));	
//另外一种写法producer.send(new ProducerRecord<String,String>("topic","partitionKey1","msg1"));		
producer.close();

 java编写consumer

Consumer API有两个级别。低级别的和一个指定的broker保持链接,并在接收完消息后关闭链接,这个级别是无状态的,每次读取消息都带着offset。

class SimpleConsumer {
/*向一个broker发送读取请求并获得消息集 */
public ByteBufferMessageSet fetch(FetchRequest request);
/*向一个broker发送读取请求并获得一个相应集 */
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
* 获得指定时间以前的offsets
* 返回值是offsets列表,以倒序排序
* @param time: 时间,毫秒,
* 若是指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 获得最新的offset.
* 若是指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),获得最老的offset.
*/
publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

注意:

1.你必须本身实现当中止消费时如何持久化offset

2.你必须本身找到哪一个broker是leader以便处理topic和分区

3.你必须本身处理leader变动

使用阶段:

1.找到那些broker是leader以便读取topic和partition

2.本身决定哪一个副本做为你的topic和分区

3.创建本身须要请求并自定义获取你感兴趣的数据

4.获取数据

5.当leader变动时本身识别和恢复。

例子:

 

String topic = "test2";

int partition = 1;

String brokers = "172.17.1.163:9093";

int maxReads = 100; // 读多少条数据

// 1.找leader

PartitionMetadata metadata = null;

for (String ipPort : brokers.split(",")) {

//咱们无须要把全部的brokers列表加进去,目的只是为了得到metedata信息,故只要有broker可链接便可

SimpleConsumer consumer = null;

try {

String[] ipPortArray = ipPort.split(":");

consumer = new SimpleConsumer(ipPortArray[0],

Integer.parseInt(ipPortArray[1]), 100000, 64 * 1024,

"leaderLookup");

List<String> topics = new ArrayList<String>();

topics.add(topic);

TopicMetadataRequest req = new TopicMetadataRequest(topics);

// 取meta信息

TopicMetadataResponse resp = consumer.send(req)

//获取topic的全部metedate信息(目测只有一个metedata信息,何来多个?)

List<TopicMetadata> metaData = resp.topicsMetadata();

for (TopicMetadata item : metaData) {

for (PartitionMetadata part : item.partitionsMetadata()) {

//获取每一个meta信息的分区信息,这里咱们只取咱们关心的partition的metedata

System.out.println("----"+part.partitionId());

if (part.partitionId() == partition) {

metadata = part;

break;

}

}

}

} catch (Exception e) {

System.out.println("Error communicating with Broker [" + ipPort

+ "] to find Leader for [" + topic + ", " + partition

+ "] Reason: " + e);

} finally {

if (consumer != null)

consumer.close();

}

}

if (metadata == null || metadata.leader() == null) {

System.out.println("meta data or leader not found, exit.");

return;

}

// 拿到leader

Broker leadBroker = metadata.leader();

// 获取全部副本

System.out.println(metadata.replicas());

// 2.获取lastOffset(这里提供了两种方式:从头取或从最后拿到的开始取,下面这个是从头取)

long whichTime = kafka.api.OffsetRequest.EarliestTime();

//这个是从最后拿到的开始取

// long whichTime = kafka.api.OffsetRequest.LatestTime();

System.out.println("lastTime:"+whichTime);

String clientName = "Client_" + topic + "_" + partition;

SimpleConsumer consumer = new SimpleConsumer(leadBroker.host(),

leadBroker.port(), 100000, 64 * 1024, clientName);

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,

partition);

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(

whichTime, 1));

OffsetRequest request = new OffsetRequest(requestInfo,

kafka.api.OffsetRequest.CurrentVersion(), clientName);

// 获取指定时间前有效的offset列表

OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {

System.out

.println("Error fetching data Offset Data the Broker. Reason: "

+ response.errorCode(topic, partition));

return;

}

// 千万不要认为offset必定是从0开始的

long[] offsets = response.offsets(topic, partition);

System.out.println("offset list:" + Arrays.toString(offsets));

long offset = offsets[0];

// 读数据

while (maxReads > 0) {

// 注意不要调用里面的replicaId()方法,这是内部使用的。

FetchRequest req = new FetchRequestBuilder().clientId(clientName)

.addFetch(topic, partition, offset, 100000).build();

FetchResponse fetchResponse = consumer.fetch(req);

if (fetchResponse.hasError()) {

// 出错处理。这里只直接返回了。实际上能够根据出错的类型进行判断,如code == ErrorMapping.OffsetOutOfRangeCode()表示拿到的offset错误

// 通常出错处理能够再次拿offset,或从新找leader,从新创建consumer。能够将上面的操做都封装成方法。再在该循环来进行消费

// 固然,在取全部leader的同时能够用metadata.replicas()更新最新的节点信息。另外zookeeper可能不会当即检测到有节点挂掉,故若是发现老的leader和新的leader同样,多是leader根本没挂,也多是zookeeper还没检测到,总之须要等等。

short code = fetchResponse.errorCode(topic, partition);

System.out.println("Error fetching data from the Broker:"

+ leadBroker + " Reason: " + code);

return;

}

//取一批消息

boolean empty = true;

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(

topic, partition)) {

empty = false;

long curOffset = messageAndOffset.offset();

//下面这个检测有必要,由于当消息是压缩的时候,经过fetch获取到的是一个整块数据。块中解压后不必定第一个消息就是offset所指定的。就是说存在再次取到已读过的消息。

if (curOffset < offset) {

System.out.println("Found an old offset: " + curOffset

+ " Expecting: " + offset);

continue;

}

// 能够经过当前消息知道下一条消息的offset是多少

offset = messageAndOffset.nextOffset();

ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];

payload.get(bytes);

System.out.println(String.valueOf(messageAndOffset.offset())

+ ": " + new String(bytes, "UTF-8"));

maxReads++;

}

//进入循环中,等待一会后获取下一批数据

if(empty){

Thread.sleep(1000);

}

}

// 退出(这里象征性的写一下)

if (consumer != null)

consumer.close();

高级别的API隐藏了和brokers链接的细节,在没必要关心服务端架构的状况下和服务端通讯。还能够本身维护消费状态,并能够经过一些条件指定订阅特定的topic,好比白名单黑名单或者正则表达式。

/* 建立链接 */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
* 这个方法能够获得一个流的列表,每一个流都是MessageAndMetadata的迭代,经过MessageAndMetadata能够拿到消息和其余的元数据(目前以后topic)
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
* 你也能够获得一个流的列表,它包含了符合TopicFiler的消息的迭代,
* 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
/* 提交目前消费到的offset */
public commitOffsets()
/* 关闭链接 */
public shutdown()
}

这个API围绕着由KafkaStream实现的迭代器展开,每一个流表明一系列从一个或多个分区多和broker上汇聚来的消息,每一个流由一个线程处理,因此客户端能够在建立的时候经过参数指定想要几个流。一个流是多个分区多个broker的合并,可是每一个分区的消息只会流向一个流。

注意:

1.上层api将会内部实现持久化每一个分区最后读到的消息的offset,数据保存在zookeeper中的消费组名中(如/consumers/id1/offsets/test2/2。其中id1是消费组,test2是topic,最后一个2表示第3个分区),每间隔一个很短的时间更新一次offset,那么可能在重启消费者时拿到重复的消息。此外,当分区leader发生变动时也可能拿到重复的消息。所以在关闭消费者时最好等待必定时间(10s)而后再shutdown()

2.消费组名是一个全局的信息,要注意在新的消费者启动以前旧的消费者要关闭。若是新的进程启动而且消费组名相同,kafka会添加这个进程到可用消费线程组中用来消费topic和触发从新分配负载均衡,那么同一个分区的消息就有可能发送到不一样的进程中。

3.若是消费的线程多于分区数,一些线程可能永远没法看到一些消息。

4.若是分区数多于线程数,一些线程会收到多个分区的消息

5.若是一个线程对应了多个分区,那么接收到的消息是不能保证顺序的。

备注:可用zk的命令查询:get /consumers/id1/owners/test3/2其中id1为消费组,test3为topic,2为分区3.查看里面的内容如:id1_163-PC-1382409386474-1091aef2-1表示该分区被该标示的线程所执行。

例子:

 

Properties props = new Properties();

// 指定zookeeper服务器地址

props.put("zookeeper.connect", "172.17.1.163:2181");

// 指定消费组(没有它会自动添加)

props.put("group.id", "id1");

// 指定kafka等待多久zookeeper回复(ms)以便放弃并继续消费。

props.put("zookeeper.session.timeout.ms", "4000");

// 指定zookeeper同步最长延迟多久再产生异常

props.put("zookeeper.sync.time.ms", "2000");

// 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次得到的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息

props.put("auto.commit.interval.ms", "1000");

ConsumerConnector consumer = Consumer

.createJavaConsumerConnector(new ConsumerConfig(props));

// 咱们要告诉kafka该进程会有多少个线程来处理对应的topic

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

int a_numThreads = 3;

// 用3个线程来处理topic:test2

topicCountMap.put("test2", a_numThreads);

// 拿到每一个stream对应的topic

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer

.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test2");

// 调用thread pool来处理topic

ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);

for (final KafkaStream stream : streams) {

executor.submit(new Runnable() {

public void run() {

ConsumerIterator<byte[], byte[]> it = stream.iterator();

while (it.hasNext()) {

System.out.println(Thread.currentThread() + ":"

+ new String(it.next().message()));

}

}

});

}

System.in.read();

// 关闭

if (consumer != null) consumer.shutdown();

if (executor != null) executor.shutdown();

相关文章
相关标签/搜索