Spring XD简介:大数据应用的运行时环境

简介

Spring XD(eXtreme Data,极限数据)是Pivotal的大数据产品。它结合了Spring BootGrails,组成Spring IO平台的执行部分。尽管Spring XD利用了大量现存的Spring项目,但它是一种运行时环境,而不是一个类库或者框架,它包含带有服务器的bin目录,你能够经过命令行启动并与之交互。运行时能够运行在开发机上、客户端本身的服务器上、AWS EC2上或者Cloud Foundry上。html

Spring XD中的关键组件是管理和容器服务器(Admin and Container Servers)。使用一种DSL,你能够把所需处理任务的描述经过HTTP提交给管理服务器。而后管理服务器会把处理的任务映射处处理模块(每一个模块都是一个执行单元,做为Spring应用程序上下文实现)中。java

该产品具备两种操做模式:-single和multi-node。第一种是由单独的进程负责全部处理和管理的工做。这对于入门颇有用,一样适合于应用程序的快速开发和测试。本文中的全部实例都被设计为在单一节点模式下工做。第二种是一种分布式模式。分布式集成运行时(Distributed Integration Runtime,DIRT)会在多个节点之间分发处理的任务。除了能够拥有VM或者物理服务器做为这些节点以外,Spring XD还让你能够在Hadoop YARN集群上运行。node



XD管理服务器会把处理的任务切分红彼此独立的模块定义,并把每一个模块分配给使用Apache ZooKeeper的容器实例。每一个容器都会监听分配给它的模块定义,而后部署模块,建立Spring应用程序上下文来运行它。须要注意的是,在我撰写这篇文章的时候,Spring XD中还不会自带Zookeeper。兼容的版本是3.4.6,你能够从这里下载。react

模块经过使用配置好的消息中间件传递消息来共享数据。传输层是可插拔的,而且支持其余两种Pivotal项目——RedisRabbit MQ——以及现成可用的内存数据库。git

用例

下图让你能够对Spring XD有个整体上的了解。github


Spring XD团队认为,对于建立大数据解决方案来讲,建立的主要用例有四种:数据吸纳、实时分析、工做流调度以及导出。redis

数据吸纳提供了一种能力,能够从各类输入源接收数据,并把它传输给大数据存储库,像HDFS(Hadoop文件系统)、Splunk或者MPP数据库。和文件同样,数据源可能包括来自于移动设备、支持MQ遥感传输协议(MQTT)的传感器以及像Twitter之类的社交流的事件。算法

吸纳过程会贯穿事件驱动数据的处理,以及针对其余类型数据的批处理(MR、PIG、Hive、Cascading、SQL等等)。流和做业的两个世界大相径庭,可是Spring XD试图使用通道抽象(channel abstraction)来模糊两者之间的边界,从而让流能够触发批处理做业,而批处理做业也能够发送事件从而触发其余流。spring

对于流来讲,会经过叫作“Taps”的抽象来支持某些实时分析,像获取指标和计数值。从概念上,Taps让你能够介入到流中,执行实时分析,并有选择地为外部系统生成数据,像GemFire、Redis或者其余内存数据网格。shell

一旦你在大数据仓库中拥有数据,那么就须要某种工做流工具来对处理进行调度。调度很是必要,由于你编写的脚本或者map-reduce做业一般会长时间运行,并采用带有多个步骤的事件链的方式。理想情况下,你须要在事件失败的时候,可以从特定的步骤从新启动,而不是彻底从头来过。

最后还须要导出步骤,从而把数据放到更适合展示的系统中,可能还会作进一步的分析。例如从HDFS到RDBMS(关系型数据库管理系统),在那里你可使用更为传统的商业智能工具。

Spring XD想要提供一种统1、分布式和可扩展的服务来知足这些用例。它没有从头开始,而是利用了大量已经存在的Spring技术。例如,它使用了Spring Batch来支持工做流调度和导出用例,使用Spring Integration来支持流处理,此外还使用了各类各样的企业应用程序集成模式。其余关键的Spring产品包括:使用Spring Data处理NoSQL/Hadoop工做,使用Reactor为编写异步程序提供简化的API,特别是在使用LMAX Disruptor的时候。

安装Spring XD

在接下来的部分,咱们会详细看一下每一个用例。你可能想要本身来试验一下这些例子。起步很是简单。

为了开始,你要确保系统至少安装了Java JDK 6或者更新的版本。我推荐使用Java JDK 7。

对于OSX用户,若是尚未Homebrew的话,请安装,而后运行:

brew tap pivotal/tapbrew install springxd

