JMS的4个特性:html
1.P2P,一个消息只有一我的消费java
2. Pub/Sub ,一个消息能够有多我的消费apache
3.分组订阅,一个消息在一个同名组中只有一我的消费网络
4.延迟队列,能够设置消息延迟发送session
1、mq配置支持: schedulerSupport="true"tcp
2、java代码支持ide
1.主要就是在message上添加属性,有4种类型的属性 a.delay,ScheduledMessage.AMQ_SCHEDULED_DELAY延迟投递的时间,单位毫秒 b.repeat,ScheduledMessage.AMQ_SCHEDULED_REPEAT重复投递次数 c.peroid,ScheduledMessage.AMQ_SCHEDULED_PERIOD重复投递的时间间隔 d.corn,ScheduledMessage.AMQ_SCHEDULED_CRON Cron表达式 延迟队列消息post
2.须要借助MessagePostProcessor在convertAndSend方法中对message进行延迟属性设置ui
jmsTemplate.convertAndSend(destArr[i], message, delayMessagePostProcessor);url
public void convertAndSend(
String destinationName, final Object message, final MessagePostProcessor postProcessor)
throws JmsException {
send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message msg = getRequiredMessageConverter().toMessage(message, session);
return postProcessor.postProcessMessage(msg);
}
});
}
3、程序支持
最好有消息发送和消息处理记录,以便核对
4、应用场景
1)2个小时后给用户发送短信。
2)15分钟后关闭网络链接。
3)2分钟后再次尝试回调。
ActiveMQ对消息延时和定时投递作了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只须要把几个描述消息定时调度方式的参数做为属性添加到消息,broker端的调度器就会按照咱们想要的行为去处理消息。
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | String | Cron表达式 |
下面咱们演示一下间隔性重复投递;
生产者:
package cn.slimsmart.study.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ScheduledMessage; public class Producer { public static final String broker_url = "failover:(tcp://10.1.199.169:61616)"; private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url); // 经过工厂建立一个链接 Connection connection = factory.createConnection(); // 启动链接 connection.start(); // 建立一个session会话 事务 自动ack Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一个消息队列 Destination destination = session.createQueue(queue_name); // 建立生产者 MessageProducer producer = session.createProducer(destination); // 消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis()); long time = 60 * 1000;// 延时1min long period = 10 * 1000;// 每一个10s int repeat = 6;// 6次 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); // 发送消息 producer.send(message); session.commit(); producer.close(); session.close(); connection.close(); } }
消费者代码:
package cn.slimsmart.study.activemq; import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static final String broker_url = "failover:(tcp://10.1.199.169:61616)"; private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url); // 经过工厂建立一个链接 Connection connection = factory.createConnection(); // 启动链接 connection.start(); // 建立一个session会话 事务 自动ack Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); // 建立一个消息队列 Destination destination = session.createQueue(queue_name); // 建立消费者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("receive message :" + ((TextMessage) message).getText()); message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); new CountDownLatch(1).await(); } }
参考:
2.消息属性定义