消息持久化和非持久化,指的是传输模式DeliverModelmysql
持久化和非持久化的最大区别是:持久化传输,消息会被保存,即存储传输,而采用非持久化,消息不会被存储sql
场景问题:服务器断电重启,未被消费的消息是否会在重启以后被继续消费?apache
非持久性模式: 服务器断电(关闭)以后,使用非持久性模式时,没有被消费的消息不会继续消费所有丢失;程序会报一个链接关闭异常中止运行,继续启动服务器运行程序,不会接收任何消息。服务器
持久性模式: 服务器断电(关闭)后,使用持久性模式时,没有被消费的消息会继续消费;程序也会报链接关闭异常,但再次启动服务器和程序后,接收方还能继续原来的消息再次接收。session
非持久订阅tcp
场景:大学上课,有的人去上课了,有的人逃课了,去上课的人就知道老师讲了什么,没有去的就不知道ide
非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持链接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到url
持久订阅spa
场景:相似于qq发送消息,当你不在线的时候,好友给你发送消息,再次登陆,仍然能够收到线程
持久订阅时,客户端向JMS 服务器注册一个本身身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存全部发送到主题的消息,当客户再次链接到JMS Provider时,会根据本身的ID获得全部当本身处于离线时发送到主题的消息
activemq日志文件地址 data/activemq.log
activemq配置文件地址 conf/activemq.xml
修改持久化方式为jdbc
<persistenceAdapter>
<!--activemq默认持久化方式-->
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<!--JDBC持久化方式 注意createTablesOnStartup,第一次是true,标识启动建立表,之后都改成false-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>
配置jdbc链接
若启动的时候报错,注意去日志文件中查看错误,多半是没有引入JDBC的驱动包
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/liuhuxiang?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="admin"/> <property name="poolPreparedStatements" value="true"/> </bean>
启动成功后,查看db,会多出三张表
activemq_acks 应答ack表
activemq_msgs用来存储消息,发送者发送消息,在表中会有记录
activemq_lock 集群环境保存发送机器
注意,一旦消息被消费了,记录就会被删除,即activemq_acks / activemq_msgs都不会有数据
代码部分
监听器
/** * 消息监听 * 通常在消费者中,咱们不直接用recive来消费消息,都是经过监听器的方式来消费 * 比较JmsptpConsumer 和JmsptpConsumer2的消费方式 */ public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("Listener收到消息,ptp类型"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
消费者
/** * 点对点消息消费者 * 经过监听方式消费 * */ public class JmsptpConsumerByListener { private static final int SENDNUM = 10; public static void main(String[] args) { // 定义成员变量 ConnectionFactory connectionFactory; //链接工厂 Connection connect = null; // 链接 Session session; // 会话,接受或发送消息的线程 MessageConsumer messageConsumer; // 消息生产者 Destination destination; try { //1 建立链接工厂,并由链接工厂建立链接 connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connect = connectionFactory.createConnection();//经过链接工厂,建立链接 connect.start(); //2 接受者,不用事物 session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE); // 这里也能够用Destination来接收,Destination是Queue的父类 destination = session.createQueue("testQueun1"); messageConsumer = session.createConsumer(destination); //注册消息监听 经过监听器的方式监听消息 messageConsumer.setMessageListener(new Listener()); } catch (JMSException e) { e.printStackTrace(); } finally { if (connect == null) { try { connect.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
生产者
/** *JMS 点对点消息生产者 * 一对一 * */ public class JmsptpProduct { //默认的username password brokerurl private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 发送次数 private static final int SENDNUM = 10; public static void main(String[] args) { // 定义成员变量 ConnectionFactory connectionFactory; //链接工厂 Connection connect = null; // 链接 Session session; // 会话,接受或发送消息的线程 MessageProducer messageProducer; // 消息生产者 Destination destination; // 消息目的地, 点对点是queue 订阅是topic 都是Destination的子类,为了通用,最好使用Destination try { //1 建立链接工厂,并由链接工厂建立链接,有多种构造器 connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connect = connectionFactory.createConnection();//经过链接工厂,建立链接 connect.start(); //2 建立链接,true表示使用事物 false表示不适用事物 session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE); //3 建立队列,两边保持一致 destination = session.createQueue("testQueun1"); //4 建立生产者 messageProducer = session.createProducer(destination); sendMessage(session, messageProducer); } catch (JMSException e) { e.printStackTrace(); } finally { if (connect == null) { try { connect.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /*** * 发送消息 * * @param session * @param messageProducer */ public static void sendMessage(Session session, MessageProducer messageProducer) { for (int i = 0; i < SENDNUM; i++) { try { TextMessage textMessage = session.createTextMessage("ActivityMQ,ptp类型,发送消息 i=" + i); System.out.println("ActivityMQ,ptp类型,发送消息 i=" + i); messageProducer.send(textMessage); } catch (JMSException e) { e.printStackTrace(); } } } }