<!-- spring-boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.1.0.RELEASE</version> </dependency> <!-- spring-cloud-rocketmq --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>0.2.2.BUILD-SNAPSHOT</version> </dependency> <!-- spring-cloud --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Greenwich.M3</version> <type>pom</type> <scope>import</scope> </dependency>
若是找不到,则添加一下配置web
<repositories> <repository> <id>spring-snapshot</id> <name>Spring Snapshot Repository</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> #若是使用私服nexus的话,再添加这个代理仓库时,注意仓库类型:snapshot要容许代理
spring: cloud: stream: default-binder: rocketmq rocketmq: binder: #rocketmq地址 name-server: 192.168.0.78:9876 bindings: #自定义的名称 对应spring.cloud.stream.bindings.output1 output1: producer: group: test-group-user-ouput1 sync: true #Binding: 包括 Input Binding 和 Output Binding。 #Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁, #实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据便可,屏蔽了开发者与底层消息中间件的接触。 bindings: #自定义的名称 output1: destination: test-topic-user # topic(一级分类) content-type: application/json
spring.cloud.stream.default-binder=rocketmq spring.cloud.stream.rocketmq.binder.name-server=192.168.0.78:9876 spring.cloud.stream.rocketmq.bindings.output1.producer.group=test-group-user-ouput1 spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true spring.cloud.stream.bindings.output1.destination=test-topic-user spring.cloud.stream.bindings.output1.content-type=application/json
定义name为output1的Output Bindingspring
public interface MySource { @Output("output1") MessageChannel output1(); }
在Application 中添加注解json
@SpringBootApplication @EnableBinding({ MySource.class }) //MySource为上面定义Binding的接口 public class RocketMQConsumerApplication { ... }
注入 定义Bingding的 接口app
@Autowired private MySource source;
String msg = ...; source.output1().send(MessageBuilder.withPayload(msg).build());
Object msg = ...; String tag = "test-tag";//tag为二级分类,topic为一级分类,能够根据两个类别进行订阅 Message message = MessageBuilder.createMessage(msg, new MessageHeaders(Stream.of(tag).collect(Collectors.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString)))); source.output1().send(message);
Object msg = ...; String tag = "test-tag" Message message = MessageBuilder.withPayload(msg) .setHeader(MessageConst.PROPERTY_TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); source.output1().send(message);
// TODO
spring: cloud: stream: default-binder: rocketmq rocketmq: binder: name-server: 192.168.1.179:9876 bindings: #自定义的名称 对应spring.cloud.stream.bindings.input1 input1: consumer: tags: test-tag1 # 订阅的tag,二级分类 orderly: false # 是否按顺序消费 bindings: #自定义的名称 input1: destination: test-topic-user # 订阅的topic ,一级分类 content-type: application/json group: test-input-group1 #group consumer: concurrency: 20 maxAttempts: 1
定义name为input1的Input Bindingide
public interface MySource { @Input("input1") SubscribableChannel input1(); }
在Application 中添加注解spring-boot
@SpringBootApplication @EnableBinding({ MySource.class }) //MySource为上面定义Binding的接口 public class RocketMQConsumerApplication { ... }
/** * 消费者 */ @Service public class ReceiveService { // 接受字符串 // @StreamListener("input1") // public void receiveInput1( String receiveMsg) { // System.out.println("input1 receive: " + receiveMsg); // } /** * 接受对象 */ @StreamListener("input1") public void receiveInput3(@Payload List<BranchInfoEntity> list ) { System.out.println("input3 receive: " + JSON.toJSONString(list)); } }