CreateOrUpdate 是业务开发中很常见的场景,我们支持用户对某个业务实体进行创建/配置。希望实现的 repository 接口要达到以下两个要求:
根据笔者的团队合作经验看,很多 Golang 开发同学不是很确定对于这种场景到底怎么实现,写出来的代码五花八门,还可能有并发问题。今天我们就来看看基于 GORM 怎么来实现 CreateOrUpdate。
我们先来看下 GORM 提供了那些方法来支持我们往数据库插入数据,对 GORM 比较熟悉的同学可以忽略这部分:
插入一条记录到数据库,注意需要通过数据的指针来创建,回填主键;
1 2 3 4 5 6 7 8 9 |
// Create insert the value into database func (db *DB) Create(value interface{}) (tx *DB) { if db.CreateBatchSize > 0 { return db.CreateInBatches(value, db.CreateBatchSize) } tx = db.getInstance() tx.Statement.Dest = value return tx.callbacks.Create().Execute(tx) } |
赋值 Dest 后直接进入 Create 的 callback 流程。
保存所有的字段,即使字段是零值。如果我们传入的结构主键为零值,则会插入记录。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
// Save update value in database, if the value doesn't have primary key, will insert it func (db *DB) Save(value interface{}) (tx *DB) { tx = db.getInstance() tx.Statement.Dest = value reflectValue := reflect.Indirect(reflect.ValueOf(value)) for reflectValue.Kind() == reflect.Ptr || reflectValue.Kind() == reflect.Interface { reflectValue = reflect.Indirect(reflectValue) } switch reflectValue.Kind() { case reflect.Slice, reflect.Array: if _, ok := tx.Statement.Clauses["ON CONFLICT"]; !ok { tx = tx.Clauses(clause.OnConflict{UpdateAll: true}) } tx = tx.callbacks.Create().Execute(tx.Set("gorm:update_track_time", true)) case reflect.Struct: if err := tx.Statement.Parse(value); err == nil && tx.Statement.Schema != nil { for _, pf := range tx.Statement.Schema.PrimaryFields { if _, isZero := pf.ValueOf(tx.Statement.Context, reflectValue); isZero { return tx.callbacks.Create().Execute(tx) } } } fallthrough default: selectedUpdate := len(tx.Statement.Selects) != 0 // when updating, use all fields including those zero-value fields if !selectedUpdate { tx.Statement.Selects = append(tx.Statement.Selects, "*") } tx = tx.callbacks.Update().Execute(tx) if tx.Error == nil && tx.RowsAffected == 0 && !tx.DryRun && !selectedUpdate { result := reflect.New(tx.Statement.Schema.ModelType).Interface() if result := tx.Session(&Session{}).Limit(1).Find(result); result.RowsAffected == 0 { return tx.Create(value) } } } return } |
关注点:
事实上有一些业务场景下,我们可以用 Save 来实现 CreateOrUpdate 的语义:
但 Save 本身语义其实比较混乱,不太建议使用,把这部分留给业务自己实现,用Updates,Create用起来更明确些。
Update 前者更新单个列。
Updates 更新多列,且当使用 struct 更新时,默认情况下,GORM 只会更新非零值的字段(可以用 Select 指定来解这个问题)。使用 map 更新时则会全部更新。
1 2 3 4 5 6 7 8 9 10 11 12 |
// Update update attributes with callbacks, refer: https://gorm.io/docs/update.html#Update-Changed-Fields func (db *DB) Update(column string, value interface{}) (tx *DB) { tx = db.getInstance() tx.Statement.Dest = map[string]interface{}{column: value} return tx.callbacks.Update().Execute(tx) } // Updates update attributes with callbacks, refer: https://gorm.io/docs/update.html#Update-Changed-Fields func (db *DB) Updates(values interface{}) (tx *DB) { tx = db.getInstance() tx.Statement.Dest = values return tx.callbacks.Update().Execute(tx) } |
这里也能从实现中看出来一些端倪。Update 接口内部是封装了一个 map[string]interface{},而 Updates 则是可以接受 map 也可以走 struct,最终写入 Dest。
获取第一条匹配的记录,或者根据给定的条件初始化一个实例(仅支持 struct 和 map)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// FirstOrInit gets the first matched record or initialize a new instance with given conditions (only works with struct or map conditions) func (db *DB) FirstOrInit(dest interface{}, conds ...interface{}) (tx *DB) { queryTx := db.Limit(1).Order(clause.OrderByColumn{ Column: clause.Column{Table: clause.CurrentTable, Name: clause.PrimaryKey}, }) if tx = queryTx.Find(dest, conds...); tx.RowsAffected == 0 { if c, ok := tx.Statement.Clauses["WHERE"]; ok { if where, ok := c.Expression.(clause.Where); ok { tx.assignInterfacesToValue(where.Exprs) } } // initialize with attrs, conds if len(tx.Statement.attrs) > 0 { tx.assignInterfacesToValue(tx.Statement.attrs...) } } // initialize with attrs, conds if len(tx.Statement.assigns) > 0 { tx.assignInterfacesToValue(tx.Statement.assigns...) } return } |
注意,Init 和 Create 的区别,如果没有找到,这里会把实例给初始化,不会存入 DB,可以看到 RowsAffected == 0 分支的处理,这里并不会走 Create 的 callback 函数。这里的定位是一个纯粹的读接口。
获取第一条匹配的记录,或者根据给定的条件创建一条新纪录(仅支持 struct 和 map 条件)。FirstOrCreate可能会执行两条sql,他们是一个事务中的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
// FirstOrCreate gets the first matched record or create a new one with given conditions (only works with struct, map conditions) func (db *DB) FirstOrCreate(dest interface{}, conds ...interface{}) (tx *DB) { tx = db.getInstance() queryTx := db.Session(&Session{}).Limit(1).Order(clause.OrderByColumn{ Column: clause.Column{Table: clause.CurrentTable, Name: clause.PrimaryKey}, }) if result := queryTx.Find(dest, conds...); result.Error == nil { if result.RowsAffected == 0 { if c, ok := result.Statement.Clauses["WHERE"]; ok { if where, ok := c.Expression.(clause.Where); ok { result.assignInterfacesToValue(where.Exprs) } } // initialize with attrs, conds if len(db.Statement.attrs) > 0 { result.assignInterfacesToValue(db.Statement.attrs...) } // initialize with attrs, conds if len(db.Statement.assigns) > 0 { result.assignInterfacesToValue(db.Statement.assigns...) } return tx.Create(dest) } else if len(db.Statement.assigns) > 0 { exprs := tx.Statement.BuildCondition(db.Statement.assigns[0], db.Statement.assigns[1:]...) assigns := map[string]interface{}{} for _, expr := range exprs { if eq, ok := expr.(clause.Eq); ok { switch column := eq.Column.(type) { case string: assigns[column] = eq.Value case clause.Column: assigns[column.Name] = eq.Value default: } } } return tx.Model(dest).Updates(assigns) } } else { tx.Error = result.Error } return tx } |
注意区别,同样是构造 queryTx 去调用 Find 方法查询,后续的处理很关键:
第一个分支好理解,需要插入新数据。重点在于 else if len(db.Statement.assigns) > 0 分支。
我们调用 FirstOrCreate 时,需要传入一个对象,再传入一批条件,这批条件会作为 Where 语句的部分在一开始进行查询。而这个函数同时可以配合 Assign() 使用,这一点就赋予了生命力。
不管是否找到记录,Assign 都会将属性赋值给 struct,并将结果写回数据库。
1 2 3 4 5 6 7 8 9 10 |
func (db *DB) Attrs(attrs ...interface{}) (tx *DB) { tx = db.getInstance() tx.Statement.attrs = attrs return } func (db *DB) Assign(attrs ...interface{}) (tx *DB) { tx = db.getInstance() tx.Statement.assigns = attrs return } |
这种方式充分利用了 Assign 的能力。我们在上面 FirstOrCreate 的分析中可以看出,这里是会将 Assign 进来的属性应用到 struct 上,写入数据库的。区别只在于是插入(Insert)还是更新(Update)。
1 2 3 4 5 6 7 8 9 10 |
// 未找到 user,根据条件和 Assign 属性创建记录 db.Where(User{Name: "non_existing"}).Assign(User{Age: 20}).FirstOrCreate(&user) // SELECT * FROM users WHERE name = 'non_existing' ORDER BY id LIMIT 1; // INSERT INTO "users" (name, age) VALUES ("non_existing", 20); // user -> User{ID: 112, Name: "non_existing", Age: 20} // 找到了 `name` = `jinzhu` 的 user,依然会根据 Assign 更新记录 db.Where(User{Name: "jinzhu"}).Assign(User{Age: 20}).FirstOrCreate(&user) // SELECT * FROM users WHERE name = 'jinzhu' ORDER BY id LIMIT 1; // UPDATE users SET age=20 WHERE id = 111; // user -> User{ID: 111, Name: "jinzhu", Age: 20} |
所以,要实现 CreateOrUpdate,我们可以将需要 Update 的属性通过 Assign 函数放进来,随后如果通过 Where 找到了记录,也会将 Assign 属性应用上,随后 Update。
这样的思路一定是可以跑通的,但使用之前要看场景。
为什么?
因为参看上面源码我们就知道,FirstOrCreate 本质是 Select + Insert 或者 Select + Update。
无论怎样,都是两条 SQL,可能有并发安全问题。如果你的业务场景不存在并发,可以放心用 FirstOrCreate + Assign,功能更多,适配更多场景。
而如果可能有并发安全的坑,我们就要考虑方案二:Upsert。
鉴于 MySQL 提供了 ON DUPLICATE KEY UPDATE 的能力,我们可以充分利用唯一键的约束,来搞定并发场景下的 CreateOrUpdate。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import "gorm.io/gorm/clause" // 不处理冲突 DB.Clauses(clause.OnConflict{DoNothing: true}).Create(&user) // `id` 冲突时,将字段值更新为默认值 DB.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "id"}}, DoUpdates: clause.Assignments(map[string]interface{}{"role": "user"}), }).Create(&users) // MERGE INTO "users" USING *** WHEN NOT MATCHED THEN INSERT *** WHEN MATCHED THEN UPDATE SET ***; SQL Server // INSERT INTO `users` *** ON DUPLICATE KEY UPDATE ***; MySQL // Update columns to new value on `id` conflict DB.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "id"}}, DoUpdates: clause.AssignmentColumns([]string{"name", "age"}), }).Create(&users) // MERGE INTO "users" USING *** WHEN NOT MATCHED THEN INSERT *** WHEN MATCHED THEN UPDATE SET "name"="excluded"."name"; SQL Server // INSERT INTO "users" *** ON CONFLICT ("id") DO UPDATE SET "name"="excluded"."name", "age"="excluded"."age"; PostgreSQL // INSERT INTO `users` *** ON DUPLICATE KEY UPDATE `name`=VALUES(name),`age=VALUES(age); MySQL |
这里依赖了 GORM 的 Clauses 方法,我们来看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
type Interface interface { Name() string Build(Builder) MergeClause(*Clause) } // AddClause add clause func (stmt *Statement) AddClause(v clause.Interface) { if optimizer, ok := v.(StatementModifier); ok { optimizer.ModifyStatement(stmt) } else { name := v.Name() c := stmt.Clauses[name] c.Name = name v.MergeClause(&c) stmt.Clauses[name] = c } } |
这里添加进来一个 Clause 之后,会调用 MergeClause 将语句进行合并,而 OnConflict 的适配是这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package clause type OnConflict struct { Columns []Column Where Where TargetWhere Where OnConstraint string DoNothing bool DoUpdates Set UpdateAll bool } func (OnConflict) Name() string { return "ON CONFLICT" } // Build build onConflict clause func (onConflict OnConflict) Build(builder Builder) { if len(onConflict.Columns) > 0 { builder.WriteByte('(') for idx, column := range onConflict.Columns { if idx > 0 { builder.WriteByte(',') } builder.WriteQuoted(column) } builder.WriteString(`) `) } if len(onConflict.TargetWhere.Exprs) > 0 { builder.WriteString(" WHERE ") onConflict.TargetWhere.Build(builder) builder.WriteByte(' ') } if onConflict.OnConstraint != "" { builder.WriteString("ON CONSTRAINT ") builder.WriteString(onConflict.OnConstraint) builder.WriteByte(' ') } if onConflict.DoNothing { builder.WriteString("DO NOTHING") } else { builder.WriteString("DO UPDATE SET ") onConflict.DoUpdates.Build(builder) } if len(onConflict.Where.Exprs) > 0 { builder.WriteString(" WHERE ") onConflict.Where.Build(builder) builder.WriteByte(' ') } } // MergeClause merge onConflict clauses func (onConflict OnConflict) MergeClause(clause *Clause) { clause.Expression = onConflict } |
初阶的用法中,我们只需要关注三个属性:
1 2 3 4 5 |
type Set []Assignment type Assignment struct { Column Column Value interface{} } |
需要注意的是,所谓 OnConflict,并不一定是主键冲突,唯一键也包含在内。所以,使用 OnConflict 这套 Upsert 的先决条件是【唯一索引】或【主键】都可以。生成一条SQL语句,并发安全。
如果没有唯一索引的限制,我们就无法复用这个能力,需要考虑别的解法。如果