本身之前都走了弯路,觉得学习战术设计就会DDD了,其实DDD的精华在战略设计,可是对于咱们菜鸟来讲,学习一些技术概念也是挺好的
常常看到这些术语,概念太多,也想简单学习一下,记忆力比较差记录一下实现的细节
数据库
/// <summary> /// 存储聚合根中的事件到EventStorage 发布事件 /// </summary> /// <typeparam name="TAggregationRoot"></typeparam> /// <param name="event"></param> /// <returns></returns> public async Task AppendEventStoragePublishEventAsync<TAggregationRoot>(TAggregationRoot @event) where TAggregationRoot : IAggregationRoot { var domainEventList = @event.UncommittedEvents.ToList(); if (domainEventList.Count == 0) { throw new Exception("请添加事件!"); } await TryAppendEventStorageAsync(domainEventList).ContinueWith(async e => { if (e.Result == (int)EventStorageStatus.Success) { await TryPublishDomainEventAsync(domainEventList).ConfigureAwait(false); @event.ClearEvents(); } }); } /// <summary> /// 发布领域事件 /// </summary> /// <returns></returns> public async Task PublishDomainEventAsync(List<IDomainEvent> domainEventList) { using (var connection = new SqlConnection(ConnectionStr)) { if (connection.State == ConnectionState.Closed) { await connection.OpenAsync().ConfigureAwait(false); } using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false)) { try { if (domainEventList.Count > 0) { foreach (var domainEvent in domainEventList) { await _capPublisher.PublishAsync(domainEvent.GetRoutingKey(), domainEvent).ConfigureAwait(false); } } await transaction.CommitAsync().ConfigureAwait(false); } catch (Exception e) { await transaction.RollbackAsync().ConfigureAwait(false); throw; } } } } /// <summary> /// 发布领域事件重试 /// </summary> /// <param name="domainEventList"></param> /// <returns></returns> public async Task TryPublishDomainEventAsync(List<IDomainEvent> domainEventList) { var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>() .RetryForeverAsync(onRetry: exception => { Task.Factory.StartNew(() => { //记录重试的信息 _loggerHelper.LogInfo("发布领域事件异常", exception.Message); }); }); await policy.ExecuteAsync(async () => { await PublishDomainEventAsync(domainEventList).ConfigureAwait(false); }); } /// <summary> /// 存储聚合根中的事件到EventStorage中 /// </summary> /// <returns></returns> public async Task<int> AppendEventStorageAsync(List<IDomainEvent> domainEventList) { if (domainEventList.Count == 0) { throw new Exception("请添加事件!"); } var status = (int)EventStorageStatus.Failure; using (var connection = new SqlConnection(ConnectionStr)) { try { if (connection.State == ConnectionState.Closed) { await connection.OpenAsync().ConfigureAwait(false); } using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false)) { try { if (domainEventList.Count > 0) { foreach (var domainEvent in domainEventList) { EventStorage eventStorage = new EventStorage { Id = Guid.NewGuid(), AggregateRootId = domainEvent.AggregateRootId, AggregateRootType = domainEvent.AggregateRootType, CreateDateTime = domainEvent.CreateDateTime, Version = domainEvent.Version, EventData = Events(domainEvent) }; var eventStorageSql = $"INSERT INTO EventStorageInfo(Id,AggregateRootId,AggregateRootType,CreateDateTime,Version,EventData) VALUES (@Id,@AggregateRootId,@AggregateRootType,@CreateDateTime,@Version,@EventData)"; await connection.ExecuteAsync(eventStorageSql, eventStorage, transaction).ConfigureAwait(false); } } await transaction.CommitAsync().ConfigureAwait(false); status = (int)EventStorageStatus.Success; } catch (Exception e) { await transaction.RollbackAsync().ConfigureAwait(false); throw; } } } catch (Exception e) { connection.Close(); throw; } } return status; } /// <summary> /// AppendEventStorageAsync异常重试 /// </summary> public async Task<int> TryAppendEventStorageAsync(List<IDomainEvent> domainEventList) { var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>() .RetryForeverAsync(onRetry: exception => { Task.Factory.StartNew(() => { //记录重试的信息 _loggerHelper.LogInfo("存储事件异常", exception.Message); }); }); var result = await policy.ExecuteAsync(async () => { var resulted = await AppendEventStorageAsync(domainEventList).ConfigureAwait(false); return resulted; }); return result; } /// <summary> /// 根据DomainEvent序列化事件Json /// </summary> /// <param name="domainEvent"></param> /// <returns></returns> public string Events(IDomainEvent domainEvent) { ConcurrentDictionary<string, string> dictionary = new ConcurrentDictionary<string, string>(); //获取领域事件的类型(方便解析Json) var domainEventTypeName = domainEvent.GetType().Name; var domainEventStr = JsonConvert.SerializeObject(domainEvent); dictionary.GetOrAdd(domainEventTypeName, domainEventStr); var eventData = JsonConvert.SerializeObject(dictionary); return eventData; }
解析EventStorage中存储的事件
服务器
public async Task<List<IDomainEvent>> GetAggregateRootEventStorageById(Guid AggregateRootId) { try { using (var connection = new SqlConnection(ConnectionStr)) { var eventStorageList = await connection.QueryAsync<EventStorage>($"SELECT * FROM dbo.EventStorageInfo WHERE AggregateRootId='{AggregateRootId}'"); List<IDomainEvent> domainEventList = new List<IDomainEvent>(); foreach (var item in eventStorageList) { var dictionaryDomainEvent = JsonConvert.DeserializeObject<Dictionary<string, string>>(item.EventData); foreach (var entry in dictionaryDomainEvent) { var domainEventType = TypeNameProvider.GetType(entry.Key); if (domainEventType != null) { var domainEvent = JsonConvert.DeserializeObject(entry.Value, domainEventType) as IDomainEvent; domainEventList.Add(domainEvent); } } } return domainEventList; } } catch (Exception ex) { throw; }
1.事件没持久化就表明事件还没发生成功,事件存储可能失败,必须先存储事件,在发布事件,保证存储事件与发布事件一致性
1.使用事件驱动,必需要作好冥等的处理
2.若是业务场景中有状态时:经过状态来控制
3.新建一张表,用来记录消费的信息,消费端的代码里面,根据惟一的标识,判断是否处理过该事件
4.Q端的任何更新都应该把聚合根ID和事件版本号做为条件,Q端的更新不用遵循聚合的原则,可使用最简单的方式处理
5.仓储是用来重建聚合的,它的行为和集合同样只有Get ,Add ,Delete
6.DDD不是技术,是思想,核心在战略模块,战术设计是实现的一种选择,战略设计,须要面向对象的分析能力,职责分配,深层次的分析业务
并发