Last active
December 17, 2022 12:07
-
-
Save weedge/fd731ce8549cd99ccfc9491f6025ae8e to your computer and use it in GitHub Desktop.
change
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
"database/sql" | |
"github.com/google/uuid" | |
"gorm.io/driver/mysql" | |
"gorm.io/gorm" | |
) | |
var sqlDB *sql.DB | |
var gormDB *gorm.DB | |
type assetChangeHandler func(oldAssetCn int) (newAssetCn int) | |
func initMySqlClient() { | |
var err error | |
// refer https://github.com/go-sql-driver/mysql#dsn-data-source-name for details | |
//dsn := "polardbx_root:fSkUPYKP@tcp(127.0.0.1:64502)/pay?charset=utf8mb4&parseTime=True&loc=Local" | |
//dsn := "root:@tcp(127.0.0.1:4000)/pay?charset=utf8mb4&parseTime=True&loc=Local" | |
dsn := "root:123@tcp(127.0.0.1:3306)/pay?charset=utf8mb4&parseTime=True&loc=Local" | |
gormDB, err = gorm.Open(mysql.Open(dsn), &gorm.Config{}) | |
if err != nil { | |
panic(err.Error()) | |
} | |
sqlDB, err = gormDB.DB() | |
if err != nil { | |
panic(err.Error()) | |
} | |
// SetMaxIdleConns sets the maximum number of connections in the idle connection pool. | |
sqlDB.SetMaxIdleConns(10) | |
// SetMaxOpenConns sets the maximum number of open connections to the database. | |
sqlDB.SetMaxOpenConns(100) | |
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused. | |
sqlDB.SetConnMaxLifetime(time.Hour) | |
} | |
func closeClient() { | |
err := sqlDB.Close() | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
func init100wData() { | |
n := 1000000 | |
for i := 1; i <= n; i++ { | |
res, err := sqlDB.Exec(`insert into pay.user_asset (userId, assetCn, assetType, version, createdAt) values (?,?,?,?,now())`, i, 100000, 1, 1) | |
if err != nil { | |
log.Fatal(err) | |
} | |
println(res.LastInsertId()) | |
} | |
} | |
type UserAssetDto struct { | |
assetCn int | |
assetType int | |
userId int64 | |
version int | |
} | |
type UserAssetRecordDto struct { | |
userId int64 | |
opUserType int | |
bizId int64 | |
bizType int | |
objId string | |
eventId string | |
eventType string | |
record string | |
recordOp string | |
} | |
func (m *UserAssetRecordDto) addRecordTx(tx *sql.Tx) { | |
_, err := tx.Exec(`insert into pay.user_asset_record( userId,opUserType, bizId, bizType, objId, eventId,eventType,record, recordOp, createdAt) VALUES (?,?,?,?,?,?,?,?,?,now())`, | |
m.userId, m.opUserType, m.bizId, m.bizType, m.objId, m.eventId, m.eventType, m.record, m.recordOp) | |
if err != nil { | |
tx.Rollback() | |
log.Fatal(err) | |
} | |
} | |
func (m *UserAssetDto) ChangeUserAssetTx(tx *sql.Tx, handle assetChangeHandler) (err error) { | |
assetCn := 0 | |
ver := 0 | |
err = tx.QueryRow(`SELECT assetCn,version FROM pay.user_asset WHERE userId=? and assetType=? FOR UPDATE`, m.userId, m.assetType).Scan(&assetCn, &ver) | |
if err != nil && err != sql.ErrNoRows { | |
tx.Rollback() | |
return | |
} | |
newAssetCn := handle(assetCn) | |
//println("newAssetCn", newAssetCn) | |
if newAssetCn < 0 { | |
tx.Rollback() | |
return errors.New("asset no enough") | |
} | |
//time.Sleep(3 * time.Second) | |
_, err = tx.Exec(`insert into pay.user_asset(userId,assetType,assetCn,version,createdAt) value(?,?,?,?,?) on duplicate key update assetCn=?, version=?`, m.userId, m.assetType, newAssetCn, 1, time.Now(), newAssetCn, ver+1) | |
if err != nil { | |
tx.Rollback() | |
return | |
} | |
return | |
} | |
// --- multi change is ok --- | |
func selectForUpdateAndUpdateMultiTx() { | |
ctx := context.Background() | |
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) | |
if err != nil { | |
log.Fatal(err) | |
} | |
sendUser := &UserAssetDto{userId: 100, assetType: 1} | |
err = sendUser.ChangeUserAssetTx(tx, func(oldAssetCn int) (newAssetCn int) { | |
newAssetCn = oldAssetCn - 1 | |
return | |
}) | |
if err != nil { | |
return | |
} | |
recUser := &UserAssetDto{userId: 101, assetType: 1} | |
err = recUser.ChangeUserAssetTx(tx, func(oldAssetCn int) (newAssetCn int) { | |
newAssetCn = oldAssetCn + 1 | |
return | |
}) | |
if err != nil { | |
return | |
} | |
eventId := uuid.New().String() | |
userAssertRecord := &UserAssetRecordDto{ | |
userId: 100, | |
opUserType: 1, | |
bizId: 110, | |
bizType: 1, | |
eventId: eventId, | |
eventType: "interactGift", | |
objId: "10", | |
record: "send gift in the room", | |
recordOp: "sendGift", | |
} | |
userAssertRecord.addRecordTx(tx) | |
toUserAssertRecord := &UserAssetRecordDto{ | |
userId: 101, | |
opUserType: 2, | |
bizId: 110, | |
bizType: 1, | |
eventId: eventId, | |
eventType: "interactGift", | |
objId: "10", | |
record: "send gift in the room", | |
recordOp: "sendGift", | |
} | |
toUserAssertRecord.addRecordTx(tx) | |
if err = tx.Commit(); err != nil { | |
log.Fatal(err) | |
} | |
} | |
// --- single change is ok --- | |
func selectForUpdateAndUpdateSingleTx(handle assetChangeHandler) { | |
ctx := context.Background() | |
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) | |
if err != nil { | |
log.Fatal(err) | |
} | |
assetCn := 0 | |
err = tx.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=? FOR UPDATE`, 100, 1).Scan(&assetCn) | |
if err != nil { | |
tx.Rollback() | |
log.Fatal(err) | |
} | |
newAssetCn := handle(assetCn) | |
//println("newAssetCn", newAssetCn) | |
if newAssetCn < 0 { | |
tx.Commit() | |
return | |
} | |
//time.Sleep(3 * time.Second) | |
_, err = tx.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=?`, newAssetCn, 100, 1) | |
if err != nil { | |
tx.Rollback() | |
log.Fatal(err) | |
} | |
/* | |
rowCn, err := res.RowsAffected() | |
if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Printf("affected Row cn:%d\n", rowCn) | |
*/ | |
if err = tx.Commit(); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func selectAndCasUpdateSingleFor(handle assetChangeHandler) { | |
for { | |
assetCn := 0 | |
err := sqlDB.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=?`, 100, 1).Scan(&assetCn) | |
if err != nil { | |
log.Fatal(err) | |
return | |
} | |
newAssetCn := handle(assetCn) | |
//println("newAssetCn", newAssetCn) | |
if newAssetCn < 0 { | |
return | |
} | |
//time.Sleep(3 * time.Second) | |
res, err := sqlDB.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=? and assetCn=?`, newAssetCn, 100, 1, assetCn) | |
if err != nil { | |
log.Fatal(err) | |
return | |
} | |
rowCn, err := res.RowsAffected() | |
if err != nil { | |
log.Fatal(err) | |
return | |
} | |
//fmt.Printf("affected Row cn:%d\n", rowCn) | |
if rowCn == 1 { | |
break | |
} | |
} | |
} | |
// ---- below not ok --------------- | |
func selectAndUpdate() { | |
assetCn := 0 | |
err := sqlDB.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=?`, 4, 1).Scan(&assetCn) | |
if err != nil { | |
log.Fatal(err) | |
} | |
newAssetCn := assetCn - 1 | |
println("newAssetCn", newAssetCn) | |
if newAssetCn < 0 { | |
return | |
} | |
//time.Sleep(3 * time.Second) | |
res, err := sqlDB.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=? and assetCn=?`, newAssetCn, 4, 1, assetCn) | |
//res, err = sqlDB.Exec(`update pay.user_asset set assetCn=assetCn+? where id=?; `, -1, 1) | |
if err != nil { | |
log.Fatal(err) | |
} | |
rowCn, err := res.RowsAffected() | |
if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Printf("affected Row cn:%d\n", rowCn) | |
} | |
func selectForUpdateAndUpdate() { | |
assetCn := 0 | |
err := sqlDB.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=? FOR UPDATE`, 100, 1).Scan(&assetCn) | |
if err != nil { | |
log.Fatal(err) | |
} | |
newAssetCn := assetCn - 1 | |
println("newAssetCn", newAssetCn) | |
if newAssetCn < 0 { | |
return | |
} | |
//time.Sleep(3 * time.Second) | |
_, err = sqlDB.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=? and assetCn=?`, newAssetCn, 100, 1, assetCn) | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
// default autocommit is open, is ok | |
func update() { | |
res, err := sqlDB.Exec(`update pay.user_asset set assetCn=assetCn+? where userId=? and assetCn+?>=0; `, -1, 2, -1) | |
//res, err = sqlDB.Exec(`update pay.user_asset set assetCn=assetCn+? where id=?; `, -1, 1) | |
if err != nil { | |
log.Fatal(err) | |
} | |
rowCn, err := res.RowsAffected() | |
if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Printf("affected Row cn:%d\n", rowCn) | |
} | |
// is ok | |
func updateTx() { | |
ctx := context.Background() | |
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) | |
if err != nil { | |
log.Fatal(err) | |
} | |
/* | |
var assetCn int | |
err = tx.QueryRow(`SELECT assetCn FROM user_asset WHERE userId=? and assetType=? FOR UPDATE`, 1, 1).Scan(&assetCn) | |
if err != nil { | |
_ = tx.Rollback() | |
log.Fatal(err) | |
} | |
*/ | |
res, err := tx.Exec(`update pay.user_asset set assetCn=assetCn+? where userId=? and assetType=?; `, 1, 1, 1) | |
if err != nil { | |
_ = tx.Rollback() | |
log.Fatal(err) | |
} | |
rowCn, err := res.RowsAffected() | |
if err != nil { | |
_ = tx.Rollback() | |
log.Fatal(err) | |
} | |
fmt.Printf("affected Row cn:%d\n", rowCn) | |
if err = tx.Commit(); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func main() { | |
initMySqlClient() | |
//init100wData() | |
n := 100 | |
//concurrencyTest(n, "selectForUpdateAndUpdate", selectForUpdateAndUpdate) | |
concurrencyTest(n, "selectForUpdateAndUpdateMultiTx", selectForUpdateAndUpdateMultiTx) | |
return | |
handle := assetChangeHandler(func(oldAssetCn int) (newAssetCn int) { | |
newAssetCn = oldAssetCn - 1 | |
return | |
}) | |
//适合并发量大的场景,不太适合长事务,事务超时时间需要数据库中调整对应参数 | |
concurrencyTest(n, "selectForUpdateAndUpdateSingleTx", func() { | |
selectForUpdateAndUpdateSingleTx(handle) | |
}) | |
//适合并发量小的场景,不用担心长事务,业务应用程序来cas循环处理 | |
concurrencyTest(n, "selectAndCasUpdateSingleFor", func() { | |
selectAndCasUpdateSingleFor(handle) | |
}) | |
return | |
} | |
func concurrencyTest(n int, name string, handle func()) { | |
startTime := time.Now() | |
wg := &sync.WaitGroup{} | |
wg.Add(n) | |
for i := 1; i <= n; i++ { | |
go func() { | |
defer wg.Done() | |
handle() | |
}() | |
} | |
wg.Wait() | |
println(name, "concurrency", n, "cost", time.Now().Sub(startTime).Microseconds(), "microsecond") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment