近期因为须要进行分布式链路跟踪系统的技术选型,因此一直在研究链路跟踪相关的框架。做为能在.Net Core中使用的APM,SkyWalking天然成为了首选。SkyAPM-dotnet是SkyWalking在.Net Core端的探针实现,其主要的收集日志的手段就是基于DiagnosticSource来进行诊断跟踪的。不得不说SkyAPM-dotnet的设计仍是很是优秀的,它自己定义了一套很是规范的标准,并且提供了很是良好的扩展性,虽然框架自己可支持的采集端有限,可是基于这套标准扩展起来仍是很是方便的。html
关于DiagnosticSource它自己是一个基于发布订阅模式的工做模式,因为它自己的实现方式是异步的,因此不只仅能够把它用到日志上,还能够用它实现异步操做,或者用它简化实现发布订阅的功能。DiagnosticSource自己是一个抽象类,咱们最经常使用到的是它的子类DiagnosticListener,经过DiagnosticSource的Write方法实现发布一条有具体名称的消息,而后经过IObserver去订阅消息。DiagnosticListener能够实现不一样的实例,每一个实例能够有本身的名称,每一个实例还能够发布不一样名称的消息,比如一个在写代码的时候咱们能够定义多个程序集,一个程序集下面能够包含多个命名空间。git
上面咱们大体的介绍了关于DiagnosticSource相关的概念,相信你们已经有了初步的了解,接下来咱们就来看一下在代码中如何使用DiagnosticSource,还说到了它一个重要的子类DiagnosticListener,基本上关于DiagnosticSource的工做方式都是围绕着DiagnosticListener实现的,首先咱们来看一下如何发布一条消息github
//声明DiagnosticListener并命名为MyTest DiagnosticSource diagnosticSource = new DiagnosticListener("MyTest"); string pubName = "MyTest.Log"; //判断是否存在MyTest.Log的订阅者 if (diagnosticSource.IsEnabled(pubName)) { //发送名为MyTest.Log的消息,包含Name,Address两个属性 diagnosticSource.Write(pubName, new { Name = "old王", Address="隔壁" }); }
经过这种方式,咱们就能够完成针对消息的发布,其中用到了IsEnabled方法,这个方法是在实际使用DiagnosticSource过程当中比较经常使用的方法,用于判断是够存在对应名称的消费者,这样能够有效的避免发送消息浪费。
发送相对仍是比较简单的,接下来咱们看一下如何订阅发布的消息。上面咱们提到了订阅消息是经过IObserver接口实现的,IObserver表明了订阅者。虽然咱们经过DiagnosticSource去发布消息,可是真正描述发布者身份的是IObservable接口,IObservable的惟一方法Subscribe是用来注册订阅者IObserver,可是默认系统并无为咱们提供一个具体的实现类,因此咱们须要定义一个IObserver订阅者的实现类网络
public class MyObserver<T>:IObserver<T> { private Action<T> _next; public MyObserver(Action<T> next) { _next = next; } public void OnCompleted() { } public void OnError(Exception error) { } public void OnNext(T value) => _next(value); }
有了具体的订阅者实现类,咱们就能够为发布者注册订阅者了,一样是使用DiagnosticListener,我的认为虽然操做都是经过DiagnosticSource来完成的,但它只是一个外观类,可是并不能直接描述发布者和订阅者自己。接下来咱们看一下具体实现app
//AllListeners获取全部发布者,Subscribe为发布者注册订阅者MyObserver DiagnosticListener.AllListeners.Subscribe(new MyObserver<DiagnosticListener>(listener => { //判断发布者的名字 if (listener.Name == "MyTest") { //获取订阅信息 listener.Subscribe(new MyObserver<KeyValuePair<string, object>>(listenerData => { System.Console.WriteLine($"监听名称:{listenerData.Key}"); dynamic data = listenerData.Value; //打印发布的消息 System.Console.WriteLine($"获取的信息为:{data.Name}的地址是{data.Address}"); })); listener.SubscribeWithAdapter(new MyDiagnosticListener()); } }));
具体实现可总结为两步,首先为发布者注册订阅者,而后获取订阅者获取发布的消息。这种写法仍是比较复杂的,首先须要实现订阅者类,而后经过一系列复杂的操做,才能完成消息订阅,而后还要本身获取发布的消息,解析具体的消息值,总之操做流程很是繁琐。微软彷佛也意识到了这个问题,因而乎给我提供了一个关于实现订阅者的便利方法,编辑项目文件引入DiagnosticAdapter包框架
<PackageReference Include="Microsoft.Extensions.DiagnosticAdapter" Version="3.1.7" />
或者经过包管理器直接搜索安装,道路千万条都是通罗马。经过这个包解决了咱们两个痛点,首先是关于订阅者的注册难问题,其次解决了关于发布消息解析难的痛点。咱们能够直接订阅一个适配类来充当订阅者的载体,其次咱们能够定义方法模拟订阅去订阅消息,而这个方法的参数就是咱们发布的消息内容。说了这么多,不如直接上代码异步
public class MyDiagnosticListener { //发布的消息主题名称 [DiagnosticName("MyTest.Log")] //发布的消息参数名称和发布的属性名称要一致 public void MyLog(string name,string address) { System.Console.WriteLine($"监听名称:MyTest.Log"); System.Console.WriteLine($"获取的信息为:{name}的地址是{address}"); } }
咱们能够随便定义一个类来充当订阅者载体,类里面能够自定义方法来实现获取解析消息的实现。想要让方法能够订阅消息,须要在方法上声明DiagnosticName,而后名称就是你要订阅消息的名称,而方法的参数就是你发布消息的字段属性名称,这里须要注意的是订阅的参数名称须要和发布声明属性名称一致。
而后咱们直接能够经过这个类去接收订阅消息async
DiagnosticListener.AllListeners.Subscribe(new MyObserver<DiagnosticListener>(listener => { if (listener.Name == "MyTest") { //适配订阅 listener.SubscribeWithAdapter(new MyDiagnosticListener()); } }));
可能你以为这样仍是不够好,由于仍是没有脱离须要自定义订阅者,这里还有更简洁的实现方式。细心的你可能已经发现了SubscribeWithAdapter是DiagnosticListener的扩展方法,而咱们声明DiagnosticSource就是使用的DiagnosticListener实例,因此上面的代码能够简化为一下方式分布式
DiagnosticListener diagnosticListener = new DiagnosticListener("MyTest"); DiagnosticSource diagnosticSource = diagnosticListener; //直接去适配订阅者 diagnosticListener.SubscribeWithAdapter(new MyDiagnosticListener()); string pubName = "MyTest.Log"; if (diagnosticSource.IsEnabled(pubName)) { diagnosticSource.Write(pubName, new { Name = "old王", Address="隔壁" }); }
这种方式也是咱们比较推荐的使用方式,极大的节省了工做的方式,并且代码很是的简洁。可是存在惟一的不足,这种写法只能针对特定的DiagnosticListener进行订阅处理,若是你须要监听全部发布者,就须要使用DiagnosticListener.AllListeners.Subscribe的方式。ide
在.Net Core的源码中,微软默认在涉及到网络请求或处理请求等许多重要的节点都使用了DiagnosticListener来发布拦截的消息,接下来就罗列一些我知道的比较常见的埋点,经过这些操做咱们就能够看出,诊断日志仍是很便利的,并且微软在.Net Core中也很是重视它的使用。
当咱们经过ConfigureWebHostDefaults配置Web主机的时候,程序就已经默认给咱们注入了诊断名称为Microsoft.AspNetCore的DiagnosticListener和DiagnosticSource,这样咱们就能够很方便的在程序中直接获取DiagnosticListener实例去发布消息或者监听发布的内部消息,具体注入逻辑位于能够去GenericWebHostBuilder类中查看[点击查看源码👈]
var listener = new DiagnosticListener("Microsoft.AspNetCore"); services.TryAddSingleton<DiagnosticListener>(listener); services.TryAddSingleton<DiagnosticSource>(listener);
而后在Server启动的时候传递了DiagnosticListener实例[点击查看源码👈]
var httpApplication = new HostingApplication(application, Logger, DiagnosticListener, HttpContextFactory); await Server.StartAsync(httpApplication, cancellationToken);
这样在Server运行期间咱们能够经过DiagnosticListener诊断跟踪请求相关的信息,咱们能够看下在处理请求的过程当中DiagnosticListener都发布了哪些消息,咱们找到发送诊断跟踪的位置位于HostingApplicationDiagnostics中[点击查看源码👈],这事集中处理请求相关的诊断跟踪,接下来咱们就大体查看一下它发布了哪些事件消息,首先找到定义发布名称的属性
private const string ActivityName = "Microsoft.AspNetCore.Hosting.HttpRequestIn"; private const string ActivityStartKey = ActivityName + ".Start"; private const string ActivityStopKey = ActivityName + ".Stop"; private const string DeprecatedDiagnosticsBeginRequestKey = "Microsoft.AspNetCore.Hosting.BeginRequest"; private const string DeprecatedDiagnosticsEndRequestKey = "Microsoft.AspNetCore.Hosting.EndRequest"; private const string DiagnosticsUnhandledExceptionKey = "Microsoft.AspNetCore.Hosting.UnhandledException";
经过这些发布消息的名称咱们就能够看出,在请求开始、请求进入、请求结束、请求中止、请求异常等都发布了诊断消息,咱们以BeginRequest为例查看一下具体发送的消息
if (_diagnosticListener.IsEnabled(DeprecatedDiagnosticsBeginRequestKey)) { startTimestamp = Stopwatch.GetTimestamp(); RecordBeginRequestDiagnostics(httpContext, startTimestamp); }
找到RecordBeginRequestDiagnostics方法的实现
[MethodImpl(MethodImplOptions.NoInlining)] private void RecordBeginRequestDiagnostics(HttpContext httpContext, long startTimestamp) { _diagnosticListener.Write( DeprecatedDiagnosticsBeginRequestKey, new { httpContext = httpContext, timestamp = startTimestamp }); }
从这里咱们能够看出在BeginRequest中诊断日志发出的消息中包含了HttpContext和开始时间戳信息,而后再来看一下请求结束发布的诊断消息
[MethodImpl(MethodImplOptions.NoInlining)] private void RecordEndRequestDiagnostics(HttpContext httpContext, long currentTimestamp) { _diagnosticListener.Write( DeprecatedDiagnosticsEndRequestKey, new { httpContext = httpContext, timestamp = currentTimestamp }); }
经过发布的这些跟踪日志咱们能够获取请求信息,请求时间而且能获得输出信息和结束时间,有了这些关键信息,咱们就能够监听请Asp.Net Core处理请求的状况,咱们上面提到过SkyAPM-dotnet正是经过这些发出诊断跟踪日志,来实现对程序无入侵的方式来处理应用系统监控的,具体咱们能够查看相关实现,咱们找到订阅这些消息的地方
[点击查看源码👈],拿出来类的结构,大体以下
public class HostingTracingDiagnosticProcessor : ITracingDiagnosticProcessor { public string ListenerName { get; } = "Microsoft.AspNetCore"; [DiagnosticName("Microsoft.AspNetCore.Hosting.BeginRequest")] public void BeginRequest([Property] HttpContext httpContext) { } [DiagnosticName("Microsoft.AspNetCore.Hosting.EndRequest")] public void EndRequest([Property] HttpContext httpContext) { } [DiagnosticName("Microsoft.AspNetCore.Diagnostics.UnhandledException")] public void DiagnosticUnhandledException([Property] HttpContext httpContext, [Property] Exception exception) { } [DiagnosticName("Microsoft.AspNetCore.Hosting.UnhandledException")] public void HostingUnhandledException([Property] HttpContext httpContext, [Property] Exception exception) { } //[DiagnosticName("Microsoft.AspNetCore.Mvc.BeforeAction")] public void BeforeAction([Property] ActionDescriptor actionDescriptor, [Property] HttpContext httpContext) { } //[DiagnosticName("Microsoft.AspNetCore.Mvc.AfterAction")] public void AfterAction([Property] ActionDescriptor actionDescriptor, [Property] HttpContext httpContext) { } }
不得不认可SkyAPM-dotnet很是巧妙的利用了系统内部发出的诊断跟踪日志,实现了对请求的处理跟踪,真的是很是优秀。
上面咱们看到的是AspNetCore处理请求的诊断日志埋点,在发出请求的HttpClient中,微软也作了埋点处理。咱们在以前的文章.NET Core HttpClient源码探究中提到过HttpClient经过HttpClientHandler发送请求的,在HttpClientHandler SendAsync方法中咱们能够看到以下实现
protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { return DiagnosticsHandler.IsEnabled() && _diagnosticsHandler != null ? _diagnosticsHandler.SendAsync(request, cancellationToken) : _underlyingHandler.SendAsync(request, cancellationToken); }
也就是说若是知足DiagnosticsHandler.IsEnabled()而且_diagnosticsHandler不为空的状况下将会使用DiagnosticsHandler发送请求,关于DiagnosticsHandler.IsEnabled()的大体实现逻辑以下
if (AppContext.TryGetSwitch("System.Net.Http.EnableActivityPropagation", out bool enableActivityPropagation)) { return enableActivityPropagation; } string? envVar = Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_HTTP_ENABLEACTIVITYPROPAGATION"); if (envVar != null && (envVar.Equals("false", StringComparison.OrdinalIgnoreCase) || envVar.Equals("0"))) { return false; } return true;
经过这个逻辑能够看出,默认状况下咱们不作特殊处理返回的就是true,也就是说发送请求会经过DiagnosticsHandler,咱们找到DiagnosticsHandler的实现[点击查看源码👈],抽离出来SendAsyncCore方法中关于诊断跟踪的核心实现逻辑,大体以下
DiagnosticListener diagnosticListener = new DiagnosticListener("HttpHandlerDiagnosticListener"); if (diagnosticListener.IsEnabled("System.Net.Http.Request")) { long timestamp = Stopwatch.GetTimestamp(); loggingRequestId = Guid.NewGuid(); //请求开始以前发送诊断日志 diagnosticListener.Write("System.Net.Http.Request",new RequestData(request, loggingRequestId, timestamp)); } HttpResponseMessage? response = null; TaskStatus taskStatus = TaskStatus.RanToCompletion; try { response = async ? await base.SendAsync(request, cancellationToken).ConfigureAwait(false) : base.Send(request, cancellationToken); return response; } catch (OperationCanceledException) { taskStatus = TaskStatus.Canceled; throw; } catch (Exception ex) { taskStatus = TaskStatus.Faulted; if (diagnosticListener.IsEnabled("System.Net.Http.Exception")) { //若是请求出现异常发出异常消息诊断日志 diagnosticListener.Write("System.Net.Http.Exception", new ExceptionData(ex, request)); } throw; } finally { if (activity != null) { diagnosticListener.StopActivity(activity, new ActivityStopData(response,request,taskStatus)); } if (diagnosticListener.IsEnabled("System.Net.Http.Response")) { long timestamp = Stopwatch.GetTimestamp(); //获得输出结果后发送诊断日志 diagnosticListener.Write("System.Net.Http.Response",new ResponseData(response,loggingRequestId,timestamp,taskStatus)); } }
一样的思路HttpClient会在发送请求以前发出请求信息相关的诊断跟踪,会在获得相应以后发送响应相关诊断跟踪,经过这些信息咱们能够捕获到由程序发出的Http请求相关的信息,从而监控请求相关的数据,咱们来看一下SkyAPM-dotnet订阅Http请求相关的实现,在HttpClientTracingDiagnosticProcessor类中[点击查看源码👈],抽离实现的框架大体以下
public class HttpClientTracingDiagnosticProcessor : ITracingDiagnosticProcessor { public string ListenerName { get; } = "HttpHandlerDiagnosticListener"; [DiagnosticName("System.Net.Http.Request")] public void HttpRequest([Property(Name = "Request")] HttpRequestMessage request) { } [DiagnosticName("System.Net.Http.Response")] public void HttpResponse([Property(Name = "Response")] HttpResponseMessage response) { } [DiagnosticName("System.Net.Http.Exception")] public void HttpException([Property(Name = "Request")] HttpRequestMessage request, [Property(Name = "Exception")] Exception exception) { } }
这里正是监听的HttpClient发出的诊断日志。假如存在系统A和系统B,系统A经过HttpClient发送请求调用Asp.Net Core系统B,经过订阅他们发出的诊断跟踪日志,而这些数据正是实现系统监控和链路跟踪重要依据。
在.Net Core相关的源码中还有许多其余关于DiagnosticListener的埋点信息好比请求执行到Action的时候或者出现全局异常的时候都有相似的处理。一样在EFCore中也存在这些埋点信息,有兴趣的能够自行查阅相关源码和SkyAPM-dotnet源码,了解DiagnosticSource工做方式,以及如何经过这些信息实现APM系统。虽然SkyAPM-dotnet自己实现的框架个数有限,可是它给咱们实现了良好的扩展性,咱们能够经过DiagnosticSource和DiagnosticListener自行实现SkyAPM-dotnet的扩展,好比你能够扩展Redis MongoDb等其它中间件。好比SkyApm.Diagnostics.CAP是CAP归入SkyAPM中程序包,正是杨总参与了相关代码的实现。
DiagnosticSource诊断跟踪涉及到的概念虽然不是不少,可是在.Net Core相关的框架中使用的仍是很是普遍的,经过这些信息咱们能够拿到框架执行过程当中关键节点获得的信息,为咱们提供了很大的便利。加上SkyAPM-dotnet巧妙的使用了这一特色使得DiagnosticSource更变得强大并且通用。上面咱们讲述的只是冰山一角,还有更多更深的应用,好比Azure监控.Net Core应用程序也是利用了这些。有兴趣的能够查看相关源码,也能够学习一下SkyAPM-dotnet相关源码,体会一下DiagnosticSource精髓所在。