NATS--NATS Streaming持久化

前言

最近项目中须要使用到一个消息队列,主要用来将原来一些操做异步化。根据本身的使用场景和熟悉程度,选择了NATS Streaming。之因此,选择NATS Streaming。一,由于我选型一些中间件,我会优先选取一些本身熟悉的语言编写的,这样方便排查问题和进一步的深究。二,由于本身一直作k8s等云原生这块,偏向于cncf基金会管理的项目,毕竟这些项目从一开始就考虑了如何部署在k8s当中。三,是评估项目在不断发展过程当中,引入的组件是否可以依旧知足需求。html

消息队列的使用场景

若是问为何这么作,须要说一下消息队列的使用场景。以前看知乎的时候,看到一些回答比较认同,暂时拿过来,更能形象表达。感谢ScienJus同窗的精彩解答。node

消息队列的主要特色是异步处理,主要目的是减小请求响应时间和解耦。因此主要的使用场景就是将比较耗时并且不须要即时(同步)返回结果的操做做为消息放入消息队列。同时因为使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不须要彼此联系,也不须要受对方的影响,即解耦和。mysql

使用场景的话,举个例子:git

假设用户在你的软件中注册,服务端收到用户的注册请求后,它会作这些操做:github

  • 校验用户名等信息,若是没问题会在数据库中添加一个用户记录
  • 若是是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信
  • 分析用户的我的信息,以便未来向他推荐一些志同道合的人,或向那些人推荐他
  • 发送给用户一个包含操做指南的系统通知等等……

可是对于用户来讲,注册功能实际只须要第一步,只要服务端将他的帐户信息存到数据库中他即可以登陆上去作他想作的事情了。至于其余的事情,非要在这一次请求中所有完成么?值得用户浪费时间等你处理这些对他来讲可有可无的事情么?因此实际当第一步作完后,服务端就能够把其余的操做放入对应的消息队列中而后立刻返回用户结果,由消息队列异步的进行这些操做。算法

或者还有一种状况,同时有大量用户注册你的软件,再高并发状况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,可是却卡在发邮件或分析信息时的状况,致使请求的响应时间大幅增加,甚至出现超时,这就有点不划算了。面对这种状况通常也是将这些操做放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时能够很快的完成注册请求,不会影响用户使用其余功能。sql

因此在软件的正常功能开发中,并不须要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在能够异步处理的耗时操做,若是存在的话即可以引入消息队列来解决。不然盲目的使用消息队列可能会增长维护和开发的成本却没法获得可观的性能提高,那就得不偿失了。docker

其实,总结一下消息队列的做用数据库

  • 削峰,形象点的话,能够比喻为蓄水池。好比elk日志收集系统中的kafka,主要在日志高峰期的时候,在牺牲实时性的同时,保证了整个系统的安全。
  • 同步系统异构化。原先一个同步操做里的诸多步骤,能够考虑将一些不影响主线发展的步骤,经过消息队列异步处理。好比,电商行业,一个订单完成以后,通常除了直接返回给客户购买成功的消息,还要通知帐户组进行扣费,通知处理库存变化,通知物流进行派送等,通知一些用户组作一些增长会员积分等操做等。

NATS Streaming 简介

NATS Streaming是一个由NATS驱动的数据流系统,用Go编程语言编写。 NATS Streaming服务器的可执行文件名是nats-streaming-server。 NATS Streaming与核心NATS平台无缝嵌入,扩展和互操做。 NATS Streaming服务器做为Apache-2.0许可下的开源软件提供。 Synadia积极维护和支持NATS Streaming服务器。编程

图片描述

特色

除了核心NATS平台的功能外,NATS Streaming还提供如下功能:

  • 加强消息协议

