Skip to content

Instantly share code, notes, and snippets.

@weedge
Last active December 17, 2022 12:07
Show Gist options
  • Save weedge/fd731ce8549cd99ccfc9491f6025ae8e to your computer and use it in GitHub Desktop.
Save weedge/fd731ce8549cd99ccfc9491f6025ae8e to your computer and use it in GitHub Desktop.
change
package main
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
"database/sql"
"github.com/google/uuid"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
var sqlDB *sql.DB
var gormDB *gorm.DB
type assetChangeHandler func(oldAssetCn int) (newAssetCn int)
func initMySqlClient() {
var err error
// refer https://github.com/go-sql-driver/mysql#dsn-data-source-name for details
//dsn := "polardbx_root:fSkUPYKP@tcp(127.0.0.1:64502)/pay?charset=utf8mb4&parseTime=True&loc=Local"
//dsn := "root:@tcp(127.0.0.1:4000)/pay?charset=utf8mb4&parseTime=True&loc=Local"
dsn := "root:123@tcp(127.0.0.1:3306)/pay?charset=utf8mb4&parseTime=True&loc=Local"
gormDB, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
panic(err.Error())
}
sqlDB, err = gormDB.DB()
if err != nil {
panic(err.Error())
}
// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
sqlDB.SetMaxIdleConns(10)
// SetMaxOpenConns sets the maximum number of open connections to the database.
sqlDB.SetMaxOpenConns(100)
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
sqlDB.SetConnMaxLifetime(time.Hour)
}
func closeClient() {
err := sqlDB.Close()
if err != nil {
log.Fatal(err)
}
}
func init100wData() {
n := 1000000
for i := 1; i <= n; i++ {
res, err := sqlDB.Exec(`insert into pay.user_asset (userId, assetCn, assetType, version, createdAt) values (?,?,?,?,now())`, i, 100000, 1, 1)
if err != nil {
log.Fatal(err)
}
println(res.LastInsertId())
}
}
type UserAssetDto struct {
assetCn int
assetType int
userId int64
version int
}
type UserAssetRecordDto struct {
userId int64
opUserType int
bizId int64
bizType int
objId string
eventId string
eventType string
record string
recordOp string
}
func (m *UserAssetRecordDto) addRecordTx(tx *sql.Tx) {
_, err := tx.Exec(`insert into pay.user_asset_record( userId,opUserType, bizId, bizType, objId, eventId,eventType,record, recordOp, createdAt) VALUES (?,?,?,?,?,?,?,?,?,now())`,
m.userId, m.opUserType, m.bizId, m.bizType, m.objId, m.eventId, m.eventType, m.record, m.recordOp)
if err != nil {
tx.Rollback()
log.Fatal(err)
}
}
func (m *UserAssetDto) ChangeUserAssetTx(tx *sql.Tx, handle assetChangeHandler) (err error) {
assetCn := 0
ver := 0
err = tx.QueryRow(`SELECT assetCn,version FROM pay.user_asset WHERE userId=? and assetType=? FOR UPDATE`, m.userId, m.assetType).Scan(&assetCn, &ver)
if err != nil && err != sql.ErrNoRows {
tx.Rollback()
return
}
newAssetCn := handle(assetCn)
//println("newAssetCn", newAssetCn)
if newAssetCn < 0 {
tx.Rollback()
return errors.New("asset no enough")
}
//time.Sleep(3 * time.Second)
_, err = tx.Exec(`insert into pay.user_asset(userId,assetType,assetCn,version,createdAt) value(?,?,?,?,?) on duplicate key update assetCn=?, version=?`, m.userId, m.assetType, newAssetCn, 1, time.Now(), newAssetCn, ver+1)
if err != nil {
tx.Rollback()
return
}
return
}
// --- multi change is ok ---
func selectForUpdateAndUpdateMultiTx() {
ctx := context.Background()
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
log.Fatal(err)
}
sendUser := &UserAssetDto{userId: 100, assetType: 1}
err = sendUser.ChangeUserAssetTx(tx, func(oldAssetCn int) (newAssetCn int) {
newAssetCn = oldAssetCn - 1
return
})
if err != nil {
return
}
recUser := &UserAssetDto{userId: 101, assetType: 1}
err = recUser.ChangeUserAssetTx(tx, func(oldAssetCn int) (newAssetCn int) {
newAssetCn = oldAssetCn + 1
return
})
if err != nil {
return
}
eventId := uuid.New().String()
userAssertRecord := &UserAssetRecordDto{
userId: 100,
opUserType: 1,
bizId: 110,
bizType: 1,
eventId: eventId,
eventType: "interactGift",
objId: "10",
record: "send gift in the room",
recordOp: "sendGift",
}
userAssertRecord.addRecordTx(tx)
toUserAssertRecord := &UserAssetRecordDto{
userId: 101,
opUserType: 2,
bizId: 110,
bizType: 1,
eventId: eventId,
eventType: "interactGift",
objId: "10",
record: "send gift in the room",
recordOp: "sendGift",
}
toUserAssertRecord.addRecordTx(tx)
if err = tx.Commit(); err != nil {
log.Fatal(err)
}
}
// --- single change is ok ---
func selectForUpdateAndUpdateSingleTx(handle assetChangeHandler) {
ctx := context.Background()
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
log.Fatal(err)
}
assetCn := 0
err = tx.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=? FOR UPDATE`, 100, 1).Scan(&assetCn)
if err != nil {
tx.Rollback()
log.Fatal(err)
}
newAssetCn := handle(assetCn)
//println("newAssetCn", newAssetCn)
if newAssetCn < 0 {
tx.Commit()
return
}
//time.Sleep(3 * time.Second)
_, err = tx.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=?`, newAssetCn, 100, 1)
if err != nil {
tx.Rollback()
log.Fatal(err)
}
/*
rowCn, err := res.RowsAffected()
if err != nil {
log.Fatal(err)
}
fmt.Printf("affected Row cn:%d\n", rowCn)
*/
if err = tx.Commit(); err != nil {
log.Fatal(err)
}
}
func selectAndCasUpdateSingleFor(handle assetChangeHandler) {
for {
assetCn := 0
err := sqlDB.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=?`, 100, 1).Scan(&assetCn)
if err != nil {
log.Fatal(err)
return
}
newAssetCn := handle(assetCn)
//println("newAssetCn", newAssetCn)
if newAssetCn < 0 {
return
}
//time.Sleep(3 * time.Second)
res, err := sqlDB.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=? and assetCn=?`, newAssetCn, 100, 1, assetCn)
if err != nil {
log.Fatal(err)
return
}
rowCn, err := res.RowsAffected()
if err != nil {
log.Fatal(err)
return
}
//fmt.Printf("affected Row cn:%d\n", rowCn)
if rowCn == 1 {
break
}
}
}
// ---- below not ok ---------------
func selectAndUpdate() {
assetCn := 0
err := sqlDB.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=?`, 4, 1).Scan(&assetCn)
if err != nil {
log.Fatal(err)
}
newAssetCn := assetCn - 1
println("newAssetCn", newAssetCn)
if newAssetCn < 0 {
return
}
//time.Sleep(3 * time.Second)
res, err := sqlDB.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=? and assetCn=?`, newAssetCn, 4, 1, assetCn)
//res, err = sqlDB.Exec(`update pay.user_asset set assetCn=assetCn+? where id=?; `, -1, 1)
if err != nil {
log.Fatal(err)
}
rowCn, err := res.RowsAffected()
if err != nil {
log.Fatal(err)
}
fmt.Printf("affected Row cn:%d\n", rowCn)
}
func selectForUpdateAndUpdate() {
assetCn := 0
err := sqlDB.QueryRow(`SELECT assetCn FROM pay.user_asset WHERE userId=? and assetType=? FOR UPDATE`, 100, 1).Scan(&assetCn)
if err != nil {
log.Fatal(err)
}
newAssetCn := assetCn - 1
println("newAssetCn", newAssetCn)
if newAssetCn < 0 {
return
}
//time.Sleep(3 * time.Second)
_, err = sqlDB.Exec(`update pay.user_asset set assetCn=? where userId=? and assetType=? and assetCn=?`, newAssetCn, 100, 1, assetCn)
if err != nil {
log.Fatal(err)
}
}
// default autocommit is open, is ok
func update() {
res, err := sqlDB.Exec(`update pay.user_asset set assetCn=assetCn+? where userId=? and assetCn+?>=0; `, -1, 2, -1)
//res, err = sqlDB.Exec(`update pay.user_asset set assetCn=assetCn+? where id=?; `, -1, 1)
if err != nil {
log.Fatal(err)
}
rowCn, err := res.RowsAffected()
if err != nil {
log.Fatal(err)
}
fmt.Printf("affected Row cn:%d\n", rowCn)
}
// is ok
func updateTx() {
ctx := context.Background()
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
log.Fatal(err)
}
/*
var assetCn int
err = tx.QueryRow(`SELECT assetCn FROM user_asset WHERE userId=? and assetType=? FOR UPDATE`, 1, 1).Scan(&assetCn)
if err != nil {
_ = tx.Rollback()
log.Fatal(err)
}
*/
res, err := tx.Exec(`update pay.user_asset set assetCn=assetCn+? where userId=? and assetType=?; `, 1, 1, 1)
if err != nil {
_ = tx.Rollback()
log.Fatal(err)
}
rowCn, err := res.RowsAffected()
if err != nil {
_ = tx.Rollback()
log.Fatal(err)
}
fmt.Printf("affected Row cn:%d\n", rowCn)
if err = tx.Commit(); err != nil {
log.Fatal(err)
}
}
func main() {
initMySqlClient()
//init100wData()
n := 100
//concurrencyTest(n, "selectForUpdateAndUpdate", selectForUpdateAndUpdate)
concurrencyTest(n, "selectForUpdateAndUpdateMultiTx", selectForUpdateAndUpdateMultiTx)
return
handle := assetChangeHandler(func(oldAssetCn int) (newAssetCn int) {
newAssetCn = oldAssetCn - 1
return
})
//适合并发量大的场景,不太适合长事务,事务超时时间需要数据库中调整对应参数
concurrencyTest(n, "selectForUpdateAndUpdateSingleTx", func() {
selectForUpdateAndUpdateSingleTx(handle)
})
//适合并发量小的场景,不用担心长事务,业务应用程序来cas循环处理
concurrencyTest(n, "selectAndCasUpdateSingleFor", func() {
selectAndCasUpdateSingleFor(handle)
})
return
}
func concurrencyTest(n int, name string, handle func()) {
startTime := time.Now()
wg := &sync.WaitGroup{}
wg.Add(n)
for i := 1; i <= n; i++ {
go func() {
defer wg.Done()
handle()
}()
}
wg.Wait()
println(name, "concurrency", n, "cost", time.Now().Sub(startTime).Microseconds(), "microsecond")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment