ActiveMQ对消息延时和定时投递作了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只须要把几个描述消息定时调度方式的参数做为属性添加到消息,broker端的调度器就会按照咱们想要的行为去处理消息。html
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | String | Cron表达式 |
生产者:java
package cn.slimsmart.study.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ScheduledMessage; public class Producer { public static final String broker_url = "failover:(tcp://10.1.199.169:61616)"; private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url); // 经过工厂建立一个链接 Connection connection = factory.createConnection(); // 启动链接 connection.start(); // 建立一个session会话 事务 自动ack Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一个消息队列 Destination destination = session.createQueue(queue_name); // 建立生产者 MessageProducer producer = session.createProducer(destination); // 消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis()); long time = 60 * 1000;// 延时1min long period = 10 * 1000;// 每一个10s int repeat = 6;// 6次 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); // 发送消息 producer.send(message); session.commit(); producer.close(); session.close(); connection.close(); } }消费者代码:
package cn.slimsmart.study.activemq; import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static final String broker_url = "failover:(tcp://10.1.199.169:61616)"; private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url); // 经过工厂建立一个链接 Connection connection = factory.createConnection(); // 启动链接 connection.start(); // 建立一个session会话 事务 自动ack Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); // 建立一个消息队列 Destination destination = session.createQueue(queue_name); // 建立消费者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("receive message :" + ((TextMessage) message).getText()); message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); new CountDownLatch(1).await(); } }
参考:apache
1.延时和定时消息投递session
2.消息属性定义tcp