NATS Streaming使用谷歌协议缓冲区实现本身的加强型消息格式。这些消息经过二进制数据流在NATS核心平台进行传播,所以不须要改变NATS的基本协议。NATS Streaming信息包含如下字段:

  - 序列 - 一个全局顺序序列号为主题的通道
  - 主题 - 是NATS Streaming 交付对象
  - 答复内容 - 对应"reply-to"对应的对象内容
  - 数据 - 真是数据内容
  - 时间戳 - 接收的时间戳,单位是纳秒
  - 重复发送 - 标志这条数据是否须要服务再次发送
  - CRC32 - 一个循环冗余数据校验选项,在数据存储和数据通信领域里,为了保证数据的正确性所采用的检错手段,这里使用的是 IEEE CRC32 算法

 - 消息/事件的持久性
  NATS Streaming提供了可配置的消息持久化,持久目的地能够为内存或者文件。另外,对应的存储子系统使用了一个公共接口容许咱们开发本身自定义实现来持久化对应的消息

 - 至少一次的发送
  NATS Streaming提供了发布者和服务器之间的消息确认(发布操做) 和订阅者和服务器之间的消息确认(确认消息发送)。其中消息被保存在服务器端内存或者辅助存储(或其余外部存储器)用来为须要从新接受消息的订阅者进行重发消息。

 - 发布者发送速率限定
  NATS Streaming提供了一个链接选项叫 MaxPubAcksInFlight,它能有效的限制一个发布者可能随意的在任什么时候候发送的未被确认的消息。当达到这个配置的最大数量时,异步发送调用接口将会被阻塞,直到未确认消息降到指定数量之下。

- 每一个订阅者的速率匹配/限制
  NATS Streaming运行指定的订阅中设置一个参数为 MaxInFlight,它用来指定已确认但未消费的最大数据量,当达到这个限制时,NATS Streaming 将暂停发送消息给订阅者,直到未确认的数据量小于设定的量为止

  • 以主题重发的历史数据

  新订阅的能够在已经存储起来的订阅的主题频道指定起始位置消息流。经过使用这个选项,消息就能够开始发送传递了:

  1. 订阅的主题存储的最先的信息
  2. 与当前订阅主题以前的最近存储的数据,这一般被认为是 "最后的值" 或 "初值" 对应的缓存
  3. 一个以纳秒为基准的 日期/时间
  4. 一个历史的起始位置相对当前服务的 日期/时间,例如:最后30秒
  5. 一个特定的消息序列号
  • 持久订阅

  订阅也能够指定一个“持久化的名称”能够在客户端重启时不受影响。持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。当这个客户端重启或者从新订阅的时候,使用相同的客户端ID 和 持久化的名称,对应的服务将会从最先的未被确认的消息处恢复。

docker 运行NATS Streaming

在运行以前,前面已经讲过NATS Streaming 相比nats,多了持久化的一个future。因此咱们在接下来的demo演示中,会重点说这点。

运行基于memory的持久化示例:

docker run -ti -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0

你将会看到以下的输出:

[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3
[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set]
[4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:13:01.770581 [INF] Server is ready
[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state
[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY
[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:13:02.052601 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:13:02.052613 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:13:02.052624 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:13:02.052635 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:13:02.052649 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------

能够看出默认的是基于内存的持久化。

运行基于file的持久化示例:

docker run -ti -v /Users/gao/test/mq:/datastore  -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0  -store file --dir /datastore -m 8222

你将会看到以下的输出:

[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM
[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set]
[5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:16:07.643932 [INF] Server is ready
[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state
[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE
[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore
[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:16:07.933711 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:16:07.933749 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:16:07.933793 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:16:07.933837 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:16:07.933857 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------

PS

  • 若是部署在k8s当中,那么就能够采起基于file的持久化,经过挂载一个块存储来保证,数据可靠。好比,aws的ebs或是ceph的rbd。
  • 4222为客户端链接的端口。8222为监控端口。

启动之后访问:localhost:8222,能够看到以下的网页:

图片描述

启动参数解析

Streaming Server Options:
    -cid, --cluster_id  <string>         Cluster ID (default: test-cluster)
    -st,  --store <string>               Store type: MEMORY|FILE|SQL (default: MEMORY)
          --dir <string>                 For FILE store type, this is the root directory
    -mc,  --max_channels <int>           Max number of channels (0 for unlimited)
    -msu, --max_subs <int>               Max number of subscriptions per channel (0 for unlimited)
    -mm,  --max_msgs <int>               Max number of messages per channel (0 for unlimited)
    -mb,  --max_bytes <size>             Max messages total size per channel (0 for unlimited)
    -ma,  --max_age <duration>           Max duration a message can be stored ("0s" for unlimited)
    -mi,  --max_inactivity <duration>    Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
    -ns,  --nats_server <string>         Connect to this external NATS Server URL (embedded otherwise)
    -sc,  --stan_config <string>         Streaming server configuration file
    -hbi, --hb_interval <duration>       Interval at which server sends heartbeat to a client
    -hbt, --hb_timeout <duration>        How long server waits for a heartbeat response
    -hbf, --hb_fail_count <int>          Number of failed heartbeats before server closes the client connection
          --ft_group <string>            Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
    -sl,  --signal <signal>[=<pid>]      Send signal to nats-streaming-server process (stop, quit, reopen)
          --encrypt <bool>               Specify if server should use encryption at rest
          --encryption_cipher <string>   Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
          --encryption_key <sting>       Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead

Streaming Server Clustering Options:
    --clustered <bool>                   Run the server in a clustered configuration (default: false)
    --cluster_node_id <string>           ID of the node within the cluster if there is no stored ID (default: random UUID)
    --cluster_bootstrap <bool>           Bootstrap the cluster if there is no existing state by electing self as leader (default: false)
    --cluster_peers <string>             List of cluster peer node IDs to bootstrap cluster state.
    --cluster_log_path <string>          Directory to store log replication data
    --cluster_log_cache_size <int>       Number of log entries to cache in memory to reduce disk IO (default: 512)
    --cluster_log_snapshots <int>        Number of log snapshots to retain (default: 2)
    --cluster_trailing_logs <int>        Number of log entries to leave after a snapshot and compaction
    --cluster_sync <bool>                Do a file sync after every write to the replication log and message store
    --cluster_raft_logging <bool>        Enable logging from the Raft library (disabled by default)

Streaming Server File Store Options:
    --file_compact_enabled <bool>        Enable file compaction
    --file_compact_frag <int>            File fragmentation threshold for compaction
    --file_compact_interval <int>        Minimum interval (in seconds) between file compactions
    --file_compact_min_size <size>       Minimum file size for compaction
    --file_buffer_size <size>            File buffer size (in bytes)
    --file_crc <bool>                    Enable file CRC-32 checksum
    --file_crc_poly <int>                Polynomial used to make the table used for CRC-32 checksum
    --file_sync <bool>                   Enable File.Sync on Flush
    --file_slice_max_msgs <int>          Maximum number of messages per file slice (subject to channel limits)
    --file_slice_max_bytes <size>        Maximum file slice size - including index file (subject to channel limits)
    --file_slice_max_age <duration>      Maximum file slice duration starting when the first message is stored (subject to channel limits)
    --file_slice_archive_script <string> Path to script to use if you want to archive a file slice being removed
    --file_fds_limit <int>               Store will try to use no more file descriptors than this given limit
    --file_parallel_recovery <int>       On startup, number of channels that can be recovered in parallel
    --file_truncate_bad_eof <bool>       Truncate files for which there is an unexpected EOF on recovery, dataloss may occur

Streaming Server SQL Store Options:
    --sql_driver <string>            Name of the SQL Driver ("mysql" or "postgres")
    --sql_source <string>            Datasource used when opening an SQL connection to the database
    --sql_no_caching <bool>          Enable/Disable caching for improved performance
    --sql_max_open_conns <int>       Maximum number of opened connections to the database

Streaming Server TLS Options:
    -secure <bool>                   Use a TLS connection to the NATS server without
                                     verification; weaker than specifying certificates.
    -tls_client_key <string>         Client key for the streaming server
    -tls_client_cert <string>        Client certificate for the streaming server
    -tls_client_cacert <string>      Client certificate CA for the streaming server

Streaming Server Logging Options:
    -SD, --stan_debug=<bool>         Enable STAN debugging output
    -SV, --stan_trace=<bool>         Trace the raw STAN protocol
    -SDV                             Debug and trace STAN
         --syslog_name               On Windows, when running several servers as a service, use this name for the event source
    (See additional NATS logging options below)

Embedded NATS Server Options:
    -a, --addr <string>              Bind to host address (default: 0.0.0.0)
    -p, --port <int>                 Use port for clients (default: 4222)
    -P, --pid <string>               File to store PID
    -m, --http_port <int>            Use port for http monitoring
    -ms,--https_port <int>           Use port for https monitoring
    -c, --config <string>            Configuration file

Logging Options:
    -l, --log <string>               File to redirect log output
    -T, --logtime=<bool>             Timestamp log entries (default: true)
    -s, --syslog <string>            Enable syslog as log method
    -r, --remote_syslog <string>     Syslog server addr (udp://localhost:514)
    -D, --debug=<bool>               Enable debugging output
    -V, --trace=<bool>               Trace the raw protocol
    -DV                              Debug and trace

Authorization Options:
        --user <string>              User required for connections
        --pass <string>              Password required for connections
        --auth <string>              Authorization token required for connections

TLS Options:
        --tls=<bool>                 Enable TLS, do not verify clients (default: false)
        --tlscert <string>           Server certificate file
        --tlskey <string>            Private key for server certificate
        --tlsverify=<bool>           Enable TLS, verify client certificates
        --tlscacert <string>         Client certificate CA for verification

NATS Clustering Options:
        --routes <string, ...>       Routes to solicit and connect
        --cluster <string>           Cluster URL for solicited routes

Common Options:
    -h, --help                       Show this message
    -v, --version                    Show version
        --help_tls                   TLS help.

源码简单分析NATS Streaming 持久化

目前NATS Streaming支持如下4种持久化方式:

  • MEMORY
  • FILE
  • SQL
  • RAFT

其实看源码能够知道:NATS Streaming的store基于接口实现,很容易扩展到更多的持久化方式。具体的接口以下:

// Store is the storage interface for NATS Streaming servers.
//
// If an implementation has a Store constructor with StoreLimits, it should be
// noted that the limits don't apply to any state being recovered, for Store
// implementations supporting recovery.
//
type Store interface {
    // GetExclusiveLock is an advisory lock to prevent concurrent
    // access to the store from multiple instances.
    // This is not to protect individual API calls, instead, it
    // is meant to protect the store for the entire duration the
    // store is being used. This is why there is no `Unlock` API.
    // The lock should be released when the store is closed.
    //
    // If an exclusive lock can be immediately acquired (that is,
    // it should not block waiting for the lock to be acquired),
    // this call will return `true` with no error. Once a store
    // instance has acquired an exclusive lock, calling this
    // function has no effect and `true` with no error will again
    // be returned.
    //
    // If the lock cannot be acquired, this call will return
    // `false` with no error: the caller can try again later.
    //
    // If, however, the lock cannot be acquired due to a fatal
    // error, this call should return `false` and the error.
    //
    // It is important to note that the implementation should
    // make an effort to distinguish error conditions deemed
    // fatal (and therefore trying again would invariably result
    // in the same error) and those deemed transient, in which
    // case no error should be returned to indicate that the
    // caller could try later.
    //
    // Implementations that do not support exclusive locks should
    // return `false` and `ErrNotSupported`.
    GetExclusiveLock() (bool, error)

    // Init can be used to initialize the store with server's information.
    Init(info *spb.ServerInfo) error

    // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
    Name() string

    // Recover returns the recovered state.
    // Implementations that do not persist state and therefore cannot
    // recover from a previous run MUST return nil, not an error.
    // However, an error must be returned for implementations that are
    // attempting to recover the state but fail to do so.
    Recover() (*RecoveredState, error)

    // SetLimits sets limits for this store. The action is not expected
    // to be retroactive.
    // The store implementation should make a deep copy as to not change
    // the content of the structure passed by the caller.
    // This call may return an error due to limits validation errors.
    SetLimits(limits *StoreLimits) error

    // GetChannelLimits returns the limit for this channel. If the channel
    // does not exist, returns nil.
    GetChannelLimits(name string) *ChannelLimits

    // CreateChannel creates a Channel.
    // Implementations should return ErrAlreadyExists if the channel was
    // already created.
    // Limits defined for this channel in StoreLimits.PeChannel map, if present,
    // will apply. Otherwise, the global limits in StoreLimits will apply.
    CreateChannel(channel string) (*Channel, error)

    // DeleteChannel deletes a Channel.
    // Implementations should make sure that if no error is returned, the
    // channel would not be recovered after a restart, unless CreateChannel()
    // with the same channel is invoked.
    // If processing is expecting to be time consuming, work should be done
    // in the background as long as the above condition is guaranteed.
    // It is also acceptable for an implementation to have CreateChannel()
    // return an error if background deletion is still happening for a
    // channel of the same name.
    DeleteChannel(channel string) error

    // AddClient stores information about the client identified by `clientID`.
    AddClient(info *spb.ClientInfo) (*Client, error)

    // DeleteClient removes the client identified by `clientID` from the store.
    DeleteClient(clientID string) error

    // Close closes this store (including all MsgStore and SubStore).
    // If an exclusive lock was acquired, the lock shall be released.
    Close() error
}

官方也提供了mysql和pgsql两种数据的支持:

postgres.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));
CREATE INDEX Idx_ChannelsName ON Channels (name(256));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));
CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));
CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);

-- Updates for 0.10.0
ALTER TABLE Clients ADD proto BYTEA;

mysql.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq));
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);

# Updates for 0.10.0
ALTER TABLE Clients ADD proto BLOB;

总结

后续会详细解读一下代码实现和一些集群部署。固然确定少不了如何部署高可用的集群在k8s当中。

参阅文章:

NATS Streaming详解

相关文章
相关标签/搜索