这会安装到 /usr/local/Cellar/springxd/1.0.0.M7/libexec (依赖于Spring XD的库)。

注意:若是你随后想要安装更新的版本,那么使用brew upgrade springXD就能够。

红帽或者CentOS的用户可使用Yum来安装。

Windows用户能够下载最新的.zip文件,解压,安装到文件夹,而后把XD_HOME这个环境变量设置成安装文件夹。

你能够经过键入如下命令,从而在单一节点上启动Spring XD:

xd-singlenode

键入如下命令来打开另外一个终端窗口并启动shell程序:

xd-shell

你会看到下面这样的状况:

为了检查它是否正常工做,让咱们建立一个快速的流:

stream create --definition "time | log" --name ticktock --deploy

在你启动Spring XD的控制台中,你会看到下面这样的显示:

你能够从shell中使用stream destroy命令删除流。

stream destroy --name ticktock数据吸纳流

在Spring XD中,基本的流会定义事件驱动数据的吸纳,从源到目的地,通过任意多个处理器。

Spring XD外壳程序支持针对流定义的一种DSL,其中带有管道和过滤器语法 - source | processor | sink。

例如,像这样的命令 stream create --name filetest --definition "file | log" --deploy会记录文件内容的日志。
除了可以处理文件以外,Spring XD还支持不少其余源,包括:

HTTP

命令 HTTP POST /streams/myStream "http | file --deploy" -表示“从HTTP消费个人流,并转到文件”。这会默认到9000端口。你可使用--port选项覆盖默认的端口设置。这是针对HTTP的惟一参数。

例如(从XD的外壳程序):

xd:> stream create --name infoqhttptest9010 --definition "http --port=9010 | file" --deploy

你能够向这个新端口提交一些数据来测试:

xd:> http post --target http://localhost:9010 --data "hello world"

你会在控制台窗口看到如下文本:

> POST (text/plain;Charset=UTF-8) http://localhost:9010 hello world > 200 OK

打开另外一个终端窗口并键入:

$ cd /tmp/xd/output $ tail -f infoqhttptest9010.out

你会在输出中看到“hello world”。

想要发送二进制数据,你须要把Content-Type头部说明设置为application/octet-string:

$ curl --data-binary @foo.zip -H'Content-Type: application-octet-string' http://localhost:9000

键入 stream destroy infoqhttptest9010 来完成清理工做。

Mail

Mail是用来接收email的源模块。根据所使用的协议,它能够以池的形式工做,或者在可用的时候就接收email。

例如:

xd:> stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles@c4media.com --password=secret --delete=false | file" --deploy

注意:这里的delete选项很重要,由于对于Spring XD来讲一旦被消费,默认状况就会删除电子邮件。Spring XD也拥有markAsRead选项,但默认值是false。Spring集成文档中对此作出了详细的说明,但主要问题是,POP3协议只知道在单独一个会话中读取了什么。做为POP3邮件适配器运行的结果,当邮件在每一个池中变成可用状态时,就会被成功发送,且没有任何一个邮件消息会被屡次发送。然而,当你重启适配器并开始新的会话时,全部位于上一个会话中已经获取过的邮件消息就可能会被再次获取。

若是你在控制台日志中看到这样的错误信息:

WARN task-scheduler-1 org.springframework.integration.mail.ImapIdleChannelAdapter:230 - error occurred in idle task javax.mail.AuthenticationFailedException: failed to connect, no password specified?

试着在你的URL把@符号替换为URL编码的样子: %40:

stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles%40c4media.com --password=secret --delete=false | file" --deploy

打开另外一个终端窗口并键入:

$ cd /tmp/xd/output $ tail -f infoqmailstream.out

给你本身发送一封邮件,以看到它在日志文件中显示的内容。

Twitter搜索

Spring XD就可使用Twitter搜索API(twittersearch),也可使用来自于Twitter's Streaming API的数据。

例如:

xd:> stream create --name twittersearchinfoq --definition "twittersearch --outputType=application/json --fixedDelay=1000 --consumerKey=afes2uqo6JAuFljdJFhqA --consumerSecret=0top8crpmd1MXGEbbgzAwVJSAODMcbeAbhwHXLnsg --query='infoq' | file" --deploy

它使用twittersearch的JSON输出格式,每1000毫秒使用令牌“infoq”在Twitter中进行查询。为了运行上面的内容,你须要一个消费者密钥(由Twitter发放的应用程序消费者密钥)以及它相关的密钥。


它的结果会经过管道以同步的方式传输给一个文件,默认是/tmp/xd/output/[streamName].out。

