本文是使用 golang 实现 redis 系列的第七篇, 将介绍如何将单点的缓存服务器扩展为分布式缓存。godis 集群的源码在Github:Godis/clusternode
单台服务器的CPU和内存等资源老是有限的,随着数据量和访问量的增长单台服务器很容易遇到瓶颈。利用多台机器创建分布式系统,分工处理是提升系统容量和吞吐量的经常使用方法。git
使用更多机器来提升系统容量的方式称为系统横向扩容。与之相对的,提升单台机器性能被称为纵向扩容。因为没法在单台机器上无限提升硬件配置且硬件价格与性能的关系并不是线性的,因此创建分布式系统进行横向扩容是更为经济实用的选择。github
咱们采用一致性 hash 算法 key 分散到不一样的服务器,客户端能够链接到服务集群中任意一个节点。当节点须要访问的数据不在本身本地时,须要经过一致性 hash 算法计算出数据所在的节点并将指令转发给它。golang
与分布式系统理论中的分区容错性不一样,咱们仅将数据存在一个节点没有保存副本。这种设计提升了系统吞吐量和容量,可是并无提升系统可用性,当有一个节点崩溃时它保存的数据将没法访问。redis
生产环境实用的 redis 集群一般也采起相似的分片存储策略,并为每一个节点配置从节点做为热备节点,并使用 sentinel 机制监控 master 节点状态。在 master 节点崩溃后,sentinel 将备份节点提高为 master 节点以保证可用性。算法
在采用分片方式创建分布式缓存时,咱们面临的第一个问题是如何决定存储数据的节点。最天然的方式是参考 hash 表的作法,假设集群中存在 n 个节点,咱们用 node = hashCode(key) % n
来决定所属的节点。数据库
普通 hash 算法解决了如何选择节点的问题,但在分布式系统中常常出现增长节点或某个节点宕机的状况。若节点数 n 发生变化, 大多数 key 根据 node = hashCode(key) % n
计算出的节点都会改变。这意味着若要在 n 变化后维持系统正常运转,须要将大多数数据在节点间进行从新分布。这个操做会消耗大量的时间和带宽等资源,这在生产环境下是不可接受的。缓存
一致性 hash 算法的目的是在节点数量 n 变化时, 使尽量少的 key 须要进行节点间从新分布。一致性 hash 算法将数据 key 和服务器地址 addr 散列到 2^32 的空间中。服务器
咱们将 2^32 个整数首尾相连造成一个环,首先计算服务器地址 addr 的 hash 值放置在环上。而后计算 key 的 hash 值放置在环上,顺时针查找,将数据放在找到的的第一个节点上。app
key1, key2 和 key5 在 node2 上,key 3 在 node4 上,key4 在 node6 上
在增长或删除节点时只有该节点附近的数据须要从新分布,从而解决了上述问题。
新增 node8 后,key 5 从 node2 转移到 node8。其它 key 不变
若是服务器节点较少则比较容易出现数据分布不均匀的问题,通常来讲环上的节点越多数据分布越均匀。咱们不须要真的增长一台服务器,只须要将实际的服务器节点映射为几个虚拟节点放在环上便可。
咱们使用 Golang 实现一致性 hash 算法, 源码在 Github: HDT3213/Godis, 大约 80 行代码。
type HashFunc func(data []byte) uint32 type Map struct { hashFunc HashFunc replicas int keys []int // sorted hashMap map[int]string } func New(replicas int, fn HashFunc) *Map { m := &Map{ replicas: replicas, // 每一个物理节点会产生 replicas 个虚拟节点 hashFunc: fn, hashMap: make(map[int]string), // 虚拟节点 hash 值到物理节点地址的映射 } if m.hashFunc == nil { m.hashFunc = crc32.ChecksumIEEE } return m } func (m *Map) IsEmpty() bool { return len(m.keys) == 0 }
接下来实现添加物理节点的 Add 方法:
func (m *Map) Add(keys ...string) { for _, key := range keys { if key == "" { continue } for i := 0; i < m.replicas; i++ { // 使用 i + key 做为一个虚拟节点,计算虚拟节点的 hash 值 hash := int(m.hashFunc([]byte(strconv.Itoa(i) + key))) // 将虚拟节点添加到环上 m.keys = append(m.keys, hash) // 注册虚拟节点到物理节点的映射 m.hashMap[hash] = key } } sort.Ints(m.keys) }
接下来实现查找算法:
func (m *Map) Get(key string) string { if m.IsEmpty() { return "" } // 支持根据 key 的 hashtag 来肯定分布 partitionKey := getPartitionKey(key) hash := int(m.hashFunc([]byte(partitionKey))) // sort.Search 会使用二分查找法搜索 keys 中知足 m.keys[i] >= hash 的最小 i 值 idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) // 若 key 的 hash 值大于最后一个虚拟节点的 hash 值,则 sort.Search 找不到目标 // 这种状况下选择第一个虚拟节点 if idx == len(m.keys) { idx = 0 } // 将虚拟节点映射为实际地址 return m.hashMap[m.keys[idx]] }
实现了一致性 hash 算法后咱们能够着手实现集群模式了,Godis 集群的代码在 Github:Godis/cluster。
集群最核心的逻辑是找到 key 所在节点并将指令转发过去:
// 集群模式下,除了 MSet、DEL 等特殊指令外,其它指令会交由 defaultFunc 处理 func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { key := string(args[1]) peer := cluster.peerPicker.Get(key) // 经过一致性 hash 找到节点 return cluster.Relay(peer, c, args) } func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) redis.Reply { if peer == cluster.self { // 若数据在本地则直接调用数据库引擎 // to self db return cluster.db.Exec(c, args) } else { // 从链接池取一个与目标节点的链接 // 链接池使用 github.com/jolestar/go-commons-pool/v2 实现 peerClient, err := cluster.getPeerClient(peer) if err != nil { return reply.MakeErrReply(err.Error()) } defer func() { _ = cluster.returnPeerClient(peer, peerClient) // 处理完成后将链接放回链接池 }() // 将指令发送到目标节点 return peerClient.Send(args) } } func (cluster *Cluster) getPeerClient(peer string) (*client.Client, error) { connectionFactory, ok := cluster.peerConnection[peer] if !ok { return nil, errors.New("connection factory not found") } raw, err := connectionFactory.BorrowObject(context.Background()) if err != nil { return nil, err } conn, ok := raw.(*client.Client) if !ok { return nil, errors.New("connection factory make wrong type") } return conn, nil } func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client) error { connectionFactory, ok := cluster.peerConnection[peer] if !ok { return errors.New("connection factory not found") } return connectionFactory.ReturnObject(context.Background(), peerClient) }