Spring Cloud Data Flow--大数据操做工具,做为Spring XD的替代产品,它是一个混合计算模型,结合了流数据与批量数据的处理方式。为数据微服务提供了业务流程,包括长期的 Spring Cloud Stream 应用程序和短时间的 Spring Cloud Task 应用程序。html
以前不多写博客,主要是国内相关资料少之又少,踩了不少坑,谷歌也没多大帮助,深知那种无助的感受,因此记录一下,与喜欢研究技术的朋友分享一下,但愿此文会有小小的帮助,微服务的明天会更好java
1.经过vagrant安装DCOS,咱们会获得一个Marathon端点url:http://m1.dcos/service/marathon,留做配置用,若是是其它好比CLI、GUI的安装方式,也可配置其服务的ip地址。service安装这里不作过多介绍,能够经过dcos的universe安装,也能够经过json文件安装mysql
2.安装mysql能够经过DCOS进行安装,若是你有一个mysql数据库亦可没必要安装linux
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @mysql.json -H "Content-type: application/json"
3.安装rabbitmq,若是你有一个rabbitmq服务器亦可没必要安装,此处注意,对于springboot应用,应当在应用配置文件中配置mq的用户名密码,如:git
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @rabbitmq.json -H "Content-type: application/json"
4.安装redis,若是你有一个redis服务器亦可没必要安装github
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @redis.json -H "Content-type: application/json"
5.安装chronos,若是你有一个chronos服务器亦可没必要安装,速度慢说明拉取镜像慢,此处共享下百度云连接:http://pan.baidu.com/s/1dFaQvSX 密码:3j9c,下载到agent节点直接docker load < chronos.tar,docker images查看便可,从新执行,速度杠杠滴。web
dcos package install chronos
6.获取springclouddataflow的json配置文件redis
$ wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-server-mesos/v1.0.0.RELEASE/src/etc/marathon/scdf-server.json
7.直接贴上个人配置文件spring
{sql
"id": "/spring-cloud-data-flow",
"instances": 1,
"cpus": 2,
"mem": 4000,
"disk": 3000,
"gpus": 0,
"backoffSeconds": 1,
"backoffFactor": 1.15,
"maxLaunchDelaySeconds": 3600,
"container": {
"type": "DOCKER",
"docker": {
"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",
"network": "BRIDGE",
"portMappings": [
{
"containerPort": 9393,
"hostPort": 0,
"servicePort": 10000,
"protocol": "tcp",
"name": "default"
}
],
"privileged": false,
"forcePullImage": false
}
},
"healthChecks": [
{
"gracePeriodSeconds": 120,
"intervalSeconds": 60,
"timeoutSeconds": 20,
"maxConsecutiveFailures": 0,
"portIndex": 0,
"path": "/management/health",
"protocol": "HTTP",
"ignoreHttp1xx": false
}
],
"upgradeStrategy": {
"minimumHealthCapacity": 1,
"maximumOverCapacity": 1
},
"unreachableStrategy": {
"inactiveAfterSeconds": 300,
"expungeAfterSeconds": 600
},
"killSelection": "YOUNGEST_FIRST",
"requirePorts": true,
"env": {
"JDBC_DRIVER": "org.mariadb.jdbc.Driver",
"MESOS_CHRONOS_URI": "http://172.16.1.77:10105",
"REDIS_HOST": "172.16.1.61",
"RABBITMQ_PORT": "6392",
"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon",
"REDIS_PORT": "6379",
"JDBC_PASSWORD": "1234321",
"JDBC_URL": "jdbc:mysql://172.16.1.145:3306/test",
"SPRING_APPLICATION_JSON": "{\"spring.cloud.deployer.mesos.marathon.apiEndpoint\":\"${MESOS_MARATHON_URI}\",\"spring.cloud.deployer.mesos.chronos.apiEndpoint\":\"${MESOS_CHRONOS_URI}\",\"spring.datasource.url\":\"${JDBC_URL}\",\"spring.datasource.driverClassName\":\"${JDBC_DRIVER}\",\"spring.datasource.username\":\"${JDBC_USERNAME}\",\"spring.datasource.password\":\"${JDBC_PASSWORD}\",\"spring.datasource.testOnBorrow\":true,\"spring.datasource.validationQuery\":\"SELECT 1\",\"spring.redis.host\":\"${REDIS_HOST}\",\"spring.redis.port\":\"${REDIS_PORT}\",\"spring.cloud.deployer.mesos.marathon.environmentVariables\":\"SPRING_RABBITMQ_HOST=${RABBITMQ_HOST},SPRING_RABBITMQ_PORT=${RABBITMQ_PORT}\",\"spring.cloud.deployer.mesos.dcos.authorizationToken\":\"${DCOS_TOKEN}\",\"spring.cloud.config.enabled\":false,\"spring.freemarker.checkTemplateLocation\":false,\"spring.cloud.deployer.mesos.marathon.memory\":\"3000\",\"spring.dataflow.embedded.database.enabled\":false}",
"RABBITMQ_HOST": "172.16.1.77",
"JDBC_USERNAME": "root"
}
}
这里对几点作说明:(1)"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",对应的agent节点docker中的镜像,直接运行的话,首先它会下载镜像,因为你懂得,速度慢成翔,你可
以docker去pull喝着大茶慢慢等,若是你别处有此镜像也可导入到你的agent节点的docker中。此处共享下导出的镜像百度云地址连接:http://pan.baidu.com/s/1hstbj5E
密码:56nk,下载后直接docker load < springdataflow.tar,若是你有搭建docker的本地远程仓库,也可将镜像打个tag推送到仓库,agent的docker直接从仓库中pull。
(2)配置marathon还有chronos地址,也能够配置ip地址,浏览器能正常访问便可
"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon", "MESOS_CHRONOS_URI": "http://m1.dcos/service/chronos",
ip能够经过service的detail中查看,logs能够看到其运行日志
(3)配置mysql
"JDBC_URL": "jdbc:mysql://", "JDBC_DRIVER": "org.mariadb.jdbc.Driver", "JDBC_USERNAME": "", "JDBC_PASSWORD": "",
(4)配置mq
"RABBITMQ_HOST": "", "RABBITMQ_PORT": "",
(5)配置redis
"REDIS_HOST": "", "REDIS_PORT": "",
(6)若是DCOS是开启权限认证,则须要配置
DCOS_TOKEN
获取token方法以下:
curl https://downloads.dcos.io/binaries/cli/linux/x86-64/dcos-1.9/dcos -o dcos && sudo mv dcos /usr/local/bin && sudo chmod +x /usr/local/bin/dcos && dcos config set core.dcos_url http://XXXXXXXXX && dcos
安装dcos的cli,配置你的master地址,而后dcos auth login 出现一个地址,http://XXXXXX/login?redirect_uri=urn:ietf:wg:oauth:2.0:oob贴到浏览器获取一个token
而后复制到控制台,登陆成功。而后 dcos config show core.dcos_acs_token 出现的token能够copy到
DCOS_TOKEN
还能够配置maven地址,可参阅文档:http://docs.spring.io/spring-cloud-dataflow/docs/1.2.2.BUILD-SNAPSHOT/reference/htmlsingle/#arch-data-flow-server,可是注册app的时候,直接配置jar的地址或者maven地址,都会出现异常,看了下源码,在deploy的时候jar会生成一个临时镜像,而后经过fegin去请求marathon的api,可是请求一直会报异常null或者:reason不啦不啦的,因此我是把每个流的jar都经过docker打包镜像push到远程仓库,配置本地docker仓库地址去拉取镜像,还有须要注意的地方就是注册app的名称还有stream的名称的时候所有用小写,否则fegin请求的时候也会报异常。如有大神知道jar或者maven配置的正确方法,请分享与我,谢谢。
8.而后就能够经过json文件运行了,页面也是能够操做的
查看日志启动成功
9.访问springdataflow的web端http://XXXX:AAA/dashboard,亦可经过
spring-cloud-dataflow-shell
来注册app或者stream,此处再也不赘述。
10.注册app
11.建立数据流
注意:至少得有一个source一个sink,因为资源有限,processor暂时不加入(ps:跑应用真的很消耗资源)
12.应用数据流
而后就能够取services去查看,一个jar应该会启动一个实例
大功告成哈哈哈哈哈哈哈哈哈
下面附上部分代码
source:
application
import java.util.Date; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.support.MessageBuilder; @EnableBinding(Source.class) @SpringBootApplication public class LoggingSourceApplication { @Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource<Long> timeMessageSource() { System.out.println(new Date() +"======================logging-source========================== execued"); return () -> { System.out.println(new Date() + "*****logging-source****** send"); return MessageBuilder.withPayload(new Date().getTime()).build(); }; } public static void main(String[] args) { SpringApplication.run(LoggingSourceApplication.class, args); } }
properties
spring.rabbitmq.host=172.16.3.183 spring.rabbitmq.username=admin spring.rabbitmq.password=admin # 本机启动测试须要如下配置 spring.cloud.stream.default.contentType=application/json spring.cloud.stream.bindings.output.destination=source-log # 默认状况下,Spring Cloud Stream 会在 RabbitMQ 中建立一个临时的队列,程序关闭, # 对应的链接关闭的时候,该队列也会消失。为此,咱们须要一个持久化的队列,而且指定一个分组,用于保证应用服务的缩放。 # 只须要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = logistic # 对应的队列就是持久化 spring.cloud.stream.bindings.output.group=logTest spring.cloud.stream.bindings.output.binder=rabbitMq1 spring.cloud.stream.binders.rabbitMq1.type=rabbit spring.cloud.stream.default-binder=rabbitMq1 # rabbitMQ服务器地址 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183 # rabbitMQ服务器端口 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/
pom
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
sink:
application
import java.util.Date; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; @EnableBinding(Sink.class) @SpringBootApplication public class LoggingSinkApplication { @MessageEndpoint public static class LoggingMessageEndpoint { @ServiceActivator(inputChannel = Sink.INPUT) public void logIncomingMessages(@Payload String msg, @Headers Map<String, Object> headers) { System.out.println(new Date() + "***********logging-sink**************"+ msg); headers.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue())); } } @StreamListener(Sink.INPUT) public void loggerSink(String date) { System.out.println("logging-sink Received: " + date); } @Payload public static void main(String[] args) { SpringApplication.run(LoggingSinkApplication.class, args); } }
properties
spring.rabbitmq.host=172.16.3.183 spring.rabbitmq.username=admin spring.rabbitmq.password=admin #本地测试须要配置 server.port=8090 spring.cloud.stream.default.contentType=application/json spring.cloud.stream.bindings.input.destination=source-log # 默认状况下,Spring Cloud Stream 会在 RabbitMQ 中建立一个临时的队列,程序关闭, # 对应的链接关闭的时候,该队列也会消失。为此,咱们须要一个持久化的队列,而且指定一个分组,用于保证应用服务的缩放。 # 只须要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = logistic # 对应的队列就是持久化 spring.cloud.stream.bindings.input.group=logTest spring.cloud.stream.bindings.input.binder=rabbitMq1 spring.cloud.stream.binders.rabbitMq1.type=rabbit spring.cloud.stream.default-binder=rabbitMq1 # rabbitMQ服务器地址 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183 # rabbitMQ服务器端口 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/
pom
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.4.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>