- 先在pom文件中加入依赖
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-client</artifactId>
<version>0.24</version>
</dependency>
- spring中配置相应的bean
<bean id="qpidConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
<constructor-arg value="${jms.config.qpid.url}"/>
</bean>
<bean id="qpidQueue" class="org.apache.qpid.client.AMQAnyDestination">
<constructor-arg value="ADDR:${jms.config.qpid.queue}; {create: always}"/>
</bean>
<bean id="qpidJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="qpidConnectionFactory"/>
<property name="defaultDestination" ref="qpidQueue"/>
</bean>
- 发送消息的代码
static class sendQpid implements Runnable{
String strMessage = null;
boolean isSuccess = true;
@Override
public void run() {
while(true){
try {
if(isSuccess){
// 若是上一条消息发送成功,则取出下一条消息
strMessage = qpidMessageQueue.take();
}
// 将消息转换成byte数组,便于发送
final byte[] output = ZipUtils.gzipToByteArray(strMessage);
logger.info("-------------->sent message to QPID,length:"+output.length);
qpidJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setBytes("body", output);
return message;
}
});
// 标记这条消息发送成功
isSuccess = true;
// 将消息记录到特定文件中,实现对发送消息的监控
// monitorLogger.info(strMessage);
}catch(JmsException e){
isSuccess = false;
logger.error(e.getLocalizedMessage());
} catch (InterruptedException e) {
isSuccess = false;
logger.error(e.getLocalizedMessage());
} catch (IOException e) {
isSuccess = false;
logger.error(e.getLocalizedMessage());
}
}
}
}