diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/modules/proxysql/collect.go | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/proxysql/collect.go b/src/go/collectors/go.d.plugin/modules/proxysql/collect.go new file mode 100644 index 000000000..dfc559a97 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/proxysql/collect.go @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package proxysql + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "strings" + "time" +) + +const ( + queryVersion = "select version();" + queryStatsMySQLGlobal = "SELECT * FROM stats_mysql_global;" + queryStatsMySQLMemoryMetrics = "SELECT * FROM stats_memory_metrics;" + queryStatsMySQLCommandsCounters = "SELECT * FROM stats_mysql_commands_counters;" + queryStatsMySQLUsers = "SELECT * FROM stats_mysql_users;" + queryStatsMySQLConnectionPool = "SELECT * FROM stats_mysql_connection_pool;" +) + +func (p *ProxySQL) collect() (map[string]int64, error) { + if p.db == nil { + if err := p.openConnection(); err != nil { + return nil, err + } + } + + p.once.Do(func() { + v, err := p.doQueryVersion() + if err != nil { + p.Warningf("error on querying version: %v", err) + } else { + p.Debugf("connected to ProxySQL version: %s", v) + } + }) + + p.cache.reset() + + mx := make(map[string]int64) + + if err := p.collectStatsMySQLGlobal(mx); err != nil { + return nil, fmt.Errorf("error on collecting mysql global status: %v", err) + } + if err := p.collectStatsMySQLMemoryMetrics(mx); err != nil { + return nil, fmt.Errorf("error on collecting memory metrics: %v", err) + } + if err := p.collectStatsMySQLCommandsCounters(mx); err != nil { + return nil, fmt.Errorf("error on collecting mysql command counters: %v", err) + } + if err := p.collectStatsMySQLUsers(mx); err != nil { + return nil, fmt.Errorf("error on collecting mysql users: %v", err) + } + if err := p.collectStatsMySQLConnectionPool(mx); err != nil { + return nil, fmt.Errorf("error on collecting mysql connection pool: %v", err) + } + + p.updateCharts() + + return mx, nil +} + +func (p *ProxySQL) doQueryVersion() (string, error) { + q := queryVersion + p.Debugf("executing query: '%s'", q) + + var v string + if err := p.doQueryRow(q, &v); err != nil { + return "", err + } + + return v, nil +} + +func (p *ProxySQL) collectStatsMySQLGlobal(mx map[string]int64) error { + // https://proxysql.com/documentation/stats-statistics/#stats_mysql_global + q := queryStatsMySQLGlobal + p.Debugf("executing query: '%s'", q) + + var name string + return p.doQuery(q, func(column, value string, rowEnd bool) { + switch column { + case "Variable_Name": + name = value + case "Variable_Value": + mx[name] = parseInt(value) + } + }) +} + +func (p *ProxySQL) collectStatsMySQLMemoryMetrics(mx map[string]int64) error { + // https://proxysql.com/documentation/stats-statistics/#stats_mysql_memory_metrics + q := queryStatsMySQLMemoryMetrics + p.Debugf("executing query: '%s'", q) + + var name string + return p.doQuery(q, func(column, value string, rowEnd bool) { + switch column { + case "Variable_Name": + name = value + case "Variable_Value": + mx[name] = parseInt(value) + } + }) +} + +func (p *ProxySQL) collectStatsMySQLCommandsCounters(mx map[string]int64) error { + // https://proxysql.com/documentation/stats-statistics/#stats_mysql_commands_counters + q := queryStatsMySQLCommandsCounters + p.Debugf("executing query: '%s'", q) + + var command string + return p.doQuery(q, func(column, value string, rowEnd bool) { + switch column { + case "Command": + command = value + p.cache.getCommand(command).updated = true + default: + mx["mysql_command_"+command+"_"+column] = parseInt(value) + } + }) +} + +func (p *ProxySQL) collectStatsMySQLUsers(mx map[string]int64) error { + // https://proxysql.com/documentation/stats-statistics/#stats_mysql_users + q := queryStatsMySQLUsers + p.Debugf("executing query: '%s'", q) + + var user string + var used int64 + return p.doQuery(q, func(column, value string, rowEnd bool) { + switch column { + case "username": + user = value + p.cache.getUser(user).updated = true + case "frontend_connections": + used = parseInt(value) + mx["mysql_user_"+user+"_"+column] = used + case "frontend_max_connections": + mx["mysql_user_"+user+"_frontend_connections_utilization"] = calcPercentage(used, parseInt(value)) + } + }) +} + +func (p *ProxySQL) collectStatsMySQLConnectionPool(mx map[string]int64) error { + // https://proxysql.com/documentation/stats-statistics/#stats_mysql_connection_pool + q := queryStatsMySQLConnectionPool + p.Debugf("executing query: '%s'", q) + + var hg, host, port string + var px string + return p.doQuery(q, func(column, value string, rowEnd bool) { + switch column { + case "hg", "hostgroup": + hg = value + case "srv_host": + host = value + case "srv_port": + port = value + p.cache.getBackend(hg, host, port).updated = true + px = "backend_" + backendID(hg, host, port) + "_" + case "status": + mx[px+"status_ONLINE"] = boolToInt(value == "1") + mx[px+"status_SHUNNED"] = boolToInt(value == "2") + mx[px+"status_OFFLINE_SOFT"] = boolToInt(value == "3") + mx[px+"status_OFFLINE_HARD"] = boolToInt(value == "4") + default: + mx[px+column] = parseInt(value) + } + }) +} + +func (p *ProxySQL) updateCharts() { + for k, m := range p.cache.commands { + if !m.updated { + delete(p.cache.commands, k) + p.removeMySQLCommandCountersCharts(m.command) + continue + } + if !m.hasCharts { + m.hasCharts = true + p.addMySQLCommandCountersCharts(m.command) + } + } + for k, m := range p.cache.users { + if !m.updated { + delete(p.cache.users, k) + p.removeMySQLUserCharts(m.user) + continue + } + if !m.hasCharts { + m.hasCharts = true + p.addMySQLUsersCharts(m.user) + } + } + for k, m := range p.cache.backends { + if !m.updated { + delete(p.cache.backends, k) + p.removeBackendCharts(m.hg, m.host, m.port) + continue + } + if !m.hasCharts { + m.hasCharts = true + p.addBackendCharts(m.hg, m.host, m.port) + } + } +} + +func (p *ProxySQL) openConnection() error { + db, err := sql.Open("mysql", p.DSN) + if err != nil { + return fmt.Errorf("error on opening a connection with the proxysql instance [%s]: %v", p.DSN, err) + } + + db.SetConnMaxLifetime(10 * time.Minute) + + if err := db.Ping(); err != nil { + _ = db.Close() + return fmt.Errorf("error on pinging the proxysql instance [%s]: %v", p.DSN, err) + } + + p.db = db + return nil +} + +func (p *ProxySQL) doQueryRow(query string, v any) error { + ctx, cancel := context.WithTimeout(context.Background(), p.Timeout.Duration()) + defer cancel() + + return p.db.QueryRowContext(ctx, query).Scan(v) +} + +func (p *ProxySQL) doQuery(query string, assign func(column, value string, rowEnd bool)) error { + ctx, cancel := context.WithTimeout(context.Background(), p.Timeout.Duration()) + defer cancel() + + rows, err := p.db.QueryContext(ctx, query) + if err != nil { + return err + } + defer func() { _ = rows.Close() }() + + return readRows(rows, assign) +} + +func readRows(rows *sql.Rows, assign func(column, value string, rowEnd bool)) error { + columns, err := rows.Columns() + if err != nil { + return err + } + + values := makeValues(len(columns)) + + for rows.Next() { + if err := rows.Scan(values...); err != nil { + return err + } + for i, l := 0, len(values); i < l; i++ { + assign(columns[i], valueToString(values[i]), i == l-1) + } + } + return rows.Err() +} + +func valueToString(value any) string { + v, ok := value.(*sql.NullString) + if !ok || !v.Valid { + return "" + } + return v.String +} + +func makeValues(size int) []any { + vs := make([]any, size) + for i := range vs { + vs[i] = &sql.NullString{} + } + return vs +} + +func parseInt(value string) int64 { + v, _ := strconv.ParseInt(value, 10, 64) + return v +} + +func calcPercentage(value, total int64) (v int64) { + if total == 0 { + return 0 + } + if v = value * 100 / total; v < 0 { + v = -v + } + return v +} + +func boolToInt(v bool) int64 { + if v { + return 1 + } + return 0 +} + +func backendID(hg, host, port string) string { + hg = strings.ReplaceAll(strings.ToLower(hg), " ", "_") + host = strings.ReplaceAll(host, ".", "_") + return hg + "_" + host + "_" + port +} |