diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 11:19:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:53:24 +0000 |
commit | b5f8ee61a7f7e9bd291dd26b0585d03eb686c941 (patch) | |
tree | d4d31289c39fc00da064a825df13a0b98ce95b10 /src/go/collectors/go.d.plugin/pkg/logs | |
parent | Adding upstream version 1.44.3. (diff) | |
download | netdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.tar.xz netdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.zip |
Adding upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/csv.go | 195 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/csv_test.go | 175 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/json.go | 140 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/json_test.go | 224 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/lastline.go | 65 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go | 54 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/ltsv.go | 95 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go | 125 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/parser.go | 65 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/parser_test.go | 3 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/reader.go | 193 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/reader_test.go | 245 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/regexp.go | 76 | ||||
-rw-r--r-- | src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go | 131 |
14 files changed, 1786 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/csv.go b/src/go/collectors/go.d.plugin/pkg/logs/csv.go new file mode 100644 index 000000000..4057b8c2f --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/csv.go @@ -0,0 +1,195 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "bytes" + "encoding/csv" + "errors" + "fmt" + "io" + "strconv" + "strings" +) + +type ( + CSVConfig struct { + FieldsPerRecord int `yaml:"fields_per_record,omitempty" json:"fields_per_record"` + Delimiter string `yaml:"delimiter,omitempty" json:"delimiter"` + TrimLeadingSpace bool `yaml:"trim_leading_space" json:"trim_leading_space"` + Format string `yaml:"format,omitempty" json:"format"` + CheckField func(string) (string, int, bool) `yaml:"-" json:"-"` + } + + CSVParser struct { + Config CSVConfig + reader *csv.Reader + format *csvFormat + } + + csvFormat struct { + raw string + maxIndex int + fields []csvField + } + + csvField struct { + name string + idx int + } +) + +func NewCSVParser(config CSVConfig, in io.Reader) (*CSVParser, error) { + if config.Format == "" { + return nil, errors.New("empty csv format") + } + + format, err := newCSVFormat(config) + if err != nil { + return nil, fmt.Errorf("bad csv format '%s': %v", config.Format, err) + } + + p := &CSVParser{ + Config: config, + reader: newCSVReader(in, config), + format: format, + } + return p, nil +} + +func (p *CSVParser) ReadLine(line LogLine) error { + record, err := p.reader.Read() + if err != nil { + return handleCSVReaderError(err) + } + return p.format.parse(record, line) +} + +func (p *CSVParser) Parse(row []byte, line LogLine) error { + r := newCSVReader(bytes.NewBuffer(row), p.Config) + record, err := r.Read() + if err != nil { + return handleCSVReaderError(err) + } + return p.format.parse(record, line) +} + +func (p CSVParser) Info() string { + return fmt.Sprintf("csv: %s", p.format.raw) +} + +func (f *csvFormat) parse(record []string, line LogLine) error { + if len(record) <= f.maxIndex { + return &ParseError{msg: "csv parse: unmatched line"} + } + + for _, v := range f.fields { + if err := line.Assign(v.name, record[v.idx]); err != nil { + return &ParseError{msg: fmt.Sprintf("csv parse: %v", err), err: err} + } + } + return nil +} + +func newCSVReader(in io.Reader, config CSVConfig) *csv.Reader { + r := csv.NewReader(in) + if config.Delimiter != "" { + if d, err := parseCSVDelimiter(config.Delimiter); err == nil { + r.Comma = d + } + } + r.TrimLeadingSpace = config.TrimLeadingSpace + r.FieldsPerRecord = config.FieldsPerRecord + r.ReuseRecord = true + return r +} + +func newCSVFormat(config CSVConfig) (*csvFormat, error) { + r := csv.NewReader(strings.NewReader(config.Format)) + if config.Delimiter != "" { + if d, err := parseCSVDelimiter(config.Delimiter); err == nil { + r.Comma = d + } + } + r.TrimLeadingSpace = config.TrimLeadingSpace + + record, err := r.Read() + if err != nil { + return nil, err + } + + fields, err := createCSVFields(record, config.CheckField) + if err != nil { + return nil, err + } + + if len(fields) == 0 { + return nil, errors.New("zero fields") + } + + format := &csvFormat{ + raw: config.Format, + maxIndex: fields[len(fields)-1].idx, + fields: fields, + } + return format, nil +} + +func createCSVFields(format []string, check func(string) (string, int, bool)) ([]csvField, error) { + if check == nil { + check = checkCSVFormatField + } + var fields []csvField + var offset int + seen := make(map[string]bool) + + for i, name := range format { + name = strings.Trim(name, `"`) + + name, addOffset, valid := check(name) + offset += addOffset + if !valid { + continue + } + if seen[name] { + return nil, fmt.Errorf("duplicate field: %s", name) + } + seen[name] = true + + idx := i + offset + fields = append(fields, csvField{name, idx}) + } + return fields, nil +} + +func handleCSVReaderError(err error) error { + if isCSVParseError(err) { + return &ParseError{msg: fmt.Sprintf("csv parse: %v", err), err: err} + } + return err +} + +func isCSVParseError(err error) bool { + return errors.Is(err, csv.ErrBareQuote) || errors.Is(err, csv.ErrFieldCount) || errors.Is(err, csv.ErrQuote) +} + +func checkCSVFormatField(name string) (newName string, offset int, valid bool) { + if len(name) < 2 || !strings.HasPrefix(name, "$") { + return "", 0, false + } + return name, 0, true +} + +func parseCSVDelimiter(s string) (rune, error) { + if isNumber(s) { + d, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return 0, fmt.Errorf("invalid CSV delimiter: %v", err) + } + return rune(d), nil + } + if len(s) != 1 { + return 0, errors.New("invalid CSV delimiter: must be a single character") + } + return rune(s[0]), nil +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/csv_test.go b/src/go/collectors/go.d.plugin/pkg/logs/csv_test.go new file mode 100644 index 000000000..d7baaa1b5 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/csv_test.go @@ -0,0 +1,175 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testCSVConfig = CSVConfig{ + Delimiter: " ", + Format: "$A %B", +} + +func TestNewCSVParser(t *testing.T) { + tests := []struct { + name string + format string + wantErr bool + }{ + {name: "valid format", format: "$A $B"}, + {name: "empty format", wantErr: true}, + {name: "bad format: csv read error", format: "$A $B \"$C", wantErr: true}, + {name: "bad format: duplicate fields", format: "$A $A", wantErr: true}, + {name: "bad format: zero fields", format: "!A !B", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := testCSVConfig + c.Format = tt.format + p, err := NewCSVParser(c, nil) + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, p) + } else { + assert.NoError(t, err) + assert.NotNil(t, p) + } + }) + } +} + +func TestNewCSVFormat(t *testing.T) { + tests := []struct { + format string + wantFormat csvFormat + wantErr bool + }{ + {format: "$A $B", wantFormat: csvFormat{maxIndex: 1, fields: []csvField{{"$A", 0}, {"$B", 1}}}}, + {format: "$A $B !C $E", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$A", 0}, {"$B", 1}, {"$E", 3}}}}, + {format: "!A !B !C $E", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$E", 3}}}}, + {format: "$A $OFFSET $B", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$A", 0}, {"$B", 3}}}}, + {format: "$A $OFFSET $B $OFFSET !A", wantFormat: csvFormat{maxIndex: 3, fields: []csvField{{"$A", 0}, {"$B", 3}}}}, + {format: "$A $OFFSET $OFFSET $B", wantFormat: csvFormat{maxIndex: 5, fields: []csvField{{"$A", 0}, {"$B", 5}}}}, + {format: "$OFFSET $A $OFFSET $B", wantFormat: csvFormat{maxIndex: 5, fields: []csvField{{"$A", 2}, {"$B", 5}}}}, + {format: "$A \"$A", wantErr: true}, + {format: "$A $A", wantErr: true}, + {format: "!A !A", wantErr: true}, + {format: "", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.format, func(t *testing.T) { + c := testCSVConfig + c.Format = tt.format + c.CheckField = testCheckCSVFormatField + tt.wantFormat.raw = tt.format + + f, err := newCSVFormat(c) + + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, f) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.wantFormat, *f) + } + }) + } +} + +func TestCSVParser_ReadLine(t *testing.T) { + tests := []struct { + name string + row string + format string + wantErr bool + wantParseErr bool + }{ + {name: "match and no error", row: "1 2 3", format: `$A $B $C`}, + {name: "match but error on assigning", row: "1 2 3", format: `$A $B $ERR`, wantErr: true, wantParseErr: true}, + {name: "not match", row: "1 2 3", format: `$A $B $C $d`, wantErr: true, wantParseErr: true}, + {name: "error on reading csv.Err", row: "1 2\"3", format: `$A $B $C`, wantErr: true, wantParseErr: true}, + {name: "error on reading EOF", row: "", format: `$A $B $C`, wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var line logLine + r := strings.NewReader(tt.row) + c := testCSVConfig + c.Format = tt.format + p, err := NewCSVParser(c, r) + require.NoError(t, err) + + err = p.ReadLine(&line) + + if tt.wantErr { + require.Error(t, err) + if tt.wantParseErr { + assert.True(t, IsParseError(err)) + } else { + assert.False(t, IsParseError(err)) + } + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestCSVParser_Parse(t *testing.T) { + tests := []struct { + name string + row string + format string + wantErr bool + }{ + {name: "match and no error", row: "1 2 3", format: `$A $B $C`}, + {name: "match but error on assigning", row: "1 2 3", format: `$A $B $ERR`, wantErr: true}, + {name: "not match", row: "1 2 3", format: `$A $B $C $d`, wantErr: true}, + {name: "error on reading csv.Err", row: "1 2\"3", format: `$A $B $C`, wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var line logLine + r := strings.NewReader(tt.row) + c := testCSVConfig + c.Format = tt.format + p, err := NewCSVParser(c, r) + require.NoError(t, err) + + err = p.ReadLine(&line) + + if tt.wantErr { + require.Error(t, err) + assert.True(t, IsParseError(err)) + } else { + assert.NoError(t, err) + } + }) + } + +} + +func TestCSVParser_Info(t *testing.T) { + p, err := NewCSVParser(testCSVConfig, nil) + require.NoError(t, err) + assert.NotZero(t, p.Info()) +} + +func testCheckCSVFormatField(name string) (newName string, offset int, valid bool) { + if len(name) < 2 || !strings.HasPrefix(name, "$") { + return "", 0, false + } + if name == "$OFFSET" { + return "", 1, false + } + return name, 0, true +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/json.go b/src/go/collectors/go.d.plugin/pkg/logs/json.go new file mode 100644 index 000000000..ceb32e272 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/json.go @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "bufio" + "fmt" + "io" + "strconv" + + "github.com/valyala/fastjson" +) + +type JSONConfig struct { + Mapping map[string]string `yaml:"mapping" json:"mapping"` +} + +type JSONParser struct { + reader *bufio.Reader + parser fastjson.Parser + buf []byte + mapping map[string]string +} + +func NewJSONParser(config JSONConfig, in io.Reader) (*JSONParser, error) { + parser := &JSONParser{ + reader: bufio.NewReader(in), + mapping: config.Mapping, + buf: make([]byte, 0, 100), + } + return parser, nil +} + +func (p *JSONParser) ReadLine(line LogLine) error { + row, err := p.reader.ReadSlice('\n') + if err != nil && len(row) == 0 { + return err + } + if len(row) > 0 && row[len(row)-1] == '\n' { + row = row[:len(row)-1] + } + return p.Parse(row, line) +} + +func (p *JSONParser) Parse(row []byte, line LogLine) error { + val, err := p.parser.ParseBytes(row) + if err != nil { + return err + } + + if err := p.parseObject("", val, line); err != nil { + return &ParseError{msg: fmt.Sprintf("json parse: %v", err), err: err} + } + + return nil +} + +func (p *JSONParser) parseObject(prefix string, val *fastjson.Value, line LogLine) error { + obj, err := val.Object() + if err != nil { + return err + } + + obj.Visit(func(key []byte, v *fastjson.Value) { + if err != nil { + return + } + + k := jsonObjKey(prefix, string(key)) + + switch v.Type() { + case fastjson.TypeString, fastjson.TypeNumber: + err = p.parseStringNumber(k, v, line) + case fastjson.TypeArray: + err = p.parseArray(k, v, line) + case fastjson.TypeObject: + err = p.parseObject(k, v, line) + default: + return + } + }) + + return err +} + +func jsonObjKey(prefix, key string) string { + if prefix == "" { + return key + } + return prefix + "." + key +} + +func (p *JSONParser) parseArray(key string, val *fastjson.Value, line LogLine) error { + arr, err := val.Array() + if err != nil { + return err + } + + for i, v := range arr { + k := jsonObjKey(key, strconv.Itoa(i)) + + switch v.Type() { + case fastjson.TypeString, fastjson.TypeNumber: + err = p.parseStringNumber(k, v, line) + case fastjson.TypeArray: + err = p.parseArray(k, v, line) + case fastjson.TypeObject: + err = p.parseObject(k, v, line) + default: + continue + } + + if err != nil { + return err + } + } + + return err +} + +func (p *JSONParser) parseStringNumber(key string, val *fastjson.Value, line LogLine) error { + if mapped, ok := p.mapping[key]; ok { + key = mapped + } + + p.buf = p.buf[:0] + if p.buf = val.MarshalTo(p.buf); len(p.buf) == 0 { + return nil + } + + if val.Type() == fastjson.TypeString { + // trim " + return line.Assign(key, string(p.buf[1:len(p.buf)-1])) + } + return line.Assign(key, string(p.buf)) +} + +func (p *JSONParser) Info() string { + return fmt.Sprintf("json: %q", p.mapping) +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/json_test.go b/src/go/collectors/go.d.plugin/pkg/logs/json_test.go new file mode 100644 index 000000000..b82850031 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/json_test.go @@ -0,0 +1,224 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewJSONParser(t *testing.T) { + tests := map[string]struct { + config JSONConfig + wantErr bool + }{ + "empty config": { + config: JSONConfig{}, + wantErr: false, + }, + "with mappings": { + config: JSONConfig{Mapping: map[string]string{"from_field_1": "to_field_1"}}, + wantErr: false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + p, err := NewJSONParser(test.config, nil) + + if test.wantErr { + assert.Error(t, err) + assert.Nil(t, p) + } else { + assert.NoError(t, err) + assert.NotNil(t, p) + } + }) + } +} + +func TestJSONParser_ReadLine(t *testing.T) { + tests := map[string]struct { + config JSONConfig + input string + wantAssigned map[string]string + wantErr bool + }{ + "string value": { + input: `{ "string": "example.com" }`, + wantErr: false, + wantAssigned: map[string]string{ + "string": "example.com", + }, + }, + "int value": { + input: `{ "int": 1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "int": "1", + }, + }, + "float value": { + input: `{ "float": 1.1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "float": "1.1", + }, + }, + "string, int, float values": { + input: `{ "string": "example.com", "int": 1, "float": 1.1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "string": "example.com", + "int": "1", + "float": "1.1", + }, + }, + "string, int, float values with mappings": { + config: JSONConfig{Mapping: map[string]string{ + "string": "STRING", + "int": "INT", + "float": "FLOAT", + }}, + input: `{ "string": "example.com", "int": 1, "float": 1.1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "STRING": "example.com", + "INT": "1", + "FLOAT": "1.1", + }, + }, + "nested": { + input: `{"one":{"two":2,"three":{"four":4}},"five":5}`, + config: JSONConfig{Mapping: map[string]string{ + "one.two": "mapped_value", + }}, + wantErr: false, + wantAssigned: map[string]string{ + "mapped_value": "2", + "one.three.four": "4", + "five": "5", + }, + }, + "nested with array": { + input: `{"one":{"two":[2,22]},"five":5}`, + config: JSONConfig{Mapping: map[string]string{ + "one.two.1": "mapped_value", + }}, + wantErr: false, + wantAssigned: map[string]string{ + "one.two.0": "2", + "mapped_value": "22", + "five": "5", + }, + }, + "error on malformed JSON": { + input: `{ "host"": unquoted_string}`, + wantErr: true, + }, + "error on empty input": { + wantErr: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + line := newLogLine() + in := strings.NewReader(test.input) + p, err := NewJSONParser(test.config, in) + require.NoError(t, err) + require.NotNil(t, p) + + err = p.ReadLine(line) + + if test.wantErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.wantAssigned, line.assigned) + } + }) + } +} + +func TestJSONParser_Parse(t *testing.T) { + tests := map[string]struct { + config JSONConfig + input string + wantAssigned map[string]string + wantErr bool + }{ + "string value": { + input: `{ "string": "example.com" }`, + wantErr: false, + wantAssigned: map[string]string{ + "string": "example.com", + }, + }, + "int value": { + input: `{ "int": 1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "int": "1", + }, + }, + "float value": { + input: `{ "float": 1.1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "float": "1.1", + }, + }, + "string, int, float values": { + input: `{ "string": "example.com", "int": 1, "float": 1.1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "string": "example.com", + "int": "1", + "float": "1.1", + }, + }, + "string, int, float values with mappings": { + config: JSONConfig{Mapping: map[string]string{ + "string": "STRING", + "int": "INT", + "float": "FLOAT", + }}, + input: `{ "string": "example.com", "int": 1, "float": 1.1 }`, + wantErr: false, + wantAssigned: map[string]string{ + "STRING": "example.com", + "INT": "1", + "FLOAT": "1.1", + }, + }, + "error on malformed JSON": { + input: `{ "host"": unquoted_string}`, + wantErr: true, + }, + "error on empty input": { + wantErr: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + line := newLogLine() + p, err := NewJSONParser(test.config, nil) + require.NoError(t, err) + require.NotNil(t, p) + + err = p.Parse([]byte(test.input), line) + + if test.wantErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.wantAssigned, line.assigned) + } + }) + } +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/lastline.go b/src/go/collectors/go.d.plugin/pkg/logs/lastline.go new file mode 100644 index 000000000..911dbf497 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/lastline.go @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "errors" + "os" + + "github.com/clbanning/rfile/v2" +) + +const DefaultMaxLineWidth = 4 * 1024 // assume disk block size is 4K + +var ErrTooLongLine = errors.New("too long line") + +// ReadLastLine returns the last line of the file and any read error encountered. +// It expects last line width <= maxLineWidth. +// If maxLineWidth <= 0, it defaults to DefaultMaxLineWidth. +func ReadLastLine(filename string, maxLineWidth int64) ([]byte, error) { + if maxLineWidth <= 0 { + maxLineWidth = DefaultMaxLineWidth + } + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer func() { _ = f.Close() }() + + stat, _ := f.Stat() + endPos := stat.Size() + if endPos == 0 { + return []byte{}, nil + } + startPos := endPos - maxLineWidth + if startPos < 0 { + startPos = 0 + } + buf := make([]byte, endPos-startPos) + n, err := f.ReadAt(buf, startPos) + if err != nil { + return nil, err + } + lnPos := 0 + foundLn := false + for i := n - 2; i >= 0; i-- { + ch := buf[i] + if ch == '\n' { + foundLn = true + lnPos = i + break + } + } + if foundLn { + return buf[lnPos+1 : n], nil + } + if startPos == 0 { + return buf[0:n], nil + } + + return nil, ErrTooLongLine +} + +func ReadLastLines(filename string, n uint) ([]string, error) { + return rfile.Tail(filename, int(n)) +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go b/src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go new file mode 100644 index 000000000..ea0a75e9e --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/lastline_test.go @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadLastLine(t *testing.T) { + tests := []struct { + name string + content string + expected string + err error + }{ + {"empty", "", "", nil}, + {"empty-ln", "\n", "\n", nil}, + {"one-line", "hello", "hello", nil}, + {"one-line-ln", "hello\n", "hello\n", nil}, + {"multi-line", "hello\nworld", "world", nil}, + {"multi-line-ln", "hello\nworld\n", "world\n", nil}, + {"long-line", "hello hello hello", "", ErrTooLongLine}, + {"long-line-ln", "hello hello hello\n", "", ErrTooLongLine}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + filename := prepareFile(t, test.content) + defer func() { _ = os.Remove(filename) }() + + line, err := ReadLastLine(filename, 10) + + if test.err != nil { + require.NotNil(t, err) + assert.Contains(t, err.Error(), test.err.Error()) + } else { + assert.Equal(t, test.expected, string(line)) + } + }) + } +} + +func prepareFile(t *testing.T, content string) string { + t.Helper() + file, err := os.CreateTemp("", "go-test") + require.NoError(t, err) + defer func() { _ = file.Close() }() + + _, _ = file.WriteString(content) + return file.Name() +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/ltsv.go b/src/go/collectors/go.d.plugin/pkg/logs/ltsv.go new file mode 100644 index 000000000..b7fbceb14 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/ltsv.go @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "bufio" + "errors" + "fmt" + "io" + "strconv" + "unsafe" + + "github.com/Wing924/ltsv" +) + +type ( + LTSVConfig struct { + FieldDelimiter string `yaml:"field_delimiter" json:"field_delimiter"` + ValueDelimiter string `yaml:"value_delimiter" json:"value_delimiter"` + Mapping map[string]string `yaml:"mapping" json:"mapping"` + } + + LTSVParser struct { + r *bufio.Reader + parser ltsv.Parser + mapping map[string]string + } +) + +func NewLTSVParser(config LTSVConfig, in io.Reader) (*LTSVParser, error) { + p := ltsv.Parser{ + FieldDelimiter: ltsv.DefaultParser.FieldDelimiter, + ValueDelimiter: ltsv.DefaultParser.ValueDelimiter, + StrictMode: false, + } + if config.FieldDelimiter != "" { + if d, err := parseLTSVDelimiter(config.FieldDelimiter); err == nil { + p.FieldDelimiter = d + } + } + if config.ValueDelimiter != "" { + if d, err := parseLTSVDelimiter(config.ValueDelimiter); err == nil { + p.ValueDelimiter = d + } + } + parser := <SVParser{ + r: bufio.NewReader(in), + parser: p, + mapping: config.Mapping, + } + return parser, nil +} + +func (p *LTSVParser) ReadLine(line LogLine) error { + row, err := p.r.ReadSlice('\n') + if err != nil && len(row) == 0 { + return err + } + if len(row) > 0 && row[len(row)-1] == '\n' { + row = row[:len(row)-1] + } + return p.Parse(row, line) +} + +func (p *LTSVParser) Parse(row []byte, line LogLine) error { + err := p.parser.ParseLine(row, func(label []byte, value []byte) error { + s := *(*string)(unsafe.Pointer(&label)) // no alloc, same as in fmt.Builder.String() + if v, ok := p.mapping[s]; ok { + s = v + } + return line.Assign(s, string(value)) + }) + if err != nil { + return &ParseError{msg: fmt.Sprintf("ltsv parse: %v", err), err: err} + } + return nil +} + +func (p LTSVParser) Info() string { + return fmt.Sprintf("ltsv: %q", p.mapping) +} + +func parseLTSVDelimiter(s string) (byte, error) { + if isNumber(s) { + d, err := strconv.ParseUint(s, 10, 8) + if err != nil { + return 0, fmt.Errorf("invalid LTSV delimiter: %v", err) + } + return byte(d), nil + } + if len(s) != 1 { + return 0, errors.New("invalid LTSV delimiter: must be a single character") + } + return s[0], nil +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go b/src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go new file mode 100644 index 000000000..f6d5ec2bd --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/ltsv_test.go @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "strings" + "testing" + + "github.com/Wing924/ltsv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testLTSVConfig = LTSVConfig{ + FieldDelimiter: " ", + ValueDelimiter: "=", + Mapping: map[string]string{"KEY": "key"}, +} + +func TestNewLTSVParser(t *testing.T) { + tests := []struct { + name string + config LTSVConfig + wantErr bool + }{ + {name: "config", config: testLTSVConfig}, + {name: "empty config"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := NewLTSVParser(tt.config, nil) + + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, p) + } else { + assert.NoError(t, err) + assert.NotNil(t, p) + if tt.config.FieldDelimiter == "" { + assert.Equal(t, ltsv.DefaultParser.FieldDelimiter, p.parser.FieldDelimiter) + } else { + assert.Equal(t, tt.config.FieldDelimiter, string(p.parser.FieldDelimiter)) + } + if tt.config.ValueDelimiter == "" { + assert.Equal(t, ltsv.DefaultParser.ValueDelimiter, p.parser.ValueDelimiter) + } else { + assert.Equal(t, tt.config.ValueDelimiter, string(p.parser.ValueDelimiter)) + } + assert.Equal(t, tt.config.Mapping, p.mapping) + } + }) + } +} + +func TestLTSVParser_ReadLine(t *testing.T) { + tests := []struct { + name string + row string + wantErr bool + wantParseErr bool + }{ + {name: "no error", row: "A=1 B=2 KEY=3"}, + {name: "error on parsing", row: "NO LABEL", wantErr: true, wantParseErr: true}, + {name: "error on assigning", row: "A=1 ERR=2", wantErr: true, wantParseErr: true}, + {name: "error on reading EOF", row: "", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var line logLine + r := strings.NewReader(tt.row) + p, err := NewLTSVParser(testLTSVConfig, r) + require.NoError(t, err) + + err = p.ReadLine(&line) + + if tt.wantErr { + require.Error(t, err) + if tt.wantParseErr { + assert.True(t, IsParseError(err)) + } else { + assert.False(t, IsParseError(err)) + } + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestLTSVParser_Parse(t *testing.T) { + tests := []struct { + name string + row string + wantErr bool + }{ + {name: "no error", row: "A=1 B=2"}, + {name: "error on parsing", row: "NO LABEL", wantErr: true}, + {name: "error on assigning", row: "A=1 ERR=2", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var line logLine + p, err := NewLTSVParser(testLTSVConfig, nil) + require.NoError(t, err) + + err = p.Parse([]byte(tt.row), &line) + + if tt.wantErr { + require.Error(t, err) + assert.True(t, IsParseError(err)) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestLTSVParser_Info(t *testing.T) { + p, err := NewLTSVParser(testLTSVConfig, nil) + require.NoError(t, err) + assert.NotZero(t, p.Info()) +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/parser.go b/src/go/collectors/go.d.plugin/pkg/logs/parser.go new file mode 100644 index 000000000..f22047b0c --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/parser.go @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "errors" + "fmt" + "io" + "strconv" +) + +type ParseError struct { + msg string + err error +} + +func (e ParseError) Error() string { return e.msg } + +func (e ParseError) Unwrap() error { return e.err } + +func IsParseError(err error) bool { var v *ParseError; return errors.As(err, &v) } + +type ( + LogLine interface { + Assign(name string, value string) error + } + + Parser interface { + ReadLine(LogLine) error + Parse(row []byte, line LogLine) error + Info() string + } +) + +const ( + TypeCSV = "csv" + TypeLTSV = "ltsv" + TypeRegExp = "regexp" + TypeJSON = "json" +) + +type ParserConfig struct { + LogType string `yaml:"log_type,omitempty" json:"log_type"` + CSV CSVConfig `yaml:"csv_config,omitempty" json:"csv_config"` + LTSV LTSVConfig `yaml:"ltsv_config,omitempty" json:"ltsv_config"` + RegExp RegExpConfig `yaml:"regexp_config,omitempty" json:"regexp_config"` + JSON JSONConfig `yaml:"json_config,omitempty" json:"json_config"` +} + +func NewParser(config ParserConfig, in io.Reader) (Parser, error) { + switch config.LogType { + case TypeCSV: + return NewCSVParser(config.CSV, in) + case TypeLTSV: + return NewLTSVParser(config.LTSV, in) + case TypeRegExp: + return NewRegExpParser(config.RegExp, in) + case TypeJSON: + return NewJSONParser(config.JSON, in) + default: + return nil, fmt.Errorf("invalid type: %q", config.LogType) + } +} + +func isNumber(s string) bool { _, err := strconv.Atoi(s); return err == nil } diff --git a/src/go/collectors/go.d.plugin/pkg/logs/parser_test.go b/src/go/collectors/go.d.plugin/pkg/logs/parser_test.go new file mode 100644 index 000000000..88ef46c27 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/parser_test.go @@ -0,0 +1,3 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs diff --git a/src/go/collectors/go.d.plugin/pkg/logs/reader.go b/src/go/collectors/go.d.plugin/pkg/logs/reader.go new file mode 100644 index 000000000..34544eac6 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/reader.go @@ -0,0 +1,193 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sort" + + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +const ( + maxEOF = 60 +) + +var ( + ErrNoMatchedFile = errors.New("no matched files") +) + +// Reader is a log rotate aware Reader +// TODO: better reopen algorithm +// TODO: handle truncate +type Reader struct { + file *os.File + path string + excludePath string + eofCounter int + continuousEOF int + log *logger.Logger +} + +// Open a file and seek to end of the file. +// path: the shell file name pattern +// excludePath: the shell file name pattern +func Open(path string, excludePath string, log *logger.Logger) (*Reader, error) { + var err error + if path, err = filepath.Abs(path); err != nil { + return nil, err + } + if _, err = filepath.Match(path, "/"); err != nil { + return nil, fmt.Errorf("bad path syntax: %q", path) + } + if _, err = filepath.Match(excludePath, "/"); err != nil { + return nil, fmt.Errorf("bad exclude_path syntax: %q", path) + } + r := &Reader{ + path: path, + excludePath: excludePath, + log: log, + } + + if err = r.open(); err != nil { + return nil, err + } + return r, nil +} + +// CurrentFilename get current opened file name +func (r *Reader) CurrentFilename() string { + return r.file.Name() +} + +func (r *Reader) open() error { + path := r.findFile() + if path == "" { + r.log.Debugf("couldn't find log file, used path: '%s', exclude_path: '%s'", r.path, r.excludePath) + return ErrNoMatchedFile + } + r.log.Debug("open log file: ", path) + file, err := os.Open(path) + if err != nil { + return err + } + stat, err := file.Stat() + if err != nil { + return err + } + if _, err = file.Seek(stat.Size(), io.SeekStart); err != nil { + return err + } + r.file = file + return nil +} + +func (r *Reader) Read(p []byte) (n int, err error) { + n, err = r.file.Read(p) + if err != nil { + switch { + case err == io.EOF: + err = r.handleEOFErr() + case errors.Is(err, os.ErrInvalid): // r.file is nil after Close + err = r.handleInvalidArgErr() + } + return + } + r.continuousEOF = 0 + return +} + +func (r *Reader) handleEOFErr() (err error) { + err = io.EOF + r.eofCounter++ + r.continuousEOF++ + if r.eofCounter < maxEOF || r.continuousEOF < 2 { + return err + } + if err2 := r.reopen(); err2 != nil { + err = err2 + } + return err +} + +func (r *Reader) handleInvalidArgErr() (err error) { + err = io.EOF + if err2 := r.reopen(); err2 != nil { + err = err2 + } + return err +} + +func (r *Reader) Close() (err error) { + if r == nil || r.file == nil { + return + } + r.log.Debug("close log file: ", r.file.Name()) + err = r.file.Close() + r.file = nil + r.eofCounter = 0 + return +} + +func (r *Reader) reopen() error { + r.log.Debugf("reopen, look for: %s", r.path) + _ = r.Close() + return r.open() +} + +func (r *Reader) findFile() string { + return find(r.path, r.excludePath) +} + +func find(path, exclude string) string { + return finder{}.find(path, exclude) +} + +// TODO: tests +type finder struct{} + +func (f finder) find(path, exclude string) string { + files, _ := filepath.Glob(path) + if len(files) == 0 { + return "" + } + + files = f.filter(files, exclude) + if len(files) == 0 { + return "" + } + + return f.findLastFile(files) +} + +func (f finder) filter(files []string, exclude string) []string { + if exclude == "" { + return files + } + + fs := make([]string, 0, len(files)) + for _, file := range files { + if ok, _ := filepath.Match(exclude, file); ok { + continue + } + fs = append(fs, file) + } + return fs +} + +// TODO: the logic is probably wrong +func (f finder) findLastFile(files []string) string { + sort.Strings(files) + for i := len(files) - 1; i >= 0; i-- { + stat, err := os.Stat(files[i]) + if err != nil || !stat.Mode().IsRegular() { + continue + } + return files[i] + } + return "" +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/reader_test.go b/src/go/collectors/go.d.plugin/pkg/logs/reader_test.go new file mode 100644 index 000000000..e6ef47fe7 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/reader_test.go @@ -0,0 +1,245 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "bufio" + "fmt" + "io" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReader_Read(t *testing.T) { + reader, teardown := prepareTestReader(t) + defer teardown() + + r := testReader{bufio.NewReader(reader)} + filename := reader.CurrentFilename() + numLogs := 5 + var sum int + + for i := 0; i < 10; i++ { + appendLogs(t, filename, time.Millisecond*10, numLogs) + n, err := r.readUntilEOF() + sum += n + + assert.Equal(t, io.EOF, err) + assert.Equal(t, numLogs*(i+1), sum) + } +} + +func TestReader_Read_HandleFileRotation(t *testing.T) { + reader, teardown := prepareTestReader(t) + defer teardown() + + r := testReader{bufio.NewReader(reader)} + filename := reader.CurrentFilename() + numLogs := 5 + rotateFile(t, filename) + appendLogs(t, filename, time.Millisecond*10, numLogs) + + n, err := r.readUntilEOFTimes(maxEOF) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, n) + + appendLogs(t, filename, time.Millisecond*10, numLogs) + n, err = r.readUntilEOF() + assert.Equal(t, io.EOF, err) + assert.Equal(t, numLogs, n) +} + +func TestReader_Read_HandleFileRotationWithDelay(t *testing.T) { + reader, teardown := prepareTestReader(t) + defer teardown() + + r := testReader{bufio.NewReader(reader)} + filename := reader.CurrentFilename() + _ = os.Remove(filename) + + // trigger reopen first time + n, err := r.readUntilEOFTimes(maxEOF) + assert.Equal(t, ErrNoMatchedFile, err) + assert.Equal(t, 0, n) + + f, err := os.Create(filename) + require.NoError(t, err) + _ = f.Close() + + // trigger reopen 2nd time + n, err = r.readUntilEOF() + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, n) + + numLogs := 5 + appendLogs(t, filename, time.Millisecond*10, numLogs) + n, err = r.readUntilEOF() + assert.Equal(t, io.EOF, err) + assert.Equal(t, numLogs, n) +} + +func TestReader_Close(t *testing.T) { + reader, teardown := prepareTestReader(t) + defer teardown() + + assert.NoError(t, reader.Close()) + assert.Nil(t, reader.file) +} + +func TestReader_Close_NilFile(t *testing.T) { + var r Reader + assert.NoError(t, r.Close()) +} + +func TestOpen(t *testing.T) { + tempFileName1 := prepareTempFile(t, "*-web_log-open-test-1.log") + tempFileName2 := prepareTempFile(t, "*-web_log-open-test-2.log") + tempFileName3 := prepareTempFile(t, "*-web_log-open-test-3.log") + defer func() { + _ = os.Remove(tempFileName1) + _ = os.Remove(tempFileName2) + _ = os.Remove(tempFileName3) + }() + + makePath := func(s string) string { + return filepath.Join(os.TempDir(), s) + } + + tests := []struct { + name string + path string + exclude string + err bool + }{ + { + name: "match without exclude", + path: makePath("*-web_log-open-test-[1-3].log"), + }, + { + name: "match with exclude", + path: makePath("*-web_log-open-test-[1-3].log"), + exclude: makePath("*-web_log-open-test-[2-3].log"), + }, + { + name: "exclude everything", + path: makePath("*-web_log-open-test-[1-3].log"), + exclude: makePath("*"), + err: true, + }, + { + name: "no match", + path: makePath("*-web_log-no-match-test-[1-3].log"), + err: true, + }, + { + name: "bad path pattern", + path: "[qw", + err: true, + }, + { + name: "bad exclude path pattern", + path: "[qw", + err: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r, err := Open(tt.path, tt.exclude, nil) + + if tt.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, r.file) + _ = r.Close() + } + }) + } +} + +func TestReader_CurrentFilename(t *testing.T) { + reader, teardown := prepareTestReader(t) + defer teardown() + + assert.Equal(t, reader.file.Name(), reader.CurrentFilename()) +} + +type testReader struct { + *bufio.Reader +} + +func (r *testReader) readUntilEOF() (n int, err error) { + for { + _, err = r.ReadBytes('\n') + if err != nil { + break + } + n++ + } + return n, err +} + +func (r *testReader) readUntilEOFTimes(times int) (sum int, err error) { + var n int + for i := 0; i < times; i++ { + n, err = r.readUntilEOF() + if err != io.EOF { + break + } + sum += n + } + return sum, err +} + +func prepareTempFile(t *testing.T, pattern string) string { + t.Helper() + f, err := os.CreateTemp("", pattern) + require.NoError(t, err) + return f.Name() +} + +func prepareTestReader(t *testing.T) (reader *Reader, teardown func()) { + t.Helper() + filename := prepareTempFile(t, "*-web_log-test.log") + f, err := os.Open(filename) + require.NoError(t, err) + + teardown = func() { + _ = os.Remove(filename) + _ = reader.file.Close() + } + reader = &Reader{ + file: f, + path: filename, + } + return reader, teardown +} + +func rotateFile(t *testing.T, filename string) { + t.Helper() + require.NoError(t, os.Remove(filename)) + f, err := os.Create(filename) + require.NoError(t, err) + _ = f.Close() +} + +func appendLogs(t *testing.T, filename string, interval time.Duration, numOfLogs int) { + t.Helper() + base := filepath.Base(filename) + file, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND, os.ModeAppend) + require.NoError(t, err) + require.NotNil(t, file) + defer func() { _ = file.Close() }() + + for i := 0; i < numOfLogs; i++ { + _, err = fmt.Fprintln(file, "line", i, "filename", base) + require.NoError(t, err) + time.Sleep(interval) + } +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/regexp.go b/src/go/collectors/go.d.plugin/pkg/logs/regexp.go new file mode 100644 index 000000000..e0dee1d02 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/regexp.go @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "bufio" + "errors" + "fmt" + "io" + "regexp" +) + +type ( + RegExpConfig struct { + Pattern string `yaml:"pattern" json:"pattern"` + } + + RegExpParser struct { + r *bufio.Reader + pattern *regexp.Regexp + } +) + +func NewRegExpParser(config RegExpConfig, in io.Reader) (*RegExpParser, error) { + if config.Pattern == "" { + return nil, errors.New("empty pattern") + } + + pattern, err := regexp.Compile(config.Pattern) + if err != nil { + return nil, fmt.Errorf("compile: %w", err) + } + + if pattern.NumSubexp() == 0 { + return nil, errors.New("pattern has no named subgroups") + } + + p := &RegExpParser{ + r: bufio.NewReader(in), + pattern: pattern, + } + return p, nil +} + +func (p *RegExpParser) ReadLine(line LogLine) error { + row, err := p.r.ReadSlice('\n') + if err != nil && len(row) == 0 { + return err + } + if len(row) > 0 && row[len(row)-1] == '\n' { + row = row[:len(row)-1] + } + return p.Parse(row, line) +} + +func (p *RegExpParser) Parse(row []byte, line LogLine) error { + match := p.pattern.FindSubmatch(row) + if len(match) == 0 { + return &ParseError{msg: "regexp parse: unmatched line"} + } + + for i, name := range p.pattern.SubexpNames() { + if name == "" || match[i] == nil { + continue + } + err := line.Assign(name, string(match[i])) + if err != nil { + return &ParseError{msg: fmt.Sprintf("regexp parse: %v", err), err: err} + } + } + return nil +} + +func (p RegExpParser) Info() string { + return fmt.Sprintf("regexp: %s", p.pattern) +} diff --git a/src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go b/src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go new file mode 100644 index 000000000..fc7bacaa5 --- /dev/null +++ b/src/go/collectors/go.d.plugin/pkg/logs/regexp_test.go @@ -0,0 +1,131 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logs + +import ( + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewRegExpParser(t *testing.T) { + tests := []struct { + name string + pattern string + wantErr bool + }{ + {name: "valid pattern", pattern: `(?P<A>\d+) (?P<B>\d+)`}, + {name: "no names subgroups in pattern", pattern: `(?:\d+) (?:\d+)`, wantErr: true}, + {name: "invalid pattern", pattern: `(((?P<A>\d+) (?P<B>\d+)`, wantErr: true}, + {name: "empty pattern", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := NewRegExpParser(RegExpConfig{Pattern: tt.pattern}, nil) + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, p) + } else { + assert.NoError(t, err) + assert.NotNil(t, p) + } + }) + } +} + +func TestRegExpParser_ReadLine(t *testing.T) { + tests := []struct { + name string + row string + pattern string + wantErr bool + wantParseErr bool + }{ + {name: "match and no error", row: "1 2", pattern: `(?P<A>\d+) (?P<B>\d+)`}, + {name: "match but error on assigning", row: "1 2", pattern: `(?P<A>\d+) (?P<ERR>\d+)`, wantErr: true, wantParseErr: true}, + {name: "not match", row: "A B", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true, wantParseErr: true}, + {name: "not match multiline", row: "a b\n3 4", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true, wantParseErr: true}, + {name: "error on reading EOF", row: "", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var line logLine + r := strings.NewReader(tt.row) + p, err := NewRegExpParser(RegExpConfig{Pattern: tt.pattern}, r) + require.NoError(t, err) + + err = p.ReadLine(&line) + if tt.wantErr { + require.Error(t, err) + if tt.wantParseErr { + assert.True(t, IsParseError(err)) + } else { + assert.False(t, IsParseError(err)) + } + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestRegExpParser_Parse(t *testing.T) { + tests := []struct { + name string + row string + pattern string + wantErr bool + }{ + {name: "match and no error", row: "1 2", pattern: `(?P<A>\d+) (?P<B>\d+)`}, + {name: "match but error on assigning", row: "1 2", pattern: `(?P<A>\d+) (?P<ERR>\d+)`, wantErr: true}, + {name: "not match", row: "A B", pattern: `(?P<A>\d+) (?P<B>\d+)`, wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var line logLine + p, err := NewRegExpParser(RegExpConfig{Pattern: tt.pattern}, nil) + require.NoError(t, err) + + err = p.Parse([]byte(tt.row), &line) + if tt.wantErr { + require.Error(t, err) + assert.True(t, IsParseError(err)) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestRegExpParser_Info(t *testing.T) { + p, err := NewRegExpParser(RegExpConfig{Pattern: `(?P<A>\d+) (?P<B>\d+)`}, nil) + require.NoError(t, err) + assert.NotZero(t, p.Info()) +} + +type logLine struct { + assigned map[string]string +} + +func newLogLine() *logLine { + return &logLine{ + assigned: make(map[string]string), + } +} + +func (l *logLine) Assign(name, val string) error { + switch name { + case "$ERR", "ERR": + return errors.New("assign error") + } + if l.assigned != nil { + l.assigned[name] = val + } + return nil +} |