打开另外一个终端窗口并键入:

$ cd /tmp/xd/output $ tail -f twittersearchjava.out

稍等一下子,你会发现超出了Twitter APE搜索的限制,而且会在控制台窗口中(你在其中在单一节点上启动了XD)看到这样的消息:

11:27:01,468 WARN task-scheduler-1 client.RestTemplate:581 - GET request for "https://api.twitter.com/1.1/search/tweets.json?q=infoq&count=20&since_id=478845525597237248" resulted in 429 (Client Error (429)); invoking error handler11:27:01,471 ERROR task-scheduler-1 handler.LoggingHandler:145 - org.springframework.social.RateLimitExceededException: The rate limit has been exceeded.

键入 stream destroy twittersearchinfoq 来完成清理工做。

其余输入流

GemFire:在XD容器进程中配置一个缓存(cache)和副本区域,它和Spring Integration GemFire同时存在于通道适配器中,它们由CacheListener支持,然后者会输出区域中外部输入事件所触发的输出消息。它还支持连续的查询,那让客户端应用程序可使用对象查询语言(OQL)来建立GemFire查询,并注册一个CQ监听器,它会订阅查询,每次查询的结果集发生变化的时候都会获得通知。

Reactor IP:它会做为服务器,让远程的组织可以链接到XD,并经过原生的TCP或者UDP socket提交数据。reactor-ip源和标准的tcp源的区别在于,它基于Reactor项目,能够被配置为使用LMAX Disruptor RingBuffer库,它可以容许极高的吸纳率,大概每秒1M。

Syslog:有三种syslog源:reactor-syslog、syslog-udp和syslog-tcp。reactor-syslog适配器使用tcp,会构建Reactor项目中可用的功能,并提供超过syslog-tcp适配器中更好的吞吐量。

TCP:它会做为服务器,让远程的组织可以链接到XD,并经过原生的TCP socket提交数据。

MQTT:链接到MQTT服务器并接收遥测消息。

Taps

在流的任意位置,你均可以插入tap——这个词来自于Gregor Hohpe等人著的《应用程序集成模式(Application Integration Patterns)》一书中的“wire tap”模式。

从概念上说,你会在通道中插入一个简单的接收列表,它会把每一个进入的消息发布到主通道和次通道中。流并不知道它的管道中任何tap的存在。删除流并不会自动删除tap——它们须要单独删除。然而,若是加入了tap的流被从新建立,那么已经存在的tap会继续起做用。

tap能够在流的任意位置(或者多个位置)插入。

处理器

流中的数据能够以多种方式处理:

过滤器:它能够用于决定消息是否应该发送给输出通道。最简单的状况是,过滤器只是一个SpEL布尔表达式,它会返回真或假。例如:

xd:> stream create --name filtertest --definition "http | filter --expression=payload=='good' | log" --deploy

会记录带有“good”关键字的全部内容的日志。然而,过滤器也能够至关复杂。Spring XD支持JSONPath计算式以及自定义的Groovy脚本。

转换:用来转换消息的内容或结构。它支持简单的SpEL,对于更复杂的转换,可使用Groovy脚本。

分割器:和Spring集成中的分割器概念相似,这里的分割器会使用SpEL表达式,它会计算一个数组或者集合的值,从而把单独一条消息切分红多个独立的消息。你可使用JSON oath表达式,但没法使用自定义的Groovy脚本。

聚合器(Aggregator):和分割器相反,它会把多条消息组合成一条。

最后是脚本,能够用于调用特定的Groovy脚本做为处理步骤。

槽(Sinks)

