From b5f8ee61a7f7e9bd291dd26b0585d03eb686c941 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.46.3. Signed-off-by: Daniel Baumann --- .../go.d.plugin/modules/mongodb/client.go | 299 +++++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 src/go/collectors/go.d.plugin/modules/mongodb/client.go (limited to 'src/go/collectors/go.d.plugin/modules/mongodb/client.go') diff --git a/src/go/collectors/go.d.plugin/modules/mongodb/client.go b/src/go/collectors/go.d.plugin/modules/mongodb/client.go new file mode 100644 index 000000000..eb36fa8ac --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/mongodb/client.go @@ -0,0 +1,299 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package mongo + +import ( + "context" + "fmt" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +const ( + mongos = "mongos" +) + +type mongoConn interface { + serverStatus() (*documentServerStatus, error) + listDatabaseNames() ([]string, error) + dbStats(name string) (*documentDBStats, error) + isReplicaSet() bool + isMongos() bool + replSetGetStatus() (*documentReplSetStatus, error) + shardNodes() (*documentShardNodesResult, error) + shardDatabasesPartitioning() (*documentPartitionedResult, error) + shardCollectionsPartitioning() (*documentPartitionedResult, error) + shardChunks() (map[string]int64, error) + initClient(uri string, timeout time.Duration) error + close() error +} + +type mongoClient struct { + client *mongo.Client + timeout time.Duration + replicaSetFlag *bool + mongosFlag *bool +} + +func (c *mongoClient) serverStatus() (*documentServerStatus, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + cmd := bson.D{ + {Key: "serverStatus", Value: 1}, + {Key: "repl", Value: 1}, + {Key: "metrics", + Value: bson.D{ + {Key: "document", Value: true}, + {Key: "cursor", Value: true}, + {Key: "queryExecutor", Value: true}, + {Key: "apiVersions", Value: false}, + {Key: "aggStageCounters", Value: false}, + {Key: "commands", Value: false}, + {Key: "dotsAndDollarsFields", Value: false}, + {Key: "getLastError", Value: false}, + {Key: "mongos", Value: false}, + {Key: "operation", Value: false}, + {Key: "operatorCounters", Value: false}, + {Key: "query", Value: false}, + {Key: "record", Value: false}, + {Key: "repl", Value: false}, + {Key: "storage", Value: false}, + {Key: "ttl", Value: false}, + }, + }, + } + var status *documentServerStatus + + err := c.client.Database("admin").RunCommand(ctx, cmd).Decode(&status) + if err != nil { + return nil, err + } + + isReplSet := status.Repl != nil + c.replicaSetFlag = &isReplSet + + isMongos := status.Process == mongos + c.mongosFlag = &isMongos + + return status, err +} + +func (c *mongoClient) listDatabaseNames() ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + return c.client.ListDatabaseNames(ctx, bson.M{}) +} + +func (c *mongoClient) dbStats(name string) (*documentDBStats, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + cmd := bson.M{"dbStats": 1} + var stats documentDBStats + + if err := c.client.Database(name).RunCommand(ctx, cmd).Decode(&stats); err != nil { + return nil, err + } + + return &stats, nil +} + +func (c *mongoClient) isReplicaSet() bool { + if c.replicaSetFlag != nil { + return *c.replicaSetFlag + } + + status, err := c.serverStatus() + if err != nil { + return false + } + + return status.Repl != nil +} + +func (c *mongoClient) isMongos() bool { + if c.mongosFlag != nil { + return *c.mongosFlag + } + + status, err := c.serverStatus() + if err != nil { + return false + } + + return status.Process == mongos +} + +func (c *mongoClient) replSetGetStatus() (*documentReplSetStatus, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + var status *documentReplSetStatus + cmd := bson.M{"replSetGetStatus": 1} + + err := c.client.Database("admin").RunCommand(ctx, cmd).Decode(&status) + if err != nil { + return nil, err + } + + return status, err +} + +func (c *mongoClient) shardNodes() (*documentShardNodesResult, error) { + collection := "shards" + groupStage := bson.D{{Key: "$sortByCount", Value: "$state"}} + + nodesByState, err := c.shardCollectAggregation(collection, []bson.D{groupStage}) + if err != nil { + return nil, err + } + + return &documentShardNodesResult{nodesByState.True, nodesByState.False}, nil +} + +func (c *mongoClient) shardDatabasesPartitioning() (*documentPartitionedResult, error) { + collection := "databases" + groupStage := bson.D{{Key: "$sortByCount", Value: "$partitioned"}} + + partitioning, err := c.shardCollectAggregation(collection, []bson.D{groupStage}) + if err != nil { + return nil, err + } + + return &documentPartitionedResult{partitioning.True, partitioning.False}, nil +} + +func (c *mongoClient) shardCollectionsPartitioning() (*documentPartitionedResult, error) { + collection := "collections" + matchStage := bson.D{{Key: "$match", Value: bson.D{{Key: "dropped", Value: false}}}} + countStage := bson.D{{Key: "$sortByCount", Value: bson.D{{Key: "$eq", Value: bson.A{"$distributionMode", "sharded"}}}}} + + partitioning, err := c.shardCollectAggregation(collection, []bson.D{matchStage, countStage}) + if err != nil { + return nil, err + } + + return &documentPartitionedResult{partitioning.True, partitioning.False}, nil +} + +func (c *mongoClient) shardCollectAggregation(collection string, aggr []bson.D) (*documentAggrResult, error) { + rows, err := c.dbAggregate(collection, aggr) + if err != nil { + return nil, err + } + + result := &documentAggrResult{} + + for _, row := range rows { + if row.Bool { + result.True = row.Count + } else { + result.False = row.Count + } + } + + return result, err +} + +func (c *mongoClient) shardChunks() (map[string]int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + col := c.client.Database("config").Collection("chunks") + + cursor, err := col.Aggregate(ctx, mongo.Pipeline{bson.D{{Key: "$sortByCount", Value: "$shard"}}}) + if err != nil { + return nil, err + } + + var shards []bson.M + if err = cursor.All(ctx, &shards); err != nil { + return nil, err + } + + defer func() { _ = cursor.Close(ctx) }() + + result := map[string]int64{} + + for _, row := range shards { + k, ok := row["_id"].(string) + if !ok { + return nil, fmt.Errorf("shard name is not a string: %v", row["_id"]) + } + v, ok := row["count"].(int32) + if !ok { + return nil, fmt.Errorf("shard chunk count is not a int32: %v", row["count"]) + } + result[k] = int64(v) + } + + return result, err +} + +func (c *mongoClient) initClient(uri string, timeout time.Duration) error { + if c.client != nil { + return nil + } + + c.timeout = timeout + + ctxConn, cancelConn := context.WithTimeout(context.Background(), c.timeout) + defer cancelConn() + + client, err := mongo.Connect(ctxConn, options.Client().ApplyURI(uri)) + if err != nil { + return err + } + + ctxPing, cancelPing := context.WithTimeout(context.Background(), c.timeout) + defer cancelPing() + + if err := client.Ping(ctxPing, nil); err != nil { + return err + } + + c.client = client + + return nil +} + +func (c *mongoClient) close() error { + if c.client == nil { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + if err := c.client.Disconnect(ctx); err != nil { + return err + } + + c.client = nil + + return nil +} + +func (c *mongoClient) dbAggregate(collection string, aggr []bson.D) ([]documentAggrResults, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + cursor, err := c.client.Database("config").Collection(collection).Aggregate(ctx, aggr) + if err != nil { + return nil, err + } + + defer func() { _ = cursor.Close(ctx) }() + + var rows []documentAggrResults + if err := cursor.All(ctx, &rows); err != nil { + return nil, err + } + + return rows, nil +} -- cgit v1.2.3