开发Kafka通用数据平台中间件前端
(含本次项目所有代码及资源)web
目录:数据库
一. Kafka概述数组
二. Kafka启动命令tomcat
三.咱们为何使用Kafka安全
四. Kafka数据平台中间件设计及代码解析服务器
五.将来Kafka开发任务并发
一. Kafka概述负载均衡
Kafka是Linkedin于2010年12月份建立的开源消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中很是常见,这些活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索状况等内容。 这些数据一般以日志的形式记录下来,而后每隔一段时间进行一次统计分析。框架
传统的日志分析系统是一种离线处理日志信息的方式,但若要进行实时处理,一般会有较大延迟。而现有的消息队列系统可以很好的处理实时或者近似实时的应用,但未处理的数据一般不会写到磁盘上,这对于Hadoop之类,间隔时间较长的离线应用而言,在数据安全上会出现问题。Kafka正是为了解决以上问题而设计的,它可以很好地进行离线和在线应用。
1.1 Kfka部署结构:
(图1)
1.2 Kafka关键字:
•Broker : Kafka消息服务器,消息中心。一个Broker能够容纳多个Topic。
•Producer :消息生产者,就是向Kafka broker发消息的客户端。
•Consumer :消息消费者,向Kafka broker取消息的客户端。
•Zookeeper :管理Producer,Broker,Consumer的动态加入与离开。
•Topic :能够为各类消息划分为多个不一样的主题,Topic就是主题名称。Producer能够针对某个主题进行生产,Consumer能够针对某个主题进行订阅。
•Consumer Group: Kafka采用广播的方式进行消息分发,而Consumer集群在消费某Topic时, Zookeeper会为该集群创建Offset消费偏移量,最新Consumer加入并消费该主题时,能够从最新的Offset点开始消费。
•Partition:Kafka采用对数据文件切片(Partition)的方式能够将一个Topic能够分布存储到多个Broker上,一个Topic能够分为多个Partition。在多个Consumer并发访问一个partition会有同步锁控制。
(图2)
1.3 消息收发流程:
•启动Zookeeper及Broker.
•Producer链接Broker后,将消息发布到Broker中指定Topic上(能够指定Patition)。
•Broker集群接收到Producer发过来的消息后,将其持久化到硬盘,并将消息该保留指定时长(可配置),而不关注消息是否被消费。
•Consumer链接到Broker后,启动消息泵对Broker进行侦听,当有消息到来时,会触发消息泵循环获取消息,获取消息后Zookeeper将记录该Consumer的消息Offset。
1.4 Kafka特性:
•高吞吐量
•负载均衡:经过zookeeper对Producer,Broker,Consumer的动态加入与离开进行管理。
•拉取系统:因为kafka broker会持久化数据,broker没有内存压力,所以,consumer很是适合采起pull的方式消费数据
•动态扩展:当须要增长broker结点时,新增的broker会向zookeeper注册,而producer及consumer会经过zookeeper感知这些变化,并及时做出调整。
•消息删除策略:数据文件将会根据broker中的配置要求,保留必定的时间以后删除。kafka经过这种简单的手段,来释放磁盘空间。
二. Kafka启动命令:
启动Zookeeper服务:
zookeeper-server-start.bat ../../config/zookeeper.properties
启动Broker服务:
kafka-server-start.bat ../../config/server.properties
经过Zookeeper的协调在Broker中建立一个Topic(主题)
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
查询当前Broker中某个指定主题的配置信息
kafka-run-class.bat kafka.admin.TopicCommand --describe --zookeeper localhost:2181 --topic testTopic
启动一个数据生产者Producer
kafka-console-producer.bat --broker-list localhost:9092 --topic testTopic
启动一个数据消费者Consumer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic testTopic --from-beginning
Zookeeper配置文件,zookeeper.properties配置片断
Broker配置文件,server.properties配置片断
关于kafka收发消息相关的配置项
1.在Broker Server中属性(这些属性须要在Server启动时加载):
//每次Broker Server可以接收的最大包大小,该参数要与consumer的fetch.message.max.bytes属性进行匹配使用
* message.max.bytes 1000000(默认)
//Broker Server中针对Producer发送方的数据缓冲区。Broker Server会利用该缓冲区循环接收来至Producer的数据 包,缓冲区太小会致使对该数据包的分段数量增长,但不会影响数据包尺寸限制问题。
socket.send.buffer.bytes 100 * 1024(默认)
//Broker Server中针对Consumer接收方的数据缓冲区。意思同上。
socket.receive.buffer.bytes 100 * 1024(默认)
//Broker Server中针对每次请求最大的缓冲区尺寸,包括Prodcuer和Consumer双方。该值必须大于 message.max.bytes属性
* socket.request.max.bytes 100 * 1024 * 1024(默认)
2.在Consumer中的属性(这些属性须要在程序中配置Consumer时设置)
//Consumer用于接收来自Broker的数据缓冲区,意思同socket.send.buffer.bytes。
socket.receive.buffer.bytes 64 * 1024(默认)
//Consumer用于每次接收消息包的最大尺寸,该属性须要与Broker中的message.max.bytes属性配对使用
* fetch.message.max.bytes 1024 * 1024(默认)
3.在Producer中的属性(这些属性须要在程序中配置Consumer时设置)
//Producer用于发送数据缓冲区,意思同socket.send.buffer.bytes。
send.buffer.bytes 100 * 1024(默认)
三. 咱们为何使用Kafka
当前项目中,咱们更但愿从企业得到尽量多的有价值数据。最直接获取大数据的方式是采用写应用直连目标企业数据库来得到数据。但这种方式在实际应用中,会因为企业担忧开放本地数据库而致使的安全隐患很难实施。另外,这种方式会与企业本地数据库结构耦合度太高,会出现多家企业多个应用的状况,缺乏统一的数据交互平台,致使后期维护困难。
3.1 Kafka在当前项目中问题:
当前案例,咱们想把某企业的本地数据实时同步到数据中心中,以后对这些数据进行二次分析处理。咱们的目标是创建统一的数据同步平台,便于在往后的多企业多系统中能有统一的实施标准,因此选用了Kafka消息系统做为支撑。
Producer(数据发送方)以独立线程方式常驻某企业内部应用中,依靠必定的时间周期,从本地数据库得到数据并推送至Broker中。而Consumer(快销组数据接收方)也是独立与WEB框架常驻内存,得到数据消息后保存至数据中心中。
但目前Kafka在实施中面临如下问题:
1.Producer/Consumer均独立于Web框架,Producer依靠消息片轮询检索/发送最新数据,执行效率低。
2.Producer会直接针对某企业内部数据库表结构操做,致使代码与企业业务耦合度太高,而没法平滑移植到其余企业系统中。
3.因为Producer/Consumer是独立于Web框架的,在外围负责数据的采集及推送,与Web项目主程序无切合度。
4.目前针对Kafka的数据传输异常处理比较简陋,当Broker或 Zookeeper等出现异常时,有可能会致使数据安全性问题。
3.2实现目标:
针对以上问题,咱们要实现以下目标:
1.把Producer/Consumer的数据推送/获取的过程封装成Class或者Jar包的形式,供Java Web框架调用,从而造成与企业内部Web应用或计算中心数据分析Web应用融合一体。
2.数据的推送/获取只针对Java Object对象,不要针对数据库表结构,不能与企业特有数据耦合度太高,造成通用的数据接口。Producer须要对Object进行序列化,Consumer须要对序列化后的二进制信息进行反序列化重建Object返回给调用者。
3.消息的推送/获取的整个生命周期中,要把重要事件通知给外部调用者,好比:Broker,Zookeeper是否有异常,数据推送/获取是否成功,若是失败须要保留失败记录便于进行后期数据恢复等。(须要在中间件中创建回调机制通知调用者)
4.可对多企业多应用进行平滑移植,移植过程当中尽量保持总体Kafka数据平台结构的零修改。
四. Kafka数据平台中间件设计
4.1解决方案:
基于以上待完成目标,咱们有了如下解决方案。
3.2 实现要点:
KfkProducer(数据生产者)
•KfkProducer对象须要在Web框架中的Application_OnStart()中启动,常驻进程,只与Broker链接一次,数据发送过程不能与Broker创建链接。(实践中发现Kafka的 Broker若是有异常,重启Broker后Producer不用再次链接便可发送)
•Web框架能够随时调用推送接口将对象(Object)推送至Broker.
•Object序列化后造成二进制信息,而且要保证在Consumer所处框架中能顺利还原.
•可发送多种对象(Object,File ,Byte[]等),简化外围框架针对待发送数据所作操做,简化调用接口。
•数据发送使用Kafka中最新的异步式数据发送API,不能因为发送时间过长或Broker异常等问题阻塞调用者。
•须要对整个发送生命期进行跟踪反馈异常信息,若发送失败,须要将待发送数据使用回调机制通知到框架调用者。
•详细测试Broker,或Zookeeper产生异常时,Producer可能会出现的状况。
•在针对多企业多应用中,可依靠Topic进行区分数据主题,这样可实现多应用部署时框架零修改问题。
KfkConsumer(数据消费者)
•KfkConsumer须要在计算中心内部Web框架中的Application_OnStart()中启动,常驻进程,只与Broker链接一次,并启动消息泵等待消息到来。(实践中发现Kafka的 Broker若是有异常,重启Broker后Consumer不用再次链接便可正常获取消息)
•须要定义回调接口,该回调接口由外围框架程序注册处理程序,当数据消息到来时,Consumer须要把数据发送至该接口,以后由调用者处理。
•调用者须要注册所接受的对象类型,由于Broker中同一Topic下会有各类数据对象(UserInfo,CompanyInfo,ProductInfo...)存在,因此必须提供接收对象的注册接口,以方便调用者有针对性的获取。
•数据到来时,要针对发送方序列化的二进制信息进行反序列化操做,并能准确还原成原始对象。
•须要对整个接收生命期进行跟踪反馈异常信息,若消息泵中止或异常,须要通知到框架调用者。
实现以上要点后,须要将KfkProducer及KfkConsumer对象打包成Jar包的形式,更灵活的部署到企业本地Web框架及计算中心内部Web框架中。
3.3 代码实现及分析:
3.3.1 KfkProducer 对象:数据生产者对象,封装了关于数据发送的相关功能。
接口函数/子对象 |
说明 |
KfkProducer () |
构造函数中须要调用者提供Broker集群的Ip,Port等信息。 Kafka支持Broker集群列表。(127.0.0.1:9092,127.0.0.1:9093)
|
Connect() |
该函数须要完成对Broker集群的链接。
|
Send() |
该函数入口为Object对象,须要对该对象进行Serialize操做,根据待发送数据构造KfkMsg对象,并取得由KfkMsg序列化后的Byte[]数组,以后调用Kafka的异步发送方式及挂接回调处理函数。
要实现多个Send()接口,须要提供对Object,File ,Byte[]等多种数据类型的支持,方便调用者操做。
|
Close() |
该函数完成对Broker链接进行关闭。
|
SendCallback发送回调对象 onCompletion()发送回调接口 |
在kafka异步发送函数send()中注册,在收到Broker返回的发送是否成功信息后,会触发该函数,并调用ProducerEvent对象的onSendMsg()函数,向调用者发送成功与否结果。
成功则返回调用者RecordMetadata信息(BrokerServer中的数据offset,Partition位置ID,Topic主题)
失败者返回调用者原始数据信息,便于往后恢复。
|
ProducerEvent接口对象 onSendMsg() |
为调用者提供的回调接口,调用者在注册后,便可重写onSendMsg()函数,以便接到通知后,处理当前事件(发送数据成功与否)状态。
|
3.3.2 KfkConsumer对象:数据消费者对象,封装了关于数据接收的相关功能。
接口函数/子对象 |
说明 |
KfkConsumer() |
构造函数中须要调用者提供Zookeeper集群的Ip,Port等信息。(即将推出的Kafka0.9.X版本将支持直连Broker集群的机制)
该对象继承至Thread对象,为线程对象。 |
connect() |
配置Zookeeper链接相关属性,并链接Zookeeper服务器。
|
run() |
线程主函数,该函数将启动Kafka消息泵等待Broker的消息到来。
消息到来后,将调用KfkMsg对象对二进制序列化信息进行还原对象操做(KfkMsg将对序列化数据进行反序列化操做,并从新还原原始对象操做)。
对象还原后,将调用调用者注册的回调接口,将对象传出。
|
close() |
关闭Consumer与Broker,Zookeeper的Socket链接。
|
ConsumerEvent接收回调对象 onRecvMsg()接收回调函数 |
为调用者提供的回调接口,调用者在注册后,便可重写onRecvMsg()函数,以便接到通知后,收取对象或处理当前事件。
|
3.3.3 KfkMsg对象:数据消息对象,封装了数据对象的序列化/反序列化操做,构造多种类型的发送对象,封装发送协议等操做。
接口函数/子对象 |
说明 |
MsgBase对象 |
消息包基类,能够在Consumer接到数据消息后,造成多种对象的反序列化多态性。
|
MsgObject对象 serializeMsg()序列化函数 deserializeMsg()反序列化函数 |
针对Object数据的序列化和反序列化操做,及消息体封装,通信协议构造等操做。
|
MsgByteArr对象 serializeMsg()序列化函数 deserializeMsg()反序列化函数 |
针对Byte[]数据的序列化和反序列化操做,及消息体封装,通信协议构造等操做。
|
MsgFile对象 serializeMsg()序列化函数 deserializeMsg()反序列化函数 |
针对二进制文件的序列化和反序列化操做,及消息体封装,通信协议构造等操做。
|
getMsgType()函数
|
负责对Consumer接收的序列化信息进行首次协议解析,判断对象类型(Object,File,byte[])以后构造对应的MsgXXX对象,以便使调用者进行反序列化多态功能。
|
3.3.4 SerializeUtils对象:序列化操做工具类,完成在Jar包内部对外部对象的序列化/反序列化基础从操做。
接口函数/子对象 |
说明 |
deserialize()函数 |
将序列化后的二进制数组byte[]还原成原始Object.
因为若是使用默认的ObjectInputStream对象进行反序列化操做,在Jar内将没法找到外部调用者定义的对象名,也即没法反序列化成功,报没法找到外部对象的异常。
因此必须重写resolveClass()函数,加载当前线程范围内的Class上下文。
|
Serialize()函数 |
将Object序列化成二进制数组,byte[]。
|
3.3.5 调用者Web框架部署:
KfkProducer部署:
部署要点 |
说明 |
1.注册发送消息回调函数 |
在WEB框架中的Application_OnStart()事件中向Jar注册发送消息回调函数。并重写onSendMsg()回调接口,用于接受发送成功/失败消息,发送失败后,能够在Web框架中针对返回的原始数据信息作备份/恢复处理。
|
2.创建与Broker之间的链接 |
在WEB框架中的 Application_OnStart()事件中调用KfkProducer connect()函数,链接远程Broker。
|
3.将KfkProducer传入框架 |
通过前两步操做后,咱们已经顺利创建KfkProducer对象,如今咱们须要把该对象传入Web框架中后续页面处理类中,以方便调用其send()函数进行数据发送。
在Play中咱们使用了cache对象机制,能够在Play Web App全生命期内得到KfkProducer对象实例。
|
4.关闭与Broker之间的链接 |
在WEB框架中的Application_OnStop()事件中调用KfkProducer的close()函数,关闭远程Broker链接。
|
KfkConsumer部署:
部署要点 |
说明 |
1.注册发送消息回调函数 |
在WEB框架中的Application_OnStart()事件中向Jar注册消息接收回调函数。并重写onRecvMsg()回调接口,用于接受来自Broker的数据信息。
在onRecvMsg()函数中,还需针对传入的Object对象进行instanceof比对操做,区分特定对象。
|
2.注册须要接收的Object类型 |
向Jar包中注册须要接收的对象类型,好比本应用须要接收(UserInfo,CompanyInfo,ProdcutInfo等对象)。 注册后,来自Broker的广播消息将被Jar包过滤,只返回调用者所需的对象数据。
|
3.创建与Zookeeper(Broker)之间的链接 |
在WEB框架中的 Application_OnStart()事件中调用KfkConsumer connect()函数,链接远程Zookeeper/Broker。
|
4.启动消息泵线程 |
通过前两步操做后,咱们已经顺利创建与Zookeeper/Broker创建链接。
咱们须要启动消息泵来收听消息的到来,这里须要调用KfkConsumer对象的start()函数启动消息泵线程常驻内存。
|
4.关闭与Zookeeper之间的链接 |
在WEB框架中的Application_OnStop()事件中调用KfkConsumer的close()函数,关闭远程Zookeeper/Broker链接。
|
五. 将来Kafka中间件
目前该中间件只完成了初级阶段功能,不少功能都不完善不深刻,随着应用业务的拓展及Kafka将来版本功能支持,。以Kafka消息中间件为中心的大数据处理平台还有不少任务去实现。
通常在互联网中所流动的数据由如下几种类型:
•须要实时响应的交易数据,用户提交一个表单,输入一段内容,这种数据最后是存放在关系数据库(Oracle, MySQL)中的,有些须要事务支持。
•活动流数据,准实时的,例如页面访问量、用户行为、搜索状况等。咱们能够针对这些数据广播、排序、个性化推荐、运营监控等。这种数据通常是前端服务器先写文件,而后经过批量的方式把文件倒到Hadoop(离线数据分析平台)这种大数据分析器里面,进行慢慢的分析。
•各个层面程序产生的日志,例如http的日志、tomcat的日志、其余各类程序产生的日志。这种数据一个是用来监控报警,还有就是用来作分析。
谢谢观赏!
注:基于全球开源共享理念,本人会分享更多原创及译文,让更多的IT人从中受益,与你们一块儿进步!
基因Cloud 原创,转发请注明出处
1738387@qq.com (工做繁忙,有事发邮件,QQ不加,非要事勿扰,多谢!)
2015 / 06 / 14