Skip to content

Instantly share code, notes, and snippets.

@shivam-tripathi
Created December 30, 2020 07:37
Show Gist options
  • Save shivam-tripathi/b884ee0354bab764739c2e8cbc419bb5 to your computer and use it in GitHub Desktop.
Save shivam-tripathi/b884ee0354bab764739c2e8cbc419bb5 to your computer and use it in GitHub Desktop.
Stress test mongo with specified concurrency limit
package main
import (
"context"
"errors"
"log"
"math/rand"
"reflect"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)
// connURI is connection uri for connect
const connURI = "mongodb://localhost:27017"
// database name
const dbName = "dbName"
// collection name
const colName = "colName"
// dataLim is number of records to be inserted
const dataLim int = 1e3
// concurrencyLimit is limit to number of concurrent inserts
const concurrencyLimit = 10
// Data struct is data to be inserted into mongodb
type Data struct {
ID primitive.ObjectID `bson:"_id" json:"_id"`
CreatedAt primitive.DateTime `bson:"createdAt" json:"createdAt"`
}
// NewData is the method which returns a new Data instance
func NewData() Data {
return Data{
ID: primitive.NewObjectID(),
CreatedAt: primitive.NewDateTimeFromTime(time.Now()),
}
}
func HandleError(err error) {
if err != nil {
log.Fatalln(err)
}
}
func Connect() *mongo.Client {
client, err := mongo.NewClient(options.Client().ApplyURI(connURI))
HandleError(err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = client.Connect(ctx)
HandleError(err)
return client
}
// process is method which process data
func process(db *mongo.Database, dataChan chan Data, done <-chan bool) {
col := db.Collection("responses")
// Allow upto 4 tasks
taskManager := make(chan int, concurrencyLimit)
defer close(taskManager)
for i := 0; i < concurrencyLimit; i++ {
taskManager <- i
}
for {
select {
case <-done:
return
case taskID := <-taskManager:
go func() {
data := <-dataChan
ctx, cancelInsert := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelInsert()
col.InsertOne(ctx, data)
ctx, cancelFind := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFind()
var savedData Data
err := col.FindOne(ctx, bson.M{"_id": data.ID}).Decode(&savedData)
if err == nil {
if !reflect.DeepEqual(data, savedData) {
panic(errors.New("Saved data is not same as inserted data"))
}
} else {
panic(err)
}
taskManager <- taskID // put back task into channel
}()
}
}
}
func main() {
client := Connect()
// Verify connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
HandleError(client.Ping(ctx, readpref.SecondaryPreferred()))
db := client.Database(dbName)
// Create collection if missing
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) // 5 second timeout
defer cancel()
db.CreateCollection(ctx, colName)
rand.Seed(time.Now().UnixNano())
// Channel which provides data
dataChan := make(chan Data)
defer close(dataChan)
// Channel for cancellation / prompt for op to end
done := make(chan bool)
defer close(done)
// Initiate process method
go process(db, dataChan, done)
idx := 0
for idx < dataLim {
dataChan <- NewData()
idx++
}
done <- true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment