ActiveMQ消息的延时和定时投递

ActiveMQ对消息延时和定时投递作了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只须要把几个描述消息定时调度方式的参数做为属性添加到消息,broker端的调度器就会按照咱们想要的行为去处理消息。html

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式
下面咱们演示一下间隔性重复投递;

生产者:java

package cn.slimsmart.study.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;

public class Producer {

	public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
	private static String queue_name = "test.queue";

	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
		// 经过工厂建立一个链接
		Connection connection = factory.createConnection();
		// 启动链接
		connection.start();
		// 建立一个session会话 事务 自动ack
		Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
		// 建立一个消息队列
		Destination destination = session.createQueue(queue_name);
		// 建立生产者
		MessageProducer producer = session.createProducer(destination);
		// 消息持久化
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis());
		long time = 60 * 1000;// 延时1min
		long period = 10 * 1000;// 每一个10s
		int repeat = 6;// 6次
		message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
		message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
		message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
		// 发送消息
		producer.send(message);
		session.commit();
		producer.close();
		session.close();
		connection.close();
	}
}
消费者代码:
package cn.slimsmart.study.activemq;

import java.util.concurrent.CountDownLatch;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

	public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
	private static String queue_name = "test.queue";

	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
		// 经过工厂建立一个链接
		Connection connection = factory.createConnection();
		// 启动链接
		connection.start();
		// 建立一个session会话 事务 自动ack
		Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
		// 建立一个消息队列
		Destination destination = session.createQueue(queue_name);
		// 建立消费者
		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					System.out.println("receive message :" + ((TextMessage) message).getText());
					message.acknowledge();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		new CountDownLatch(1).await();
	}
}

参考:apache

1.延时和定时消息投递session

2.消息属性定义tcp