先看看数据库事务的定义:单个逻辑工做单元执行的一系列操做,要么彻底地执行,要么彻底地不执行redis
这个比较容易理解,操做过数据库的通常都懂,既是业务需求涉及到多个数据表操做的时候,须要用到事务数据库
要么一块儿更新,要么一块儿不更新,不会出现只更新了部分数据表的状况,下边看看数据库事务的使用服务器
1 begin tran 2 begin try 3 update Table1 set Field = 1 where ID = 1 4 update Table2 set Field = 2 where ID = 1 5 end try 6 begin catch 7 rollback tran 8 end catch 9 commit tran
上实例在小型项目中通常是问题不大的,由于小型项目通常是单机系统,数据库、Web服务大都在一台服务器上,甚至可能只有一个数据库文件,架构
这种状况下使用本地事务没有一点问题;并发
可是本地事务有很大的缺陷,由于开启事务通常是锁表的,事务执行期间会一直锁着,其余的操做通常都要排队等待,对性能要求比较高的系统是不能忍受的。app
特别是涉及改动不一样数据库的操做,这会形成跨库事务,性能更加低异步
若是还涉及到不在同一台服务器、甚至不一样网段部署的数据库,那本地事务简直是系统运行的灾难,是首先须要丢弃的解决方案。分布式
那若是遇到上述状况,该怎么作呢,这就涉及到分布式事务了高并发
若是有海量数据须要处理、或者要求高并发请求的话,同步的事务机制已是不现实的了,这种状况下必须采用异步事务机制,既分段式的事务性能
分段式事务通常作法就是把需求任务分段式地完成,经过事务补偿机制来保证业务最终执行成功,补偿机制通常能够归类为2种:
1 )定时任务补偿:
经过定时任务去跟进后续任务,根据不一样的状态表肯定下一步的操做,从而保证业务最终执行成功,
这种办法可能会涉及到不少的后台服务,维护起来也会比较麻烦,这是应该是早期比较流行的作法
2) 消息补偿:
经过消息中间件触发下一段任务,既经过实时消息通知下一段任务开始执行,执行完毕后的消息回发通知来保证业务最终完成;
固然这也是异步进行的,可是能保证数据最终的完整性、一致性,也是近几年比较热门的作法
定时任务补偿就不说了,这篇文章咱们来讨论一下经过消息补偿来完成分布式事务的通常作法
0)咱们以简单的产品下单场景来讲明,(不要较真哈)
1)先来看看分布式异步事务处理流程示意图,APP1与APP2须要互相订阅对方消息
2)首先看数据库,2个,一个库存库,一个已下单成功的库
1 -- 下单通知,主要做用保留已下单操做,消息发送失败能够根据此表从新发送 2 CREATE TABLE [dbo].[ProductMessage]( 3 [ID] [int] IDENTITY(1,1) NOT NULL, 4 [Product] [varchar](50) NULL, 5 [Amount] [int] NULL, 6 [UpdateTime] [datetime] NULL 7 ) 8 -- 库存 9 CREATE TABLE [dbo].[ProductStock]( 10 [ID] [int] IDENTITY(1,1) NOT NULL, 11 [Product] [varchar](50) NULL, 12 [Amount] [int] NULL 13 ) 14 -- 下单成功 15 CREATE TABLE [dbo].[ProductSell]( 16 [ID] [int] IDENTITY(1,1) NOT NULL, 17 [Product] [varchar](50) NULL, 18 [Customer] [int] NULL, 19 [Amount] [int] NULL 20 ) 21 -- 下单成功消息,主要做用防止重复消费 22 CREATE TABLE [dbo].[ProductMessageApply]( 23 [ID] [int] IDENTITY(1,1) NOT NULL, 24 [MesageID] [int] NULL, 25 [CreateTime] [datetime] NULL 26 )
3)项目架构Demo
数据底层访问使用的是Dapper、使用redis做为消息中间件
4)实体层代码
1 public class ProductMessage 2 { 3 [Key] 4 [IgnoreProperty(true)] 5 public int ID { get; set; } 6 public string Product { get; set; } 7 public int Amount { get; set; } 8 public DateTime UpdateTime { get; set; } 9 } 10 public class ProductMessageApply 11 { 12 [Key] 13 [IgnoreProperty(true)] 14 public int ID { get; set; } 15 public int MesageID { get; set; } 16 public DateTime CreateTime { get; set; } 17 } 18 public class ProductSell 19 { 20 [Key] 21 [IgnoreProperty(true)] 22 public int ID { get; set; } 23 public string Product { get; set; } 24 public int Customer { get; set; } 25 public int Amount { get; set; } 26 } 27 public class ProductStock 28 { 29 [Key] 30 [IgnoreProperty(true)] 31 public int ID { get; set; } 32 public string Product { get; set; } 33 public int Amount { get; set; } 34 }
5)服务接口层代码
1 public interface IProductMessageApplyService 2 { 3 void Add(ProductMessageApply entity); 4 ProductMessageApply Get(int id); 5 } 6 public interface IProductMessageService 7 { 8 void Add(ProductMessage entity); 9 IEnumerable<ProductMessage> Gets(object paramPairs = null); 10 void Delete(int id); 11 } 12 public interface IProductSellService 13 { 14 void Add(ProductSell entity); 15 } 16 public interface IProductStockService 17 { 18 void ReduceReserve(int id, int amount); 19 }
6)库存、消息通知
1 public class ProductMessageService : IProductMessageService 2 { 3 private IRepository<ProductMessage> repository; 4 5 public ProductMessageService(IRepository<ProductMessage> repository) 6 { 7 this.repository = repository; 8 } 9 10 public void Add(ProductMessage entity) 11 { 12 this.repository.Add(entity); 13 } 14 15 public IEnumerable<ProductMessage> Gets(object paramPairs = null) 16 { 17 return this.repository.Gets(paramPairs); 18 } 19 20 public void Delete(int id) 21 { 22 this.repository.Delete(id); 23 } 24 } 25 26 public class ProductStockService : IProductStockService 27 { 28 private IRepository<ProductStock> repository; 29 30 public ProductStockService(IRepository<ProductStock> repository) 31 { 32 this.repository = repository; 33 } 34 35 public void ReduceReserve(int id, int amount) 36 { 37 var entity = this.repository.Get(id); 38 if (entity == null) return; 39 40 entity.Amount = entity.Amount - amount; 41 this.repository.Update(entity); 42 } 43 }
7)下单、下单成功消息
1 public class ProductMessageApplyService : IProductMessageApplyService 2 { 3 private IRepository<ProductMessageApply> repository; 4 5 public ProductMessageApplyService(IRepository<ProductMessageApply> repository) 6 { 7 this.repository = repository; 8 } 9 10 public void Add(ProductMessageApply entity) 11 { 12 this.repository.Add(entity); 13 } 14 15 public ProductMessageApply Get(int id) 16 { 17 return this.repository.Get(id); 18 } 19 } 20 21 public class ProductSellService : IProductSellService 22 { 23 private IRepository<ProductSell> repository; 24 25 public ProductSellService(IRepository<ProductSell> repository) 26 { 27 this.repository = repository; 28 } 29 30 public void Add(ProductSell entity) 31 { 32 this.repository.Add(entity); 33 } 34 }
8)下单减库存测试
1 namespace Demo.Reserve.App 2 { 3 class Program 4 { 5 static void Main(string[] args) 6 { 7 Console.WriteLine(string.Format("{0} 程序已启动", DateTime.Now.ToString())); 8 9 Send(); 10 Subscribe(); 11 12 Console.ReadKey(); 13 } 14 15 private static void Send() 16 { 17 var unitOfWork = new UnitOfWork(Enums.Reserve); 18 19 try 20 { 21 var productStockRepository = new BaseRepository<ProductStock>(unitOfWork); 22 var productStockServic = new ProductStockService(productStockRepository); 23 var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork); 24 var productMessageService = new ProductMessageService(productMessageRepository); 25 26 var id = 1; 27 var amount = 2; 28 var productMessage = new ProductMessage() 29 { 30 Product = "ProductCode", 31 Amount = amount, 32 UpdateTime = DateTime.Now 33 }; 34 35 productStockServic.ReduceReserve(id, amount); 36 productMessageService.Add(productMessage); 37 unitOfWork.Commit(); 38 Console.WriteLine(string.Format("{0} 减库存完成", DateTime.Now.ToString())); 39 Thread.Sleep(1000); 40 41 var message = JsonConvert.SerializeObject(productMessage); 42 RedisConfig.Instrace.Publish("channel.Send", message); 43 Console.WriteLine(string.Format("{0} 发送减库存消息: {1}", DateTime.Now.ToString(), message)); 44 } 45 catch (Exception ex) 46 { 47 //Logger.Error(ex); 48 unitOfWork.Rollback(); 49 } 50 } 51 52 private static void Subscribe() 53 { 54 var client = RedisConfig.Instrace.NewClient(); 55 var subscriber = client.GetSubscriber(); 56 57 subscriber.Subscribe("channel.Success", (chl, message) => 58 { 59 try 60 { 61 var unitOfWork = new UnitOfWork(Enums.Reserve); 62 var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork); 63 var productMessageService = new ProductMessageService(productMessageRepository); 64 65 var messageID = message.ToString().ToInt(); 66 if (messageID > 0) 67 { 68 productMessageService.Delete(messageID); 69 Console.WriteLine(string.Format("{0} 收到消费成功消息:{1}", DateTime.Now.ToString(), message)); 70 } 71 } 72 catch (Exception ex) 73 { 74 //Logger.Error(ex); 75 } 76 }); 77 } 78 } 79 }
9)下单成功及消息回发测试
1 namespace Demo.Sell.App 2 { 3 class Program 4 { 5 static void Main(string[] args) 6 { 7 Subscribe(); 8 9 Console.WriteLine(string.Format("{0} 程序已启动", DateTime.Now.ToString())); 10 Console.ReadKey(); 11 } 12 13 private static void Subscribe() 14 { 15 var client = RedisConfig.Instrace.NewClient(); 16 var subscriber = client.GetSubscriber(); 17 18 subscriber.Subscribe("channel.Send", (chl, message) => 19 { 20 Consume(message); 21 }); 22 } 23 24 private static void Consume(string message) 25 { 26 var unitOfWork = new UnitOfWork(Enums.Sell); 27 28 try 29 { 30 Console.WriteLine(string.Format("{0} 收到减库存消息: {1}", DateTime.Now.ToString(), message)); 31 32 var productMessage = JsonConvert.DeserializeObject<ProductMessage>(message); 33 34 var productSellRepository = new BaseRepository<ProductSell>(unitOfWork); 35 var productSellService = new ProductSellService(productSellRepository); 36 37 var productMessageApplyRepository = new BaseRepository<ProductMessageApply>(unitOfWork); 38 var productMessageApplyService = new ProductMessageApplyService(productMessageApplyRepository); 39 40 var noExists = productMessageApplyService.Get(productMessage.ID) == null; 41 if (noExists) 42 { 43 productSellService.Add(new ProductSell() 44 { 45 Product = productMessage.Product, 46 Amount = productMessage.Amount, 47 Customer = 123 48 }); 49 50 productMessageApplyService.Add(new ProductMessageApply() 51 { 52 MesageID = productMessage.ID, 53 CreateTime = DateTime.Now 54 }); 55 56 unitOfWork.Commit(); 57 Console.WriteLine(string.Format("{0} 消息消费完成", DateTime.Now.ToString())); 58 Thread.Sleep(1000); 59 } 60 61 RedisConfig.Instrace.Publish("channel.Success", productMessage.ID.ToString()); 62 Console.WriteLine(string.Format("{0} 发送消费完成通知:{1}", DateTime.Now.ToString(), productMessage.ID.ToString())); 63 } 64 catch (Exception ex) 65 { 66 //Logger.Error(ex); 67 unitOfWork.Rollback(); 68 } 69 } 70 } 71 }
10)好了,到了最后检验成果的时候了
先打开Demo.Sell.App.exe、而后打开Demo.Reserve.App.exe
大功告成!