在大数据时代,数据规模变得愈来愈大。因为数据的增加速度和非结构化的特性,经常使用的软硬件工具已没法在用户可容忍的时间内对数据进行采集、管理和处理。本文主要介绍如何在阿里云上使用Kafka和Storm搭建大规模消息分发和实时数据流处理系统,以及这个过程当中主要遭遇的一些挑战。实践主要立足创建一套汽车状态实时监控系统,能够在阿里云上当即进行部署。node
大数据时代,随着可获取数据的渠道增多,好比常见的电子商务、网络、传感器的数据流、太空数据等,数据规模也变得愈来愈大;同时,不一样的渠道每每产生更多的数据类型,这些衍生的数据增加很是之快,规模很是之大。大数据时代各个机构可谓是坐拥金山,然而目前大数据技术的应用却仍然存在众多挑战,主要出如今数据收集、存储、处理和可视化几个过程。web
Gartner的Merv Adrian对大数据有这样一个定义:“大数据让经常使用硬件软件工具没法在用户可容忍时间内对数据进行采集、管理和处理。” [1] 麦肯锡全球研究院在2011年5月也有这样一个概念:“大数据是指超出典型数据库软件工具采集、存储、管理和分析能力的数据集。” [2] 从上面的定义能够看出,大数据最大的挑战在于如何在有限时间内对数据进行处理和分析,并获得有用信息。数据库
大数据处理中最著名的工具是Hadoop,不过它并非一套实时系统。为了解决这个问题,计算机工程师们又开发了Storm和Kafka。Apache Storm是一套开源的分布式实时计算系统。最先由Nathan Marz [3] 开发,在被Twitter收购后开源,并在2014年9月起成为Apache顶级开源项目。Storm被普遍用于各类商业网站,包括Twitter、Yelp、Groupon、百度、淘宝等。Storm的使用场景很是普遍,例如实时分析、在线机器学习、连续计算、分部署RPC、ETL等。Storm有着很是快的处理速度,单节点能够达到百万个元组每秒,此外它还具备高扩展、容错、保证数据处理等特性。缓存
图1是Storm的一个简单的架构。bash
图1 Storm架构服务器
Apache Kafka也是一个开源的系统,旨在提供一个统一的,高吞吐、低延迟的分布式消息处理平台来对实时数据进行处理。它最先由LinkedIn开发,开源于2011年并被贡献给了Apache。Kafka区别于传统RabbitMQ、Apache ActiveMQ等消息系统的地方主要在于:分布式系统特性,易于扩展;为发布和订阅提供高吞吐量;支持多订阅,能够自动平衡消费者;能够将消息持久化到磁盘,能够用于批量消费,例如ETL等。网络
图2 Kafka架构架构
咱们须要设计一个实时车辆监控系统,这个系统要将汽车驾驶过程当中实时的位置,速度,转速,油耗以及转速发送到系统中,从而能够实时计算出车流量和污染物排放量。该系统的目标是要能同事支持10万辆车同时发送消息,在最高峰能知足100万辆车。为了实现如此规模的消息分发和吞吐,咱们基于Kafka和Storm来设计实现。同时为了知足高扩展性,咱们将Storm和Kafka分别部署到不一样的服务器上,若是须要更多的计算能力,能够随时经过建立新的服务器的方式来完成。此外为了知足高可用性,每台相同功能的服务器也须要至少部署2台,这样一旦一台服务器出现问题,另一台服务器也能够持续提供服务。并发
在实体服务器上部署Storm和Kafka等系统涉及到大量服务器集群和软件的安装部署,这个过程须要花费大量时间,而云计算则很好的弥补了这一点——提供各类虚拟服务器和镜像功能,加快基础设施和软件的部署过程。app
图3 车联网监控系统架构
咱们须要2台服务器来构建Kafka代理服务器,在Storm中还须要2台服务器来运行Spout和2个Bolt,另外在Redis层则须要2台服务器来部署缓存,再加上2台服务器做为Web服务器。服务器架构图如图4所示。
图4 车联网监控系统架构
在部署车联网监控系统以前,咱们首先须要在每台服务器上部署相应的软件,包括Git、Libzmq、Java、G++等,用于代码编译和相关软件安装。可使用SSH链接到相应的机器。用户名密码则会由阿里云以邮件或者短消息的方式提供。
在车联网实时监控系统中,咱们须要部署4种不一样类型的服务器,分别是网站前台服务器、Kafka服务器、Storm服务器和缓存服务器,以知足上面提到的高扩展性的要求。在每一种类型的服务器部署完成以后,均可以经过阿里云镜像的功能,建立一个能随时使用的镜像,这样在扩展服务器的时候就不须要从新安装软件,直接经过镜像建立服务器就能够了。
如下命令须要在全部服务器上运行以安装相应的软件:
如下命令安装在缓存服务器和Kafka服务器上:
另外,咱们还须要在Storm的服务器安装maven和lein用于代码编译:
在Kafka服务器上安装Kafka:
对于Storm和Kafka的安装,到这一步已基本完成,接下去须要分别建立镜像。建立镜像的方法是先建立阿里云快照,而后经过将快照转换为镜像的方式完成。具体步骤以下:
图5 自定义的镜像
接下来,咱们经过镜像能够直接建立相同配置的ECS服务器。
图6 从自定义镜像中建立云服务器
固然,在自动扩展实现上,云服务并不须要用户去手动执行,这里咱们使用阿里云的ECS REST API自动经过镜像建立机器。能够参考如下Python代码,自动建立阿里云ECS虚拟机:
基于Storm和Kafka的车辆信息实时监控系统打造
接下来作的就是将车辆信息实时监控系统部署到系统中。这个系统演示了如何编写一个Storm的Topology,从Kafka消息系统中将信息读取出来。咱们使用Kafka的客户端模拟从世界各地发送车辆实时信息给Kafka集群,而后Storm Topology会把这些消息经过Bolts将坐标转换为Json对象,而且使用GeoJSON在Bing Map上显示车辆的实时位置、温度、转速以及速度等等信息。Topology还会将信息写到Redis缓存中,而后Node.js经过socket.io读取Redis中的信息,而且使用d3js显示在页面上。
首先,咱们须要编写Kafka 生产者的部分代码,主要是模拟读取汽车的实时数据并向Kafka集群进行发送,咱们实现了一个KafkaCarDataProducer类,经过配置ProducerConfig来建立一个Producer对象来发送数据。它能够用来链接到Zookeeper,或者直接是Kafka 代理。例如:kafkaclient.cloudapp.net:2181或者0:kafkaclient.cloudapp.net:9092。代码中咱们根据不一样的链接字符串设置不一样配置。伪代码以下:
而后就能够直接经过下面代码来发送消息:
接下来咱们须要编写3个Storm类,首先是建立Storm的Topology,这个类叫KafkaCarTopology,咱们建立了一个叫car的topic,而后定义本机一个hosts和Zookeeper hosts,最后建立一个Spout,叫作KafkaSpout,而后添加ParseCarDataBolt链接到KafkaSout,再建立一个RedisCarBolt,用于将结果写入Redis缓存。最后根据参数建立3个Worker,提交Storm Topology。
在这个拓扑结构中,咱们有2个Bolt用于数据的处理,第一个叫ParserCarDataBolt,这个Bolt主要将Kafka传出的消息转换为Json格式,它继承BaseBasicBolt,在execute函数中经过collector提交数据,同时重载了declareOutputFields函数,通知下一个Bolt的数据格式。代码以下:
数据会被写入RedisCarBolt,再写入到Redis缓存中。它继承自BaseRichBolt,须要重载prepare和excute方法来处理消息元组。此外还须要重载prepare和cleanup函数,几个关键的函数以下:
最后咱们还须要编写一些Node.js的代码,保证在页面上经过socket.io进行通信,实时将最终数据从Redis里面读取出来,并在BingMap上显示。
到此为止,一个简单的车辆信息实时监控系统就实现了,咱们经过bash脚本进行编译,并安装到相应的服务器上,好比下列代码须要被安装在Storm的服务器上:
有一点须要注意的是,因为在编译过程当中须要自动下载Storm库,在阿里云的国内机房的虚拟机颇有可能须要设置代理进行。设置代理的方法也很简单,经过对lein命令增长如下参数就能够了:http_proxy=http://URL:PORT
接着咱们在网页上访问http://webhostname或者运行node.js的服务器,就会看到下面的网页,同时发现网页将同步刷新汽车的实时位置、速度、转速等。
图7 车联网监控系统演示页面
接下来咱们对这个系统进行了一个简单的吞吐量测试。咱们只有1个Topic,使用5个partition、3个worker、1个Spout和2个Bolt,在一台2核2GB的ECS上运行。咱们使用了另外4台客户端,每一个客户端有4核8G内存,分别启动40个线程不断向这个系统实时发送汽车信息,模拟160台汽车发送的状况,其消息发送数量和CPU占用率状况如图8所示。
图8 车联网监控系统性能分析
从图8中能够看出,平均每辆汽车客户端会模拟每秒给系统发送了1000条消息,总的吞吐量达到16万条左右,此时平均的CPU占用率大约在30%左右。若是系统是彻底线性的,在系统CPU占用率达到90%的状况下,大约能处理48万条消息。不过实际状况中,在阿里云ECS上,却发现CPU达到50%之后,就再也不上升,而客户端发送消息的延时也逐步增长。
通过分析之后发现,因为ECS的磁盘性能没法和物理机的SSD磁盘相比,因此在Kafka消息大量写入磁盘的过程当中,吞吐量降低,磁盘读写负担变得很是大。这时咱们增长了Kafka的Broker和Storm的Spout的数量,将消息分布式地分发到多台ECS上,从而实现了消息吞吐量的线性增长。
在这个系统中,咱们不推荐使用大核和大内存的机器,而推荐使用多台2核2GB的服务器分布式地处理消息。这也是云计算处理大数据的原则所在,使用横向扩展而不用纵向扩展。
至此咱们介绍了利用Storm和Kafka实现大数据的实时处理,而且介绍了如何在云上经过镜像快速地建立这套系统。此外,咱们还介绍了如何对Storm、Kafka、Redis以及Node.js开发出一个实时的车辆信息监控系统。这个系统可以实现高性能、大吞吐量和高并发。固然,随着大数据的快速发展,咱们相信还会有愈来愈多好的工具和产品出如今市场上,到那时咱们从大数据中获取有效的信息将会变得更加容易和便捷。有了云计算的帮助,开发的周期也会变得愈来愈短。
云角:
[1] “It’s going mainstream, and it’s your next opportunity.“, Teradata Magazine, Q1, 2011http://www.teradatamagazine.com/v11n01/Features/Big-Data/
[2] ” Big data: The next frontier for innovation, competition, and productivity”http://www.mckinsey.com/insights/business_technology/big_data_the_next_frontier_for_innovation