记一次 Angular 基于 STOMP over WebSocket 实现流文本传输

本文成文时间是 2019-08-18 ,文中提到的最新版本号是以 2019-08-18 为基准的。

前情摘要

在介绍正文以前须要先简单了解几个概念: STOMP 协议、STOMP over WebSocket 以及 RxJS。(关于这三点本文不会进行详细介绍)html

什么是 STOMP?

STOMPSimple or Streaming Text Orientated Messageing Protocal ,是一种简单(流)文本定向传输协议。前端

STOMP 是 WebSocket 更高级的子协议,它使用一个基于帧的格式来定义消息,与 HTTP 的 RequestResponse 相似。git

STOMP 提供可互操做的链接格式,容许 STOMP 客户端与任意代理进行交互。STOMP 是一个很是简单易用的协议, 服务器端实现起来会相对困难一些,编写客户端很是容易。github

STOMP over WebSocket

STOMP over Websocket 即经过 WebSocket 创建 STOMP 链接,也就是说是在 WebSocket 链接的基础上再创建 STOMP 链接。web

WebSocket 协议定义了两种类型的消息,文本和二进制,但它们的内容是未定义的。spring

若是说 Socket 是 C/S 架构 的 TCP 编程,那么同理 WebSocket 就是 B/S架构的 TCP 编程,因此须要在客户端与服务端之间定义一个机制去协商一个子协议 - 更高级别的消息协议,将它使用在 WebSocket 之上去定义每次发送消息的类别、格式和内容,等等。数据库

子协议的使用是可选的,但不管哪一种方式,客户端和服务器都须要就一些定义消息内容的协议达成一致。因而,一般选择在 WebSocket 协议上使用 STOMP 协议来定义内容格式。npm

RxJS

RxJS 是一个用于使用 Observables 进行反应式编程的库,可简化编写异步或基于回调的代码的过程。该项目是对 Reactive-Extensions / RxJS 的重写,具备更好的性能,更好的模块化,更好的可调试调用堆栈,同时主要保持向后兼容,而且进行了一些重大更改,从而减小了 API 操做。

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions(Rx),Rx 是对 LINQ 的一种扩展,他的目标是对异步的集合进行操做,也就是说,集合中的元素是异步填充的,好比说从 Web 或者云端获取数据而后对集合进行填充。编程

LINQ(Language Integrated Query)语言集成查询是一组用于 C# 和 Visual Basic 语言的扩展。它容许编写 C# 或者 Visual Basic 代码以操做内存数据的方式,查询数据库。

RxJS 的主要功能是利用响应式编程的模式来实现 JavaScript 的异步式编程(现前端主流框架 Vue React Angular 都是响应式的开发框架)。bootstrap

RxJS 是基于观察者模式迭代器模式以函数式编程思惟来实现的。学习 RxJS 以前咱们须要先了解观察者模式和迭代器模式,还要对 Stream 流的概念有所认识。

接下来咱们就一块儿来看下如何在的 Angular 项目中是使用 STOMP over WebSocket 进行数据流传输的。

Angular 实战

本文案例是实际 Angular 项目中的一个小功能模块,Angular 是 8.0 版本,本文涉及的组件主要包括右键菜单项负责生产消息的 context-menu-component 动态组件,进度监控 app-progress-bar 组件和日志输出组件 app-console-area

项目中使用的 UI 框架为 ng-zorro-antd,下面是 tabs 组件中的相关伪代码(省略了组件间 Input Ouput 接口):

...
<nz-tab [nzType]="'card'"> 
  <ng-template #consoleArea>控制台</ng-template>    
  <app-progress-bar></app-progress-bar>    
  <app-console-area></app-console-area>  
</nz-tab>
...

代码与 UI 视图的对应关系以下:

项目中使用的 STOMP 客户端是 ng2-stompjs 库, ng2-stompjs 目前的版本是 7.xx ,其底层的 @stomp/stompjs 已被重写,自此与 STOMP 标准具备严格的兼容性。

