[转]Redis cluster failover

今天测试了redis cluster  failover 功能,在切换过程当中很快,但在failover时有force 与takeover 之分node

[RHZYTEST_10:REDIS:6237:M ~]$r 127.0.0.1:6237> cluster nodes 13b6094babb9c16066a315a828434b3c6525f08b 192.168.1.91:6236 master - 0 1498035354251 3 connected 10923-16383 1f23e15fc418a157af3a8bb02616221f7c89be9c 192.168.1.91:6234 master - 0 1498035351241 5 connected 0-5460 6defcadf4122643b49d2fbf963e6d1fe75bc077f 192.168.1.91:6240 handshake - 1498035354852 0 0 disconnected 5b0b0b1b77426a27024c334aa52c964a61d5aee8 192.168.1.91:6238 slave 5caa45a805a9fefe2c8d41b549b15fd8568133ee 0 1498035356258 2 connected 9c21cb36a91f937185f51b91642360ff843db33b 192.168.1.91:6239 slave 13b6094babb9c16066a315a828434b3c6525f08b 0 1498035353248 3 connected 5caa45a805a9fefe2c8d41b549b15fd8568133ee 192.168.1.91:6235 master - 0 1498035355253 2 connected 5461-10922 edc4ba4257425393dc8d21680baab991b5e91241 192.168.1.91:6237 myself,slave 1f23e15fc418a157af3a8bb02616221f7c89be9c 0 0 4 connected 127.0.0.1:6237> cluster failover force OK 127.0.0.1:6237> 127.0.0.1:6237> cluster nodes 3559d4c6e79ba7fffa130831d8abbad1ee7c4beb 192.168.1.91:6240 handshake - 1498035440166 0 0 disconnected 13b6094babb9c16066a315a828434b3c6525f08b 192.168.1.91:6236 master - 0 1498035448608 3 connected 10923-16383 1f23e15fc418a157af3a8bb02616221f7c89be9c 192.168.1.91:6234 slave edc4ba4257425393dc8d21680baab991b5e91241 0 1498035451616 6 connected 5b0b0b1b77426a27024c334aa52c964a61d5aee8 192.168.1.91:6238 slave 5caa45a805a9fefe2c8d41b549b15fd8568133ee 0 1498035449611 2 connected 9c21cb36a91f937185f51b91642360ff843db33b 192.168.1.91:6239 slave 13b6094babb9c16066a315a828434b3c6525f08b 0 1498035447606 3 connected 5caa45a805a9fefe2c8d41b549b15fd8568133ee 192.168.1.91:6235 master - 0 1498035450615 2 connected 5461-10922 edc4ba4257425393dc8d21680baab991b5e91241 192.168.1.91:6237 myself,master - 0 0 6 connected 0-5460 127.0.0.1:6237> 

failove 主要应用如下场景:redis

  1. The slave tells the master to stop processing queries from clients.
  2. The master replies to the slave with the current replication offset.
  3. The slave waits for the replication offset to match on its side, to make sure it processed all the data from the master before it continues.
  4. The slave starts a failover, obtains a new configuration epoch from the majority of the masters, and broadcasts the new configuration.
  5. The old master receives the configuration update: unblocks its clients and starts replying with redirection messages so that they'll continue the chat with the new master.

他们之间有什么区别呢?算法

FORCE option is given, the slave does not perform any handshake with the master, that may be not reachable, but instead just starts a failover ASAP starting from point 4. This is useful when we want to start a manual failover while the master is no longer reachable.bash

However using FORCE we still need the majority of masters to be available in order to authorize the failover and generate a new configuration epoch for the slave that is going to become master.app

There are situations where this is not enough, and we want a slave to failover without any agreement with the rest of the cluster. A real world use case for this is to mass promote slaves in a different data center to masters in order to perform a data center switch, while all the masters are down or partitioned away.less

The TAKEOVER option implies everything FORCE implies, but also does not uses any cluster authorization in order to failover. A slave receiving CLUSTER FAILOVER TAKEOVER will instead:分布式

  1. Generate a new configEpoch unilaterally, just taking the current greatest epoch available and incrementing it if its local configuration epoch is not already the greatest.
  2. Assign itself all the hash slots of its master, and propagate the new configuration to every node which is reachable ASAP, and eventually to every other node.

Note that TAKEOVER violates the last-failover-wins principle of Redis Cluster, since the configuration epoch generated by the slave violates the normal generation of configuration epochs in several ways:ide

  1. There is no guarantee that it is actually the higher configuration epoch, since, for example, we can use the TAKEOVER option within a minority, nor any message exchange is performed to generate the new configuration epoch.
  2. If we generate a configuration epoch which happens to collide with another instance, eventually our configuration epoch, or the one of another instance with our same epoch, will be moved away using the configuration epoch collision resolution algorithm.

Because of this the TAKEOVER option should be used with care.函数

下面是分析cluster failover 选项takeover 与forcer的原码测试

 

http://blog.csdn.net/gqtcgq/article/details/51830483

一:手动故障转移

         Redis集群支持手动故障转移。也就是向从节点发送”CLUSTER  FAILOVER”命令,使其在主节点未下线的状况下,发起故障转移流程,升级为新的主节点,而原来的主节点降级为从节点。

         为了避免丢失数据,向从节点发送”CLUSTER  FAILOVER”命令后,流程以下:

         a:从节点收到命令后,向主节点发送CLUSTERMSG_TYPE_MFSTART包;

         b:主节点收到该包后,会将其全部客户端置于阻塞状态,也就是在10s的时间内,再也不处理客户端发来的命令;而且在其发送的心跳包中,会带有CLUSTERMSG_FLAG0_PAUSED标记;

         c:从节点收到主节点发来的,带CLUSTERMSG_FLAG0_PAUSED标记的心跳包后,从中获取主节点当前的复制偏移量。从节点等到本身的复制偏移量达到该值后,才会开始执行故障转移流程:发起选举、统计选票、赢得选举、升级为主节点并更新配置;

 

         ”CLUSTER  FAILOVER”命令支持两个选项:FORCE和TAKEOVER。使用这两个选项,能够改变上述的流程。

         若是有FORCE选项,则从节点不会与主节点进行交互,主节点也不会阻塞其客户端,而是从节点当即开始故障转移流程:发起选举、统计选票、赢得选举、升级为主节点并更新配置。

         若是有TAKEOVER选项,则更加简单粗暴:从节点再也不发起选举,而是直接将本身升级为主节点,接手原主节点的槽位,增长本身的configEpoch后更新配置。

 

         所以,使用FORCE和TAKEOVER选项,主节点能够已经下线;而不使用任何选项,只发送”CLUSTER  FAILOVER”命令的话,主节点必须在线。

 

         在clusterCommand函数中,处理”CLUSTER  FAILOVER”命令的部分代码以下:

else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
               (c->argc == 2 || c->argc == 3))
    {
        /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
        int force = 0, takeover = 0;

        if (c->argc == 3) {
            if (!strcasecmp(c->argv[2]->ptr,"force")) {
                force = 1;
            } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
                takeover = 1;
                force = 1; /* Takeover also implies force. */
            } else {
                addReply(c,shared.syntaxerr);
                return;
            }
        }

        /* Check preconditions. */
        if (nodeIsMaster(myself)) {
            addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
            return;
        } else if (myself->slaveof == NULL) {
            addReplyError(c,"I'm a slave but my master is unknown to me");
            return;
        } else if (!force &&
                   (nodeFailed(myself->slaveof) ||
                    myself->slaveof->link == NULL))
        {
            addReplyError(c,"Master is down or failed, "
                            "please use CLUSTER FAILOVER FORCE");
            return;
        }
        resetManualFailover();
        server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;

        if (takeover) {
            /* A takeover does not perform any initial check. It just
             * generates a new configuration epoch for this node without
             * consensus, claims the master's slots, and broadcast the new
             * configuration. */
            redisLog(REDIS_WARNING,"Taking over the master (user request).");
            clusterBumpConfigEpochWithoutConsensus();
            clusterFailoverReplaceYourMaster();
        } else if (force) {
            /* If this is a forced failover, we don't need to talk with our
             * master to agree about the offset. We just failover taking over
             * it without coordination. */
            redisLog(REDIS_WARNING,"Forced failover user request accepted.");
            server.cluster->mf_can_start = 1;
        } else {
            redisLog(REDIS_WARNING,"Manual failover user request accepted.");
            clusterSendMFStart(myself->slaveof);
        }
        addReply(c,shared.ok);
    }

 

         首先检查命令的最后一个参数是不是FORCE或TAKEOVER;

         若是当前节点是主节点;或者当前节点是从节点,但没有主节点;或者当前从节点的主节点已经下线或者断链,而且命令中没有FORCE或TAKEOVER参数,则直接回复客户端错误信息后返回;

         而后调用resetManualFailover,重置手动强制故障转移的状态;

         置mf_end为当前时间加5秒,该属性表示手动强制故障转移流程的超时时间,也用来表示当前是否正在进行手动强制故障转移;

         若是命令最后一个参数为TAKEOVER,这表示收到命令的从节点无需通过选举的过程,直接接手其主节点的槽位,并成为新的主节点。所以首先调用函数clusterBumpConfigEpochWithoutConsensus,产生新的configEpoch,以便后续更新配置;而后调用clusterFailoverReplaceYourMaster函数,转变成为新的主节点,并将这种转变广播给集群中全部节点;

         若是命令最后一个参数是FORCE,这表示收到命令的从节点能够直接开始选举过程,而无需达到主节点的复制偏移量以后才开始选举过程。所以置mf_can_start为1,这样在函数clusterHandleSlaveFailover中,即便在主节点未下线或者当前从节点的复制数据比较旧的状况下,也能够开始故障转移流程;

         若是最后一个参数不是FORCE或TAKEOVER,这表示收到命令的从节点,首先须要向主节点发送CLUSTERMSG_TYPE_MFSTART包,所以调用clusterSendMFStart函数,向其主节点发送该包;

 

         主节点收到CLUSTERMSG_TYPE_MFSTART包后,在clusterProcessPacket函数中,是这样处理的:

 

else if (type == CLUSTERMSG_TYPE_MFSTART) {
        /* This message is acceptable only if I'm a master and the sender
         * is one of my slaves. */
        if (!sender || sender->slaveof != myself) return 1;
        /* Manual failover requested from slaves. Initialize the state
         * accordingly. */
        resetManualFailover();
        server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
        server.cluster->mf_slave = sender;
        pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
        redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",
            sender->name);
    }

 

         若是字典中找不到发送节点,或者发送节点的主节点不是当前节点,则直接返回;

         调用resetManualFailover,重置手动强制故障转移的状态;

         而后置mf_end为当前时间加5秒,该属性表示手动强制故障转移流程的超时时间,也用来表示当前是否正在进行手动强制故障转移;

         而后设置mf_slave为sender,该属性表示要进行手动强制故障转移的从节点;

         而后调用pauseClients,使全部客户端在以后的10s内阻塞;

 

         主节点在发送心跳包时,在构建包头时,若是发现当前正处于手动强制故障转移阶段,则会在包头中增长CLUSTERMSG_FLAG0_PAUSED标记:

 

void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
    ...
    /* Set the message flags. */
    if (nodeIsMaster(myself) && server.cluster->mf_end)
        hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
    ...
}

 

 

         从节点在clusterProcessPacket函数中处理收到的包,一旦发现主节点发来的,带有CLUSTERMSG_FLAG0_PAUSED标记的包,就会将该主节点的复制偏移量记录到server.cluster->mf_master_offset中:

int clusterProcessPacket(clusterLink *link) {
    ...
    /* Check if the sender is a known node. */
    sender = clusterLookupNode(hdr->sender);
    if (sender && !nodeInHandshake(sender)) {
        ...
        /* Update the replication offset info for this node. */
        sender->repl_offset = ntohu64(hdr->offset);
        sender->repl_offset_time = mstime();
        /* If we are a slave performing a manual failover and our master
         * sent its offset while already paused, populate the MF state. */
        if (server.cluster->mf_end &&
            nodeIsSlave(myself) &&
            myself->slaveof == sender &&
            hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
            server.cluster->mf_master_offset == 0)
        {
            server.cluster->mf_master_offset = sender->repl_offset;
            redisLog(REDIS_WARNING,
                "Received replication offset for paused "
                "master manual failover: %lld",
                server.cluster->mf_master_offset);
        }
    }
}

 

 

         从节点在集群定时器函数clusterCron中,会调用clusterHandleManualFailover函数,判断一旦当前从节点的复制偏移量达到了server.cluster->mf_master_offset,就会置server.cluster->mf_can_start为1。这样在接下来要调用的clusterHandleSlaveFailover函数中,就会当即开始故障转移流程了。

         clusterHandleManualFailover函数的代码以下:

 

void clusterHandleManualFailover(void) {
    /* Return ASAP if no manual failover is in progress. */
    if (server.cluster->mf_end == 0) return;

    /* If mf_can_start is non-zero, the failover was already triggered so the
     * next steps are performed by clusterHandleSlaveFailover(). */
    if (server.cluster->mf_can_start) return;

    if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */

    if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
        /* Our replication offset matches the master replication offset
         * announced after clients were paused. We can start the failover. */
        server.cluster->mf_can_start = 1;
        redisLog(REDIS_WARNING,
            "All master replication stream processed, "
            "manual failover can start.");
    }
}

 

         不论是从节点,仍是主节点,在集群定时器函数clusterCron中,都会调用manualFailoverCheckTimeout函数,一旦发现手动故障转移的超时时间已到,就会重置手动故障转移的状态,表示终止该过程。manualFailoverCheckTimeout函数代码以下:

 

/* If a manual failover timed out, abort it. */
void manualFailoverCheckTimeout(void) {
    if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
        redisLog(REDIS_WARNING,"Manual failover timed out.");
        resetManualFailover();
    }
}

 

 

二:从节点迁移

         在Redis集群中,为了加强集群的可用性,通常状况下须要为每一个主节点配置若干从节点。可是这种主从关系若是是固定不变的,则通过一段时间以后,就有可能出现孤立主节点的状况,也就是一个主节点再也没有可用于故障转移的从节点了,一旦这样的主节点下线,整个集群也就不可用了。

         所以,在Redis集群中,增长了从节点迁移的功能。简单描述以下:一旦发现集群中出现了孤立主节点,则某个从节点A就会自动变成该孤立主节点的从节点。该从节点A知足这样的条件:A的主节点具备最多的附属从节点;A在这些附属从节点中,节点ID是最小的(The acting slave is the slave among the masterswith the maximum number of attached slaves, that is not in FAIL state and hasthe smallest node ID)。

 

         该功能是在集群定时器函数clusterCron中实现的。这部分的代码以下:

 

void clusterCron(void) {
    ...
    orphaned_masters = 0;
    max_slaves = 0;
    this_slaves = 0;
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */
        mstime_t delay;

        if (node->flags &
            (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
                continue;

        /* Orphaned master check, useful only if the current instance
         * is a slave that may migrate to another master. */
        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
            int okslaves = clusterCountNonFailingSlaves(node);

            /* A master is orphaned if it is serving a non-zero number of
             * slots, have no working slaves, but used to have at least one
             * slave. */
            if (okslaves == 0 && node->numslots > 0 && node->numslaves)
                orphaned_masters++;
            if (okslaves > max_slaves) max_slaves = okslaves;
            if (nodeIsSlave(myself) && myself->slaveof == node)
                this_slaves = okslaves;
        }
        ...
    }
    ...
    if (nodeIsSlave(myself)) {
        ...
        /* If there are orphaned slaves, and we are a slave among the masters
         * with the max number of non-failing slaves, consider migrating to
         * the orphaned masters. Note that it does not make sense to try
         * a migration if there is no master with at least *two* working
         * slaves. */
        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
            clusterHandleSlaveMigration(max_slaves);
    }
    ...
}

 

         轮训字典server.cluster->nodes,只要其中的节点不是当前节点,没有处于REDIS_NODE_NOADDR或者握手状态,就对该node节点作相应的处理:

         若是当前节点是从节点,而且node节点是主节点,而且node未被标记为下线,则首先调用函数clusterCountNonFailingSlaves,计算node节点未下线的从节点个数okslaves,若是node主节点的okslaves为0,而且该主节点负责的插槽数不为0,说明该node主节点是孤立主节点,所以增长orphaned_masters的值;若是该node主节点的okslaves大于max_slaves,则将max_slaves改成okslaves,所以,max_slaves记录了全部主节点中,拥有最多未下线从节点的那个主节点的未下线从节点个数;若是当前节点正好是node主节点的从节点之一,则将okslaves记录到this_slaves中,以上都是为后续作从节点迁移作的准备;

 

         轮训完全部节点以后,若是存在孤立主节点,而且max_slaves大于等于2,而且当前节点恰好是那个拥有最多未下线从节点的主节点的众多从节点之一,则调用函数clusterHandleSlaveMigration,知足条件的状况下,进行从节点迁移,也就是将当前从节点置为某孤立主节点的从节点。

 

         clusterHandleSlaveMigration函数的代码以下:

 

void clusterHandleSlaveMigration(int max_slaves) {
    int j, okslaves = 0;
    clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
    dictIterator *di;
    dictEntry *de;

    /* Step 1: Don't migrate if the cluster state is not ok. */
    if (server.cluster->state != REDIS_CLUSTER_OK) return;

    /* Step 2: Don't migrate if my master will not be left with at least
     *         'migration-barrier' slaves after my migration. */
    if (mymaster == NULL) return;
    for (j = 0; j < mymaster->numslaves; j++)
        if (!nodeFailed(mymaster->slaves[j]) &&
            !nodeTimedOut(mymaster->slaves[j])) okslaves++;
    if (okslaves <= server.cluster_migration_barrier) return;

    /* Step 3: Idenitfy a candidate for migration, and check if among the
     * masters with the greatest number of ok slaves, I'm the one with the
     * smaller node ID.
     *
     * Note that this means that eventually a replica migration will occurr
     * since slaves that are reachable again always have their FAIL flag
     * cleared. At the same time this does not mean that there are no
     * race conditions possible (two slaves migrating at the same time), but
     * this is extremely unlikely to happen, and harmless. */
    candidate = myself;
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        int okslaves;

        /* Only iterate over working masters. */
        if (nodeIsSlave(node) || nodeFailed(node)) continue;
        /* If this master never had slaves so far, don't migrate. We want
         * to migrate to a master that remained orphaned, not masters that
         * were never configured to have slaves. */
        if (node->numslaves == 0) continue;
        okslaves = clusterCountNonFailingSlaves(node);

        if (okslaves == 0 && target == NULL && node->numslots > 0)
            target = node;

        if (okslaves == max_slaves) {
            for (j = 0; j < node->numslaves; j++) {
                if (memcmp(node->slaves[j]->name,
                           candidate->name,
                           REDIS_CLUSTER_NAMELEN) < 0)
                {
                    candidate = node->slaves[j];
                }
            }
        }
    }
    dictReleaseIterator(di);

    /* Step 4: perform the migration if there is a target, and if I'm the
     * candidate. */
    if (target && candidate == myself) {
        redisLog(REDIS_WARNING,"Migrating to orphaned master %.40s",
            target->name);
        clusterSetMaster(target);
    }
}

 

         若是当前集群状态不是REDIS_CLUSTER_OK,则直接返回;若是当前从节点没有主节点,则直接返回;

         接下来计算,当前从节点的主节点,具备未下线从节点的个数okslaves;若是okslaves小于等于迁移阈值server.cluster_migration_barrier,则直接返回;

        

         接下来,开始轮训字典server.cluster->nodes,针对其中的每个节点node:

         若是node节点是从节点,或者处于下线状态,则直接处理下一个节点;若是node节点没有配置从节点,则直接处理下一个节点;

         调用clusterCountNonFailingSlaves函数,计算该node节点的未下线主节点数okslaves;若是okslaves为0,而且该node节点的numslots大于0,说明该主节点以前有从节点,可是都下线了,所以找到了一个孤立主节点target;

         若是okslaves等于参数max_slaves,说明该node节点就是具备最多未下线从节点的主节点,所以将当前节点的节点ID,与其全部从节点的节点ID进行比较,若是当前节点的名字更大,则将candidate置为具备更小名字的那个从节点;(其实从这里就能够直接退出返回了)

        

         轮训完全部节点后,若是找到了孤立节点,而且当前节点拥有最小的节点ID,则调用clusterSetMaster,将target置为当前节点的主节点,并开始主从复制流程。

 

三:configEpoch冲突问题

         在集群中,负责不一样槽位的主节点,具备相同的configEpoch实际上是没有问题的,可是有可能由于人为介入的缘由或者BUG的问题,致使具备相同configEpoch的主节点都宣称负责相同的槽位,这在分布式系统中是致命的问题;所以,Redis规定集群中的全部节点,必须具备不一样的configEpoch。

         当某个从节点升级为新主节点时,它会获得一个大于当前全部节点的configEpoch的新configEpoch,因此不会致使具备重复configEpoch的从节点(由于一次选举中,不会有两个从节点同时胜出)。可是在管理员发起的从新分片过程的最后,迁入槽位的节点会本身更新本身的configEpoch,而无需其余节点的赞成;或者手动强制故障转移过程,也会致使从节点在无需其余节点赞成的状况下更新configEpoch,以上的状况均可能致使出现多个主节点具备相同configEpoch的状况。

         所以,就须要一种算法,保证集群中全部节点的configEpoch都不相同。这种算法是这样实现的:当某个主节点收到其余主节点发来的心跳包后,发现包中的configEpoch与本身的configEpoch相同,就会调用clusterHandleConfigEpochCollision函数,解决这种configEpoch冲突的问题。

         clusterHandleConfigEpochCollision函数的代码以下:

 

void clusterHandleConfigEpochCollision(clusterNode *sender) {
    /* Prerequisites: nodes have the same configEpoch and are both masters. */
    if (sender->configEpoch != myself->configEpoch ||
        !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
    /* Don't act if the colliding node has a smaller Node ID. */
    if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return;
    /* Get the next ID available at the best of this node knowledge. */
    server.cluster->currentEpoch++;
    myself->configEpoch = server.cluster->currentEpoch;
    clusterSaveConfigOrDie(1);
    redisLog(REDIS_VERBOSE,
        "WARNING: configEpoch collision with node %.40s."
        " configEpoch set to %llu",
        sender->name,
        (unsigned long long) myself->configEpoch);
}

 

         若是发送节点的configEpoch不等于当前节点的configEpoch,或者发送节点不是主节点,或者当前节点不是主节点,则直接返回;

         若是相比于当前节点的节点ID,发送节点的节点ID更小,则直接返回;

         所以,较小名字的节点能得到更大的configEpoch,接下来首先增长本身的currentEpoch,而后将configEpoch赋值为currentEpoch。

         这样,即便有多个节点具备相同的configEpoch,最终,只有具备最大节点ID的节点的configEpoch保持不变,其余节点都会增长本身的configEpoch,并且增长的值会不一样,具备最小NODE ID的节点,最终具备最大的configEpoch。

 

         参考:http://redis.io/topics/cluster-spec

相关文章
相关标签/搜索