summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/pkg/logs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/csv.go195
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/csv_test.go175
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/json.go140
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/json_test.go224
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/lastline.go65
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go54
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/ltsv.go95
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go125
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/parser.go65
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/parser_test.go3
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/reader.go193
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/reader_test.go245
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/regexp.go76
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go131
14 files changed, 1786 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/csv.go b/src/go/collectors/go.d.plugin/pkg/logs/csv.go
new file mode 100644
index 000000000..0b7d90009
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/csv.go
@@ -0,0 +1,195 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "bytes"
+ "encoding/csv"
+ "errors"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+)
+
+type (
+ CSVConfig struct {
+ FieldsPerRecord int `yaml:"fields_per_record" json:"fields_per_record"`
+ Delimiter string `yaml:"delimiter" json:"delimiter"`
+ TrimLeadingSpace bool `yaml:"trim_leading_space" json:"trim_leading_space"`
+ Format string `yaml:"format" json:"format"`
+ CheckField func(string) (string, int, bool) `yaml:"-" json:"-"`
+ }
+
+ CSVParser struct {
+ Config CSVConfig
+ reader *csv.Reader
+ format *csvFormat
+ }
+
+ csvFormat struct {
+ raw string
+ maxIndex int
+ fields []csvField
+ }
+
+ csvField struct {
+ name string
+ idx int
+ }
+)
+
+func NewCSVParser(config CSVConfig, in io.Reader) (*CSVParser, error) {
+ if config.Format == "" {
+ return nil, errors.New("empty csv format")
+ }
+
+ format, err := newCSVFormat(config)
+ if err != nil {
+ return nil, fmt.Errorf("bad csv format '%s': %v", config.Format, err)
+ }
+
+ p := &CSVParser{
+ Config: config,
+ reader: newCSVReader(in, config),
+ format: format,
+ }
+ return p, nil
+}
+
+func (p *CSVParser) ReadLine(line LogLine) error {
+ record, err := p.reader.Read()
+ if err != nil {
+ return handleCSVReaderError(err)
+ }
+ return p.format.parse(record, line)
+}
+
+func (p *CSVParser) Parse(row []byte, line LogLine) error {
+ r := newCSVReader(bytes.NewBuffer(row), p.Config)
+ record, err := r.Read()
+ if err != nil {
+ return handleCSVReaderError(err)
+ }
+ return p.format.parse(record, line)
+}
+
+func (p CSVParser) Info() string {
+ return fmt.Sprintf("csv: %s", p.format.raw)
+}
+
+func (f *csvFormat) parse(record []string, line LogLine) error {
+ if len(record) <= f.maxIndex {
+ return &ParseError{msg: "csv parse: unmatched line"}
+ }
+
+ for _, v := range f.fields {
+ if err := line.Assign(v.name, record[v.idx]); err != nil {
+ return &ParseError{msg: fmt.Sprintf("csv parse: %v", err), err: err}
+ }
+ }
+ return nil
+}
+
+func newCSVReader(in io.Reader, config CSVConfig) *csv.Reader {
+ r := csv.NewReader(in)
+ if config.Delimiter != "" {
+ if d, err := parseCSVDelimiter(config.Delimiter); err == nil {
+ r.Comma = d
+ }
+ }
+ r.TrimLeadingSpace = config.TrimLeadingSpace
+ r.FieldsPerRecord = config.FieldsPerRecord
+ r.ReuseRecord = true
+ return r
+}
+
+func newCSVFormat(config CSVConfig) (*csvFormat, error) {
+ r := csv.NewReader(strings.NewReader(config.Format))
+ if config.Delimiter != "" {
+ if d, err := parseCSVDelimiter(config.Delimiter); err == nil {
+ r.Comma = d
+ }
+ }
+ r.TrimLeadingSpace = config.TrimLeadingSpace
+
+ record, err := r.Read()
+ if err != nil {
+ return nil, err
+ }
+
+ fields, err := createCSVFields(record, config.CheckField)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(fields) == 0 {
+ return nil, errors.New("zero fields")
+ }
+
+ format := &csvFormat{
+ raw: config.Format,
+ maxIndex: fields[len(fields)-1].idx,
+ fields: fields,
+ }
+ return format, nil
+}
+
+func createCSVFields(format []string, check func(string) (string, int, bool)) ([]csvField, error) {
+ if check == nil {
+ check = checkCSVFormatField
+ }
+ var fields []csvField
+ var offset int
+ seen := make(map[string]bool)
+
+ for i, name := range format {
+ name = strings.Trim(name, `"`)
+
+ name, addOffset, valid := check(name)
+ offset += addOffset
+ if !valid {
+ continue
+ }
+ if seen[name] {
+ return nil, fmt.Errorf("duplicate field: %s", name)
+ }
+ seen[name] = true
+
+ idx := i + offset
+ fields = append(fields, csvField{name, idx})
+ }
+ return fields, nil
+}
+
+func handleCSVReaderError(err error) error {
+ if isCSVParseError(err) {
+ return &ParseError{msg: fmt.Sprintf("csv parse: %v", err), err: err}
+ }
+ return err
+}
+
+func isCSVParseError(err error) bool {
+ return errors.Is(err, csv.ErrBareQuote) || errors.Is(err, csv.ErrFieldCount) || errors.Is(err, csv.ErrQuote)
+}
+
+func checkCSVFormatField(name string) (newName string, offset int, valid bool) {
+ if len(name) < 2 || !strings.HasPrefix(name, "$") {
+ return "", 0, false
+ }
+ return name, 0, true
+}
+
+func parseCSVDelimiter(s string) (rune, error) {
+ if isNumber(s) {
+ d, err := strconv.ParseInt(s, 10, 32)
+ if err != nil {
+ return 0, fmt.Errorf("invalid CSV delimiter: %v", err)
+ }
+ return rune(d), nil
+ }
+ if len(s) != 1 {
+ return 0, errors.New("invalid CSV delimiter: must be a single character")
+ }
+ return rune(s[0]), nil
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/csv_test.go b/src/go/collectors/go.d.plugin/pkg/logs/csv_test.go
new file mode 100644
index 000000000..d7baaa1b5
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/csv_test.go
@@ -0,0 +1,175 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+var testCSVConfig = CSVConfig{
+ Delimiter: " ",
+ Format: "$A %B",
+}
+
+func TestNewCSVParser(t *testing.T) {
+ tests := []struct {
+ name string
+ format string
+ wantErr bool
+ }{
+ {name: "valid format", format: "$A $B"},
+ {name: "empty format", wantErr: true},
+ {name: "bad format: csv read error", format: "$A $B \"$C", wantErr: true},
+ {name: "bad format: duplicate fields", format: "$A $A", wantErr: true},
+ {name: "bad format: zero fields", format: "!A !B", wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := testCSVConfig
+ c.Format = tt.format
+ p, err := NewCSVParser(c, nil)
+ if tt.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, p)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ }
+ })
+ }
+}
+
+func TestNewCSVFormat(t *testing.T) {
+ tests := []struct {
+ format string
+ wantFormat csvFormat
+ wantErr bool
+ }{
+ {format: "$A $B", wantFormat: csvFormat{maxIndex: 1, fields: []csvField{{"$A", 0}, {"$B", 1}}}},
+ {format: "$A $B !C $E", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$A", 0}, {"$B", 1}, {"$E", 3}}}},
+ {format: "!A !B !C $E", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$E", 3}}}},
+ {format: "$A $OFFSET $B", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$A", 0}, {"$B", 3}}}},
+ {format: "$A $OFFSET $B $OFFSET !A", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$A", 0}, {"$B", 3}}}},
+ {format: "$A $OFFSET $OFFSET $B", wantFormat: csvFormat{maxIndex: 5, fields: []csvField{{"$A", 0}, {"$B", 5}}}},
+ {format: "$OFFSET $A $OFFSET $B", wantFormat: csvFormat{maxIndex: 5, fields: []csvField{{"$A", 2}, {"$B", 5}}}},
+ {format: "$A \"$A", wantErr: true},
+ {format: "$A $A", wantErr: true},
+ {format: "!A !A", wantErr: true},
+ {format: "", wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.format, func(t *testing.T) {
+ c := testCSVConfig
+ c.Format = tt.format
+ c.CheckField = testCheckCSVFormatField
+ tt.wantFormat.raw = tt.format
+
+ f, err := newCSVFormat(c)
+
+ if tt.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, f)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tt.wantFormat, *f)
+ }
+ })
+ }
+}
+
+func TestCSVParser_ReadLine(t *testing.T) {
+ tests := []struct {
+ name string
+ row string
+ format string
+ wantErr bool
+ wantParseErr bool
+ }{
+ {name: "match and no error", row: "1 2 3", format: `$A $B $C`},
+ {name: "match but error on assigning", row: "1 2 3", format: `$A $B $ERR`, wantErr: true, wantParseErr: true},
+ {name: "not match", row: "1 2 3", format: `$A $B $C $d`, wantErr: true, wantParseErr: true},
+ {name: "error on reading csv.Err", row: "1 2\"3", format: `$A $B $C`, wantErr: true, wantParseErr: true},
+ {name: "error on reading EOF", row: "", format: `$A $B $C`, wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var line logLine
+ r := strings.NewReader(tt.row)
+ c := testCSVConfig
+ c.Format = tt.format
+ p, err := NewCSVParser(c, r)
+ require.NoError(t, err)
+
+ err = p.ReadLine(&line)
+
+ if tt.wantErr {
+ require.Error(t, err)
+ if tt.wantParseErr {
+ assert.True(t, IsParseError(err))
+ } else {
+ assert.False(t, IsParseError(err))
+ }
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestCSVParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ row string
+ format string
+ wantErr bool
+ }{
+ {name: "match and no error", row: "1 2 3", format: `$A $B $C`},
+ {name: "match but error on assigning", row: "1 2 3", format: `$A $B $ERR`, wantErr: true},
+ {name: "not match", row: "1 2 3", format: `$A $B $C $d`, wantErr: true},
+ {name: "error on reading csv.Err", row: "1 2\"3", format: `$A $B $C`, wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var line logLine
+ r := strings.NewReader(tt.row)
+ c := testCSVConfig
+ c.Format = tt.format
+ p, err := NewCSVParser(c, r)
+ require.NoError(t, err)
+
+ err = p.ReadLine(&line)
+
+ if tt.wantErr {
+ require.Error(t, err)
+ assert.True(t, IsParseError(err))
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+
+}
+
+func TestCSVParser_Info(t *testing.T) {
+ p, err := NewCSVParser(testCSVConfig, nil)
+ require.NoError(t, err)
+ assert.NotZero(t, p.Info())
+}
+
+func testCheckCSVFormatField(name string) (newName string, offset int, valid bool) {
+ if len(name) < 2 || !strings.HasPrefix(name, "$") {
+ return "", 0, false
+ }
+ if name == "$OFFSET" {
+ return "", 1, false
+ }
+ return name, 0, true
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/json.go b/src/go/collectors/go.d.plugin/pkg/logs/json.go
new file mode 100644
index 000000000..ceb32e272
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/json.go
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "strconv"
+
+ "github.com/valyala/fastjson"
+)
+
+type JSONConfig struct {
+ Mapping map[string]string `yaml:"mapping" json:"mapping"`
+}
+
+type JSONParser struct {
+ reader *bufio.Reader
+ parser fastjson.Parser
+ buf []byte
+ mapping map[string]string
+}
+
+func NewJSONParser(config JSONConfig, in io.Reader) (*JSONParser, error) {
+ parser := &JSONParser{
+ reader: bufio.NewReader(in),
+ mapping: config.Mapping,
+ buf: make([]byte, 0, 100),
+ }
+ return parser, nil
+}
+
+func (p *JSONParser) ReadLine(line LogLine) error {
+ row, err := p.reader.ReadSlice('\n')
+ if err != nil && len(row) == 0 {
+ return err
+ }
+ if len(row) > 0 && row[len(row)-1] == '\n' {
+ row = row[:len(row)-1]
+ }
+ return p.Parse(row, line)
+}
+
+func (p *JSONParser) Parse(row []byte, line LogLine) error {
+ val, err := p.parser.ParseBytes(row)
+ if err != nil {
+ return err
+ }
+
+ if err := p.parseObject("", val, line); err != nil {
+ return &ParseError{msg: fmt.Sprintf("json parse: %v", err), err: err}
+ }
+
+ return nil
+}
+
+func (p *JSONParser) parseObject(prefix string, val *fastjson.Value, line LogLine) error {
+ obj, err := val.Object()
+ if err != nil {
+ return err
+ }
+
+ obj.Visit(func(key []byte, v *fastjson.Value) {
+ if err != nil {
+ return
+ }
+
+ k := jsonObjKey(prefix, string(key))
+
+ switch v.Type() {
+ case fastjson.TypeString, fastjson.TypeNumber:
+ err = p.parseStringNumber(k, v, line)
+ case fastjson.TypeArray:
+ err = p.parseArray(k, v, line)
+ case fastjson.TypeObject:
+ err = p.parseObject(k, v, line)
+ default:
+ return
+ }
+ })
+
+ return err
+}
+
+func jsonObjKey(prefix, key string) string {
+ if prefix == "" {
+ return key
+ }
+ return prefix + "." + key
+}
+
+func (p *JSONParser) parseArray(key string, val *fastjson.Value, line LogLine) error {
+ arr, err := val.Array()
+ if err != nil {
+ return err
+ }
+
+ for i, v := range arr {
+ k := jsonObjKey(key, strconv.Itoa(i))
+
+ switch v.Type() {
+ case fastjson.TypeString, fastjson.TypeNumber:
+ err = p.parseStringNumber(k, v, line)
+ case fastjson.TypeArray:
+ err = p.parseArray(k, v, line)
+ case fastjson.TypeObject:
+ err = p.parseObject(k, v, line)
+ default:
+ continue
+ }
+
+ if err != nil {
+ return err
+ }
+ }
+
+ return err
+}
+
+func (p *JSONParser) parseStringNumber(key string, val *fastjson.Value, line LogLine) error {
+ if mapped, ok := p.mapping[key]; ok {
+ key = mapped
+ }
+
+ p.buf = p.buf[:0]
+ if p.buf = val.MarshalTo(p.buf); len(p.buf) == 0 {
+ return nil
+ }
+
+ if val.Type() == fastjson.TypeString {
+ // trim "
+ return line.Assign(key, string(p.buf[1:len(p.buf)-1]))
+ }
+ return line.Assign(key, string(p.buf))
+}
+
+func (p *JSONParser) Info() string {
+ return fmt.Sprintf("json: %q", p.mapping)
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/json_test.go b/src/go/collectors/go.d.plugin/pkg/logs/json_test.go
new file mode 100644
index 000000000..b82850031
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/json_test.go
@@ -0,0 +1,224 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestNewJSONParser(t *testing.T) {
+ tests := map[string]struct {
+ config JSONConfig
+ wantErr bool
+ }{
+ "empty config": {
+ config: JSONConfig{},
+ wantErr: false,
+ },
+ "with mappings": {
+ config: JSONConfig{Mapping: map[string]string{"from_field_1": "to_field_1"}},
+ wantErr: false,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ p, err := NewJSONParser(test.config, nil)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, p)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ }
+ })
+ }
+}
+
+func TestJSONParser_ReadLine(t *testing.T) {
+ tests := map[string]struct {
+ config JSONConfig
+ input string
+ wantAssigned map[string]string
+ wantErr bool
+ }{
+ "string value": {
+ input: `{ "string": "example.com" }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "string": "example.com",
+ },
+ },
+ "int value": {
+ input: `{ "int": 1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "int": "1",
+ },
+ },
+ "float value": {
+ input: `{ "float": 1.1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "float": "1.1",
+ },
+ },
+ "string, int, float values": {
+ input: `{ "string": "example.com", "int": 1, "float": 1.1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "string": "example.com",
+ "int": "1",
+ "float": "1.1",
+ },
+ },
+ "string, int, float values with mappings": {
+ config: JSONConfig{Mapping: map[string]string{
+ "string": "STRING",
+ "int": "INT",
+ "float": "FLOAT",
+ }},
+ input: `{ "string": "example.com", "int": 1, "float": 1.1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "STRING": "example.com",
+ "INT": "1",
+ "FLOAT": "1.1",
+ },
+ },
+ "nested": {
+ input: `{"one":{"two":2,"three":{"four":4}},"five":5}`,
+ config: JSONConfig{Mapping: map[string]string{
+ "one.two": "mapped_value",
+ }},
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "mapped_value": "2",
+ "one.three.four": "4",
+ "five": "5",
+ },
+ },
+ "nested with array": {
+ input: `{"one":{"two":[2,22]},"five":5}`,
+ config: JSONConfig{Mapping: map[string]string{
+ "one.two.1": "mapped_value",
+ }},
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "one.two.0": "2",
+ "mapped_value": "22",
+ "five": "5",
+ },
+ },
+ "error on malformed JSON": {
+ input: `{ "host"": unquoted_string}`,
+ wantErr: true,
+ },
+ "error on empty input": {
+ wantErr: true,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ line := newLogLine()
+ in := strings.NewReader(test.input)
+ p, err := NewJSONParser(test.config, in)
+ require.NoError(t, err)
+ require.NotNil(t, p)
+
+ err = p.ReadLine(line)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ assert.Equal(t, test.wantAssigned, line.assigned)
+ }
+ })
+ }
+}
+
+func TestJSONParser_Parse(t *testing.T) {
+ tests := map[string]struct {
+ config JSONConfig
+ input string
+ wantAssigned map[string]string
+ wantErr bool
+ }{
+ "string value": {
+ input: `{ "string": "example.com" }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "string": "example.com",
+ },
+ },
+ "int value": {
+ input: `{ "int": 1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "int": "1",
+ },
+ },
+ "float value": {
+ input: `{ "float": 1.1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "float": "1.1",
+ },
+ },
+ "string, int, float values": {
+ input: `{ "string": "example.com", "int": 1, "float": 1.1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "string": "example.com",
+ "int": "1",
+ "float": "1.1",
+ },
+ },
+ "string, int, float values with mappings": {
+ config: JSONConfig{Mapping: map[string]string{
+ "string": "STRING",
+ "int": "INT",
+ "float": "FLOAT",
+ }},
+ input: `{ "string": "example.com", "int": 1, "float": 1.1 }`,
+ wantErr: false,
+ wantAssigned: map[string]string{
+ "STRING": "example.com",
+ "INT": "1",
+ "FLOAT": "1.1",
+ },
+ },
+ "error on malformed JSON": {
+ input: `{ "host"": unquoted_string}`,
+ wantErr: true,
+ },
+ "error on empty input": {
+ wantErr: true,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ line := newLogLine()
+ p, err := NewJSONParser(test.config, nil)
+ require.NoError(t, err)
+ require.NotNil(t, p)
+
+ err = p.Parse([]byte(test.input), line)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ assert.Equal(t, test.wantAssigned, line.assigned)
+ }
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/lastline.go b/src/go/collectors/go.d.plugin/pkg/logs/lastline.go
new file mode 100644
index 000000000..911dbf497
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/lastline.go
@@ -0,0 +1,65 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "errors"
+ "os"
+
+ "github.com/clbanning/rfile/v2"
+)
+
+const DefaultMaxLineWidth = 4 * 1024 // assume disk block size is 4K
+
+var ErrTooLongLine = errors.New("too long line")
+
+// ReadLastLine returns the last line of the file and any read error encountered.
+// It expects last line width <= maxLineWidth.
+// If maxLineWidth <= 0, it defaults to DefaultMaxLineWidth.
+func ReadLastLine(filename string, maxLineWidth int64) ([]byte, error) {
+ if maxLineWidth <= 0 {
+ maxLineWidth = DefaultMaxLineWidth
+ }
+ f, err := os.Open(filename)
+ if err != nil {
+ return nil, err
+ }
+ defer func() { _ = f.Close() }()
+
+ stat, _ := f.Stat()
+ endPos := stat.Size()
+ if endPos == 0 {
+ return []byte{}, nil
+ }
+ startPos := endPos - maxLineWidth
+ if startPos < 0 {
+ startPos = 0
+ }
+ buf := make([]byte, endPos-startPos)
+ n, err := f.ReadAt(buf, startPos)
+ if err != nil {
+ return nil, err
+ }
+ lnPos := 0
+ foundLn := false
+ for i := n - 2; i >= 0; i-- {
+ ch := buf[i]
+ if ch == '\n' {
+ foundLn = true
+ lnPos = i
+ break
+ }
+ }
+ if foundLn {
+ return buf[lnPos+1 : n], nil
+ }
+ if startPos == 0 {
+ return buf[0:n], nil
+ }
+
+ return nil, ErrTooLongLine
+}
+
+func ReadLastLines(filename string, n uint) ([]string, error) {
+ return rfile.Tail(filename, int(n))
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go b/src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go
new file mode 100644
index 000000000..ea0a75e9e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go
@@ -0,0 +1,54 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestReadLastLine(t *testing.T) {
+ tests := []struct {
+ name string
+ content string
+ expected string
+ err error
+ }{
+ {"empty", "", "", nil},
+ {"empty-ln", "\n", "\n", nil},
+ {"one-line", "hello", "hello", nil},
+ {"one-line-ln", "hello\n", "hello\n", nil},
+ {"multi-line", "hello\nworld", "world", nil},
+ {"multi-line-ln", "hello\nworld\n", "world\n", nil},
+ {"long-line", "hello hello hello", "", ErrTooLongLine},
+ {"long-line-ln", "hello hello hello\n", "", ErrTooLongLine},
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ filename := prepareFile(t, test.content)
+ defer func() { _ = os.Remove(filename) }()
+
+ line, err := ReadLastLine(filename, 10)
+
+ if test.err != nil {
+ require.NotNil(t, err)
+ assert.Contains(t, err.Error(), test.err.Error())
+ } else {
+ assert.Equal(t, test.expected, string(line))
+ }
+ })
+ }
+}
+
+func prepareFile(t *testing.T, content string) string {
+ t.Helper()
+ file, err := os.CreateTemp("", "go-test")
+ require.NoError(t, err)
+ defer func() { _ = file.Close() }()
+
+ _, _ = file.WriteString(content)
+ return file.Name()
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/ltsv.go b/src/go/collectors/go.d.plugin/pkg/logs/ltsv.go
new file mode 100644
index 000000000..b7fbceb14
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/ltsv.go
@@ -0,0 +1,95 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "bufio"
+ "errors"
+ "fmt"
+ "io"
+ "strconv"
+ "unsafe"
+
+ "github.com/Wing924/ltsv"
+)
+
+type (
+ LTSVConfig struct {
+ FieldDelimiter string `yaml:"field_delimiter" json:"field_delimiter"`
+ ValueDelimiter string `yaml:"value_delimiter" json:"value_delimiter"`
+ Mapping map[string]string `yaml:"mapping" json:"mapping"`
+ }
+
+ LTSVParser struct {
+ r *bufio.Reader
+ parser ltsv.Parser
+ mapping map[string]string
+ }
+)
+
+func NewLTSVParser(config LTSVConfig, in io.Reader) (*LTSVParser, error) {
+ p := ltsv.Parser{
+ FieldDelimiter: ltsv.DefaultParser.FieldDelimiter,
+ ValueDelimiter: ltsv.DefaultParser.ValueDelimiter,
+ StrictMode: false,
+ }
+ if config.FieldDelimiter != "" {
+ if d, err := parseLTSVDelimiter(config.FieldDelimiter); err == nil {
+ p.FieldDelimiter = d
+ }
+ }
+ if config.ValueDelimiter != "" {
+ if d, err := parseLTSVDelimiter(config.ValueDelimiter); err == nil {
+ p.ValueDelimiter = d
+ }
+ }
+ parser := &LTSVParser{
+ r: bufio.NewReader(in),
+ parser: p,
+ mapping: config.Mapping,
+ }
+ return parser, nil
+}
+
+func (p *LTSVParser) ReadLine(line LogLine) error {
+ row, err := p.r.ReadSlice('\n')
+ if err != nil && len(row) == 0 {
+ return err
+ }
+ if len(row) > 0 && row[len(row)-1] == '\n' {
+ row = row[:len(row)-1]
+ }
+ return p.Parse(row, line)
+}
+
+func (p *LTSVParser) Parse(row []byte, line LogLine) error {
+ err := p.parser.ParseLine(row, func(label []byte, value []byte) error {
+ s := *(*string)(unsafe.Pointer(&label)) // no alloc, same as in fmt.Builder.String()
+ if v, ok := p.mapping[s]; ok {
+ s = v
+ }
+ return line.Assign(s, string(value))
+ })
+ if err != nil {
+ return &ParseError{msg: fmt.Sprintf("ltsv parse: %v", err), err: err}
+ }
+ return nil
+}
+
+func (p LTSVParser) Info() string {
+ return fmt.Sprintf("ltsv: %q", p.mapping)
+}
+
+func parseLTSVDelimiter(s string) (byte, error) {
+ if isNumber(s) {
+ d, err := strconv.ParseUint(s, 10, 8)
+ if err != nil {
+ return 0, fmt.Errorf("invalid LTSV delimiter: %v", err)
+ }
+ return byte(d), nil
+ }
+ if len(s) != 1 {
+ return 0, errors.New("invalid LTSV delimiter: must be a single character")
+ }
+ return s[0], nil
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go b/src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go
new file mode 100644
index 000000000..f6d5ec2bd
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go
@@ -0,0 +1,125 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/Wing924/ltsv"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+var testLTSVConfig = LTSVConfig{
+ FieldDelimiter: " ",
+ ValueDelimiter: "=",
+ Mapping: map[string]string{"KEY": "key"},
+}
+
+func TestNewLTSVParser(t *testing.T) {
+ tests := []struct {
+ name string
+ config LTSVConfig
+ wantErr bool
+ }{
+ {name: "config", config: testLTSVConfig},
+ {name: "empty config"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ p, err := NewLTSVParser(tt.config, nil)
+
+ if tt.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, p)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ if tt.config.FieldDelimiter == "" {
+ assert.Equal(t, ltsv.DefaultParser.FieldDelimiter, p.parser.FieldDelimiter)
+ } else {
+ assert.Equal(t, tt.config.FieldDelimiter, string(p.parser.FieldDelimiter))
+ }
+ if tt.config.ValueDelimiter == "" {
+ assert.Equal(t, ltsv.DefaultParser.ValueDelimiter, p.parser.ValueDelimiter)
+ } else {
+ assert.Equal(t, tt.config.ValueDelimiter, string(p.parser.ValueDelimiter))
+ }
+ assert.Equal(t, tt.config.Mapping, p.mapping)
+ }
+ })
+ }
+}
+
+func TestLTSVParser_ReadLine(t *testing.T) {
+ tests := []struct {
+ name string
+ row string
+ wantErr bool
+ wantParseErr bool
+ }{
+ {name: "no error", row: "A=1 B=2 KEY=3"},
+ {name: "error on parsing", row: "NO LABEL", wantErr: true, wantParseErr: true},
+ {name: "error on assigning", row: "A=1 ERR=2", wantErr: true, wantParseErr: true},
+ {name: "error on reading EOF", row: "", wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var line logLine
+ r := strings.NewReader(tt.row)
+ p, err := NewLTSVParser(testLTSVConfig, r)
+ require.NoError(t, err)
+
+ err = p.ReadLine(&line)
+
+ if tt.wantErr {
+ require.Error(t, err)
+ if tt.wantParseErr {
+ assert.True(t, IsParseError(err))
+ } else {
+ assert.False(t, IsParseError(err))
+ }
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestLTSVParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ row string
+ wantErr bool
+ }{
+ {name: "no error", row: "A=1 B=2"},
+ {name: "error on parsing", row: "NO LABEL", wantErr: true},
+ {name: "error on assigning", row: "A=1 ERR=2", wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var line logLine
+ p, err := NewLTSVParser(testLTSVConfig, nil)
+ require.NoError(t, err)
+
+ err = p.Parse([]byte(tt.row), &line)
+
+ if tt.wantErr {
+ require.Error(t, err)
+ assert.True(t, IsParseError(err))
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestLTSVParser_Info(t *testing.T) {
+ p, err := NewLTSVParser(testLTSVConfig, nil)
+ require.NoError(t, err)
+ assert.NotZero(t, p.Info())
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/parser.go b/src/go/collectors/go.d.plugin/pkg/logs/parser.go
new file mode 100644
index 000000000..d83b4309d
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/parser.go
@@ -0,0 +1,65 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "strconv"
+)
+
+type ParseError struct {
+ msg string
+ err error
+}
+
+func (e ParseError) Error() string { return e.msg }
+
+func (e ParseError) Unwrap() error { return e.err }
+
+func IsParseError(err error) bool { var v *ParseError; return errors.As(err, &v) }
+
+type (
+ LogLine interface {
+ Assign(name string, value string) error
+ }
+
+ Parser interface {
+ ReadLine(LogLine) error
+ Parse(row []byte, line LogLine) error
+ Info() string
+ }
+)
+
+const (
+ TypeCSV = "csv"
+ TypeLTSV = "ltsv"
+ TypeRegExp = "regexp"
+ TypeJSON = "json"
+)
+
+type ParserConfig struct {
+ LogType string `yaml:"log_type" json:"log_type"`
+ CSV CSVConfig `yaml:"csv_config" json:"csv_config"`
+ LTSV LTSVConfig `yaml:"ltsv_config" json:"ltsv_config"`
+ RegExp RegExpConfig `yaml:"regexp_config" json:"regexp_config"`
+ JSON JSONConfig `yaml:"json_config" json:"json_config"`
+}
+
+func NewParser(config ParserConfig, in io.Reader) (Parser, error) {
+ switch config.LogType {
+ case TypeCSV:
+ return NewCSVParser(config.CSV, in)
+ case TypeLTSV:
+ return NewLTSVParser(config.LTSV, in)
+ case TypeRegExp:
+ return NewRegExpParser(config.RegExp, in)
+ case TypeJSON:
+ return NewJSONParser(config.JSON, in)
+ default:
+ return nil, fmt.Errorf("invalid type: %q", config.LogType)
+ }
+}
+
+func isNumber(s string) bool { _, err := strconv.Atoi(s); return err == nil }
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/parser_test.go b/src/go/collectors/go.d.plugin/pkg/logs/parser_test.go
new file mode 100644
index 000000000..88ef46c27
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/parser_test.go
@@ -0,0 +1,3 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/reader.go b/src/go/collectors/go.d.plugin/pkg/logs/reader.go
new file mode 100644
index 000000000..ee526a9e3
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/reader.go
@@ -0,0 +1,193 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+const (
+ maxEOF = 60
+)
+
+var (
+ ErrNoMatchedFile = errors.New("no matched files")
+)
+
+// Reader is a log rotate aware Reader
+// TODO: better reopen algorithm
+// TODO: handle truncate
+type Reader struct {
+ file *os.File
+ path string
+ excludePath string
+ eofCounter int
+ continuousEOF int
+ log *logger.Logger
+}
+
+// Open a file and seek to end of the file.
+// path: the shell file name pattern
+// excludePath: the shell file name pattern
+func Open(path string, excludePath string, log *logger.Logger) (*Reader, error) {
+ var err error
+ if path, err = filepath.Abs(path); err != nil {
+ return nil, err
+ }
+ if _, err = filepath.Match(path, "/"); err != nil {
+ return nil, fmt.Errorf("bad path syntax: %q", path)
+ }
+ if _, err = filepath.Match(excludePath, "/"); err != nil {
+ return nil, fmt.Errorf("bad exclude_path syntax: %q", path)
+ }
+ r := &Reader{
+ path: path,
+ excludePath: excludePath,
+ log: log,
+ }
+
+ if err = r.open(); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+// CurrentFilename get current opened file name
+func (r *Reader) CurrentFilename() string {
+ return r.file.Name()
+}
+
+func (r *Reader) open() error {
+ path := r.findFile()
+ if path == "" {
+ r.log.Debugf("couldn't find log file, used path: '%s', exclude_path: '%s'", r.path, r.excludePath)
+ return ErrNoMatchedFile
+ }
+ r.log.Debug("open log file: ", path)
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ stat, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ if _, err = file.Seek(stat.Size(), io.SeekStart); err != nil {
+ return err
+ }
+ r.file = file
+ return nil
+}
+
+func (r *Reader) Read(p []byte) (n int, err error) {
+ n, err = r.file.Read(p)
+ if err != nil {
+ switch err {
+ case io.EOF:
+ err = r.handleEOFErr()
+ case os.ErrInvalid: // r.file is nil after Close
+ err = r.handleInvalidArgErr()
+ }
+ return
+ }
+ r.continuousEOF = 0
+ return
+}
+
+func (r *Reader) handleEOFErr() (err error) {
+ err = io.EOF
+ r.eofCounter++
+ r.continuousEOF++
+ if r.eofCounter < maxEOF || r.continuousEOF < 2 {
+ return err
+ }
+ if err2 := r.reopen(); err2 != nil {
+ err = err2
+ }
+ return err
+}
+
+func (r *Reader) handleInvalidArgErr() (err error) {
+ err = io.EOF
+ if err2 := r.reopen(); err2 != nil {
+ err = err2
+ }
+ return err
+}
+
+func (r *Reader) Close() (err error) {
+ if r == nil || r.file == nil {
+ return
+ }
+ r.log.Debug("close log file: ", r.file.Name())
+ err = r.file.Close()
+ r.file = nil
+ r.eofCounter = 0
+ return
+}
+
+func (r *Reader) reopen() error {
+ r.log.Debugf("reopen, look for: %s", r.path)
+ _ = r.Close()
+ return r.open()
+}
+
+func (r *Reader) findFile() string {
+ return find(r.path, r.excludePath)
+}
+
+func find(path, exclude string) string {
+ return finder{}.find(path, exclude)
+}
+
+// TODO: tests
+type finder struct{}
+
+func (f finder) find(path, exclude string) string {
+ files, _ := filepath.Glob(path)
+ if len(files) == 0 {
+ return ""
+ }
+
+ files = f.filter(files, exclude)
+ if len(files) == 0 {
+ return ""
+ }
+
+ return f.findLastFile(files)
+}
+
+func (f finder) filter(files []string, exclude string) []string {
+ if exclude == "" {
+ return files
+ }
+
+ fs := make([]string, 0, len(files))
+ for _, file := range files {
+ if ok, _ := filepath.Match(exclude, file); ok {
+ continue
+ }
+ fs = append(fs, file)
+ }
+ return fs
+}
+
+// TODO: the logic is probably wrong
+func (f finder) findLastFile(files []string) string {
+ sort.Strings(files)
+ for i := len(files) - 1; i >= 0; i-- {
+ stat, err := os.Stat(files[i])
+ if err != nil || !stat.Mode().IsRegular() {
+ continue
+ }
+ return files[i]
+ }
+ return ""
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/reader_test.go b/src/go/collectors/go.d.plugin/pkg/logs/reader_test.go
new file mode 100644
index 000000000..e6ef47fe7
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/reader_test.go
@@ -0,0 +1,245 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestReader_Read(t *testing.T) {
+ reader, teardown := prepareTestReader(t)
+ defer teardown()
+
+ r := testReader{bufio.NewReader(reader)}
+ filename := reader.CurrentFilename()
+ numLogs := 5
+ var sum int
+
+ for i := 0; i < 10; i++ {
+ appendLogs(t, filename, time.Millisecond*10, numLogs)
+ n, err := r.readUntilEOF()
+ sum += n
+
+ assert.Equal(t, io.EOF, err)
+ assert.Equal(t, numLogs*(i+1), sum)
+ }
+}
+
+func TestReader_Read_HandleFileRotation(t *testing.T) {
+ reader, teardown := prepareTestReader(t)
+ defer teardown()
+
+ r := testReader{bufio.NewReader(reader)}
+ filename := reader.CurrentFilename()
+ numLogs := 5
+ rotateFile(t, filename)
+ appendLogs(t, filename, time.Millisecond*10, numLogs)
+
+ n, err := r.readUntilEOFTimes(maxEOF)
+ assert.Equal(t, io.EOF, err)
+ assert.Equal(t, 0, n)
+
+ appendLogs(t, filename, time.Millisecond*10, numLogs)
+ n, err = r.readUntilEOF()
+ assert.Equal(t, io.EOF, err)
+ assert.Equal(t, numLogs, n)
+}
+
+func TestReader_Read_HandleFileRotationWithDelay(t *testing.T) {
+ reader, teardown := prepareTestReader(t)
+ defer teardown()
+
+ r := testReader{bufio.NewReader(reader)}
+ filename := reader.CurrentFilename()
+ _ = os.Remove(filename)
+
+ // trigger reopen first time
+ n, err := r.readUntilEOFTimes(maxEOF)
+ assert.Equal(t, ErrNoMatchedFile, err)
+ assert.Equal(t, 0, n)
+
+ f, err := os.Create(filename)
+ require.NoError(t, err)
+ _ = f.Close()
+
+ // trigger reopen 2nd time
+ n, err = r.readUntilEOF()
+ assert.Equal(t, io.EOF, err)
+ assert.Equal(t, 0, n)
+
+ numLogs := 5
+ appendLogs(t, filename, time.Millisecond*10, numLogs)
+ n, err = r.readUntilEOF()
+ assert.Equal(t, io.EOF, err)
+ assert.Equal(t, numLogs, n)
+}
+
+func TestReader_Close(t *testing.T) {
+ reader, teardown := prepareTestReader(t)
+ defer teardown()
+
+ assert.NoError(t, reader.Close())
+ assert.Nil(t, reader.file)
+}
+
+func TestReader_Close_NilFile(t *testing.T) {
+ var r Reader
+ assert.NoError(t, r.Close())
+}
+
+func TestOpen(t *testing.T) {
+ tempFileName1 := prepareTempFile(t, "*-web_log-open-test-1.log")
+ tempFileName2 := prepareTempFile(t, "*-web_log-open-test-2.log")
+ tempFileName3 := prepareTempFile(t, "*-web_log-open-test-3.log")
+ defer func() {
+ _ = os.Remove(tempFileName1)
+ _ = os.Remove(tempFileName2)
+ _ = os.Remove(tempFileName3)
+ }()
+
+ makePath := func(s string) string {
+ return filepath.Join(os.TempDir(), s)
+ }
+
+ tests := []struct {
+ name string
+ path string
+ exclude string
+ err bool
+ }{
+ {
+ name: "match without exclude",
+ path: makePath("*-web_log-open-test-[1-3].log"),
+ },
+ {
+ name: "match with exclude",
+ path: makePath("*-web_log-open-test-[1-3].log"),
+ exclude: makePath("*-web_log-open-test-[2-3].log"),
+ },
+ {
+ name: "exclude everything",
+ path: makePath("*-web_log-open-test-[1-3].log"),
+ exclude: makePath("*"),
+ err: true,
+ },
+ {
+ name: "no match",
+ path: makePath("*-web_log-no-match-test-[1-3].log"),
+ err: true,
+ },
+ {
+ name: "bad path pattern",
+ path: "[qw",
+ err: true,
+ },
+ {
+ name: "bad exclude path pattern",
+ path: "[qw",
+ err: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ r, err := Open(tt.path, tt.exclude, nil)
+
+ if tt.err {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, r.file)
+ _ = r.Close()
+ }
+ })
+ }
+}
+
+func TestReader_CurrentFilename(t *testing.T) {
+ reader, teardown := prepareTestReader(t)
+ defer teardown()
+
+ assert.Equal(t, reader.file.Name(), reader.CurrentFilename())
+}
+
+type testReader struct {
+ *bufio.Reader
+}
+
+func (r *testReader) readUntilEOF() (n int, err error) {
+ for {
+ _, err = r.ReadBytes('\n')
+ if err != nil {
+ break
+ }
+ n++
+ }
+ return n, err
+}
+
+func (r *testReader) readUntilEOFTimes(times int) (sum int, err error) {
+ var n int
+ for i := 0; i < times; i++ {
+ n, err = r.readUntilEOF()
+ if err != io.EOF {
+ break
+ }
+ sum += n
+ }
+ return sum, err
+}
+
+func prepareTempFile(t *testing.T, pattern string) string {
+ t.Helper()
+ f, err := os.CreateTemp("", pattern)
+ require.NoError(t, err)
+ return f.Name()
+}
+
+func prepareTestReader(t *testing.T) (reader *Reader, teardown func()) {
+ t.Helper()
+ filename := prepareTempFile(t, "*-web_log-test.log")
+ f, err := os.Open(filename)
+ require.NoError(t, err)
+
+ teardown = func() {
+ _ = os.Remove(filename)
+ _ = reader.file.Close()
+ }
+ reader = &Reader{
+ file: f,
+ path: filename,
+ }
+ return reader, teardown
+}
+
+func rotateFile(t *testing.T, filename string) {
+ t.Helper()
+ require.NoError(t, os.Remove(filename))
+ f, err := os.Create(filename)
+ require.NoError(t, err)
+ _ = f.Close()
+}
+
+func appendLogs(t *testing.T, filename string, interval time.Duration, numOfLogs int) {
+ t.Helper()
+ base := filepath.Base(filename)
+ file, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND, os.ModeAppend)
+ require.NoError(t, err)
+ require.NotNil(t, file)
+ defer func() { _ = file.Close() }()
+
+ for i := 0; i < numOfLogs; i++ {
+ _, err = fmt.Fprintln(file, "line", i, "filename", base)
+ require.NoError(t, err)
+ time.Sleep(interval)
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/regexp.go b/src/go/collectors/go.d.plugin/pkg/logs/regexp.go
new file mode 100644
index 000000000..e0dee1d02
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/regexp.go
@@ -0,0 +1,76 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "bufio"
+ "errors"
+ "fmt"
+ "io"
+ "regexp"
+)
+
+type (
+ RegExpConfig struct {
+ Pattern string `yaml:"pattern" json:"pattern"`
+ }
+
+ RegExpParser struct {
+ r *bufio.Reader
+ pattern *regexp.Regexp
+ }
+)
+
+func NewRegExpParser(config RegExpConfig, in io.Reader) (*RegExpParser, error) {
+ if config.Pattern == "" {
+ return nil, errors.New("empty pattern")
+ }
+
+ pattern, err := regexp.Compile(config.Pattern)
+ if err != nil {
+ return nil, fmt.Errorf("compile: %w", err)
+ }
+
+ if pattern.NumSubexp() == 0 {
+ return nil, errors.New("pattern has no named subgroups")
+ }
+
+ p := &RegExpParser{
+ r: bufio.NewReader(in),
+ pattern: pattern,
+ }
+ return p, nil
+}
+
+func (p *RegExpParser) ReadLine(line LogLine) error {
+ row, err := p.r.ReadSlice('\n')
+ if err != nil && len(row) == 0 {
+ return err
+ }
+ if len(row) > 0 && row[len(row)-1] == '\n' {
+ row = row[:len(row)-1]
+ }
+ return p.Parse(row, line)
+}
+
+func (p *RegExpParser) Parse(row []byte, line LogLine) error {
+ match := p.pattern.FindSubmatch(row)
+ if len(match) == 0 {
+ return &ParseError{msg: "regexp parse: unmatched line"}
+ }
+
+ for i, name := range p.pattern.SubexpNames() {
+ if name == "" || match[i] == nil {
+ continue
+ }
+ err := line.Assign(name, string(match[i]))
+ if err != nil {
+ return &ParseError{msg: fmt.Sprintf("regexp parse: %v", err), err: err}
+ }
+ }
+ return nil
+}
+
+func (p RegExpParser) Info() string {
+ return fmt.Sprintf("regexp: %s", p.pattern)
+}
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go b/src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go
new file mode 100644
index 000000000..fc7bacaa5
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go
@@ -0,0 +1,131 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "errors"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestNewRegExpParser(t *testing.T) {
+ tests := []struct {
+ name string
+ pattern string
+ wantErr bool
+ }{
+ {name: "valid pattern", pattern: `(?P<A>\d+) (?P<B>\d+)`},
+ {name: "no names subgroups in pattern", pattern: `(?:\d+) (?:\d+)`, wantErr: true},
+ {name: "invalid pattern", pattern: `(((?P<A>\d+) (?P<B>\d+)`, wantErr: true},
+ {name: "empty pattern", wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ p, err := NewRegExpParser(RegExpConfig{Pattern: tt.pattern}, nil)
+ if tt.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, p)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ }
+ })
+ }
+}
+
+func TestRegExpParser_ReadLine(t *testing.T) {
+ tests := []struct {
+ name string
+ row string
+ pattern string
+ wantErr bool
+ wantParseErr bool
+ }{
+ {name: "match and no error", row: "1 2", pattern: `(?P<A>\d+) (?P<B>\d+)`},
+ {name: "match but error on assigning", row: "1 2", pattern: `(?P<A>\d+) (?P<ERR>\d+)`, wantErr: true, wantParseErr: true},
+ {name: "not match", row: "A B", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true, wantParseErr: true},
+ {name: "not match multiline", row: "a b\n3 4", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true, wantParseErr: true},
+ {name: "error on reading EOF", row: "", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var line logLine
+ r := strings.NewReader(tt.row)
+ p, err := NewRegExpParser(RegExpConfig{Pattern: tt.pattern}, r)
+ require.NoError(t, err)
+
+ err = p.ReadLine(&line)
+ if tt.wantErr {
+ require.Error(t, err)
+ if tt.wantParseErr {
+ assert.True(t, IsParseError(err))
+ } else {
+ assert.False(t, IsParseError(err))
+ }
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestRegExpParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ row string
+ pattern string
+ wantErr bool
+ }{
+ {name: "match and no error", row: "1 2", pattern: `(?P<A>\d+) (?P<B>\d+)`},
+ {name: "match but error on assigning", row: "1 2", pattern: `(?P<A>\d+) (?P<ERR>\d+)`, wantErr: true},
+ {name: "not match", row: "A B", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var line logLine
+ p, err := NewRegExpParser(RegExpConfig{Pattern: tt.pattern}, nil)
+ require.NoError(t, err)
+
+ err = p.Parse([]byte(tt.row), &line)
+ if tt.wantErr {
+ require.Error(t, err)
+ assert.True(t, IsParseError(err))
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestRegExpParser_Info(t *testing.T) {
+ p, err := NewRegExpParser(RegExpConfig{Pattern: `(?P<A>\d+) (?P<B>\d+)`}, nil)
+ require.NoError(t, err)
+ assert.NotZero(t, p.Info())
+}
+
+type logLine struct {
+ assigned map[string]string
+}
+
+func newLogLine() *logLine {
+ return &logLine{
+ assigned: make(map[string]string),
+ }
+}
+
+func (l *logLine) Assign(name, val string) error {
+ switch name {
+ case "$ERR", "ERR":
+ return errors.New("assign error")
+ }
+ if l.assigned != nil {
+ l.assigned[name] = val
+ }
+ return nil
+}