.NET Core微服务之基于Polly+AspectCore实现熔断与降级机制

Tip: 此篇已加入.NET Core微服务基础系列文章索引html

1、熔断、降级与AOP

1.1 啥是熔断?

  在广义的解释中,熔断主要是指为控制股票、期货或其余金融衍生产品的交易风险,为其单日价格波动幅度规定区间限制,一旦成交价触及区间上下限,交易则自动中断一段时间(“熔即断”),或就此“躺平”而不得超过上限或下限(“熔而不断”)。git

  而对于微服务来讲,熔断就是咱们常说的“保险丝”,意为当服务出现某些情况时,切断服务,从而防止应用程序不断地常识执行可能会失败的操做形成系统的“雪崩”,或者大量的超时等待致使系统卡死等状况,不少地方也将其成为“过载保护”。github

1.2 啥是降级?

  降级的目的就是当某个服务提供者发生故障的时候,向调用方返回一个替代响应或者错误响应数据库

  例如:假设有一个短信服务,其调用联通接口服务器发送短信服务(假设这里调用联通接口最方便,最省事也最经济)失败以后,会尝试改用移动短信服务器(假设这里调用移动服务器比较不方便也不经济)发送,若是移动服务器调用也失败,那么还会尝试改用电信短信服务器(假设这里调用电信服务器最不省事和最不经济),若是还失败,则返回“失败”响应;编程

  降级的另外一个概念也能够看做是服务的“选择性放弃”,好比在双11或618等大型的电商活动日中,在高峰值的情形下,通常的电商系统都会采用部分服务的优先级下降或者干脆延时或中止服务,以确保主要的服务可以使用最大化的资源为客户提供服务。等待峰值降低以后,再经过处理恢复那些降级的服务的原有优先级。json

1.3 啥是AOP?

  AOP(Aspect Oriented Programming)意为面向切面编程,它是指在运行时,动态地将代码切入到类的指定方法、指定位置上的编程思想就是面向切面的编程。好比说,咱们在两个类中,可能都须要在每一个方法中作日志。按面向对象的设计方法,咱们就必须在两个类的方法中都加入日志的内容。也许他们是彻底相同的,但就是由于面向对象的设计让类与类之间没法联系,而不能将这些重复的代码统一块儿来。而AOP就是为了解决这个问题而生的,通常而言,咱们把切入到指定类指定方法的代码片断称为切面,而切入到哪些类、哪些方法则叫切入点。有了AOP,咱们就能够把几个类共有的代码,抽取到一个切片中,等到须要时再切入对象中去,从而改变其原有的行为。api

  AOP是OOP(Object Oriented Programming)的补充,OOP从横向上区分出一个个的类来,而AOP则从纵向上向对象中加入特定的代码。有了AOP,OOP变得立体了。关于AOP的更多细节和讨论,能够浏览知乎的这篇帖子:《什么是AOP?缓存

2、Polly的基本使用

2.1 Polly极简介绍

  Polly是一个被.NET基金会承认的弹性和瞬态故障处理库,容许咱们以很是顺畅和线程安全的方式来执诸如行重试,断路,超时,故障恢复等策略,其主要功能以下:安全

  • 功能1:重试(Retry)
  • 功能2:断路器(Circuit-Breaker)
  • 功能3:超时检测(Timeout)
  • 功能4:缓存(Cache)
  • 功能5:降级(Fallback)

  Polly的策略主要由“故障”和“动做”两个部分组成,“故障”能够包括异常、超时等状况,“动做”则包括Fallback(降级)、重试(Retry)、熔断(Circuit-Breaker)等。策略则用来执行业务代码,当业务代码出现了“故障”中的状况时就开始执行“动做”。服务器

2.2 Polly基础使用

  *.这里只介绍几个咱们须要用到的功能,其余功能请浏览参考资料关于Polly的部分

  (1)经过NuGet安装,最新版本:6.0.1

NuGet>Install-Package Polly  

  (2)FallBack => 当出现故障,则进入降级动做

    public static void Case1()
    {
        ISyncPolicy policy = Policy.Handle<ArgumentException>()
            .Fallback(() =>
            {
                Console.WriteLine("Error occured");
            });

        policy.Execute(() =>
        {
            Console.WriteLine("Job Start");

            throw new ArgumentException("Hello Polly!");

            Console.WriteLine("Job End");
        });
    }

  执行结果以下图所示:这里捕捉的是ArgumentException, 若是想捕捉全部的Exception,请设置Policy.Handle<Exception>,不过这样就扩大了范围。

  

  (3)Retry => 重试,比较容易理解

    public static void Case2()
    {
        ISyncPolicy policy = Policy.Handle<Exception>().Retry(3);

        try
        {
            policy.Execute(() =>
            {
                Console.WriteLine("Job Start");
                if (DateTime.Now.Second % 10 != 0)
                {
                    throw new Exception("Special error occured");
                }
                Console.WriteLine("Job End");
            });
        }
        catch (Exception ex)
        {
            Console.WriteLine("There's one unhandled exception : " + ex.Message);
        }
    }

  执行结果以下图所示:能够看到,这里重试了三次,仍然没有知足条件(DateTime.Now.Second % 10 == 0),所以进入了外部的未处理异常catch块中。

  

  (4)CircuitBreaker => 短路保护,当一块业务代码/服务 出现了N次错误,则把“熔断器”(保险丝)熔断,等待一段时间后才容许再次执行,在这段等待的时间内若是再执行则直接抛出BrokenCircuitException异常。这个也很好理解,好比咱们的手机屏幕密码,若是输错了N次以后,手机会拒绝咱们再次输入,而是让咱们等待20 ~ 30s 以后再输入,若是等待以后再输错N次,则再次进入等待。

  这里假设咱们设置一个短路保护策略:当发生了故障的时候,则重试了5次仍是有故障(代码中的6表明的是在执行短路保护策略以前容许6次故障),那么久中止服务10s钟,10s以后再容许重试。

    public static void Case3()
    {
        // Stop for 10s after retry 6 times
        ISyncPolicy policy = Policy.Handle<Exception>()
            .CircuitBreaker(6, TimeSpan.FromSeconds(10));

        while (true)
        {
            try
            {
                policy.Execute(() =>
                {
                    Console.WriteLine("Job Start");
                    throw new Exception("Special error occured");
                    Console.WriteLine("Job End");
                });
            }
            catch (Exception ex)
            {
                Console.WriteLine("There's one unhandled exception : " + ex.Message);
            }

            Thread.Sleep(500);
        }
    }

  执行结果以下图所示:出现了6次故障以后,直接给咱们跑出了短路保护的异常,“The circuit is now open and is not allowing calls”.

  

  (5)Timeout 与 Wrap => Wrap是指策略封装,能够把多个ISyncPolicy合并到一块儿执行。Timeout则是指超时处理,可是超时策略通常不能直接使用,而是其其余策略封装到一块儿使用。

  这里咱们封装两个策略,一个是基本的Fallback,另外一个则是超时策略,若是调用执行时间超过2s则触发Fallback。

  这里涉及到Polly中关于超时的两个策略:一个是悲观策略(Pessimistic),一个是乐观策略(Optimistic)。其中,悲观策略超时后会直接抛异常,而乐观策略则不会,而只是触发CancellationTokenSource.Cancel函数,须要等待委托自行终止操做。通常状况下,咱们都会用悲观策略。

    public static void Case4()
    {
        try
        {
            ISyncPolicy policyException = Policy.Handle<TimeoutRejectedException>()
                .Fallback(() =>
                {
                    Console.WriteLine("Fallback");
                });
            ISyncPolicy policyTimeout = Policy.Timeout(3, Polly.Timeout.TimeoutStrategy.Pessimistic);
            ISyncPolicy mainPolicy = Policy.Wrap(policyTimeout, policyException);
            mainPolicy.Execute(() =>
            {
                Console.WriteLine("Job Start...");
                Thread.Sleep(5000);
                //throw new Exception();
                Console.WriteLine("Job End...");
            });
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Unhandled exception : {ex.GetType()} : {ex.Message}");
        }
    }

  执行结果以下图所示:

  

  除此以外,Polly还提供了一些异步方法供调用以实现以上介绍的功能,好比在业务代码中有一些Http的调用或者IO操做时,不妨用用异步操做来提升一点效率,能够看下面这个例子:

    public static async void Case5()
    {
        Policy<byte[]> policy = Policy<byte[]>.Handle<Exception>()
            .FallbackAsync(async c =>
            {
                Console.WriteLine("Executed Error!");
                return new byte[0];
            }, async r =>
            {
                Console.WriteLine(r.Exception);
            });

        policy = policy.WrapAsync(Policy.TimeoutAsync(20, TimeoutStrategy.Pessimistic,
            async (context, timespan, task) =>
            {
                Console.WriteLine("Timeout!");
            }));

        var bytes = await policy.ExecuteAsync(async ()=>
        {
            Console.WriteLine("Start Job");
            HttpClient httpClient = new HttpClient();
            var result = await httpClient.GetByteArrayAsync("https://images2018.cnblogs.com/blog/381412/201806/381412-20180606230929894-145212290.png");
            Console.WriteLine("Finish Job");

            return result;
        });

        Console.WriteLine($"Length of bytes : {bytes.Length}");
    }

  执行结果以下图所示:

  

  至于Polly更多的功能和用法,能够参阅官方文档,这里再也不赘述。

3、AspectCore的基本使用

3.1 为何要用AOP框架

  从上面的例子能够看出,若是直接使用Polly,那么就会形成咱们的业务代码中混杂大量的业务无关的代码。因此,咱们会使用AOP的方式来封装Polly,嗯,首先咱们先找一个支持的.NET Core的AOP框架吧,目前你们都在用AspectCore(国产,做者Lemon),它采用动态动态代理/织入,而且支持异步方法的拦截。

  快快经过NuGet安装一个吧:

NuGet>Install-Package AspectCore.Core  

3.2 AspectCore的极简使用

  这里假设咱们要针对一个类的某些类的某些方法进行拦截,咱们通常会通过一下几个步骤:

  (1)编写一个拦截器,通常继承自AbstractInterceptorAttribute

    /// <summary>
    /// 自定义拦截器
    /// </summary>
    public class CustomInterceptorAttribute : AbstractInterceptorAttribute
    {
        /// <summary>
        /// 每一个被拦截的方法中执行
        /// </summary>
        /// <param name="context"></param>
        /// <param name="next"></param>
        /// <returns></returns>
        public override async Task Invoke(AspectContext context, AspectDelegate next)
        {
            try
            {
                Console.WriteLine("Before service call");
                await next(context); // 执行被拦截的方法
            }
            catch (Exception)
            {
                Console.WriteLine("Service threw an exception");
                throw;
            }
            finally
            {
                Console.WriteLine("After service call");
            }
        }
    }

  这里咱们经过为被拦截方法增长一些处理前和处理后的logic来实现AOP。

  (2)编写须要被代理拦截的类

    /// <summary>
    /// 实现AoP的两个要求:
    /// 1.public 类
    /// 2.virtual 方法
    /// </summary>
    public class Person
    {
        [CustomInterceptor]
        public virtual void Say(string message)
        {
            Console.WriteLine($"Service calling ... => {message}");
        }
    }

  能够看到咱们在要拦截的方法Say()的声明之上加了一个Attribute:CustomInterceptor,正是咱们以前新增的。

  (3)经过AspectCore建立代理对象实现AOP

    public class Program
    {
        public static void Main(string[] args)
        {
            ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder();
            using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build())
            {
                Person p = proxyGenerator.CreateClassProxy<Person>();
                p.Say("edisonchou.cnblogs.com");
            }
            Console.ReadKey();
        }
    }

  执行结果以下图所示:

  

  代码很清晰,再也不解释。直到这里,咱们看到了不论是Polly的使用,仍是AspectCore的使用,都存在一些业务无关的声明代码,并且咱们须要结合Polly和AspectCore才能完整地实现适合ASP.NET Core的熔断降级组件,下面咱们就来模仿Spring Cloud中的Hystrix(能够参考这一篇文章来了解Spring Cloud Hystrix是个啥玩意儿)

4、Polly+AspectCore的结合使用

4.1 封装一个Hystrix

NuGet>Install-Package Polly

NuGet>Install-Package AspectCore.Core

NuGet>Install-Package Microsoft.Extensions.Caching.Memory  

    [AttributeUsage(AttributeTargets.Method)]
    public class HystrixCommandAttribute : AbstractInterceptorAttribute
    {
        /// <summary>
        /// 最多重试几回,若是为0则不重试
        /// </summary>
        public int MaxRetryTimes { get; set; } = 0;

        /// <summary>
        /// 重试间隔的毫秒数
        /// </summary>
        public int RetryIntervalMilliseconds { get; set; } = 100;

        /// <summary>
        /// 是否启用熔断
        /// </summary>
        public bool IsEnableCircuitBreaker { get; set; } = false;

        /// <summary>
        /// 熔断前出现容许错误几回
        /// </summary>
        public int ExceptionsAllowedBeforeBreaking { get; set; } = 3;

        /// <summary>
        /// 熔断多长时间(毫秒)
        /// </summary>
        public int MillisecondsOfBreak { get; set; } = 1000;

        /// <summary>
        /// 执行超过多少毫秒则认为超时(0表示不检测超时)
        /// </summary>
        public int TimeOutMilliseconds { get; set; } = 0;

        /// <summary>
        /// 缓存多少毫秒(0表示不缓存),用“类名+方法名+全部参数ToString拼接”作缓存Key
        /// </summary>

        public int CacheTTLMilliseconds { get; set; } = 0;

        private static ConcurrentDictionary<MethodInfo, Policy> policies 
            = new ConcurrentDictionary<MethodInfo, Policy>();

        private static readonly IMemoryCache memoryCache 
            = new MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions());

        /// <summary>
        /// HystrixCommandAttribute
        /// </summary>
        /// <param name="fallBackMethod">降级的方法名</param>
        public HystrixCommandAttribute(string fallBackMethod)
        {
            this.FallBackMethod = fallBackMethod;
        }

        public string FallBackMethod { get; set; }

        public override async Task Invoke(AspectContext context, AspectDelegate next)
        {
            //一个HystrixCommand中保持一个policy对象便可
            //其实主要是CircuitBreaker要求对于同一段代码要共享一个policy对象
            //根据反射原理,同一个方法的MethodInfo是同一个对象,可是对象上取出来的HystrixCommandAttribute
            //每次获取的都是不一样的对象,所以以MethodInfo为Key保存到policies中,确保一个方法对应一个policy实例
            policies.TryGetValue(context.ServiceMethod, out Policy policy);
            lock (policies)//由于Invoke多是并发调用,所以要确保policies赋值的线程安全
            {
                if (policy == null)
                {
                    policy = Policy.NoOpAsync();//建立一个空的Policy
                    if (IsEnableCircuitBreaker)
                    {
                        policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak)));
                    }
                    if (TimeOutMilliseconds > 0)
                    {
                        policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic));
                    }
                    if (MaxRetryTimes > 0)
                    {
                        policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds)));
                    }
                    Policy policyFallBack = Policy
                    .Handle<Exception>()
                    .FallbackAsync(async (ctx, t) =>
                    {
                        AspectContext aspectContext = (AspectContext)ctx["aspectContext"];
                        var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
                        Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
                        //不能以下这样,由于这是闭包相关,若是这样写第二次调用Invoke的时候context指向的
                        //仍是第一次的对象,因此要经过Polly的上下文来传递AspectContext
                        //context.ReturnValue = fallBackResult;
                        aspectContext.ReturnValue = fallBackResult;
                    }, async (ex, t) => { });

                    policy = policyFallBack.WrapAsync(policy);
                    //放入
                    policies.TryAdd(context.ServiceMethod, policy);
                }
            }

            //把本地调用的AspectContext传递给Polly,主要给FallbackAsync中使用,避免闭包的坑
            Context pollyCtx = new Context();
            pollyCtx["aspectContext"] = context;

            //Install-Package Microsoft.Extensions.Caching.Memory
            if (CacheTTLMilliseconds > 0)
            {
                //用类名+方法名+参数的下划线链接起来做为缓存key
                string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType
                                                                   + "." + context.ServiceMethod + string.Join("_", context.Parameters);
                //尝试去缓存中获取。若是找到了,则直接用缓存中的值作返回值
                if (memoryCache.TryGetValue(cacheKey, out var cacheValue))
                {
                    context.ReturnValue = cacheValue;
                }
                else
                {
                    //若是缓存中没有,则执行实际被拦截的方法
                    await policy.ExecuteAsync(ctx => next(context), pollyCtx);
                    //存入缓存中
                    using (var cacheEntry = memoryCache.CreateEntry(cacheKey))
                    {
                        cacheEntry.Value = context.ReturnValue;
                        cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds);
                    }
                }
            }
            else//若是没有启用缓存,就直接执行业务方法
            {
                await policy.ExecuteAsync(ctx => next(context), pollyCtx);
            }
        }
    }

  这个HystrixCommand并不是我原创,而是引用的杨中科老师在.NET微服务中的代码,你们也能够直接经过NuGet安装这个封装好的Package:

NuGet>Install-Package RuPeng.HystrixCore

  这里再也不多讲解代码,由于都有注释,你们经过一个案例调试如下就了解流程了。

4.2 在ASP.NET Core的使用

  (1)为了简化代理类对象的注入,不用在ASP.NET Core中再经过ProxyGeneratorBuilder进行注入,咱们引入一个AspectCore的DI扩展包:

NuGet>Install-Package AspectCore.Extensions.DependencyInjection

  (2)改写Startup类的ConfigureService方法,把返回值从void改成IServiceProvider

    // This method gets called by the runtime. Use this method to add services to the container.
    public IServiceProvider ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();
        .......
        // AoP - AspectCore
        RegisterServices(this.GetType().Assembly, services);
        return services.BuildAspectCoreServiceProvider();
    }

  这里BuildAspectCoreServiceProvider就是让AspectCore接管注入。RegisterService方法以下所示:

    private static void RegisterServices(Assembly asm, IServiceCollection services)
    {
        foreach (var type in asm.GetExportedTypes())
        {
            bool hasHystrixCommand = type.GetMethods().Any(m =>
                m.GetCustomAttribute(typeof(HystrixCommandAttribute)) != null);
            if (hasHystrixCommand)
            {
                services.AddSingleton(type);
            }
        }
    }

  这里使用反射,筛选出那些带有HystrixCommandAttribute的类进行注入,从而减小一行一行注入的代码工做量。

  (3)这里假设咱们须要进行熔断保护的方法所在类是一个ProductService类,它主要的功能就是经过HttpClient去调用ProductService的某个API,它的定义以下:

    public class ProductService
    {
        [HystrixCommand(nameof(GetAllProductsFallBackAsync),
            IsEnableCircuitBreaker = true,
            ExceptionsAllowedBeforeBreaking = 3,
            MillisecondsOfBreak = 1000 * 5)]
        public virtual async Task<string> GetAllProductsAsync(string productType)
        {
            Console.WriteLine($"-->>Starting get product type : {productType}");
            string str = null;
            str.ToString();
            
            // to do : using HttpClient to call outer service to get product list

            return $"OK {productType}";
        }

        public virtual async Task<string> GetAllProductsFallBackAsync(string productType)
        {
            Console.WriteLine($"-->>FallBack : Starting get product type : {productType}");

            return $"OK for FallBack  {productType}";
        }
    }

  这里假设咱们主要针对GetAllProductsAsync这个方法进行熔断保护,假设它会调用另外一个Service的获取产品的接口,这个接口会访问核心数据库,其天天的访问量很大,咱们对此接口进行熔断保护,设置在启用熔断保护前容许两次故障(这里主要指异常),熔断保护时间为5s。

  在Controller中,经过构造函数注入:

    [Produces("application/json")]
    [Route("api/Client")]
    public class ClientController : Controller
    {
        private readonly IClientService clientService;
        private readonly ProductService productService;

        public ClientController(IClientService _clientService, ProductService _productService)
        {
            clientService = _clientService;
            productService = _productService;
        }

        [HttpGet("{id}")]
        public async Task<string> Get(int id)
        {
            var product = await productService.GetAllProductsAsync("B");

            return product;
        }
    }

  为了可以在控制台中看到熔断的信息,咱们增长一句Console.WriteLine到HystrixCommandAttribute中:

    // 启用熔断保护(CircuitBreaker)
    if (IsEnableCircuitBreaker)
    {
        policy = policy.WrapAsync(Policy.Handle<Exception>()
            .CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking,
            TimeSpan.FromMilliseconds(MillisecondsOfBreak), (ex, ts) =>
            {
                    // assuem to do logging
                    Console.WriteLine($"Service API OnBreak -- ts = {ts.Seconds}s, ex.message = {ex.Message}");
            }, () => {}));
    }

  这样当Polly启用熔断时便会在控制台中输出一段消息,实际使用中能够往日志中写一段日志信息。

  (4)开起内置服务器进行测试

  Step1.借助命令行启动一个WebAPI程序

  Step2.借助Postman/SoapUI等API测试工具,输入咱们的URL,测试结果以下图所示:

  

  能够看到咱们经过在Postman中访问这个URL从而触发Service中的异常,两次异常以后,便进入了熔断保护时间,此后5s内的访问都没有再进行实际代码的执行,而直接进入了Fallback方法执行降级后的逻辑。5s保护时间以后,则再次进入实际代码的执行。目前,这个Hystrix还存在一些问题,需继续完善,还没法正式投入使用,后续会结合Polly和Ocelot,在API网关处作统一熔断保护。

5、小结

  本篇首先介绍了一下熔断、降级以及AOP的基本概念,而后从两个流行的库Polly和AspectCore的基本使用开始了解如何在.NET Core代码中实现熔断机制和AOP,最后经过结合Polly+AspectCore封装了一个Hystrix来介绍了一下如何在ASP.NET Core程序中如何作到标签式地快速实现熔断降级机制。后续,会将Polly与Ocelot结合实践API网关,在Ocelot网关处作统一熔断保护。

参考资料

杨中科,《.NET微服务直播课课件(第二版)》

guwei,《谈谈我对服务熔断、服务降级的理解

Jeffcky,《已被.NET基金会承认的弹性和瞬态故障处理库Polly介绍

Lemon,《Asp.Net Core轻量级Aop解决方案:AspectCore

Sunday_Xiao,《服务熔断保护Spring Cloud Hystrix

Catcher Wong, 《再谈Circuit Breaker之使用Polly

Polly官方文档,https://github.com/App-vNext/Polly

AspectCore官方文档,https://github.com/dotnetcore/AspectCore-Framework

 

相关文章
相关标签/搜索