Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
MongoDB Cluster Monitoring using Go
package main
import (
"context"
"encoding/json"
"fmt"
gohandlers "github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/hashicorp/go-hclog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"net/http"
"os"
"sync/atomic"
"time"
)
var (
// mongo conn pool
connPoolCreated uint64
connSuccess uint64
connClosed uint64
connReturned uint64
connPoolCleared uint64
// cmd Monitor
cmdMonitorStart = []*event.CommandStartedEvent{}
cmdMonitorSucceed = []*event.CommandSucceededEvent{}
cmdMonitorFailed = []*event.CommandFailedEvent{}
// server Heartbeat
serverHeartbeatStart = []*event.ServerHeartbeatStartedEvent{}
serverHeartbeatOpenning = []*event.ServerOpeningEvent{}
serverHeartbeatSucceeded = []*event.ServerHeartbeatSucceededEvent{}
serverHeartbeatFailed = []*event.ServerHeartbeatFailedEvent{}
serverHeartbeatClosed = []*event.ServerClosedEvent{}
ctx = context.Background()
client *mongo.Client
err error
log hclog.Logger
)
type MongoMonitor struct {
// mongo conn pool
ConnPoolCreated uint64 `json:"connPoolCreated"`
ConnSuccess uint64 `json:"connSuccess"`
ConnClosed uint64 `json:"connClosed"`
ConnReturned uint64 `json:"connReturned"`
ConnPoolCleared uint64 `json:"connPoolCleared"`
// server Heartbeat
ServerHeartbeatStart []*event.ServerHeartbeatStartedEvent `json:"serverHeartbeatStart"`
ServerHeartbeatOpenning []*event.ServerOpeningEvent `json:"serverHeartbeatOpenning"`
ServerHeartbeatSucceeded []*event.ServerHeartbeatSucceededEvent `json:"serverHeartbeatSucceeded"`
ServerHeartbeatFailed []*event.ServerHeartbeatFailedEvent `json:"serverHeartbeatFailed"`
ServerHeartbeatClosed []*event.ServerClosedEvent `json:"serverHeartbeatClosed"`
// mongo commands
CmdMonitorStart []*event.CommandStartedEvent `json:"cmdMonitorStart"`
CmdMonitorSucceed []*event.CommandSucceededEvent `json:"cmdMonitorSucceed"`
CmdMonitorFailed []*event.CommandFailedEvent `json:"cmdMonitorFailed"`
}
func monitors() (*event.PoolMonitor, *event.CommandMonitor, *event.ServerMonitor) {
poolMonitor := &event.PoolMonitor{
Event: func(e *event.PoolEvent) {
switch e.Type {
case event.PoolCreated:
atomic.AddUint64(&connPoolCreated, 1)
log.Info("PoolMonitor", "PoolCreated", getConnPoolCreated())
case event.GetSucceeded:
atomic.AddUint64(&connSuccess, 1)
log.Info("PoolMonitor", "ConnSucces", getConnPoolSuccess())
case event.ConnectionClosed:
atomic.AddUint64(&connClosed, 1)
log.Info("PoolMonitor", "ConnClosed", getConnPoolClosed())
case event.ConnectionReturned:
atomic.AddUint64(&connReturned, 1)
log.Info("PoolMonitor", "ConnReturned", getConnPoolReturned())
case event.PoolCleared:
atomic.AddUint64(&connPoolCleared, 1)
log.Info("PoolMonitor", "ConnPoolCleared", getConnPoolCleared())
}
},
}
// cmd Monitor
cmdMonitorStart = []*event.CommandStartedEvent{}
cmdMonitorSucceed = []*event.CommandSucceededEvent{}
cmdMonitorFailed = []*event.CommandFailedEvent{}
// server Heartbeat
serverHeartbeatStart = []*event.ServerHeartbeatStartedEvent{}
serverHeartbeatOpenning = []*event.ServerOpeningEvent{}
serverHeartbeatSucceeded = []*event.ServerHeartbeatSucceededEvent{}
serverHeartbeatFailed = []*event.ServerHeartbeatFailedEvent{}
serverHeartbeatClosed = []*event.ServerClosedEvent{}
cmdMonitor := &event.CommandMonitor{
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
cmdMonitorStart = append(cmdMonitorStart, evt)
},
Succeeded: func(_ context.Context, evt *event.CommandSucceededEvent) {
cmdMonitorSucceed = append(cmdMonitorSucceed, evt)
},
Failed: func(_ context.Context, evt *event.CommandFailedEvent) {
cmdMonitorFailed = append(cmdMonitorFailed, evt)
},
}
serverMonitor := &event.ServerMonitor{
ServerHeartbeatStarted: func(evt *event.ServerHeartbeatStartedEvent) {
serverHeartbeatStart = append(serverHeartbeatStart, evt)
},
ServerOpening: func(evt *event.ServerOpeningEvent) {
serverHeartbeatOpenning = append(serverHeartbeatOpenning, evt)
},
ServerHeartbeatSucceeded: func(evt *event.ServerHeartbeatSucceededEvent) {
serverHeartbeatSucceeded = append(serverHeartbeatSucceeded, evt)
},
ServerHeartbeatFailed: func(evt *event.ServerHeartbeatFailedEvent) {
serverHeartbeatFailed = append(serverHeartbeatFailed, evt)
},
ServerClosed: func(evt *event.ServerClosedEvent) {
serverHeartbeatClosed = append(serverHeartbeatClosed, evt)
},
}
return poolMonitor, cmdMonitor, serverMonitor
}
func ReadMongoMonitor() *MongoMonitor {
return &MongoMonitor{
ConnPoolCreated: getConnPoolCreated(),
ConnSuccess: getConnPoolSuccess(),
ConnClosed: getConnPoolClosed(),
ConnReturned: getConnPoolReturned(),
ConnPoolCleared: getConnPoolCleared(),
ServerHeartbeatStart: serverHeartbeatStart,
ServerHeartbeatOpenning: serverHeartbeatOpenning,
ServerHeartbeatSucceeded: serverHeartbeatSucceeded,
ServerHeartbeatFailed: serverHeartbeatFailed,
ServerHeartbeatClosed: serverHeartbeatClosed,
CmdMonitorStart: cmdMonitorStart,
CmdMonitorSucceed: cmdMonitorSucceed,
CmdMonitorFailed: cmdMonitorFailed,
}
}
func getConnPoolCreated() uint64 {
return atomic.LoadUint64(&connPoolCreated)
}
func getConnPoolSuccess() uint64 {
return atomic.LoadUint64(&connSuccess)
}
func getConnPoolReturned() uint64 {
return atomic.LoadUint64(&connReturned)
}
func getConnPoolClosed() uint64 {
return atomic.LoadUint64(&connClosed)
}
func getConnPoolCleared() uint64 {
return atomic.LoadUint64(&connPoolCleared)
}
func main() {
PORT := ":8000"
log = hclog.Default()
router := mux.NewRouter()
router.StrictSlash(true)
// GET
getRequest := router.Methods(http.MethodGet).Subrouter()
getRequest.HandleFunc("/api/monitor-mongodb-cluster", MonitorMongoDBCluster)
// mongo connection
mongoConnection()
cors := gohandlers.CORS(gohandlers.AllowedOrigins([]string{"*"}))
// create the http server
server := http.Server{
Addr: PORT,
Handler: cors(router),
ErrorLog: log.StandardLogger(&hclog.StandardLoggerOptions{}),
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
}
log.Info("Starting server on", "PORT", PORT)
err = server.ListenAndServe()
if err != nil {
log.Error("Unable to start server", "error", err)
os.Exit(1)
}
}
func mongoConnection() {
poolMonitor, cmdMonitor, serverMonitor := monitors()
client, err = mongo.NewClient(options.Client().
ApplyURI("mongodb://pd123:pd123@localhost:27017/product?authSource=product&replicaSet=rs0").
SetPoolMonitor(poolMonitor).
SetMonitor(cmdMonitor).
SetServerMonitor(serverMonitor))
if err != nil {
fmt.Println("Error initializing to MongoDB : " + err.Error())
return
}
ctx, _ = context.WithTimeout(context.Background(), 10*time.Second)
err = client.Connect(ctx)
if err != nil {
fmt.Println("Error connecting to MongoDB : " + err.Error())
return
}
log.Info("MongoDB Connected!")
err = client.Ping(ctx, readpref.Primary())
if err != nil {
log.Error("Mongo Ping Error : ", err.Error())
return
}
log.Info("MongoDB Connection Pinged!")
databases, err := client.ListDatabaseNames(ctx, bson.M{})
if err != nil {
log.Error("Mongo ListDatabase Error : ", err.Error())
return
}
log.Info("database", "list", databases)
}
func MonitorMongoDBCluster(w http.ResponseWriter, r *http.Request) {
respondJSON(w, http.StatusOK, ReadMongoMonitor())
}
func respondJSON(w http.ResponseWriter, status int, payload interface{}) {
w.Header().Set("content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(payload)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment