summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/pgbouncer/collect.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/pgbouncer/collect.go354
1 files changed, 354 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/pgbouncer/collect.go b/src/go/collectors/go.d.plugin/modules/pgbouncer/collect.go
new file mode 100644
index 000000000..c0e4bf2da
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/pgbouncer/collect.go
@@ -0,0 +1,354 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pgbouncer
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/blang/semver/v4"
+ "github.com/jackc/pgx/v4"
+ "github.com/jackc/pgx/v4/stdlib"
+)
+
+// 'SHOW STATS;' response was changed significantly in v1.8.0
+// v1.8.0 was released in 2015 - no need to complicate the code to support the old version.
+var minSupportedVersion = semver.Version{Major: 1, Minor: 8, Patch: 0}
+
+const (
+ queryShowVersion = "SHOW VERSION;"
+ queryShowConfig = "SHOW CONFIG;"
+ queryShowDatabases = "SHOW DATABASES;"
+ queryShowStats = "SHOW STATS;"
+ queryShowPools = "SHOW POOLS;"
+)
+
+func (p *PgBouncer) collect() (map[string]int64, error) {
+ if p.db == nil {
+ if err := p.openConnection(); err != nil {
+ return nil, err
+ }
+ }
+ if p.version == nil {
+ ver, err := p.queryVersion()
+ if err != nil {
+ return nil, err
+ }
+ p.Debugf("connected to PgBouncer v%s", ver)
+ if ver.LE(minSupportedVersion) {
+ return nil, fmt.Errorf("unsupported version: v%s, required v%s+", ver, minSupportedVersion)
+ }
+ p.version = ver
+ }
+
+ now := time.Now()
+ if now.Sub(p.recheckSettingsTime) > p.recheckSettingsEvery {
+ v, err := p.queryMaxClientConn()
+ if err != nil {
+ return nil, err
+ }
+ p.maxClientConn = v
+ }
+
+ // http://www.pgbouncer.org/usage.html
+
+ p.resetMetrics()
+
+ if err := p.collectDatabases(); err != nil {
+ return nil, err
+ }
+ if err := p.collectStats(); err != nil {
+ return nil, err
+ }
+ if err := p.collectPools(); err != nil {
+ return nil, err
+ }
+
+ mx := make(map[string]int64)
+ p.collectMetrics(mx)
+
+ return mx, nil
+}
+
+func (p *PgBouncer) collectMetrics(mx map[string]int64) {
+ var clientConns int64
+ for name, db := range p.metrics.dbs {
+ if !db.updated {
+ delete(p.metrics.dbs, name)
+ p.removeDatabaseCharts(name)
+ continue
+ }
+ if !db.hasCharts {
+ db.hasCharts = true
+ p.addNewDatabaseCharts(name, db.pgDBName)
+ }
+
+ mx["db_"+name+"_total_xact_count"] = db.totalXactCount
+ mx["db_"+name+"_total_xact_time"] = db.totalXactTime
+ mx["db_"+name+"_avg_xact_time"] = db.avgXactTime
+
+ mx["db_"+name+"_total_query_count"] = db.totalQueryCount
+ mx["db_"+name+"_total_query_time"] = db.totalQueryTime
+ mx["db_"+name+"_avg_query_time"] = db.avgQueryTime
+
+ mx["db_"+name+"_total_wait_time"] = db.totalWaitTime
+ mx["db_"+name+"_maxwait"] = db.maxWait*1e6 + db.maxWaitUS
+
+ mx["db_"+name+"_cl_active"] = db.clActive
+ mx["db_"+name+"_cl_waiting"] = db.clWaiting
+ mx["db_"+name+"_cl_cancel_req"] = db.clCancelReq
+ clientConns += db.clActive + db.clWaiting + db.clCancelReq
+
+ mx["db_"+name+"_sv_active"] = db.svActive
+ mx["db_"+name+"_sv_idle"] = db.svIdle
+ mx["db_"+name+"_sv_used"] = db.svUsed
+ mx["db_"+name+"_sv_tested"] = db.svTested
+ mx["db_"+name+"_sv_login"] = db.svLogin
+
+ mx["db_"+name+"_total_received"] = db.totalReceived
+ mx["db_"+name+"_total_sent"] = db.totalSent
+
+ mx["db_"+name+"_sv_conns_utilization"] = calcPercentage(db.currentConnections, db.maxConnections)
+ }
+
+ mx["cl_conns_utilization"] = calcPercentage(clientConns, p.maxClientConn)
+}
+
+func (p *PgBouncer) collectDatabases() error {
+ q := queryShowDatabases
+ p.Debugf("executing query: %v", q)
+
+ var db string
+ return p.collectQuery(q, func(column, value string) {
+ switch column {
+ case "name":
+ db = value
+ p.getDBMetrics(db).updated = true
+ case "database":
+ p.getDBMetrics(db).pgDBName = value
+ case "max_connections":
+ p.getDBMetrics(db).maxConnections = parseInt(value)
+ case "current_connections":
+ p.getDBMetrics(db).currentConnections = parseInt(value)
+ case "paused":
+ p.getDBMetrics(db).paused = parseInt(value)
+ case "disabled":
+ p.getDBMetrics(db).disabled = parseInt(value)
+ }
+ })
+}
+
+func (p *PgBouncer) collectStats() error {
+ q := queryShowStats
+ p.Debugf("executing query: %v", q)
+
+ var db string
+ return p.collectQuery(q, func(column, value string) {
+ switch column {
+ case "database":
+ db = value
+ p.getDBMetrics(db).updated = true
+ case "total_xact_count":
+ p.getDBMetrics(db).totalXactCount = parseInt(value)
+ case "total_query_count":
+ p.getDBMetrics(db).totalQueryCount = parseInt(value)
+ case "total_received":
+ p.getDBMetrics(db).totalReceived = parseInt(value)
+ case "total_sent":
+ p.getDBMetrics(db).totalSent = parseInt(value)
+ case "total_xact_time":
+ p.getDBMetrics(db).totalXactTime = parseInt(value)
+ case "total_query_time":
+ p.getDBMetrics(db).totalQueryTime = parseInt(value)
+ case "total_wait_time":
+ p.getDBMetrics(db).totalWaitTime = parseInt(value)
+ case "avg_xact_time":
+ p.getDBMetrics(db).avgXactTime = parseInt(value)
+ case "avg_query_time":
+ p.getDBMetrics(db).avgQueryTime = parseInt(value)
+ }
+ })
+}
+
+func (p *PgBouncer) collectPools() error {
+ q := queryShowPools
+ p.Debugf("executing query: %v", q)
+
+ // an entry is made for each couple of (database, user).
+ var db string
+ return p.collectQuery(q, func(column, value string) {
+ switch column {
+ case "database":
+ db = value
+ p.getDBMetrics(db).updated = true
+ case "cl_active":
+ p.getDBMetrics(db).clActive += parseInt(value)
+ case "cl_waiting":
+ p.getDBMetrics(db).clWaiting += parseInt(value)
+ case "cl_cancel_req":
+ p.getDBMetrics(db).clCancelReq += parseInt(value)
+ case "sv_active":
+ p.getDBMetrics(db).svActive += parseInt(value)
+ case "sv_idle":
+ p.getDBMetrics(db).svIdle += parseInt(value)
+ case "sv_used":
+ p.getDBMetrics(db).svUsed += parseInt(value)
+ case "sv_tested":
+ p.getDBMetrics(db).svTested += parseInt(value)
+ case "sv_login":
+ p.getDBMetrics(db).svLogin += parseInt(value)
+ case "maxwait":
+ p.getDBMetrics(db).maxWait += parseInt(value)
+ case "maxwait_us":
+ p.getDBMetrics(db).maxWaitUS += parseInt(value)
+ }
+ })
+}
+
+func (p *PgBouncer) queryMaxClientConn() (int64, error) {
+ q := queryShowConfig
+ p.Debugf("executing query: %v", q)
+
+ var v int64
+ var key string
+ err := p.collectQuery(q, func(column, value string) {
+ switch column {
+ case "key":
+ key = value
+ case "value":
+ if key == "max_client_conn" {
+ v = parseInt(value)
+ }
+ }
+ })
+ return v, err
+}
+
+var reVersion = regexp.MustCompile(`\d+\.\d+\.\d+`)
+
+func (p *PgBouncer) queryVersion() (*semver.Version, error) {
+ q := queryShowVersion
+ p.Debugf("executing query: %v", q)
+
+ var resp string
+ ctx, cancel := context.WithTimeout(context.Background(), p.Timeout.Duration())
+ defer cancel()
+ if err := p.db.QueryRowContext(ctx, q).Scan(&resp); err != nil {
+ return nil, err
+ }
+
+ if !strings.Contains(resp, "PgBouncer") {
+ return nil, fmt.Errorf("not PgBouncer instance: version response: %s", resp)
+ }
+
+ ver := reVersion.FindString(resp)
+ if ver == "" {
+ return nil, fmt.Errorf("couldn't parse version string '%s' (expected pattern '%s')", resp, reVersion)
+ }
+
+ v, err := semver.New(ver)
+ if err != nil {
+ return nil, fmt.Errorf("couldn't parse version string '%s': %v", ver, err)
+ }
+
+ return v, nil
+}
+
+func (p *PgBouncer) openConnection() error {
+ cfg, err := pgx.ParseConfig(p.DSN)
+ if err != nil {
+ return err
+ }
+ cfg.PreferSimpleProtocol = true
+
+ db, err := sql.Open("pgx", stdlib.RegisterConnConfig(cfg))
+ if err != nil {
+ return fmt.Errorf("error on opening a connection with the PgBouncer database [%s]: %v", p.DSN, err)
+ }
+
+ db.SetMaxOpenConns(1)
+ db.SetMaxIdleConns(1)
+ db.SetConnMaxLifetime(10 * time.Minute)
+
+ p.db = db
+
+ return nil
+}
+
+func (p *PgBouncer) collectQuery(query string, assign func(column, value string)) error {
+ ctx, cancel := context.WithTimeout(context.Background(), p.Timeout.Duration())
+ defer cancel()
+ rows, err := p.db.QueryContext(ctx, query)
+ if err != nil {
+ return err
+ }
+ defer func() { _ = rows.Close() }()
+
+ columns, err := rows.Columns()
+ if err != nil {
+ return err
+ }
+
+ values := makeNullStrings(len(columns))
+ for rows.Next() {
+ if err := rows.Scan(values...); err != nil {
+ return err
+ }
+ for i, v := range values {
+ assign(columns[i], valueToString(v))
+ }
+ }
+ return rows.Err()
+}
+
+func (p *PgBouncer) getDBMetrics(dbname string) *dbMetrics {
+ db, ok := p.metrics.dbs[dbname]
+ if !ok {
+ db = &dbMetrics{name: dbname}
+ p.metrics.dbs[dbname] = db
+ }
+ return db
+}
+
+func (p *PgBouncer) resetMetrics() {
+ for name, db := range p.metrics.dbs {
+ p.metrics.dbs[name] = &dbMetrics{
+ name: db.name,
+ pgDBName: db.pgDBName,
+ hasCharts: db.hasCharts,
+ }
+ }
+}
+
+func valueToString(value any) string {
+ v, ok := value.(*sql.NullString)
+ if !ok || !v.Valid {
+ return ""
+ }
+ return v.String
+}
+
+func makeNullStrings(size int) []any {
+ vs := make([]any, size)
+ for i := range vs {
+ vs[i] = &sql.NullString{}
+ }
+ return vs
+}
+
+func parseInt(s string) int64 {
+ v, _ := strconv.ParseInt(s, 10, 64)
+ return v
+}
+
+func calcPercentage(value, total int64) int64 {
+ if total == 0 {
+ return 0
+ }
+ return value * 100 / total
+}