最简单的槽是日志和文件。其余能够支持的槽包括Hadoop(HDFS)、JDBC、TCP、Mail、RabbitMQ、GemFire服务器、Splunk服务器和MQQT。还有一个动态路由选项,容许基于SpEL表达式或Groovy脚本的值,把Spring XD消息路由到命名通道中。让我有一点奇怪的是,在这里缺乏通常目的的JMS槽,尽管咱们能够像[url=http://www.infoq.com/cn/articles/(https://github.com/spring-projects/spring-xd/wiki/Extending-XD]这里[/url]描述的同样构建自定义的槽模块。

实时分析

Spring XD为各类机器学习评分算法的实时计算提供了支持,还为使用各类类型的计数器和计量器进行实时数据分析提供了支持。分析功能是经过能够添加到流中的模块实现的。在那种状况下,实时分析是经过和数据吸纳同样的模块完成的。

尽管流的主要角色能够是执行实时分析,但更为常见的是添加一个tap来初始化另外一个流,其中分析——例如:一个字段值的计数器——会应用给经过主要流吸纳的一样数据之上。

Spring XD中自带提供了一些简单的分析工具,它们都实现为抽象API,针对内存数据库和Redis而实现,以下:

  • 简单计数器
  • 字段值计数器:计算特定字段出现的次数。
  • 聚合计数器: 在Mongo和Redis之类的工具中比较常见,让你能够对数据根据时间——例如分钟、小时、月、年等——进行分片。
  • 计量器(Gauge):最新的值
  • 富计量器:最新的值,运行的平均值,最大、最小值

对于预测性的分析,Spring XD包含了一个可扩展的类库,基于它能够构建其余实现。例如在GitHub上提供的PMML模块,它和JPMML-Evaluator库集成,为更广范围内的模型类型提供了支持,而且能够与从RRattleKNIMERapidMiner导出的模块进行互操做。

产品还包含了一些抽象,能够在流处理应用程序中事件分析模型。在撰写这篇文章的时候,只支持预测性模块标记语言(Predictive Model Markup Language,PMML),但Pivotal告诉InfoQ:

咱们正在进行一个内部项目,以提供普遍的分析解决方案,它的目标是围绕“欺诈检测”和“网络安全”之类的状况。咱们还在与OSS库——像“stream-lib”和“graphlab”——的整合作了一些设计。

Pivotal还说明,他们指望,随着时间的推移可以在这个领域看到发展,而且对预测性建模提供额外的支持。

批处理做业、工做流调度和导出

除了流以外,Spring XD还包含了基于Spring Batch启动和监控批处理做业的功能,而Spring Batch也被用于支持工做流调度和导出用例。

工做流的概念会被转换成批处理做业,那能够被认为是各个步骤的有向图,每一个图都是一个处理步骤:

根据配置的状况,步骤能够顺序或者并行执行。它们能够复制或者处理来自于文件、数据库、MapReduce、Pig、Hive或Cascading做业的数据,而且和容许重启的检查点一块儿持久化。和流同样,做业支持单节点,或者能够和数据分区一块儿分布。

Spring XD自身带有少许预约义的做业,能够用来向Hadoop文件系统HDFS导出数据,或者从中导入数据。这些做业覆盖了FTP到HDFS、HDFS到JDBC、HDFS到MongoDB和JDBC到HDFS。还有一个做业用于向JDBC导出文件。你能够在/libexec/xd/modules/job文件夹中找到。

Spring XD提供了至关基础的、基于浏览器的图形化界面,当前让你能够执行和任务相关的批处理做业。对于启动Spring XD,管理员界面在这里提供:

(点击图像能够放大)

正如在上面的截屏中能够看到的,管理员界面当前包括四个标签页:

  • 模块:列举了可用的批处理做业和更多细节(像做业模块选项以及模块的XML配置文件)。
  • 定义:列举了XD批处理做业定义,并提供了部署或者卸载那些做业的动做。
  • 部署:列举了全部部署了的做业,并提供了一种选项来启动部署好的做业。一旦做业已经部署,它就能够经过管理员界面启动。
  • 执行:列举了批处理做业的执行情况,并提供了一种选项,若是批处理做业能够重启,而且处于中止或者失败状态,那么就重启。
结论

Spring XD当前还处于开发中。第一个里程碑版本已经在2013年六月发布,而GA版本指望在今年(2014年)七月发布。它基于Apache第二版许可。在GitHub上提供了源代码示例。你还能够找到在线的Sonar代码度量

产品可能还很新,但正如咱们看到的,它构建在成熟的基础之上——Spring Batch、Spring Integration和Sping Data,以及Reactor项目、LMAX Disruptor和Apache Hadoop——并提供了一种轻量级的运行时环境,能够经过DSL来配置和集成,只须要不多代码,甚至不须要。Spring XD为开发者提供了一种便利的方式,能够开始构建大数据应用程序,为构建和部署这样的应用程序提供了“一站式服务”。

对于想要探索这个产品的读者,有大量资源可用,包括主要的wiki,还有覆盖了实时分析的视频

关于做者

Charles Humble从2014年三月开始担任InfoQ.com编辑团队的主编,引领咱们的内容建立工做,包括新闻、文章、书籍、视频和采访。在全职加入InfoQ以前,Charles领导过咱们的Java部分工做,是PRPi顾问公司的CTO,该公司是一家简历研究公司,在2012年七月被PwC收购。他做为开发者、架构师和开发经理在软件企业中工做了近20年。在空闲时间,他会写一些音乐,而且是伦敦周边的技术小组Twofish的成员。

相关文章
相关标签/搜索