ActiveMQ JMS延迟队列

JMS的4个特性:html

1.P2P,一个消息只有一我的消费java

2. Pub/Sub ,一个消息能够有多我的消费apache

3.分组订阅,一个消息在一个同名组中只有一我的消费网络

4.延迟队列,能够设置消息延迟发送session

 

1、mq配置支持: schedulerSupport="true"tcp

 

2、java代码支持ide

1.主要就是在message上添加属性,有4种类型的属性 a.delay,ScheduledMessage.AMQ_SCHEDULED_DELAY延迟投递的时间,单位毫秒 b.repeat,ScheduledMessage.AMQ_SCHEDULED_REPEAT重复投递次数 c.peroid,ScheduledMessage.AMQ_SCHEDULED_PERIOD重复投递的时间间隔 d.corn,ScheduledMessage.AMQ_SCHEDULED_CRON Cron表达式 延迟队列消息post

2.须要借助MessagePostProcessor在convertAndSend方法中对message进行延迟属性设置ui

jmsTemplate.convertAndSend(destArr[i], message, delayMessagePostProcessor);url

    public void convertAndSend(
            String destinationName, final Object message, final MessagePostProcessor postProcessor)
        throws JmsException {

        send(destinationName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                Message msg = getRequiredMessageConverter().toMessage(message, session);
                return postProcessor.postProcessMessage(msg);
            }
        });
    }

3、程序支持

最好有消息发送和消息处理记录,以便核对

4、应用场景

1)2个小时后给用户发送短信。
2)15分钟后关闭网络链接。
3)2分钟后再次尝试回调。

 

ActiveMQ对消息延时和定时投递作了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只须要把几个描述消息定时调度方式的参数做为属性添加到消息,broker端的调度器就会按照咱们想要的行为去处理消息。

 

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式

下面咱们演示一下间隔性重复投递;

生产者:

 

package cn.slimsmart.study.activemq;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
 
public class Producer {
 
    public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
    private static String queue_name = "test.queue";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
        // 经过工厂建立一个链接
        Connection connection = factory.createConnection();
        // 启动链接
        connection.start();
        // 建立一个session会话 事务 自动ack
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // 建立一个消息队列
        Destination destination = session.createQueue(queue_name);
        // 建立生产者
        MessageProducer producer = session.createProducer(destination);
        // 消息持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis());
        long time = 60 * 1000;// 延时1min
        long period = 10 * 1000;// 每一个10s
        int repeat = 6;// 6次
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
        // 发送消息
        producer.send(message);
        session.commit();
        producer.close();
        session.close();
        connection.close();
    }
}

消费者代码:

package cn.slimsmart.study.activemq;
 
import java.util.concurrent.CountDownLatch;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Consumer {
 
    public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
    private static String queue_name = "test.queue";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
        // 经过工厂建立一个链接
        Connection connection = factory.createConnection();
        // 启动链接
        connection.start();
        // 建立一个session会话 事务 自动ack
        Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        // 建立一个消息队列
        Destination destination = session.createQueue(queue_name);
        // 建立消费者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println("receive message :" + ((TextMessage) message).getText());
                    message.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        new CountDownLatch(1).await();
    }
}

参考:

1.延时和定时消息投递

2.消息属性定义

相关文章
相关标签/搜索