在以前的工做中,微服务框架使用的是springcloud,消息中间件使用的rocketmq,这段时间看到阿里出了spring cloud alibaba集成了rocketmq,出于好奇,写了个demojava
第一步:下载:www.apache.org/dyn/closer.… 第二步:解压 第三步:修改三个配置文件:runbroker.sh,runserver.sh,tools.sh,将其中JAVA_HOME改为本身电脑的环境配置,修改完以下web
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=本身的地址
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
复制代码
第四步:依次执行命令spring
./mqnamesrv
./mqbroker -n localhost:9876
./mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
复制代码
若是启动成功,没有报错,表明启动成功哈,下面就能够开发了apache
第一步:导入相关的pomjson
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>0.2.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
复制代码
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 为了Endpoint 信息查看 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.6</version>
</dependency>
复制代码
第二步:建一个springboot项目,启动类以下:api
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
复制代码
第三步:建立provider浏览器
@Service
public class RocketmqProducer {
public void send(String message) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("test-topic", "test-tag", message.getBytes());
producer.send(msg);
}
}
复制代码
第四步:建立consumerspringboot
@Service
public class ReceiveService {
/** * 默认是input,在Sink类中指定,若是想要多个input,须要写一个实现Sink的类 * @param receiveMsg */
@StreamListener("input")
public void receiveInput1(String receiveMsg) {
System.out.println("input receive: " + receiveMsg);
}
}
复制代码
第五步:加入配置文件:数据结构
server.port=8087
spring.application.name=spring-cloud-alibaba-rocketmq-demo
# 配置rocketmq的nameserver地址
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
# 定义name为output的binding
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
#定义name为input的binding
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=test-group
management.endpoint.health.show-details=always
复制代码
第六步:写一个controller,启动项目,访问接口app
@RestController
@RequestMapping(value = "/api/demo/test")
public class TestController {
@Autowired
RocketmqProducer rocketmqProducer;
@RequestMapping(value = "/send", method = RequestMethod.GET)
public String send() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
rocketmqProducer.send("test rocketmq message");
return "success";
}
}
复制代码
会看到控制台输出:input receive: test rocketmq message
浏览器输入:http://127.0.0.1:8087/actuator/rocketmq-binder
这一篇文章只是将spring cloud stream 和 rocketmq跑通了,其实对于spring cloud stream和rocketmq仍是学习的阶段,只能感叹spring cloud博大精深
更多网站能够访问www.zplxjj.com或关注公众号: