在 Abp 框架内部实现了工做单元,在这里讲解一下,什么是工做单元?html
Unit Of Work(工做单元)模式用来维护一个由已经被业务事物修改(增长、删除或更新)的业务对象组成的列表。Unit Of Work模式负责协调这些修改的持久化工做以及全部标记的并发问题。在数据访问层中采用Unit Of Work模式带来的好处是可以确保数据完整性。若是在持久化一系列业务对象(他们属于同一个事物)的过程当中出现问题,那么应该将全部的修改回滚,以确保数据始终处于有效状态。数据库
而在 Abp 的内部则是结合 Castle 的 Dynamic Proxy 拦截 UnitOfwork Attribute 来进行动态代理注入,实现了当执行标注了 [UnitOfwork]
方法时可以经过 UnitOfworkManager
来进行事务控制。并发
其大概流程以下:app
首先咱们来看一下 Abp 内部是何时注入 UOW 相关的代码的,翻阅源码,在 AbpBootstrapper
内部咱们就能够看到 Abp 做者为 UOW 写了一个拦截器,而且在 Abp 框架初始化的时候就经过 AddInterceptorRegistrars()
方法来监听 IocManager
的组件注册事件,当触发事件的时候就来判断是否知足条件,若是知足则将拦截器与该类型进行一个绑定。框架
public class AbpBootstrapper : IDisposable { private AbpBootstrapper([NotNull] Type startupModule, [CanBeNull] Action<AbpBootstrapperOptions> optionsAction = null) { // 其余代码 if (!options.DisableAllInterceptors) { // 添加拦截器 AddInterceptorRegistrars(); } } private void AddInterceptorRegistrars() { ValidationInterceptorRegistrar.Initialize(IocManager); AuditingInterceptorRegistrar.Initialize(IocManager); EntityHistoryInterceptorRegistrar.Initialize(IocManager); UnitOfWorkRegistrar.Initialize(IocManager); AuthorizationInterceptorRegistrar.Initialize(IocManager); } }
internal static class UnitOfWorkRegistrar { public static void Initialize(IIocManager iocManager) { // 监听组件注册事件 iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) => { var implementationType = handler.ComponentModel.Implementation.GetTypeInfo(); // 按 UOW 特性注册 HandleTypesWithUnitOfWorkAttribute(implementationType, handler); // 按规约注册 HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler); }; } private static void HandleTypesWithUnitOfWorkAttribute(TypeInfo implementationType, IHandler handler) { if (IsUnitOfWorkType(implementationType) || AnyMethodHasUnitOfWork(implementationType)) { handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor))); } } private static void HandleConventionalUnitOfWorkTypes(IIocManager iocManager, TypeInfo implementationType, IHandler handler) { if (!iocManager.IsRegistered<IUnitOfWorkDefaultOptions>()) { return; } var uowOptions = iocManager.Resolve<IUnitOfWorkDefaultOptions>(); if (uowOptions.IsConventionalUowClass(implementationType.AsType())) { handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor))); } } private static bool IsUnitOfWorkType(TypeInfo implementationType) { return UnitOfWorkHelper.HasUnitOfWorkAttribute(implementationType); } private static bool AnyMethodHasUnitOfWork(TypeInfo implementationType) { return implementationType .GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic) .Any(UnitOfWorkHelper.HasUnitOfWorkAttribute); } }
能够看到在这个 Registrar 里面他拥有两种注册方式,第一种很简单,就是判断注册的组件类型是否拥有 UOW 标签,第二种则是经过规约来注入拦截器。异步
Abp 默认针对仓储与应用服务会自动将拦截器挂载到这两个类型以及他的全部子类的。这里的 UnitOfWorkDefaultOptionsExtensions.IsConventionalUowClass()
方法就是用来判断传入的 Type 是否属于规约的 Type。async
public static bool IsConventionalUowClass(this IUnitOfWorkDefaultOptions unitOfWorkDefaultOptions, Type type) { return unitOfWorkDefaultOptions.ConventionalUowSelectors.Any(selector => selector(type)); }
又牵扯到了一个 IUnitOfWorkDefaultOptions
,看一下他的默认实现 UnitOfWorkDefaultOptions
就会发现这样的代码:ide
public UnitOfWorkDefaultOptions() { _filters = new List<DataFilterConfiguration>(); IsTransactional = true; Scope = TransactionScopeOption.Required; IsTransactionScopeAvailable = true; // 默认类型 ConventionalUowSelectors = new List<Func<Type, bool>> { type => typeof(IRepository).IsAssignableFrom(type) || typeof(IApplicationService).IsAssignableFrom(type) }; }
在上一步咱们经过两种注入方式将拦截器注入到须要应用工做单元特性的类型里面,那么咱们程序在执行的时候就会使用 Dyncmic Proxy 来拦截包裹这些方法。post
下面咱们就来看一下刚刚注入的拦截器:ui
internal class UnitOfWorkInterceptor : IInterceptor { // ... // 其余代码 public void Intercept(IInvocation invocation) { MethodInfo method; try { method = invocation.MethodInvocationTarget; } catch { method = invocation.GetConcreteMethod(); } // 判断当前进入的方法是否带有 UnitOfWork 特性 var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method); if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled) { // 没有则直接执行该方法 invocation.Proceed(); return; } PerformUow(invocation, unitOfWorkAttr.CreateOptions()); } // ... // 其余代码 }
拦截器内部方法很简单,若是是 UOW 方法则执行 PerformUow()
便可,在该方法内部则对方法类型进行了不一样的判断,同步与异步的处理方法是不同的。
// ... // 其余代码 private void PerformUow(IInvocation invocation, UnitOfWorkOptions options) { // 判断方法是同步仍是异步方法,不一样则执行不一样的处理操做 if (invocation.Method.IsAsync()) { PerformAsyncUow(invocation, options); } else { PerformSyncUow(invocation, options); } } // ... // 其余代码
那么咱们就先来看一下同步方法:
// ... // 其余代码 private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options) { using (var uow = _unitOfWorkManager.Begin(options)) { // 继续执行 invocation.Proceed(); uow.Complete(); } } // ... // 其余代码
同步方法针对 UOW 的操做十分简单,直接使用 UnitOfWorkManager.Begin()
方法开启一个事务,而后在内部执行原有方法的代码,执行完成以后调用 Complete()
完成这次调用。
假如我拥有两个应用服务类,他们都拥有 UnitOfWork
特性,而后我再一个 A 方法调用他们两个 B 类的 Run()
方法,而 B类的内部也调用了C 的 Run()
方法,大致以下:
public class A { private readonly B B; public A(B b) { B = b; } public void TestMethod() { B.Run(); } } internal class B { private readonly C C; public B(C c) { C = c; } [UnitOfWork] public void Run() { // 数据库操做 C.Run(); Console.WriteLine("B 的 Run 方法被调用."); } } internal class C { [UnitOfWork] public void Run() { Console.WriteLine("C 的 Run 方法被调用."); } }
而后在拦截器内部的执行过程就相似于下面这种:
internal class UnitOfWorkInterceptor { public void TestMethod() { using (var uow = _unitOfWorkManager.Begin(options)) { using(var uow2 = _unitOfWorkManager.Begin(options)) { // C 方法的代码 Console.WriteLine("C 的 Run 方法被调用."); uow2.Complete(); } // B 方法的代码 Console.WriteLine("B 的 Run 方法被调用."); uow.Complete(); } } }
两个工做单元之间的调用会被嵌套在一个 using 语句块之中,一旦任何代码抛出了异常,都会致使最外层的 uow.Complete()
不会被执行,而 Complete()
方法没有执行,则会致使 uow 对象被释放的时候,uow.Dispose()
内部检测到 Complete()
没有被调用,Abp 框架也会本身抛出异常。
因此 Abp 巧妙结合 Castle Dynamic 实现了 UOW 模式。
下面咱们继续看一下他是如何处理异步 UOW 方法的。
private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options) { var uow = _unitOfWorkManager.Begin(options); try { invocation.Proceed(); } catch { uow.Dispose(); throw; } // 若是是无返回值的异步方法 if (invocation.Method.ReturnType == typeof(Task)) { invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally( (Task) invocation.ReturnValue, async () => await uow.CompleteAsync(), exception => uow.Dispose() ); } // 有返回值的异步方法处理 else { invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult( invocation.Method.ReturnType.GenericTypeArguments[0], invocation.ReturnValue, async () => await uow.CompleteAsync(), exception => uow.Dispose() ); } }
相比而言,针对拦截到的异步方法处理起来更加复杂一点,可是整体思路仍然是同样的,将这些工做单元的方法一层层地嵌套起来,依次执行就是核心。而在上面代码里面,同样的首先使用 UnitOfManager.Begin()
得到了一个新的工做单元以后,继续执行原有的操做,下面则主要是经过内部的 InternalAsyncHelper
封装的两个辅助方法来确保等待原有任务执行完成以后,再执行 CompleteAsync()
方法。
咱们能够来看一下这个内部类的实现:
// 异步无返回值处理 public static async Task AwaitTaskWithPostActionAndFinally(Task actualReturnValue, Func<Task> postAction, Action<Exception> finalAction) { Exception exception = null; try { // 等待原有任务执行完成 await actualReturnValue; // 执行 CompleteAsync() 表示本工做单元已经顺利执行 await postAction(); } // 捕获异常 catch (Exception ex) { exception = ex; throw; } finally { // 不管是否抛出异常,都调用以前传入的 uow.Dispose() 方法 finalAction(exception); } } // 原理基本同上,只是多了一个返回值 public static async Task<T> AwaitTaskWithPostActionAndFinallyAndGetResult<T>(Task<T> actualReturnValue, Func<Task> postAction, Action<Exception> finalAction) { Exception exception = null; try { var result = await actualReturnValue; await postAction(); return result; } catch (Exception ex) { exception = ex; throw; } finally { finalAction(exception); } } // 异步有返回值处理 public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, Func<Task> action, Action<Exception> finalAction) { // 这里经过反射获取到 AwaitTaskWithPostActionAndFinallyAndGetResult 方法,并调用。 return typeof (InternalAsyncHelper) .GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static) .MakeGenericMethod(taskReturnType) .Invoke(null, new object[] { actualReturnValue, action, finalAction }); }
并不复杂,以上便是拦截器所作的操做。
经过上文咱们能够看到一个工做单元是经过 IUnitOfWorkManager.Begin()
拿到的,那 IUnitOfWorkManager
又是个什么东西呢?
根据字面意思咱们大概知道应该相似于管理 UOW 的东西,它其实只有两个做用。第一,获取当前处于激活状态的工做单元,什么叫激活状态咱们后面再讲。第二个做用就是咱们以前看到的,能够经过 Begin()
方法来建立一个新的工做单元。
IUnitOfWorkManager
在 Abp 框架初始化的时候就被注入了,其默认实现为 UnitOfWorkManager
,其核心方法就是 Begin()
方法。
public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options) { // 若是没有传入 UOW 参数,则填充一个默认的参数 options.FillDefaultsForNonProvidedOptions(_defaultOptions); // 获取当前的外部工做单元 var outerUow = _currentUnitOfWorkProvider.Current; // 若是已经存在有外部工做单元,则直接构建一个内部工做单元 if (options.Scope == TransactionScopeOption.Required && outerUow != null) { return new InnerUnitOfWorkCompleteHandle(); } // 不存在外部工做单元,则从 IOC 容器当中获取一个新的出来 var uow = _iocResolver.Resolve<IUnitOfWork>(); // 绑定外部工做单元的事件 uow.Completed += (sender, args) => { _currentUnitOfWorkProvider.Current = null; }; uow.Failed += (sender, args) => { _currentUnitOfWorkProvider.Current = null; }; uow.Disposed += (sender, args) => { _iocResolver.Release(uow); }; // 设置过滤器 if (outerUow != null) { options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList()); } uow.Begin(options); // 绑定租户 ID if (outerUow != null) { uow.SetTenantId(outerUow.GetTenantId(), false); } // 设置当前的外部工做单元为刚刚初始化的工做单元 _currentUnitOfWorkProvider.Current = uow; return uow; }
能够看到 Begin()
方法返回的是一个类型为 IUnitOfWorkCompleteHandle
的东西,转到其定义:
/// <summary> /// Used to complete a unit of work. /// This interface can not be injected or directly used. /// Use <see cref="IUnitOfWorkManager"/> instead. /// </summary> public interface IUnitOfWorkCompleteHandle : IDisposable { /// <summary> /// Completes this unit of work. /// It saves all changes and commit transaction if exists. /// </summary> void Complete(); /// <summary> /// Completes this unit of work. /// It saves all changes and commit transaction if exists. /// </summary> Task CompleteAsync(); }
他只有两个方法,都是标识 UOW 处于已经完成的状态。
在方法上面右键查看其实现能够看到有这样一种依赖关系:
能够看到 IUnitOfWorkCompleteHandle
有两个实现,一个是 InnerUnitOfWorkCompleteHandle
还有一个则是 IUnitOfWork
接口。
首先看一下 InnerUnitOfWorkCompleteHandle
:
internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle { public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work."; private volatile bool _isCompleteCalled; private volatile bool _isDisposed; public void Complete() { _isCompleteCalled = true; } public Task CompleteAsync() { _isCompleteCalled = true; return Task.FromResult(0); } public void Dispose() { if (_isDisposed) { return; } _isDisposed = true; if (!_isCompleteCalled) { if (HasException()) { return; } throw new AbpException(DidNotCallCompleteMethodExceptionMessage); } } private static bool HasException() { try { return Marshal.GetExceptionCode() != 0; } catch (Exception) { return false; } } }
代码很简单,调用 Complete()/CompleteAsync()
会将 _isCompleteCalled 置为 true,而后在 Dispose()
方法内会进行检测,为 faslse 的话直接抛出异常。能够看到在 InnerUnitOfWorkCompleteHandle
内部并不会真正地调用 DbContext.SaveChanges()
进行数据保存。
那么谁才是真正进行数据库操做的工做单元呢?
答案就是以前在 IUnitOfWorkManager.Begin()
里面,能够看到在建立 UOW 对象的时候,他在内部进行了一个判断,若是不存在外部工做单元的状况下才会建立 InnerUnitOfWorkCompleteHandle
对象,不然是解析的一个 IUnitOfWork
对象。
也就是说你能够想象有如下代码:
public void TestUowMethod() { using(var outerUOW = Manager.Begin()) // 这里返回的是 IOC 解析出的 IUnitOfWork { OperationOuter(); using(var innerUOW1 = Manager.Begin()) // 内部 UOW { Operation1(); using(var innerUOW2 = Manager.Begin()) // 内部 UOW { Operation2(); Complete(); } Complete(); } Complete(); } }
当代码执行的时候,如同俄罗斯套娃,从内部依次到外部执行,内部工做单元仅会在调用 Complete 方法的时候将 completed 标记为 true,但一旦操做抛出异常,Complete()
没法获得执行,则会直接抛出异常,中断外层代码执行。
在 ABP 内部针对 EF Core 框架实现了一套 UOW,其继承自 UnitOfWorkBase
,而在 UnitOfWorkBase
内部有部分针对接口 IActiveUnitOfWork
的实现,同时因为 IUnifOfWork
也实现了 IUnitOfWorkCompleteHandle
接口,因此在 Begin()
方法处可以向上转型。
根据上图能够知道 Abp 默认实现了一个 UnitOfWorkBase
做为工做单元的抽象基类,他主要的属性就是 Id 与 Outer 属性。
public abstract class UnitOfWorkBase : IUnitOfWork { public string Id { get; } [DoNotWire] public IUnitOfWork Outer { get; set; } }
这里的 Id 是使用的 Guid 生成的,用于标识每一个工做单元。
而 Outer 则是当前 UOW 对象的引用对象。
这里重点说一下 Outer 是哪儿来的,Outer 他的值是在以前的 UnitOfWorkManager.Begin()
里面的 _currentUnitOfWorkProvider.Current = uow;
进行设置的,_currentUnitOfWorkProvider
的实如今 AsyncLocalCurrentUnitOfWorkProvider
内部,其做用是维护一个 UOW 链,确保当前的工做单元始终是最新的,这里的代码本来是使用 CallContext
实现的,如今已经换为 AsyncLocal<T>
了。
public class AsyncLocalCurrentUnitOfWorkProvider : ICurrentUnitOfWorkProvider, ITransientDependency { /// <inheritdoc /> [DoNotWire] public IUnitOfWork Current { get { return GetCurrentUow(); } set { SetCurrentUow(value); } } public ILogger Logger { get; set; } private static readonly AsyncLocal<LocalUowWrapper> AsyncLocalUow = new AsyncLocal<LocalUowWrapper>(); public AsyncLocalCurrentUnitOfWorkProvider() { Logger = NullLogger.Instance; } private static IUnitOfWork GetCurrentUow() { var uow = AsyncLocalUow.Value?.UnitOfWork; if (uow == null) { return null; } if (uow.IsDisposed) { AsyncLocalUow.Value = null; return null; } return uow; } private static void SetCurrentUow(IUnitOfWork value) { lock (AsyncLocalUow) { if (value == null) { if (AsyncLocalUow.Value == null) { return; } if (AsyncLocalUow.Value.UnitOfWork?.Outer == null) { AsyncLocalUow.Value.UnitOfWork = null; AsyncLocalUow.Value = null; return; } AsyncLocalUow.Value.UnitOfWork = AsyncLocalUow.Value.UnitOfWork.Outer; } else { if (AsyncLocalUow.Value?.UnitOfWork == null) { if (AsyncLocalUow.Value != null) { AsyncLocalUow.Value.UnitOfWork = value; } AsyncLocalUow.Value = new LocalUowWrapper(value); return; } value.Outer = AsyncLocalUow.Value.UnitOfWork; AsyncLocalUow.Value.UnitOfWork = value; } } } private class LocalUowWrapper { public IUnitOfWork UnitOfWork { get; set; } public LocalUowWrapper(IUnitOfWork unitOfWork) { UnitOfWork = unitOfWork; } } }
继续往下看,在 UnitOfWorkBase
的里面也是有个 Complete()
与 CompleteAsync()
方法的。
protected abstract void CompleteUow(); /// <inheritdoc/> public void Complete() { // 判断是否重复完成 PreventMultipleComplete(); try { CompleteUow(); _succeed = true; OnCompleted(); } catch (Exception ex) { _exception = ex; throw; } }
这里的 CompleteUow()
仍然只是一个抽象方法,具体的实如今具体的访问层里面。
Abp 针对 EF Core 的 UOW 实现是 EfCoreUnitOfWork
,代码以下:
protected override void BeginUow() { if (Options.IsTransactional == true) { _transactionStrategy.InitOptions(Options); } } public override void SaveChanges() { foreach (var dbContext in GetAllActiveDbContexts()) { SaveChangesInDbContext(dbContext); } } public override async Task SaveChangesAsync() { // 遍历全部激活的 DbContext foreach (var dbContext in GetAllActiveDbContexts()) { await SaveChangesInDbContextAsync(dbContext); } } protected override void CompleteUow() { SaveChanges(); CommitTransaction(); } protected override async Task CompleteUowAsync() { await SaveChangesAsync(); CommitTransaction(); } private void CommitTransaction() { if (Options.IsTransactional == true) { _transactionStrategy.Commit(); } } public IReadOnlyList<DbContext> GetAllActiveDbContexts() { return ActiveDbContexts.Values.ToImmutableList(); }
根本就是遍历 DbContext
调用其 SaveChanges()
来提交全部数据库更改。
余下更加详细的东西会放在 《7、仓储与 Entity Framework Core》 当中说明的。