diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:35 +0000 |
commit | f09848204fa5283d21ea43e262ee41aa578e1808 (patch) | |
tree | c62385d7adf209fa6a798635954d887f718fb3fb /src/go/plugin/go.d/pkg/socket | |
parent | Releasing debian version 1.46.3-2. (diff) | |
download | netdata-f09848204fa5283d21ea43e262ee41aa578e1808.tar.xz netdata-f09848204fa5283d21ea43e262ee41aa578e1808.zip |
Merging upstream version 1.47.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/plugin/go.d/pkg/socket')
-rw-r--r-- | src/go/plugin/go.d/pkg/socket/client.go | 106 | ||||
-rw-r--r-- | src/go/plugin/go.d/pkg/socket/client_test.go | 163 | ||||
-rw-r--r-- | src/go/plugin/go.d/pkg/socket/servers_test.go | 139 | ||||
-rw-r--r-- | src/go/plugin/go.d/pkg/socket/types.go | 41 | ||||
-rw-r--r-- | src/go/plugin/go.d/pkg/socket/utils.go | 25 |
5 files changed, 474 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/pkg/socket/client.go b/src/go/plugin/go.d/pkg/socket/client.go new file mode 100644 index 000000000..26ae1dfa6 --- /dev/null +++ b/src/go/plugin/go.d/pkg/socket/client.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package socket + +import ( + "bufio" + "crypto/tls" + "errors" + "net" + "time" +) + +// New returns a new pointer to a socket client given the socket +// type (IP, TCP, UDP, UNIX), a network address (IP/domain:port), +// a timeout and a TLS config. It supports both IPv4 and IPv6 address +// and reuses connection where possible. +func New(config Config) *Socket { + return &Socket{ + Config: config, + conn: nil, + } +} + +// Socket is the implementation of a socket client. +type Socket struct { + Config + conn net.Conn +} + +// Connect connects to the Socket address on the named network. +// If the address is a domain name it will also perform the DNS resolution. +// Address like :80 will attempt to connect to the localhost. +// The config timeout and TLS config will be used. +func (s *Socket) Connect() error { + network, address := networkType(s.Address) + var conn net.Conn + var err error + + if s.TLSConf == nil { + conn, err = net.DialTimeout(network, address, s.ConnectTimeout) + } else { + var d net.Dialer + d.Timeout = s.ConnectTimeout + conn, err = tls.DialWithDialer(&d, network, address, s.TLSConf) + } + if err != nil { + return err + } + + s.conn = conn + + return nil +} + +// Disconnect closes the connection. +// Any in-flight commands will be cancelled and return errors. +func (s *Socket) Disconnect() (err error) { + if s.conn != nil { + err = s.conn.Close() + s.conn = nil + } + return err +} + +// Command writes the command string to the connection and passed the +// response bytes line by line to the process function. It uses the +// timeout value from the Socket config and returns read, write and +// timeout errors if any. If a timeout occurs during the processing +// of the responses this function will stop processing and return a +// timeout error. +func (s *Socket) Command(command string, process Processor) error { + if s.conn == nil { + return errors.New("cannot send command on nil connection") + } + if err := write(command, s.conn, s.WriteTimeout); err != nil { + return err + } + return read(s.conn, process, s.ReadTimeout) +} + +func write(command string, writer net.Conn, timeout time.Duration) error { + if writer == nil { + return errors.New("attempt to write on nil connection") + } + if err := writer.SetWriteDeadline(time.Now().Add(timeout)); err != nil { + return err + } + _, err := writer.Write([]byte(command)) + return err +} + +func read(reader net.Conn, process Processor, timeout time.Duration) error { + if process == nil { + return errors.New("process func is nil") + } + if reader == nil { + return errors.New("attempt to read on nil connection") + } + if err := reader.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return err + } + scanner := bufio.NewScanner(reader) + for scanner.Scan() && process(scanner.Bytes()) { + } + return scanner.Err() +} diff --git a/src/go/plugin/go.d/pkg/socket/client_test.go b/src/go/plugin/go.d/pkg/socket/client_test.go new file mode 100644 index 000000000..fa64f4558 --- /dev/null +++ b/src/go/plugin/go.d/pkg/socket/client_test.go @@ -0,0 +1,163 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package socket + +import ( + "crypto/tls" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testServerAddress = "127.0.0.1:9999" + testUdpServerAddress = "udp://127.0.0.1:9999" + testUnixServerAddress = "/tmp/testSocketFD" + defaultTimeout = 100 * time.Millisecond +) + +var tcpConfig = Config{ + Address: testServerAddress, + ConnectTimeout: defaultTimeout, + ReadTimeout: defaultTimeout, + WriteTimeout: defaultTimeout, + TLSConf: nil, +} + +var udpConfig = Config{ + Address: testUdpServerAddress, + ConnectTimeout: defaultTimeout, + ReadTimeout: defaultTimeout, + WriteTimeout: defaultTimeout, + TLSConf: nil, +} + +var unixConfig = Config{ + Address: testUnixServerAddress, + ConnectTimeout: defaultTimeout, + ReadTimeout: defaultTimeout, + WriteTimeout: defaultTimeout, + TLSConf: nil, +} + +var tcpTlsConfig = Config{ + Address: testServerAddress, + ConnectTimeout: defaultTimeout, + ReadTimeout: defaultTimeout, + WriteTimeout: defaultTimeout, + TLSConf: &tls.Config{}, +} + +func Test_clientCommand(t *testing.T) { + srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1} + go func() { _ = srv.Run(); defer func() { _ = srv.Close() }() }() + + time.Sleep(time.Millisecond * 100) + sock := New(tcpConfig) + require.NoError(t, sock.Connect()) + err := sock.Command("ping\n", func(bytes []byte) bool { + assert.Equal(t, "pong", string(bytes)) + return true + }) + require.NoError(t, sock.Disconnect()) + require.NoError(t, err) +} + +func Test_clientTimeout(t *testing.T) { + srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1} + go func() { _ = srv.Run() }() + + time.Sleep(time.Millisecond * 100) + sock := New(tcpConfig) + require.NoError(t, sock.Connect()) + sock.ReadTimeout = 0 + sock.ReadTimeout = 0 + err := sock.Command("ping\n", func(bytes []byte) bool { + assert.Equal(t, "pong", string(bytes)) + return true + }) + require.Error(t, err) +} + +func Test_clientIncompleteSSL(t *testing.T) { + srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1} + go func() { _ = srv.Run() }() + + time.Sleep(time.Millisecond * 100) + sock := New(tcpTlsConfig) + err := sock.Connect() + require.Error(t, err) +} + +func Test_clientCommandStopProcessing(t *testing.T) { + srv := &tcpServer{addr: testServerAddress, rowsNumResp: 2} + go func() { _ = srv.Run() }() + + time.Sleep(time.Millisecond * 100) + sock := New(tcpConfig) + require.NoError(t, sock.Connect()) + err := sock.Command("ping\n", func(bytes []byte) bool { + assert.Equal(t, "pong", string(bytes)) + return false + }) + require.NoError(t, sock.Disconnect()) + require.NoError(t, err) +} + +func Test_clientUDPCommand(t *testing.T) { + srv := &udpServer{addr: testServerAddress, rowsNumResp: 1} + go func() { _ = srv.Run(); defer func() { _ = srv.Close() }() }() + + time.Sleep(time.Millisecond * 100) + sock := New(udpConfig) + require.NoError(t, sock.Connect()) + err := sock.Command("ping\n", func(bytes []byte) bool { + assert.Equal(t, "pong", string(bytes)) + return false + }) + require.NoError(t, sock.Disconnect()) + require.NoError(t, err) +} + +func Test_clientTCPAddress(t *testing.T) { + srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1} + go func() { _ = srv.Run() }() + time.Sleep(time.Millisecond * 100) + + sock := New(tcpConfig) + require.NoError(t, sock.Connect()) + + tcpConfig.Address = "tcp://" + tcpConfig.Address + sock = New(tcpConfig) + require.NoError(t, sock.Connect()) +} + +func Test_clientUnixCommand(t *testing.T) { + srv := &unixServer{addr: testUnixServerAddress, rowsNumResp: 1} + // cleanup previous file descriptors + _ = srv.Close() + go func() { _ = srv.Run() }() + + time.Sleep(time.Millisecond * 200) + sock := New(unixConfig) + require.NoError(t, sock.Connect()) + err := sock.Command("ping\n", func(bytes []byte) bool { + assert.Equal(t, "pong", string(bytes)) + return false + }) + require.NoError(t, err) + require.NoError(t, sock.Disconnect()) +} + +func Test_clientEmptyProcessFunc(t *testing.T) { + srv := &tcpServer{addr: testServerAddress, rowsNumResp: 1} + go func() { _ = srv.Run() }() + + time.Sleep(time.Millisecond * 100) + sock := New(tcpConfig) + require.NoError(t, sock.Connect()) + err := sock.Command("ping\n", nil) + require.Error(t, err, "nil process func should return an error") +} diff --git a/src/go/plugin/go.d/pkg/socket/servers_test.go b/src/go/plugin/go.d/pkg/socket/servers_test.go new file mode 100644 index 000000000..d66178162 --- /dev/null +++ b/src/go/plugin/go.d/pkg/socket/servers_test.go @@ -0,0 +1,139 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package socket + +import ( + "bufio" + "errors" + "fmt" + "net" + "os" + "strings" + "time" +) + +type tcpServer struct { + addr string + server net.Listener + rowsNumResp int +} + +func (t *tcpServer) Run() (err error) { + t.server, err = net.Listen("tcp", t.addr) + if err != nil { + return + } + return t.handleConnections() +} + +func (t *tcpServer) Close() (err error) { + return t.server.Close() +} + +func (t *tcpServer) handleConnections() (err error) { + for { + conn, err := t.server.Accept() + if err != nil || conn == nil { + return errors.New("could not accept connection") + } + t.handleConnection(conn) + } +} + +func (t *tcpServer) handleConnection(conn net.Conn) { + defer func() { _ = conn.Close() }() + _ = conn.SetDeadline(time.Now().Add(time.Millisecond * 100)) + + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + _, err := rw.ReadString('\n') + if err != nil { + _, _ = rw.WriteString("failed to read input") + _ = rw.Flush() + } else { + resp := strings.Repeat("pong\n", t.rowsNumResp) + _, _ = rw.WriteString(resp) + _ = rw.Flush() + } +} + +type udpServer struct { + addr string + conn *net.UDPConn + rowsNumResp int +} + +func (u *udpServer) Run() (err error) { + addr, err := net.ResolveUDPAddr("udp", u.addr) + if err != nil { + return err + } + u.conn, err = net.ListenUDP("udp", addr) + if err != nil { + return + } + u.handleConnections() + return nil +} + +func (u *udpServer) Close() (err error) { + return u.conn.Close() +} + +func (u *udpServer) handleConnections() { + for { + var buf [2048]byte + _, addr, _ := u.conn.ReadFromUDP(buf[0:]) + resp := strings.Repeat("pong\n", u.rowsNumResp) + _, _ = u.conn.WriteToUDP([]byte(resp), addr) + } +} + +type unixServer struct { + addr string + conn *net.UnixListener + rowsNumResp int +} + +func (u *unixServer) Run() (err error) { + _, _ = os.CreateTemp("/tmp", "testSocketFD") + addr, err := net.ResolveUnixAddr("unix", u.addr) + if err != nil { + return err + } + u.conn, err = net.ListenUnix("unix", addr) + if err != nil { + return + } + go u.handleConnections() + return nil +} + +func (u *unixServer) Close() (err error) { + _ = os.Remove(testUnixServerAddress) + return u.conn.Close() +} + +func (u *unixServer) handleConnections() { + var conn net.Conn + var err error + conn, err = u.conn.AcceptUnix() + if err != nil { + panic(fmt.Errorf("could not accept connection: %v", err)) + } + u.handleConnection(conn) +} + +func (u *unixServer) handleConnection(conn net.Conn) { + _ = conn.SetDeadline(time.Now().Add(time.Second)) + + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + _, err := rw.ReadString('\n') + if err != nil { + _, _ = rw.WriteString("failed to read input") + _ = rw.Flush() + } else { + resp := strings.Repeat("pong\n", u.rowsNumResp) + _, _ = rw.WriteString(resp) + _ = rw.Flush() + } +} diff --git a/src/go/plugin/go.d/pkg/socket/types.go b/src/go/plugin/go.d/pkg/socket/types.go new file mode 100644 index 000000000..693faf5be --- /dev/null +++ b/src/go/plugin/go.d/pkg/socket/types.go @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package socket + +import ( + "crypto/tls" + "time" +) + +// Processor function passed to the Socket.Command function. +// It is passed by the caller to process a command's response +// line by line. +type Processor func([]byte) bool + +// Client is the interface that wraps the basic socket client operations +// and hides the implementation details from the users. +// +// Connect should prepare the connection. +// +// Disconnect should stop any in-flight connections. +// +// Command should send the actual data to the wire and pass +// any results to the processor function. +// +// Implementations should return TCP, UDP or Unix ready sockets. +type Client interface { + Connect() error + Disconnect() error + Command(command string, process Processor) error +} + +// Config holds the network ip v4 or v6 address, port, +// Socket type(ip, tcp, udp, unix), timeout and TLS configuration +// for a Socket +type Config struct { + Address string + ConnectTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + TLSConf *tls.Config +} diff --git a/src/go/plugin/go.d/pkg/socket/utils.go b/src/go/plugin/go.d/pkg/socket/utils.go new file mode 100644 index 000000000..dcc48b383 --- /dev/null +++ b/src/go/plugin/go.d/pkg/socket/utils.go @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package socket + +import "strings" + +func IsUnixSocket(address string) bool { + return strings.HasPrefix(address, "/") || strings.HasPrefix(address, "unix://") +} + +func IsUdpSocket(address string) bool { + return strings.HasPrefix(address, "udp://") +} + +func networkType(address string) (string, string) { + switch { + case IsUnixSocket(address): + address = strings.TrimPrefix(address, "unix://") + return "unix", address + case IsUdpSocket(address): + return "udp", strings.TrimPrefix(address, "udp://") + default: + return "tcp", strings.TrimPrefix(address, "tcp://") + } +} |