Source:来源(近义词:Producer、Publisher)异步
ide
Processor:对于上流而言是Sink,对于下游而言是Sourceui
消息大体分为两个部分:this
消息头(Headers)atom
消息体(Body/Payload)spa
定义发送通道线程
public interface Source {
/**
* 需求通道
*/
String OUT_PUT_DEMAND = "out_put_demand";
/**
* 任务通道
*/
String OUT_PUT_TASK = "out_put_task";
/**
* 工做日志通道
*/
String OUT_PUT_WORK_LOG = "out_put_workLog";
/**
* 组织结构信息通道
*/
String OUT_PUT_ORG = "out_put_org";
/**
* 代码质量通道
*/
String OUT_PUT_QUALITY = "out_put_quality";
生产类日志
public class Producer {
/**
* 默认发送消息
*
* @param message
* @param channel
* @return
*/
public static Boolean send(Object message, MessageChannel channel) {
return send(message, channel, 5000L);
}
/**
* 带超时时间
*
* @param message
* @param timeout
* @param channel
* @return
*/
public static Boolean send(Object message, MessageChannel channel, Long timeout) {
return channel.send(MessageBuilder.withPayload(message).build(), timeout);
}
}
Bindingcode
策略模式-消息类型接口
public enum SendType {
DEMAND_MESSAGE(new DemandMessage()),
TASK_MESSAGE(new TaskMessage()),
WORK_LOG_MESSAGE(new WorkLogMessage()),
CODE_QUALITY_MESSAGE(new CodeQualityMessage());
private MessageSend messageSend;
SendType(MessageSend messageSend){
this.messageSend = messageSend;
}
public MessageSend get(){
return this.messageSend;
}
}
消息发送接口
public interface MessageSend {
public Boolean send(Object message);
}
接口实现
public class DemandMessage implements MessageSend {
private static final Source SOURCE = SpringContextHelper.getBean(Source.class);
生产消息
public class ProduceHelper {
/**
* 需求消息生产
* @param sendType 发送类型
* @param message 消息内容
* @return boolean
*/
public static Boolean produce(SendType sendType, Demand message) {
return sendType.get().send(message);
}
/**
* 任务消息生产
* @param sendType 发送类型
* @param message 消息内容
* @return boolean
*/
public static Boolean produce(SendType sendType, Task message) {
return sendType.get().send(message);
}
/**
* 工做日志消息生产
* @param sendType 发送类型
* @param message 消息内容
* @return boolean
*/
public static Boolean produce(SendType sendType, WorkLog message) {
return sendType.get().send(message);
}
/**
* 代码质量消息生产
* @param sendType 发送类型
* @param message 消息内容
* @return boolean
*/
public static Boolean produce(SendType sendType, CodeQuality message) {
return sendType.get().send(message);
}
}
定义接收通道
public interface Sink {
/**
* 需求通道
*/
String IN_PUT_DEMAND = "in_put_demand";
/**
* 任务通道
*/
String IN_PUT_TASK = "in_put_task";
/**
* 工做日志通道
*/
String IN_PUT_WORK_LOG = "in_put_workLog";
/**
* 组织结构信息通道
*/
String IN_PUT_ORG = "in_put_org";
/**
* 代码质量通道
*/
String IN_PUT_QUALITY = "in_put_quality";
消费类
public interface Consumer<T> {
void onMessage(T message);
}
消息监听
@StreamListener方式
@ServiceActivator
@PostConstruct
消息处理