为了支持业务层中的事务,我试图在Go中查找相似Spring的声明式事务管理,可是没找到,因此我决定本身写一个。 事务很容易在Go中实现,但很难作到正确地实现。html
1.将业务逻辑与事务代码分开。
在编写业务用例时,开发者应该只需考虑业务逻辑,不须要同时考虑怎样给业务逻辑加事务管理。若是之后须要添加事务支持,你能够在现有业务逻辑的基础上进行简单封装,而无需更改任何其余代码。事务实现细节应该对业务逻辑透明。git
2.事务逻辑应该做用于用例层(业务逻辑)
不在持久层上。github
3.数据服务(数据持久性)层应对事务逻辑透明。
这意味着持久性代码应该是相同的,不管它是否支持事务golang
4.你能够选择延迟支持事物。
你能够先编写没有事务的用例,稍后能够在不修改现有代码的状况下给该用例加上事务。你只需添加新代码。sql
我最终的解决方案还不是声明式事务管理,但它很是接近。建立一个真正的声明式事务管理须要付出不少努力,所以我构建了一个能够实现声明式事务的大多数功能的事务管理,同时又没花不少精力。数据库
最终解决方案涉及本程序的全部层级,我将逐一解释它们。服务器
数据库连接封装架构
在Go的“sql”lib中,有两个数据库连接sql.DB和sql.Tx. 不须要事务时,使用sql.DB访问数据库; 当须要事务时,你使用sql.Tx. 为了共享代码,持久层须要同时支持二者。 所以须要对数据库连接进行封装,而后把它做为数据库访问方法的接收器。 我从这里¹获得了粗略的想法。app
// SqlGdbc (SQL Go database connection) is a wrapper for SQL database handler ( can be *sql.DB or *sql.Tx) // It should be able to work with all SQL data that follows SQL standard. type SqlGdbc interface { Exec(query string, args ...interface{}) (sql.Result, error) Prepare(query string) (*sql.Stmt, error) Query(query string, args ...interface{}) (*sql.Rows, error) QueryRow(query string, args ...interface{}) *sql.Row // If need transaction support, add this interface Transactioner } // SqlDBTx is the concrete implementation of sqlGdbc by using *sql.DB type SqlDBTx struct { DB *sql.DB } // SqlConnTx is the concrete implementation of sqlGdbc by using *sql.Tx type SqlConnTx struct { DB *sql.Tx }
数据库实现类型SqlDBTx和sqlConnTx都须要实现SqlGdbc接口(包括“Transactioner”)接口才行。 须要为每一个数据库(例如MySQL, CouchDB)实现“Transactioner”接口以支持事务。框架
// Transactioner is the transaction interface for database handler // It should only be applicable to SQL database type Transactioner interface { // Rollback a transaction Rollback() error // Commit a transaction Commit() error // TxEnd commits a transaction if no errors, otherwise rollback // txFunc is the operations wrapped in a transaction TxEnd(txFunc func() error) error // TxBegin gets *sql.DB from receiver and return a SqlGdbc, which has a *sql.Tx TxBegin() (SqlGdbc, error) }
数据库存储层(datastore layer)的事物管理代码
如下是“Transactioner”接口的实现代码,其中只有TxBegin()是在SqlDBTx(sql.DB)上实现,由于事务从sql.DB开始,而后全部事务的其余操做都在SqlConnTx(sql.Tx)上。 我从这里²获得了这个想法。
// TransactionBegin starts a transaction func (sdt *SqlDBTx) TxBegin() (gdbc.SqlGdbc, error) { tx, err := sdt.DB.Begin() sct := SqlConnTx{tx} return &sct, err } func (sct *SqlConnTx) TxEnd(txFunc func() error) error { var err error tx := sct.DB defer func() { if p := recover(); p != nil { tx.Rollback() panic(p) // re-throw panic after Rollback } else if err != nil { tx.Rollback() // err is non-nil; don't change it } else { err = tx.Commit() // if Commit returns error update err with commit err } }() err = txFunc() return err } func (sct *SqlConnTx) Rollback() error { return sct.DB.Rollback() }
用例层的事物接口
在用例层中,你能够拥有相同业务功能的一个函数的两个版本,一个支持事务,一个不支持,而且它们的名称能够共享相同的前缀,而事务能够添加“withTx”做为后缀。 例如,在如下代码中,“ModifyAndUnregister”是不支持事务的那个,“ModifyAndUnregisterWithTx”是支持事务的那个。 “EnableTxer”是用例层上惟一的事务支持接口,任何支持事务的“用例”都须要它。 这里的全部代码都在是用例层级(包括“EnableTxer”)代码,不涉及数据库内容。
type RegistrationUseCaseInterface interface { ... // ModifyAndUnregister change user information and then unregister the user based on the User.Id passed in. // It is created to illustrate transaction, no real use. ModifyAndUnregister(user *model.User) error // ModifyAndUnregisterWithTx change user information and then unregister the user based on the User.Id passed in. // It supports transaction // It is created to illustrate transaction, no real use. ModifyAndUnregisterWithTx(user *model.User) error // EnableTx enable transaction support on use case. Need to be included for each use case needs transaction // It replaces the underline database handler to sql.Tx for each data service that used by this use case EnableTxer } // EnableTxer is the transaction interface for use case layer type EnableTxer interface { EnableTx() }
如下是不包含事务的业务逻辑代码的示例。 “modifyAndUnregister(ruc,user)”是事务和非事务用例函数共享的业务功能。 你须要使用TxBegin()和TxEnd()(在TxDataInterface中)来包装业务功能以支持事务,这些是数据服务层接口,而且与数据库访问层无关。 该用例还实现了“EnableTx()”接口,该接口实际上将底层数据库连接从sql.DB切换到sql.Tx.
// The use case of ModifyAndUnregister without transaction func (ruc *RegistrationUseCase) ModifyAndUnregister(user *model.User) error { return modifyAndUnregister(ruc, user) } // The use case of ModifyAndUnregister with transaction func (ruc *RegistrationUseCase) ModifyAndUnregisterWithTx(user *model.User) error { tdi, err := ruc.TxDataInterface.TxBegin() if err != nil { return errors.Wrap(err, "") } ruc.EnableTx() return tdi.TxEnd(func() error { // wrap the business function inside the TxEnd function return modifyAndUnregister(ruc, user) }) } // The business function will be wrapped inside a transaction and inside a non-transaction function // It needs to be written in a way that every error will be returned so it can be catched by TxEnd() function, // which will handle commit and rollback func modifyAndUnregister(ruc *RegistrationUseCase, user *model.User) error { udi := ruc.UserDataInterface err := modifyUser(udi, user) if err != nil { return errors.Wrap(err, "") } err = unregisterUser(udi, user.Name) if err != nil { return errors.Wrap(err, "") } return nil } func (ruc *RegistrationUseCase) EnableTx() { // Only UserDataInterface need transaction support here. If there are other data services need it, // then they also need to enable transaction here ruc.UserDataInterface.EnableTx(ruc.TxDataInterface) }
为何我须要在“TxDataInterface”中调用函数“EnbaleTx”来替换底层数据库连接而不是直接在用例中执行? 由于sql.DB和sql.Tx层级要比用例层低几个级别,直接调用会搞砸依赖关系。 保持合理依赖关系的诀窍是在每一层上都有TxBegin()和TxEnd()并逐层调用它们以维持合理的依赖关系。
数据服务层的事物接口
咱们讨论了用例层和数据存储层上的事务功能,咱们还须要数据服务层中的事务功能将这二者链接在一块儿。 如下代码是数据服务层的事务接口(“TxDataInterface”)。 “TxDataInterface”是仅为事物管理而建立的数据服务层接口。 每一个数据库只须要实现一次。 还有一个“EnableTxer”接口(这是一个数据服务层接口,不要与用例层中的“EnableTxer”接口混淆),实现“EnableTxer”接口将开启数据服务类型对事务的支持,例如, 若是想要“UserDataInterface”支持事物,就须要它实现“EnableTxer”接口。
// TxDataInterface represents operations needed for transaction support. // It only needs to be implemented once for each database // For sqlGdbc, it is implemented for SqlDBTx in transaction.go type TxDataInterface interface { // TxBegin starts a transaction. It gets a DB handler from the receiver and return a TxDataInterface, which has a // *sql.Tx inside. Any data access wrapped inside a transaction will go through the *sql.Tx TxBegin() (TxDataInterface, error) // TxEnd is called at the end of a transaction and based on whether there is an error, it commits or rollback the // transaction. // txFunc is the business function wrapped in a transaction TxEnd(txFunc func() error) error // Return the underline transaction handler, sql.Tx GetTx() gdbc.SqlGdbc } // This interface needs to be included in every data service interface that needs transaction support type EnableTxer interface { // EnableTx enables transaction, basically it replaces the underling database handle sql.DB with sql.Tx EnableTx(dataInterface TxDataInterface) } // UserDataInterface represents interface for user data access through database type UserDataInterface interface { ... Update(user *model.User) (rowsAffected int64, err error) // Insert adds a user to a database. The returned resultUser has a Id, which is auto generated by database Insert(user *model.User) (resultUser *model.User, err error) // Need to add this for transaction support EnableTxer }
如下代码是“TxDataInterface”的实现。 “TxDataSql”是“TxDataInterface”的具体类型。 它调用底层数据库连接的开始和结束函数来执行真正的事务操做。
// TxDataSql is the generic implementation for transaction for SQL database // You only need to do it once for each SQL database type TxDataSql struct { DB gdbc.SqlGdbc } func (tds *TxDataSql) TxEnd(txFunc func() error) error { return tds.DB.TxEnd(txFunc) } func (tds *TxDataSql) TxBegin() (dataservice.TxDataInterface, error) { sqlTx, error := tds.DB.TxBegin() tdi := TxDataSql{sqlTx} tds.DB = tdi.DB return &tdi, error } func (tds *TxDataSql) GetTx() gdbc.SqlGdbc { return tds.DB }
事物策略:
你可能会问为何我在上面的代码中须要“TxDataSql”? 确实能够在没有它的状况下实现事务,实际上最开的程序里就没有它。 可是我仍是要在某些数据服务中实现“TxDataInterface”来开始和结束事务。 因为这是在用例层中完成的,用例层不知道哪一个数据服务类型实现了接口,所以必须在每一个数据服务接口上实现“TxDataInterface”(例如,“UserDataInterface”和“CourseDataInterface”)以保证 “用例层”不会选择没有接口的“数据服务(data service)”。 在建立“TxDataSql”以后,我只须要在“TxDataSql”中实现一次“TxDataInterface”,而后每一个数据服务类型只须要实现“EnableTx()”就好了。
// UserDataSql is the SQL implementation of UserDataInterface type UserDataSql struct { DB gdbc.SqlGdbc } func (uds *UserDataSql) EnableTx(tx dataservice.TxDataInterface) { uds.DB = tx.GetTx() } func (uds *UserDataSql) FindByName(name string) (*model.User, error) { //logger.Log.Debug("call FindByName() and name is:", name) rows, err := uds.DB.Query(QUERY_USER_BY_NAME, name) if err != nil { return nil, errors.Wrap(err, "") } defer rows.Close() return retrieveUser(rows) }
上面的代码是“UserDataService”接口的实现程序。 “EnableTx()”方法从“TxDataInterface”得到sql.Tx并将“UserDataSql”中的sql.DB替换为sql.Tx.
数据访问方法(例如,FindByName())在事务代码和非事务代码之间共享,而且不须要知道“UserDataSql.DB”是sql.DB仍是sql.Tx.
依赖关系漏洞:
上面的代码实现中存在一个缺陷,这会破坏个人设计并使其不完美。它是“TxDataInterface”中的函数“GetTx()”,它是一个数据服务层接口,所以它不该该依赖于gdbc.SqlGdbc(数据库接口)。你可能认为数据服务层的实现代码不管如何都须要访问数据库,当前这是正确的。可是,你能够在未来更改实现去调用gRPC微服务(而不是数据库)。若是接口不依赖于SQL接口的话,则能够自由更改实现,但若是不是,则即便你的接口实现已更改,该接口也会永久保留对SQL的依赖。
为何它是本程序中打破依赖关系的惟一地方?由于对于其余接口,容器负责建立具体类型,而程序的其他部分仅使用接口。可是对于事务,在建立具体类型以后,须要将底层数据库处理程序从sql.DB替换为sql.Tx,这破坏了设计。
它有解决方法吗?是的,容器能够为须要事务的函数建立sql.Tx而不是sql.DB,这样我就不须要在之后的用例级别中替换它。可是,配置文件中须要一个标志来指示函数是否须要事务, 并且这个标志须要配备给用例中的每一个函数。这是一个太大的改动,因此我决定如今先这样,之后再从新审视它。
经过这个实现,事务代码对业务逻辑几乎是透明的(除了我上面提到的缺陷)。业务逻辑中没有数据存储(datastore)级事务代码,如Tx.Begin,Tx.Commit和Tx.Rollback(但你确实须要业务级别事物函数Tx.Begin和Tx.End),不只如此,你的持久性代码中也几乎没有数据存储级事务代码。 如需在用例层上启用事务,你只须要在用例上实现EnableTx()并将业务函数封装在“TxBegin()”,EnableTx()和“TxEnd()”中,如上例所示。 在持久层上,大多数事务代码已经由“txDataService.go”实现,你只须要为特定的数据服务(例如UserDataService)实现“EnableTx”。 事务支持的真正操做是在“transaction.go”文件中实现的,它实现了“Transactioner”接口,它有四个函数,“Rollback”, “Commit”, “TxBegin” 和 “TxEnd”。
假设咱们须要在用例“listCourse”中为一个函数添加事务支持,如下是步骤
首先,它仍然不是声明式事物管理;第二,它没有彻底达到需求中的#4。要将用例函数从非事务更改成事务,你能够建立一个支持事务的新函数,它须要更改调用函数; 或者你修改现有函数并将其包装到事务中,这也须要代码更改。为了实现#4,须要添加许多代码,所以我将其推迟到之后。第三,它不支持嵌套事务(Nested Transaction),所以你须要手动确保代码中没有发生嵌套事务。若是代码库不是太复杂,这很容易作到。若是你有一个很是复杂的代码库,有不少事务和非事务函数混在一块儿,那么手工作起来会比较困难,这是须要在程序中实现嵌套事务或找到已经支持它的方案。我没有花时间研究添加嵌套事务所需的工做量,但这可能并不容易。若是你对它感兴趣,这里³是一些讨论。到目前为止,对于大多数状况而言,当前的解决方案多是在代价不大的状况下的最佳方案。
首先,它只支持SQL数据库的事务。 若是你有NoSql数据库,它将没法工做(大多数NoSql数据库不管如何都不支持事务)。 其次,若是事务跨越了数据库的边界(例如在不一样的微服务器之间),那么它将没法工做。 在这种状况下,你须要使用Saga⁴。它的原理是为事物中的每一个操做写一个补偿操做,而后在回滚阶段挨个执行每个补偿操做。 在当前框架中添加Sage解决方案应该不难。
关闭数据库连接(Close connection)
我历来没有为数据库连接调用Close()函数,由于没有必要这样作。 你能够传入sql.DB或sql.Tx做为持久性函数的接收器(receiver)。 对于sql.DB,数据库将自动建立连接池并为你管理连接。 连接完成后,它将返回到连接池,无需关闭。 对于sql.Tx,在事务结束时,你能够提交或回滚,以后连接将返回到链接池,而无需关闭。 请参阅此处⁵ 和 此处⁶ .
对象关系映射(O/R mapping)
我简要地查看了几个“O/R”映射库,但它们没有提供我所须要的功能。 我认为“O/R映射”只适合两种状况。 首先,你的应用程序主要是CRUD,没有太多的查询或搜索; 第二,开发人员不熟悉SQL。 若是不是这种状况,则O/R映射不会提供太多帮助。 我想从扩展数据库模块中得到两个功能,一个是将sql.row加载到个人域模型结构(包括处理NULL值)中(例如“User”),另外一个是自动关闭sql类型,如sql.statement或sql.rows。 有一些sql扩展库彷佛提供了至少部分这样的功能。 我尚未尝试,但彷佛值得一试。
延迟(Defer):
在进行数据库访问时,你将进行大量重复调用以关闭数据库类型(例如statements, rows)。例如如下代码中的“defer row.close()”。 你想要记住这一点,要在错误处理函数以后调用“defer row.close()”,由于若是不是这样,当出现错误时,“rows”将为nil,这将致使恐慌而且不会执行错误处理代码。
func (uds *UserDataSql) Find(id int) (*model.User, error) { rows, err := uds.DB.Query(QUERY_USER_BY_ID, id) if err != nil { return nil, errors.Wrap(err, "") } defer rows.Close() return retrieveUser(rows) }
恐慌(panic):
我看到不少Go数据库代码在出现数据库错误时抛出了恐慌(panic)而不是错误(error),这可能会致使微服务出现问题,由于在微服务环境中你一般但愿服务一直运行。 假设当更新语句中出现SQL错误时,用户将没法访问该功能,这很糟糕。 但若是由于这个,整个微服务或网站被关闭,那就更糟了。 所以,正确的方法是将错误传播到上一级并让它决定要作什么。 所以正确的作法是不在你的程序中抛出panic,但若是第三方库抛出恐慌呢? 这时你须要捕获恐慌并从中恢复以保持你的服务正常运行。 我在另外一篇文章“日志管理”⁸中有具体示例.
完整的源程序连接 github: https://github.com/jfeng45/se...
[2]database/sql Tx—detecting Commit or Rollback
[3]database/sql: nested transaction or save point support
[4]GOTO 2015 • Applying the Saga Pattern • Caitie McCaffrey — YouTube
[5]Common Pitfalls When Using database/sql in Go
[7]sqlx
[8]Go Microservice with Clean Architecture: Application Logging
不堆砌术语,不罗列架构,不迷信权威,不盲从流行,坚持独立思考