ng2-stompjs 是第一个可靠地支持二进制有效负载的 STOMP JS 客户端库。

安装 ng-stompjs

$ npm install @stomp/ng-stompjs

添加和注入 @stomp/ng2-stompjs

使用前须要定义配置文件,在目录 src/app/config/ 建立 stomp.config.js 文件:

import { InjectableRxStompConfig } from '@stomp/ng2-stompjs';
import { STOMP_SERVER_BASE_URL } from 'server.config';
const _window: any = window;

export const myRxStompConfig: InjectableRxStompConfig = {
  // Which server?
  brokerURL: _window.STOMP_SERVER_BASE_URL 
              ? _window.STOMP_SERVER_BASE_URL
              : STOMP_SERVER_BASE_URL

  // Headers
  // Typical keys: login, passcode, host
  connectHeaders: {
    login: 'guest',
    passcode: 'guest'
  },

  // How often to heartbeat?
  // Interval in milliseconds, set to 0 to disable
  heartbeatIncoming: 0, // Typical value 0 - disabled
  heartbeatOutgoing: 20000, // Typical value 20000 - every 20 seconds

  // Wait in milliseconds before attempting auto reconnect
  // Set to 0 to disable
  // Typical value 500 (500 milli seconds)
  reconnectDelay: 200,

  // Will log diagnostics on console
  // It can be quite verbose, not recommended in production
  // Skip this key to stop logging to console
  debug: (msg: string): void => {
    console.log(new Date(), msg);
  }
};

在建立实例时,此配置将由 Angular Dependency Injection 机制注入 RxStompService 服务,在 src/app/app.module.ts 文件中,添加如下内容。

import { InjectableRxStompConfig, RxStompService, rxStompServiceFactory } from '@stomp/ng2-stompjs';
import { myRxStompConfig } from './config/stomp.config';
...
@NgModule({
  declarations: [/* 声明模块内部成员的地方 */],
  imports: [/* 导入的其余module */],  
  providers: [
    {
      provide: InjectableRxStompConfig,
      useValue: myRxStompConfig
    },
    {
      provide: RxStompService,
      useFactory: rxStompServiceFactory,
      deps: [InjectableRxStompConfig]
    }
  ],
  entryComponents: [/* 不会在模版中引用到的组件 */],
  bootstrap: [AppComponent]
})

export class AppModule {}

创建链接

咱们如今将 RxStompService 依赖注入 app-progress-bar 组件中,为此咱们将它添加到构造函数中,以下所示:

constructor(private rxStompService: RxStompService) { }

为了能实时接收服务器发送过来的消息,咱们须要在 app-progress-bar 组件的生命周期函数 OnInit 中,使用 watch() 方法进行订阅:

