summaryrefslogtreecommitdiffstats
path: root/src/go/plugin/go.d/pkg/socket
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 08:15:24 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 08:15:35 +0000
commitf09848204fa5283d21ea43e262ee41aa578e1808 (patch)
treec62385d7adf209fa6a798635954d887f718fb3fb /src/go/plugin/go.d/pkg/socket
parentReleasing debian version 1.46.3-2. (diff)
downloadnetdata-f09848204fa5283d21ea43e262ee41aa578e1808.tar.xz
netdata-f09848204fa5283d21ea43e262ee41aa578e1808.zip
Merging upstream version 1.47.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/plugin/go.d/pkg/socket')
-rw-r--r--src/go/plugin/go.d/pkg/socket/client.go106
-rw-r--r--src/go/plugin/go.d/pkg/socket/client_test.go163
-rw-r--r--src/go/plugin/go.d/pkg/socket/servers_test.go139
-rw-r--r--src/go/plugin/go.d/pkg/socket/types.go41
-rw-r--r--src/go/plugin/go.d/pkg/socket/utils.go25
5 files changed, 474 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/pkg/socket/client.go b/src/go/plugin/go.d/pkg/socket/client.go
new file mode 100644
index 000000000..26ae1dfa6
--- /dev/null
+++ b/src/go/plugin/go.d/pkg/socket/client.go
@@ -0,0 +1,106 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package socket
+
+import (
+ "bufio"
+ "crypto/tls"
+ "errors"
+ "net"
+ "time"
+)
+
+// New returns a new pointer to a socket client given the socket
+// type (IP, TCP, UDP, UNIX), a network address (IP/domain:port),
+// a timeout and a TLS config. It supports both IPv4 and IPv6 address
+// and reuses connection where possible.
+func New(config Config) *Socket {
+ return &Socket{
+ Config: config,
+ conn: nil,
+ }
+}
+
+// Socket is the implementation of a socket client.
+type Socket struct {
+ Config
+ conn net.Conn
+}
+
+// Connect connects to the Socket address on the named network.
+// If the address is a domain name it will also perform the DNS resolution.
+// Address like :80 will attempt to connect to the localhost.
+// The config timeout and TLS config will be used.
+func (s *Socket) Connect() error {
+ network, address := networkType(s.Address)
+ var conn net.Conn
+ var err error
+
+ if s.TLSConf == nil {
+ conn, err = net.DialTimeout(network, address, s.ConnectTimeout)
+ } else {
+ var d net.Dialer
+ d.Timeout = s.ConnectTimeout
+ conn, err = tls.DialWithDialer(&d, network, address, s.TLSConf)
+ }
+ if err != nil {
+ return err
+ }
+
+ s.conn = conn
+
+ return nil
+}
+
+// Disconnect closes the connection.
+// Any in-flight commands will be cancelled and return errors.
+func (s *Socket) Disconnect() (err error) {
+ if s.conn != nil {
+ err = s.conn.Close()
+ s.conn = nil
+ }
+ return err
+}
+
+// Command writes the command string to the connection and passed the
+// response bytes line by line to the process function. It uses the
+// timeout value from the Socket config and returns read, write and
+// timeout errors if any. If a timeout occurs during the processing
+// of the responses this function will stop processing and return a
+// timeout error.
+func (s *Socket) Command(command string, process Processor) error {
+ if s.conn == nil {
+ return errors.New("cannot send command on nil connection")
+ }
+ if err := write(command, s.conn, s.WriteTimeout); err != nil {
+ return err
+ }
+ return read(s.conn, process, s.ReadTimeout)
+}
+
+func write(command string, writer net.Conn, timeout time.Duration) error {
+ if writer == nil {
+ return errors.New("attempt to write on nil connection")
+ }
+ if err := writer.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
+ return err
+ }
+ _, err := writer.Write([]byte(command))
+ return err
+}
+
+func read(reader net.Conn, process Processor, timeout time.Duration) error {
+ if process == nil {
+ return errors.New("process func is nil")
+ }
+ if reader == nil {
+ return errors.New("attempt to read on nil connection")
+ }
+ if err := reader.SetReadDeadline(time.Now().Add(timeout)); err != nil {
+ return err
+ }
+ scanner := bufio.NewScanner(reader)
+ for scanner.Scan() && process(scanner.Bytes()) {
+ }
+ return scanner.Err()
+}
diff --git a/src/go/plugin/go.d/pkg/socket/client_test.go b/src/go/plugin/go.d/pkg/socket/client_test.go
new file mode 100644
index 000000000..fa64f4558
--- /dev/null
+++ b/src/go/plugin/go.d/pkg/socket/client_test.go
@@ -0,0 +1,163 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package socket
+
+import (
+ "crypto/tls"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ testServerAddress = "127.0.0.1:9999"
+ testUdpServerAddress = "udp://127.0.0.1:9999"
+ testUnixServerAddress = "/tmp/testSocketFD"
+ defaultTimeout = 100 * time.Millisecond
+)
+
+var tcpConfig = Config{
+ Address: testServerAddress,
+ ConnectTimeout: defaultTimeout,
+ ReadTimeout: defaultTimeout,
+ WriteTimeout: defaultTimeout,
+ TLSConf: nil,
+}
+
+var udpConfig = Config{
+ Address: testUdpServerAddress,
+ ConnectTimeout: defaultTimeout,
+ ReadTimeout: defaultTimeout,
+ WriteTimeout: defaultTimeout,
+ TLSConf: nil,
+}
+
+var unixConfig = Config{
+ Address: testUnixServerAddress,
+ ConnectTimeout: defaultTimeout,
+ ReadTimeout: defaultTimeout,
+ WriteTimeout: defaultTimeout,
+ TLSConf: nil,
+}
+
+var tcpTlsConfig = Config{
+ Address: testServerAddress,
+ ConnectTimeout: defaultTimeout,
+ ReadTimeout: defaultTimeout,
+ WriteTimeout: defaultTimeout,
+ TLSConf: &tls.Config{},
+}
+
+func Test_clientCommand(t *testing.T) {
+ srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1}
+ go func() { _ = srv.Run(); defer func() { _ = srv.Close() }() }()
+
+ time.Sleep(time.Millisecond * 100)
+ sock := New(tcpConfig)
+ require.NoError(t, sock.Connect())
+ err := sock.Command("ping\n", func(bytes []byte) bool {
+ assert.Equal(t, "pong", string(bytes))
+ return true
+ })
+ require.NoError(t, sock.Disconnect())
+ require.NoError(t, err)
+}
+
+func Test_clientTimeout(t *testing.T) {
+ srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1}
+ go func() { _ = srv.Run() }()
+
+ time.Sleep(time.Millisecond * 100)
+ sock := New(tcpConfig)
+ require.NoError(t, sock.Connect())
+ sock.ReadTimeout = 0
+ sock.ReadTimeout = 0
+ err := sock.Command("ping\n", func(bytes []byte) bool {
+ assert.Equal(t, "pong", string(bytes))
+ return true
+ })
+ require.Error(t, err)
+}
+
+func Test_clientIncompleteSSL(t *testing.T) {
+ srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1}
+ go func() { _ = srv.Run() }()
+
+ time.Sleep(time.Millisecond * 100)
+ sock := New(tcpTlsConfig)
+ err := sock.Connect()
+ require.Error(t, err)
+}
+
+func Test_clientCommandStopProcessing(t *testing.T) {
+ srv := &tcpServer{addr: testServerAddress, rowsNumResp: 2}
+ go func() { _ = srv.Run() }()
+
+ time.Sleep(time.Millisecond * 100)
+ sock := New(tcpConfig)
+ require.NoError(t, sock.Connect())
+ err := sock.Command("ping\n", func(bytes []byte) bool {
+ assert.Equal(t, "pong", string(bytes))
+ return false
+ })
+ require.NoError(t, sock.Disconnect())
+ require.NoError(t, err)
+}
+
+func Test_clientUDPCommand(t *testing.T) {
+ srv := &udpServer{addr: testServerAddress, rowsNumResp: 1}
+ go func() { _ = srv.Run(); defer func() { _ = srv.Close() }() }()
+
+ time.Sleep(time.Millisecond * 100)
+ sock := New(udpConfig)
+ require.NoError(t, sock.Connect())
+ err := sock.Command("ping\n", func(bytes []byte) bool {
+ assert.Equal(t, "pong", string(bytes))
+ return false
+ })
+ require.NoError(t, sock.Disconnect())
+ require.NoError(t, err)
+}
+
+func Test_clientTCPAddress(t *testing.T) {
+ srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1}
+ go func() { _ = srv.Run() }()
+ time.Sleep(time.Millisecond * 100)
+
+ sock := New(tcpConfig)
+ require.NoError(t, sock.Connect())
+
+ tcpConfig.Address = "tcp://" + tcpConfig.Address
+ sock = New(tcpConfig)
+ require.NoError(t, sock.Connect())
+}
+
+func Test_clientUnixCommand(t *testing.T) {
+ srv := &unixServer{addr: testUnixServerAddress, rowsNumResp: 1}
+ // cleanup previous file descriptors
+ _ = srv.Close()
+ go func() { _ = srv.Run() }()
+
+ time.Sleep(time.Millisecond * 200)
+ sock := New(unixConfig)
+ require.NoError(t, sock.Connect())
+ err := sock.Command("ping\n", func(bytes []byte) bool {
+ assert.Equal(t, "pong", string(bytes))
+ return false
+ })
+ require.NoError(t, err)
+ require.NoError(t, sock.Disconnect())
+}
+
+func Test_clientEmptyProcessFunc(t *testing.T) {
+ srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1}
+ go func() { _ = srv.Run() }()
+
+ time.Sleep(time.Millisecond * 100)
+ sock := New(tcpConfig)
+ require.NoError(t, sock.Connect())
+ err := sock.Command("ping\n", nil)
+ require.Error(t, err, "nil process func should return an error")
+}
diff --git a/src/go/plugin/go.d/pkg/socket/servers_test.go b/src/go/plugin/go.d/pkg/socket/servers_test.go
new file mode 100644
index 000000000..d66178162
--- /dev/null
+++ b/src/go/plugin/go.d/pkg/socket/servers_test.go
@@ -0,0 +1,139 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package socket
+
+import (
+ "bufio"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "strings"
+ "time"
+)
+
+type tcpServer struct {
+ addr string
+ server net.Listener
+ rowsNumResp int
+}
+
+func (t *tcpServer) Run() (err error) {
+ t.server, err = net.Listen("tcp", t.addr)
+ if err != nil {
+ return
+ }
+ return t.handleConnections()
+}
+
+func (t *tcpServer) Close() (err error) {
+ return t.server.Close()
+}
+
+func (t *tcpServer) handleConnections() (err error) {
+ for {
+ conn, err := t.server.Accept()
+ if err != nil || conn == nil {
+ return errors.New("could not accept connection")
+ }
+ t.handleConnection(conn)
+ }
+}
+
+func (t *tcpServer) handleConnection(conn net.Conn) {
+ defer func() { _ = conn.Close() }()
+ _ = conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
+
+ rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
+ _, err := rw.ReadString('\n')
+ if err != nil {
+ _, _ = rw.WriteString("failed to read input")
+ _ = rw.Flush()
+ } else {
+ resp := strings.Repeat("pong\n", t.rowsNumResp)
+ _, _ = rw.WriteString(resp)
+ _ = rw.Flush()
+ }
+}
+
+type udpServer struct {
+ addr string
+ conn *net.UDPConn
+ rowsNumResp int
+}
+
+func (u *udpServer) Run() (err error) {
+ addr, err := net.ResolveUDPAddr("udp", u.addr)
+ if err != nil {
+ return err
+ }
+ u.conn, err = net.ListenUDP("udp", addr)
+ if err != nil {
+ return
+ }
+ u.handleConnections()
+ return nil
+}
+
+func (u *udpServer) Close() (err error) {
+ return u.conn.Close()
+}
+
+func (u *udpServer) handleConnections() {
+ for {
+ var buf [2048]byte
+ _, addr, _ := u.conn.ReadFromUDP(buf[0:])
+ resp := strings.Repeat("pong\n", u.rowsNumResp)
+ _, _ = u.conn.WriteToUDP([]byte(resp), addr)
+ }
+}
+
+type unixServer struct {
+ addr string
+ conn *net.UnixListener
+ rowsNumResp int
+}
+
+func (u *unixServer) Run() (err error) {
+ _, _ = os.CreateTemp("/tmp", "testSocketFD")
+ addr, err := net.ResolveUnixAddr("unix", u.addr)
+ if err != nil {
+ return err
+ }
+ u.conn, err = net.ListenUnix("unix", addr)
+ if err != nil {
+ return
+ }
+ go u.handleConnections()
+ return nil
+}
+
+func (u *unixServer) Close() (err error) {
+ _ = os.Remove(testUnixServerAddress)
+ return u.conn.Close()
+}
+
+func (u *unixServer) handleConnections() {
+ var conn net.Conn
+ var err error
+ conn, err = u.conn.AcceptUnix()
+ if err != nil {
+ panic(fmt.Errorf("could not accept connection: %v", err))
+ }
+ u.handleConnection(conn)
+}
+
+func (u *unixServer) handleConnection(conn net.Conn) {
+ _ = conn.SetDeadline(time.Now().Add(time.Second))
+
+ rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
+ _, err := rw.ReadString('\n')
+ if err != nil {
+ _, _ = rw.WriteString("failed to read input")
+ _ = rw.Flush()
+ } else {
+ resp := strings.Repeat("pong\n", u.rowsNumResp)
+ _, _ = rw.WriteString(resp)
+ _ = rw.Flush()
+ }
+}
diff --git a/src/go/plugin/go.d/pkg/socket/types.go b/src/go/plugin/go.d/pkg/socket/types.go
new file mode 100644
index 000000000..693faf5be
--- /dev/null
+++ b/src/go/plugin/go.d/pkg/socket/types.go
@@ -0,0 +1,41 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package socket
+
+import (
+ "crypto/tls"
+ "time"
+)
+
+// Processor function passed to the Socket.Command function.
+// It is passed by the caller to process a command's response
+// line by line.
+type Processor func([]byte) bool
+
+// Client is the interface that wraps the basic socket client operations
+// and hides the implementation details from the users.
+//
+// Connect should prepare the connection.
+//
+// Disconnect should stop any in-flight connections.
+//
+// Command should send the actual data to the wire and pass
+// any results to the processor function.
+//
+// Implementations should return TCP, UDP or Unix ready sockets.
+type Client interface {
+ Connect() error
+ Disconnect() error
+ Command(command string, process Processor) error
+}
+
+// Config holds the network ip v4 or v6 address, port,
+// Socket type(ip, tcp, udp, unix), timeout and TLS configuration
+// for a Socket
+type Config struct {
+ Address string
+ ConnectTimeout time.Duration
+ ReadTimeout time.Duration
+ WriteTimeout time.Duration
+ TLSConf *tls.Config
+}
diff --git a/src/go/plugin/go.d/pkg/socket/utils.go b/src/go/plugin/go.d/pkg/socket/utils.go
new file mode 100644
index 000000000..dcc48b383
--- /dev/null
+++ b/src/go/plugin/go.d/pkg/socket/utils.go
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package socket
+
+import "strings"
+
+func IsUnixSocket(address string) bool {
+ return strings.HasPrefix(address, "/") || strings.HasPrefix(address, "unix://")
+}
+
+func IsUdpSocket(address string) bool {
+ return strings.HasPrefix(address, "udp://")
+}
+
+func networkType(address string) (string, string) {
+ switch {
+ case IsUnixSocket(address):
+ address = strings.TrimPrefix(address, "unix://")
+ return "unix", address
+ case IsUdpSocket(address):
+ return "udp", strings.TrimPrefix(address, "udp://")
+ default:
+ return "tcp", strings.TrimPrefix(address, "tcp://")
+ }
+}