summaryrefslogtreecommitdiffstats
path: root/src/go/plugin/go.d/modules/gearman/client.go
blob: dff9a1be47dddbfd28fbb1e21b133db60ecf03c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// SPDX-License-Identifier: GPL-3.0-or-later

package gearman

import (
	"bytes"
	"fmt"
	"strings"

	"github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/socket"
)

type gearmanConn interface {
	connect() error
	disconnect()
	queryStatus() ([]byte, error)
	queryPriorityStatus() ([]byte, error)
}

func newGearmanConn(conf Config) gearmanConn {
	return &gearmanClient{conn: socket.New(socket.Config{
		Address:        conf.Address,
		ConnectTimeout: conf.Timeout.Duration(),
		ReadTimeout:    conf.Timeout.Duration(),
		WriteTimeout:   conf.Timeout.Duration(),
	})}
}

type gearmanClient struct {
	conn socket.Client
}

func (c *gearmanClient) connect() error {
	return c.conn.Connect()
}

func (c *gearmanClient) disconnect() {
	_ = c.conn.Disconnect()
}

func (c *gearmanClient) queryStatus() ([]byte, error) {
	return c.query("status")
}

func (c *gearmanClient) queryPriorityStatus() ([]byte, error) {
	return c.query("prioritystatus")
}

func (c *gearmanClient) query(cmd string) ([]byte, error) {
	const limitReadLines = 10000
	var num int
	var err error
	var b bytes.Buffer

	clientErr := c.conn.Command(cmd+"\n", func(bs []byte) bool {
		s := string(bs)

		if strings.HasPrefix(s, "ERR") {
			err = fmt.Errorf("command '%s': %s", cmd, s)
			return false
		}

		b.WriteString(s)
		b.WriteByte('\n')

		if num++; num >= limitReadLines {
			err = fmt.Errorf("command '%s': read line limit exceeded (%d)", cmd, limitReadLines)
			return false
		}
		return !strings.HasPrefix(s, ".")
	})
	if clientErr != nil {
		return nil, fmt.Errorf("command '%s' client error: %v", cmd, clientErr)
	}
	if err != nil {
		return nil, err
	}

	return b.Bytes(), nil
}