ngOnInit() {
  // 订阅 STOMP 消息
  this.topicSubscription = this.rxStompService.watch('/topic/message').subscribe((message: Message) => {
    console.log(message.body);
}
  
this.errorSubscription = this.rxStompService.watch('/topic/error').subscribe((message: Message) => {
    this.progressInfo = message.body;
  });
}

注:app-message-bar 组件默认是不显示的,当有消息传递进来时,此组件才会显示在页面中,进度达到 100% 时,会自动隐藏。

STOMP 协议是如何将消息准确发送的目的地的呢?

文章开头提到,STOMP 是一种基于帧的协议,其帧在 HTTP 上创建模型。一个框架由一个命令,一组可选的标题和一个可选的主体组成。

STOMP 服务器被建模为能够向其发送消息的一组目标,STOMP 协议将目标视为不透明字符串,其语法是特定于服务器实现的。另外,STOMP 没有定义目的地 destination 的传递语义应该是什么。目的地的传递或「消息交换」语义可能因服务器而异,甚至从目的地到目的地也不一样,这使得服务器可使用 STOMP 支持的语义进行创做。

STOMP 客户端是一个用户代理,能够在两种(多是同时的)模式下运行:

  • 做为生产者,经过 SEND 框架将消息发送到服务器上的目的地。
  • 做为消费者,发送 SUBSCRIBE 给定目的地的帧并从服务器接收消息做为MESSAGE 帧。

咱们的案例中两种模式同时存在,发送消息的是生产者(咱们上文提到的 context-menu-component 动态组件),接收消息的是消费者(app-progress-bar 组件)。消

费者能够经过订阅不一样的 destination,来得到不一样的推送消息,不须要开发人员去管理这些订阅与推送目的地以前的关系。

接下来就介绍下做为生产者的 context-menu-component 组件,看看它都作了哪些事情吧。

发送消息

context-menu-component 组件是触发右键时动态产生的组件,它负责经过向不一样的目的地 destination 下达不一样的指令,进而来实现不一样的功能需求。

使用 ng-zorro-antdDropdown 组件 ,动态生成:

public openProjectManagerContextMenu(context: ProjectManagerContext): void {
  this.contextMenuComponent = this.nzDropdownService.create(context.mouseEvent,          this.contextMenuTemplate);
}

当咱们点击「运行用例」按钮时,它做为生产者会向 STOMP 服务端目的地 SEND 消息指令。

// 运行用例
 public runProjectCases(): void {
   const streamTaskParam: StreamTaskParam = new StreamTaskParam();
   streamTaskParam.project = this.globalService.projectInfo.projectName;
   this.openTaskProgressModal('/app/run-project-cases', JSON.stringify(streamTaskParam));
 }

从代码得知,这会将消息发送到名为的 /app/run-project-cases 的目的地,STOMP 将此目标视为不透明字符串,而且目标名称不承担传递语义。

STOMP 定义了本身的消息传输体制。首先是经过一个后台绑定的链接点 endpoint 来创建 socket 链接,而后生产者经过 SEND 方法,绑定好发送的 destination。而 topic 和 app 则是一种消息处理手段的分支,走 app/url 的消息会被你设置到的 MassageMapping 拦截到,进行你本身定义的具体逻辑处理,而走 topic/url 的消息就不会被拦截,直接到 Simplebroker 节点中将消息推送出去。(其中 simplebroker 是 spring 的一种基于内存的消息队列,你也可使用 activeMQ,rabbitMQ 代替)。

所以目的地 /app/run-project-cases 生产出来的消息会被拦截,最终会转发到消费者 app-progress-bar 组件 的 /topic/message

接收消息

app-progress-bar 组件做为消费者使用 watch() 方法启动与代理的订阅,this.rxStompService.watch('/topic/message') 将代理到目的地为 /topic/message 的订阅上,并返回 RxJS Observable

ngOnInit() {
  // 订阅 STOMP 消息
  this.topicSubscription = this.rxStompService.watch('/topic/message').subscribe((message: Message) => {
    console.log(message.body);
    // do something
  }
}

app-progress-bar 组件都作了些什么事情呢?它负责创建 STOMP 链接,从服务器端接收文本流,并将这些流进行数据解析,解析出来的数据一部分用来控制进度条的数值变化,一部分用来控制 app-console-area 组件日志的输出节点。

也就是说 app-console-area 组件中打印的内容是由 app-progress-bar 组件解析和传递的。

取消订阅

咱们知道 RxJS Observable 实际上就是一个函数,它接收一个 Observer 对象做为参数,返回一个函数用来取消订阅。因此咱们能够在 app-progress-bar 组件销毁时,调用 unsubscribe() 方法取消订阅。

ngOnDestroy() {  this.topicSubscription.unsubscribe();}

本文主要目的是是结合案例展示 STOMP 协议的使用场景,因此不会着重介绍案例上的功能以及实现细节。

「记一次」系列文章:

相关文献:

相关文章
相关标签/搜索