Apache Ignite 学习笔记(五): Primary和backup数据同步模式和处理分片丢失的策略

上一篇文章咱们介绍了Ignite数据网格中不一样的数据分片冗余策略:Replicated和Partition模式。不管是哪一种模式,其实就是经过对数据分片在不一样的节点上作多个拷贝来保证数据的可用性。在一个多个节点组成的分布式系统中,一旦须要作数据拷贝,天然就要考虑数据拷贝的过程是同步的仍是异步的。并且,在partition模式下,一个节点也许不会有数据的全部分片,那势必会出现某个数据分片的primary和backup拷贝因为节点故障,在集群中访问不到的状况。这篇文章咱们就接着看看,针对数据拷贝以及数据分片丢失,Ignite提供了哪些选项,咱们又该怎样处理。java

Primary和Backup之间的同步/异步拷贝


Ignite针对primary和backup之间的数据拷贝提供了三种同步模式:node

  • PRIMARY_SYNC: 默认状况下Ignite采用的同步模式。写cache的操做在数据分片的primary节点成功写入便可返回,不用等待backup节点数据成功写入。这也意味着,若是此时从backup节点读数据,有可能读到的任然是旧数据。
  • FULL_SYNC: 写cache的操做在primary节点和backup节点都成功写入后返回。和PRIMARY_SYNC模式相比,这个模式保证了写入成功后节点之间的数据都同样。
  • FULL_ASYNC: 写cache的操做不用等primary节点和backup节点成功写入便可返回。和PRIMARY_SYNC模式相比,此时即使是读primary节点的数据都有可能读到旧数据。

三种同步模式如何选择,彻底取决于应用对数据一致性,可用性和性能的要求。FULL_SYNC保证新的数据同步到了primary和backup节点上,天然对写操做的性能影响是最大的。PRIMARY_SYNC则只保证数据同步到了primary节点上,这个模式牺牲必定的可用性换取了比FULL_SYNC更好的写性能。而FULL_ASYNC由于是彻底异步的,因此有可能会出现数据丢失,这里牺牲了数据的可用性,换取更好的写性能。git

咱们能够经过XML配置文件或者是代码中之间配置同步模式。下面是XML配置文件:spring

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="multicastGroup" value="224.0.0.251"/>
                    </bean>
                </property>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <!-- 设置缓存名字. -->
                <property name="name" value="TEST"/>
                <!-- 设置缓存模式. -->
                <property name="cacheMode" value="PARTITIONED"/>
                <property name="backups" value="1"/>
                <!-- 下面将缓存设置为replicated模式 -->
                <!--property name="cacheMode" value="REPLICATED"/-->
                <property name="writeSynchronizationMode" value="FULL_SYNC"/>
            </bean>
        </property>
    </bean>
</beans>

下面例子是如何在Java代码中设置同步模式apache

...
    CacheConfiguration<String, String> cacheCfg = new CacheConfiguration("TEST");
    cacheCfg.setCacheMode(CacheMode.PARTITIONED);
    cacheCfg.setBackups(1);
    cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
    ...

数据分片丢失的处理


在partition模式下, 数据分片后存放在primary和backup节点上,一旦出现某块数据分片的全部primary和backup拷贝因为节点故障没法访问时,就出现了“partition loss"的状况。用上一篇文章的partitoned cached图来举个例子:
缓存

图中cache用的模式partition模式,backup数量是1,因此数据分片有一个primar和backup拷贝,若是JVM1和JVM4出现故障,那么分片D的primary拷贝和backup拷贝全都没法访问。这时候,若是容许用户的读写操做继续读取分片D数据,那数据的一致性就没法保证了。咱们能够经过监听EVT_CACHE_REBALANCE_PART_DATA_LOST事件,及时知道集群中出现partition loss,而后采起相应措施。另外,Ignite提供了不一样的处理策略,让你能够针对不一样的场景选择不一样的策略:异步

  • IGNORE: 若是不进行配置,这是默认状况下的策略。即便出现了partition loss的状况,Ignite会自动忽略而且会清空和partion loss相关的状态不会触发EVT_CACHE_REBALANCE_PART_DATA_LOST事件。
  • READ_WRITE_ALL: Ignite容许全部的读写操做,就好像partition loss没发生过。和IGNORE策略最大的不一样,该策略虽然容许继续读写,但会触发EVT_CACHE_REBALANCE_PART_DATA_LOST事件。
  • READ_WRITE_SAFE: 容许对没有丢失的partition的读写操做,可是对已经丢失的partition的读写操做会失败并抛异常。
  • READ_ONLY_ALL: 容许对丢失的和正常的partition的读操做,可是写操做会失败并抛异常。
  • READ_ONLY_SAFE: 全部的写操做和对丢失partition的读操做都会失败并抛异常。容许对正常的partition的读操做。

下面,让咱们用一个例子演示下如何配置partition loss的策略,以及如何经过监听EVT_CACHE_REBALANCE_PART_DATA_LOST处理paritition loss的事件:maven

  1. 启动2个server实例。
  2. server实例启动后,启动一个client节点连上集群,用CacheMode.PARTITIONED模式建立一个backup数量为0的cache(将backup数量设为0为了方便模拟partition丢失的场景), 而后往cache里写一些数据,并监听EVT_CACHE_REBALANCE_PART_DATA_LOST事件。
  3. 关掉一个server实例模拟节点故障触发partition loss。
  4. 观察client实例可否收到EVT_CACHE_REBALANCE_PART_DATA_LOST事件,在发生partition loss后启用不一样策略继续读写cache的行为,以及如何重置集群状态让读写恢复正常。

由于server节点的逻辑很简单(实际上2个server节点就是启动后组成一个Ignite集群),咱们看看client节点的代码:tcp

public class IgnitePartitionLossExampleClient {
    private static AtomicBoolean partitionLost = new AtomicBoolean(false);

    public static void main(String[] args) {
        Ignite ignite;

        if (args.length == 1 && !args[0].isEmpty()) {
            //若是启动时指定了配置文件,则用指定的配置文件
            System.out.println("Use " + args[0] + " to start.");
            ignite = Ignition.start(args[0]);
        } else {
            //若是启动时没指定配置文件,则生成一个配置文件
            System.out.println("Create an IgniteConfiguration to start.");
            TcpDiscoverySpi spi = new TcpDiscoverySpi();
            TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
            ipFinder.setMulticastGroup("224.0.0.251");
            spi.setIpFinder(ipFinder);
            IgniteConfiguration cfg = new IgniteConfiguration();
            cfg.setDiscoverySpi(spi);
            cfg.setClientMode(true);
            //默认因为性能缘由,Ignite会忽略全部事件,这里要主动配置须要监听的事件
            cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
            ignite = Ignition.start(cfg);
        }

        // 建立一个TEST缓存, cache mode设为PARTITIONED, backup数量为1, 并把partition loss policy设为READ_WRITE_SAFE
        CacheConfiguration<String, String> cacheCfg = new CacheConfiguration<>();
        cacheCfg.setName("TEST");
        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
        cacheCfg.setBackups(0);
        cacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
        IgniteCache<String, String> cityProvinceCache = ignite.getOrCreateCache(cacheCfg);

        // Local listener that listens to local events.
        IgnitePredicate<CacheRebalancingEvent> locLsnr = evt -> {
            try {
                System.out.println("=========Received event [evt=" + evt.name() + "]==========");
                Collection<Integer> lostPartitions = cityProvinceCache.lostPartitions();
                if (lostPartitions != null) {
                    partitionLost.set(true);
                }
                return true; // Continue listening.
            } catch (Exception e) {
                System.out.println(e);
            }
            System.out.println("=========Stop listening==========");
            return false;
        };

        // Subscribe to specified cache events occuring on local node.
        ignite.events().localListen(locLsnr,
                EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);


        List<String> cities = new ArrayList<String>(Arrays.asList("Edmonton",
                "Calgary", "Markham", "Toronto", "Richmond Hill", "Montreal"));

        // 写入一些数据, key是城市的名字,value是省的名字
        populateCityProvinceData(cityProvinceCache);

        //用下面的while循环不停模拟对cache的读写操做
        while(true) {
            try {
                for (String city : cities) {
                    try {
                            if (!partitionLost.get()) {
                                //若是cache一切正常,则正常读
                                getAndPrintCityProvince(city, cityProvinceCache);
                            } else {
                                //若是cache出现partition lost,模拟错误处理, 咱们这里简单把cache
                                //lost partiton重置,并从新写入数据
                                Collection<Integer> lostPartitions = cityProvinceCache.lostPartitions();
                                System.out.println("Cache lost partitions: " + lostPartitions.toString());
                                ignite.resetLostPartitions(Arrays.asList("TEST"));
                                populateCityProvinceData(cityProvinceCache);
                                partitionLost.set(false);
                            }
                    } catch(CacheException e) {
                        e.printStackTrace();
                    }
                }
                Thread.sleep(1000);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    private static void populateCityProvinceData(IgniteCache<String, String> cityProvinceCache) {
        System.out.println("Populate city province data!");
        cityProvinceCache.put("Edmonton", "Alberta");
        cityProvinceCache.put("Calgary", "Alberta");
        cityProvinceCache.put("Markham", "Ontario");
        cityProvinceCache.put("Toronto", "Ontario");
        cityProvinceCache.put("Richmond Hill", "Ontario");
        cityProvinceCache.put("Montreal", "Quebec");
    }

    private static void getAndPrintCityProvince(String city, IgniteCache<String, String> cityProvinceCache) {
        System.out.println(city + " is in " + cityProvinceCache.get(city));
    }
}
  • 在第30~31行,咱们将backup数量设为0并调用setPartitionLossPolicy函数将cache在partition丢失后的模式改成READ_WRITE_SAFE,这种模式下容许对没有丢失的partition的读写操做,可是对已经丢失的partition的读写操做会失败并抛异常。
  • 在35~52行,咱们建立了一个对CacheRebalancingEvent的监听器,而且经过localListen函数将监听器注册给Ignite,这样在Ignite集群中一旦出现EVT_CACHE_REBALANCE_PART_DATA_LOST事件,该监听器就会被调用。在监听器中,咱们不但打印了触发该listener的事件,还经过cache的lostPartitions函数返回丢失掉的partition的信息,若是确实有partition丢失了,listener还会把partitionLost置为true用来触发第70~71行的修复代码。
  • 在注册监听器时有一点须要注意,Ignite为了性能,默认会忽略对全部事件的通知,为了能获得EVT_CACHE_REBALANCE_PART_DATA_LOST的事件通知,咱们须要在启动Ignite节点时,显式的打开咱们关心的事件通知(代码第22行,也能够经过xml文件配置,具体见实例代码里的ignite-cache.xml配置文件)。
  • 第59行,咱们往cache中写入一些初始数据。接着在62~88行的一个while循环中,咱们不断的从cache里读数据。在每次读cache前,咱们都检查一些partitionLost是否有被置为true。若是没有,咱们就读cache并打印出来(68行);不然,说明cache的某些partition丢失了,咱们经过lostPartitions函数获得丢失的partition的信息,并打印出来。生产环境中,partition丢失表明着数据丢失,这时须要从外部帮忙恢复数据,或者检查下丢失的数据是否重要,而后才能将cache恢复正常读写。在例子中,因为咱们知道所有数据集,因此能够直接调用resetLostPartitions将cache恢复,而且重写一遍数据,这样后面的读操做都能成功。

在关掉一个server节点后,client节点会在console打印以下结果:分布式

=========Received event [evt=CACHE_REBALANCE_PART_DATA_LOST]==========
Cache lost partitions: [0, 3, 6, 7, 8, 9, 10, 11, 12, 19, 22, 23, 24, 27, 32, 34, 36, 37, 38, 39, 40, 42, 44, 50, 51, 52, 53, 56, 57, 59, 61, 64, 66, 69, 70, 71, 76, 78, 79, 80, 82, 83, 86, 87, 88, 91, 92, 95, 97, 98, 100, 101, 103, 106, 108, 112, 115, 117, 120, 121, 123, 124, 125, 128, 129, 135, 137, 138, 140, 141, 144, 145, 148, 149, 150, 151, 155, 157, 161, 162, 165, 166, 167, 169, 170, 171, 172, 179, 181, 183, 185, 188, 190, 191, 200, 201, 202, 204, 205, 212, 213, 214, 216, 217, 222, 223, 225, 226, 230, 231, 233, 235, 238, 239, 240, 242, 243, 244, 245, 246, 247, 255, 256, 257, 259, 261, 262, 263, 264, 266, 268, 271, 273, 274, 278, 281, 282, 284, 285, 288, 290, 291, 293, 296, 297, 299, 302, 306, 307, 312, 314, 316, 317, 318, 319, 322, 325, 326, 330, 333, 335, 336, 337, 339, 340, 343, 344, 345, 346, 348, 349, 351, 352, 354, 356, 359, 360, 362, 365, 367, 369, 370, 371, 373, 374, 379, 381, 383, 385, 386, 387, 391, 396, 398, 401, 403, 404, 405, 407, 410, 412, 413, 423, 425, 426, 427, 431, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 451, 453, 456, 457, 462, 465, 467, 469, 471, 472, 473, 476, 477, 479, 481, 482, 483, 484, 485, 486, 487, 490, 492, 499, 500, 501, 502, 503, 504, 505, 507, 509, 511, 519, 520, 522, 523, 524, 526, 527, 528, 529, 531, 536, 538, 540, 541, 542, 544, 546, 547, 548, 549, 552, 553, 554, 555, 558, 559, 563, 565, 567, 570, 572, 573, 577, 578, 580, 581, 585, 586, 589, 590, 591, 593, 600, 601, 602, 603, 604, 605, 606, 607, 608, 610, 611, 616, 617, 619, 621, 624, 627, 628, 629, 630, 631, 635, 637, 639, 643, 646, 648, 652, 653, 658, 661, 665, 667, 670, 673, 675, 686, 687, 691, 693, 695, 696, 700, 703, 705, 707, 711, 712, 716, 718, 719, 720, 721, 722, 724, 730, 731, 732, 733, 734, 735, 736, 737, 738, 739, 742, 743, 744, 745, 750, 751, 752, 756, 758, 760, 763, 764, 766, 769, 775, 776, 777, 778, 779, 780, 782, 786, 789, 790, 792, 793, 794, 797, 799, 801, 802, 808, 809, 810, 812, 813, 814, 815, 819, 820, 821, 822, 824, 825, 827, 830, 831, 834, 835, 836, 837, 838, 843, 844, 846, 847, 848, 852, 854, 859, 863, 864, 866, 867, 874, 876, 878, 879, 881, 882, 883, 884, 885, 888, 891, 895, 896, 897, 899, 900, 903, 904, 905, 907, 908, 910, 911, 915, 916, 917, 920, 922, 924, 928, 933, 935, 936, 940, 942, 943, 945, 948, 949, 950, 951, 952, 955, 956, 960, 962, 965, 969, 971, 972, 973, 979, 983, 985, 990, 994, 995, 1001, 1003, 1010, 1012, 1013, 1014, 1015, 1017, 1018, 1019, 1020, 1021, 1022, 1023]
Populate city province data!

这表示client节点的确收到EVT_CACHE_REBALANCE_PART_DATA_LOST的事件通知,lostPartitions函数的返回结果也证实了哪些partition丢失了。当咱们把第31行代码注释掉后(至关于咱们把cache的lost partition policy设为了默认的IGNORE),从新编译再跑一样的试验,咱们会发现这时client节点不会收到partition lost的相关事件。

总结


这篇文章咱们介绍了Ignite提供的primary和backup数据之间三种同步模式,以及这三种同步模式的适用场景。咱们还介绍了在出现partition丢失状况下,Ignite提供的事件通知机制。经过一个简单的例子和代码,咱们展现了如何添加监听Ignite的事件,并处理partition丢失事件。这篇文章里用到的例子的完整代码和maven工程能够在这里找到。 Client对应的xml配置文件在src/main/resources目录下。

下一篇,咱们将继续深刻了解一下Ignite cache除了简单的key/value查询,还提供了哪些功能强大的查询方式。

相关文章
相关标签/搜索