Kafka官网本身的介绍是:一个可支持分布式的流平台。
kafka官网介绍前端
kafka三个关键能力: 1.发布订阅记录流,相似于消息队列与企业信息系统 2.以容错的持久方式存储记录流 3.对流进行处理 kafka一般应用再两大类应用中: 1.构建实时流数据管道,在系统或应用程序之间可靠地获取数据 2.构建转换或响应数据流的实时流应用程序 kafka的一些基本概念: 1.Kafka做为一个集群运行在一个或多个服务器上,这些服务器能够跨越多个数据中心。 2.Kafka集群将记录流存储在称为topic的类别中。 3.每一个记录由一个键、一个值和一个时间戳组成。 kafka核心API: 1.Producer API:容许应用程序将记录流发布到一个或多个topic。 2.Consumer API:容许应用程序订阅一个或多个topic并处理生成给它们的记录流。 3.Streams API:容许应用程序充当流处理器,使用来自一个或多个topic的输入流, 并生成一个或多个输出topic的输出流,从而有效地将输入流转换为输出流。 4.Connector API:容许构建和运行可重用的生产者或消费者,将topic链接到现有的应用程序或数据系统。 例如,到关系数据库的链接器可能捕获对表的每一个更改。
传统消息传递有两类模型:消息队列、发布订阅。在消息队列中,一个消费者池能够从一个服务器读取数据,而每一个记录都将被发送到其中一个服务器;在发布-订阅中,记录被广播给全部消费者。这两种模型各有优缺点:vue
消息队列优缺点: 它容许您在多个使用者实例上划分数据处理,这使您能够扩展处理。 队列不是多订阅者的—一旦一个进程读取了它丢失的数据。 发布订阅优缺点: Publish-subscribe容许您将数据广播到多个进程, 可是因为每一个消息都传递到每一个订阅者,所以没法扩展处理。
做为消息传递系统,那么跟mq有什么区别呢?(RabbitMq\redis\RocketMq\ActiveMq)java
RabbitMQ: 遵循AMQP协议,由内在高并发的erlang语言开发,用在实时的对可靠性要求比较高的消息传递上. 万级数据量,社区活跃度极高,可视化操做界面丰富。 提供了全面的核心功能,是消息队列的优秀产品。 由于是erlang语言开发,难以维护而且开发者很难二次开发。 Redis: redis的主要场景是内存数据库,做为消息队列来讲可靠性太差,并且速度太依赖网络IO。 在服务器本机上的速度较快,且容易出现数据堆积的问题,在比较轻量的场合下可以适用。 RocketMq: rocketMq几十万级别数据量,基于Java开发。是阿里巴巴开源的一个消息产品。 应对了淘宝双十一考验,而且文档十分的完善,拥有一些其余消息队列不具有的高级特性, 如定时推送,其余消息队列是延迟推送,如rabbitMq经过设置expire字段设置延迟推送时间。 又好比rocketmq实现分布式事务,比较可靠的。RocketMq也是用过的惟一支持分布式事务的一款产品。 Kafka: kafka本来设计的初衷是日志统计分析,如今基于大数据的背景下也能够作运营数据的分析统计。 kafka真正的大规模分布式消息队列,提供的核心功能比较少。基于zookeeper实现的分布式消息订阅。 几十万级数据量级,比RokectMq更强。 客户端和服务器之间的通讯是经过一个简单的、高性能的、语言无关的TCP协议来完成的。 ActiveMq: Apache ActiveMQ™是最流行的开源、多协议、基于java的消息服务器。它支持行业标准协议, 所以用户能够在各类语言和平台上选择客户端。可使用来自C、c++、Python、. net等的链接性。 使用通用的AMQP协议集成您的多平台应用程序。使用STOMP在websockets上交换web应用程序之间的消息。 使用MQTT管理物联网设备。支持您现有的JMS基础结构及其余。ActiveMQ提供了支持任何messagi的强大功能 和灵活性。
备注:由于该文章主要介绍kafka,因此上述只是简单罗列了一些特色,若是有兴趣的同窗能够详细的分析一下,这些产品我后续都会专门写文章来概括总结分析,在这里先简单带过。c++
该部分是扩展内容,不少人包括我刚毕业那年使用消息队列,但别人问道我为啥用消息队列,我都没有一个很清晰的认识,因此在这里也说一下。但愿给有须要的同窗一些帮助。web
那么为何要使用消息队列呢?首先咱们来回顾一下消息传递。前端而言,传统方式是经过全局变量来传递,后面有了数据总线的概念,再后来有相应的解决方案产品好比说vuex、redux、store等。对于后端来讲,最早系统之间的通讯,消息传递都很是依赖于通讯对象彼此,高度耦合,后面有了一些产品来解决这些问题,好比说webservice.但这样的方式极其不友好,并且维护繁琐,职责难以分清,工做量增长,因此mq诞生后,基本解决了这些问题。redis
消息队列的引入是为了:vuex
1.解耦: 好比:A系统操做p,须要将消息传递给B、C两个系统,若是没有消息队列,那么A系统中须要给B发一条消息, 又得给C发一条消息,而后有一天D、E、F系统说:A系统你也要给我发p的消息,这个时候A又得修改代码, 发布上线,DEF才能正常接收消息。而后过了n天,C又说,不要给我发消息了,把给我发消息的部分去掉吧。 A系统的开发人员又得哐哧哐哧的去掉,发布上线。这样日复一日,随着系统增多,接入和退出的操做增多, 那么A系统须要频繁发布上线,下降了稳定性、可用时间、同时每次上线都须要测试跟踪测试,这里面的成本 与风险不言而喻。而消息队列一旦引入,A不须要关心谁消费,谁退出消费,A只负责将消息放入队列便可, 而其余系统只须要监听这个队列,就算其余系统退出,对A而言也是没有任何影响的,可以一直持续不断的 提供服务,这难道不香吗? 2.异步 好比说:传统方式发送消息给B、C、D,须要120ms,那么若是采用了消息队列,就能够大大下降耗时。但 这些对于那些非必要的同步业务逻辑适用。 3.削峰 传统模式下,请求直接进入到数据库,当峰值到达必定时,必然会挂掉。若是适用了中间件消息队列,那么就能够很好的保证系统正常提供服务,这也是秒杀系统中会经常谈到的限流、这样能够防止系统崩溃,提供系统可用性。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency>
/** * @author chandlerHuang * @description @TODO * @date 2020/1/15 */ public class KafkaProducerService implements Runnable { private final KafkaProducer<String,String> producer; private final String topic; public KafkaProducerService(String topic) { Properties props = new Properties(); props.put("bootstrap.servers", "绑定的外网IP:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); this.producer = new KafkaProducer<String, String>(props); this.topic = topic; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr="["+messageNo+"]:hello,boys!"; producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)); //生产了100条就打印 if(messageNo%100==0){ System.out.println("sendMessages:" + messageStr); } //生产1000条就退出 if(messageNo%1000==0){ System.out.println("successCount:"+messageNo); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerService test = new KafkaProducerService(TopicConstant.CHART_TOPIC); Thread thread = new Thread(test); thread.start(); } }
/** * @author chandlerHuang * @description @TODO * @date 2020/1/15 */ public class KafkaConsumerService implements Runnable{ private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerService(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "绑定的外网IP:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { //消费100条就打印 ,但打印的数据不必定是这个规律的 if(messageNo%100==0){ System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } //当消费了1000条就退出 if(messageNo%1000==0){ break; } messageNo++; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerService test1 = new KafkaConsumerService(TopicConstant.CHART_TOPIC); Thread thread1 = new Thread(test1); thread1.start(); } }
备注:上述demo编写过程当中,发现报了一个Exception:Kafka java client 链接异常(org.apache.kafka.common.errors.TimeoutException: Failed to update metadata )...数据库
kafka中须要配置server.文件:apache
advertised.listeners=PLAINTEXT://外网地址:9092 zookeeper.connect=内网地址:2181
若是你是云服务器的话须要,在安全组设置对应端口开放,不然没法访问响应接口!redux