Created
December 30, 2020 07:37
-
-
Save shivam-tripathi/b884ee0354bab764739c2e8cbc419bb5 to your computer and use it in GitHub Desktop.
Stress test mongo with specified concurrency limit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"log" | |
"math/rand" | |
"reflect" | |
"time" | |
"go.mongodb.org/mongo-driver/bson" | |
"go.mongodb.org/mongo-driver/bson/primitive" | |
"go.mongodb.org/mongo-driver/mongo" | |
"go.mongodb.org/mongo-driver/mongo/options" | |
"go.mongodb.org/mongo-driver/mongo/readpref" | |
) | |
// connURI is connection uri for connect | |
const connURI = "mongodb://localhost:27017" | |
// database name | |
const dbName = "dbName" | |
// collection name | |
const colName = "colName" | |
// dataLim is number of records to be inserted | |
const dataLim int = 1e3 | |
// concurrencyLimit is limit to number of concurrent inserts | |
const concurrencyLimit = 10 | |
// Data struct is data to be inserted into mongodb | |
type Data struct { | |
ID primitive.ObjectID `bson:"_id" json:"_id"` | |
CreatedAt primitive.DateTime `bson:"createdAt" json:"createdAt"` | |
} | |
// NewData is the method which returns a new Data instance | |
func NewData() Data { | |
return Data{ | |
ID: primitive.NewObjectID(), | |
CreatedAt: primitive.NewDateTimeFromTime(time.Now()), | |
} | |
} | |
func HandleError(err error) { | |
if err != nil { | |
log.Fatalln(err) | |
} | |
} | |
func Connect() *mongo.Client { | |
client, err := mongo.NewClient(options.Client().ApplyURI(connURI)) | |
HandleError(err) | |
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
defer cancel() | |
err = client.Connect(ctx) | |
HandleError(err) | |
return client | |
} | |
// process is method which process data | |
func process(db *mongo.Database, dataChan chan Data, done <-chan bool) { | |
col := db.Collection("responses") | |
// Allow upto 4 tasks | |
taskManager := make(chan int, concurrencyLimit) | |
defer close(taskManager) | |
for i := 0; i < concurrencyLimit; i++ { | |
taskManager <- i | |
} | |
for { | |
select { | |
case <-done: | |
return | |
case taskID := <-taskManager: | |
go func() { | |
data := <-dataChan | |
ctx, cancelInsert := context.WithTimeout(context.Background(), 1*time.Second) | |
defer cancelInsert() | |
col.InsertOne(ctx, data) | |
ctx, cancelFind := context.WithTimeout(context.Background(), 1*time.Second) | |
defer cancelFind() | |
var savedData Data | |
err := col.FindOne(ctx, bson.M{"_id": data.ID}).Decode(&savedData) | |
if err == nil { | |
if !reflect.DeepEqual(data, savedData) { | |
panic(errors.New("Saved data is not same as inserted data")) | |
} | |
} else { | |
panic(err) | |
} | |
taskManager <- taskID // put back task into channel | |
}() | |
} | |
} | |
} | |
func main() { | |
client := Connect() | |
// Verify connection | |
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
HandleError(client.Ping(ctx, readpref.SecondaryPreferred())) | |
db := client.Database(dbName) | |
// Create collection if missing | |
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) // 5 second timeout | |
defer cancel() | |
db.CreateCollection(ctx, colName) | |
rand.Seed(time.Now().UnixNano()) | |
// Channel which provides data | |
dataChan := make(chan Data) | |
defer close(dataChan) | |
// Channel for cancellation / prompt for op to end | |
done := make(chan bool) | |
defer close(done) | |
// Initiate process method | |
go process(db, dataChan, done) | |
idx := 0 | |
for idx < dataLim { | |
dataChan <- NewData() | |
idx++ | |
} | |
done <- true | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment