使用ActiveMQ和HornetQ通过WebSocket通过STOMP轻松进行消息传递

消息传递是用于构建不同级别的分布式软件系统的极其强大的工具。 通常,至少在Java生态系统中,客户端(前端)从不直接与消息代理(或交换)进行交互,而是通过调用服务器端(后端)服务来进行交互。 否则,客户甚至可能不知道已安装了消息传递解决方案。

随着Websockets越来越被采用,对诸如STOMP (用于与消息代理或交换进行通信)之类的面向文本的协议的广泛支持将产生影响。 今天的帖子将尝试解释公开两个非常流行的JMS实现(Apache ActiveMQ和JBoss HornetQ)是多么简单,这两个Web前端(JavaScript)可以通过Websockets上的 STOMP使用到Web前端(JavaScript)。

在深入研究代码之前,可能有人认为这样做不是一个好主意。 那目的是什么? 答案确实取决于:

  • 您正在开发原型/概念证明,并且需要简单的方法来集成发布/订阅或对等消息传递
  • 您不需要/不需要构建复杂的体系结构,而最简单的解决方案就足够了

可扩展性,故障转移和许多其他非常重要的决定在这里没有考虑,但是如果您正在开发健壮和有弹性的体系结构,则绝对应该考虑。

因此,让我们开始吧。 与往常一样,最好从我们要解决的问题开始:我们想开发一个简单的发布/订阅解决方案,使用JavaScript编写的Web客户端能够发送消息并侦听特定主题。 只要收到任何消息,客户端就会显示简单的警报窗口。 请注意,我们需要使用支持Websocket的现代浏览器,例如Google ChromeMozilla Firefox

对于我们的两个示例,客户端的代码均保持不变,因此我们从此开始。 最佳起点是STOMP Over WebSocket文章,其中介绍了stomp.js模块,这是我们的index.html

<script src="stomp.js"></script>

<script type="text/javascript">
 var client = Stomp.client( "ws://localhost:61614/stomp", "v11.stomp" );

 client.connect( "", "",
  function() {
      client.subscribe("jms.topic.test",
       function( message ) {
           alert( message );
          }, 
    { priority: 9 } 
      );

   client.send("jms.topic.test", { priority: 9 }, "Pub/Sub over STOMP!");
  }
 );

</script>

非常简单的代码,但很少有细节值得解释。 首先,我们正在ws:// localhost:61614 / stomp寻找Websockets端点。 这足以进行本地部署,但最好用真实IP地址或主机名替换localhost 其次,一旦连接,客户端就订阅该主题(仅对优先级为9的消息感兴趣),并在此之后立即将消息发布到该主题。 从客户的角度来看,我们已经完成了。

让我们继续进行消息代理,列表中的第一个是Apache ActiveMQ 为了简化示例,我们将不使用配置XML文件将Apache ActiveMQ代理嵌入到简单的Spring应用程序中。 由于源代码在GitHub可用 ,因此我将跳过POM文件片段,仅显示代码:

package com.example.messaging;

import java.util.Collections;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();    
        broker.addConnector( "ws://localhost:61614" ); 
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );

        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );

        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );

        return broker;
    }
}

如我们所见, ActiveMQ代理配置有ws:// localhost:61614连接器,该连接器假定使用STOMP协议。 另外,我们正在创建名称为jms.topic.test的 JMS主题,并启用JMX管理工具。 并运行它,简单的Starter类:

package com.example.messaging;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

现在,启动并运行它,让我们在浏览器中打开index.html文件,我们应该看到类似以下内容:

Message.STOMP.1

简单! 对于好奇的读者, ActiveMQ使用Jetty 7.6.7.v20120910来支持Websockets,并且不能与最新的Jetty发行版一起使用。

接下来,就HornetQ而言 ,实现看起来有些不同,尽管也不是很复杂。 由于Starter类保持不变,因此唯一的变化是配置:

package com.example.hornetq;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );

        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );

        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );

        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );

        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );

        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );

        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );

        return jmsServer;
    }
}

完整的源代码在GitHub上 在运行Starter类并在浏览器中打开index.html之后,我们应该看到非常相似的结果:

Message.STOMP.2

HornetQ配置看起来更加冗长,但是除了出色的Netty框架之外,没有涉及其他依赖项。

出于好奇,我将ActiveMQ代理替换为Apollo实现。 尽管我成功实现了预期的功能,但我发现该API非常麻烦,至少在当前版本1.6中如此,因此本文中没有涉及它。

翻译自: https://www.javacodegeeks.com/2013/09/easy-messaging-with-stomp-over-websockets-using-activemq-and-hornetq.html