summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/pkg/logs/json.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/pkg/logs/json.go140
1 files changed, 140 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/pkg/logs/json.go b/src/go/collectors/go.d.plugin/pkg/logs/json.go
new file mode 100644
index 000000000..ceb32e272
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/pkg/logs/json.go
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package logs
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "strconv"
+
+ "github.com/valyala/fastjson"
+)
+
+type JSONConfig struct {
+ Mapping map[string]string `yaml:"mapping" json:"mapping"`
+}
+
+type JSONParser struct {
+ reader *bufio.Reader
+ parser fastjson.Parser
+ buf []byte
+ mapping map[string]string
+}
+
+func NewJSONParser(config JSONConfig, in io.Reader) (*JSONParser, error) {
+ parser := &JSONParser{
+ reader: bufio.NewReader(in),
+ mapping: config.Mapping,
+ buf: make([]byte, 0, 100),
+ }
+ return parser, nil
+}
+
+func (p *JSONParser) ReadLine(line LogLine) error {
+ row, err := p.reader.ReadSlice('\n')
+ if err != nil && len(row) == 0 {
+ return err
+ }
+ if len(row) > 0 && row[len(row)-1] == '\n' {
+ row = row[:len(row)-1]
+ }
+ return p.Parse(row, line)
+}
+
+func (p *JSONParser) Parse(row []byte, line LogLine) error {
+ val, err := p.parser.ParseBytes(row)
+ if err != nil {
+ return err
+ }
+
+ if err := p.parseObject("", val, line); err != nil {
+ return &ParseError{msg: fmt.Sprintf("json parse: %v", err), err: err}
+ }
+
+ return nil
+}
+
+func (p *JSONParser) parseObject(prefix string, val *fastjson.Value, line LogLine) error {
+ obj, err := val.Object()
+ if err != nil {
+ return err
+ }
+
+ obj.Visit(func(key []byte, v *fastjson.Value) {
+ if err != nil {
+ return
+ }
+
+ k := jsonObjKey(prefix, string(key))
+
+ switch v.Type() {
+ case fastjson.TypeString, fastjson.TypeNumber:
+ err = p.parseStringNumber(k, v, line)
+ case fastjson.TypeArray:
+ err = p.parseArray(k, v, line)
+ case fastjson.TypeObject:
+ err = p.parseObject(k, v, line)
+ default:
+ return
+ }
+ })
+
+ return err
+}
+
+func jsonObjKey(prefix, key string) string {
+ if prefix == "" {
+ return key
+ }
+ return prefix + "." + key
+}
+
+func (p *JSONParser) parseArray(key string, val *fastjson.Value, line LogLine) error {
+ arr, err := val.Array()
+ if err != nil {
+ return err
+ }
+
+ for i, v := range arr {
+ k := jsonObjKey(key, strconv.Itoa(i))
+
+ switch v.Type() {
+ case fastjson.TypeString, fastjson.TypeNumber:
+ err = p.parseStringNumber(k, v, line)
+ case fastjson.TypeArray:
+ err = p.parseArray(k, v, line)
+ case fastjson.TypeObject:
+ err = p.parseObject(k, v, line)
+ default:
+ continue
+ }
+
+ if err != nil {
+ return err
+ }
+ }
+
+ return err
+}
+
+func (p *JSONParser) parseStringNumber(key string, val *fastjson.Value, line LogLine) error {
+ if mapped, ok := p.mapping[key]; ok {
+ key = mapped
+ }
+
+ p.buf = p.buf[:0]
+ if p.buf = val.MarshalTo(p.buf); len(p.buf) == 0 {
+ return nil
+ }
+
+ if val.Type() == fastjson.TypeString {
+ // trim "
+ return line.Assign(key, string(p.buf[1:len(p.buf)-1]))
+ }
+ return line.Assign(key, string(p.buf))
+}
+
+func (p *JSONParser) Info() string {
+ return fmt.Sprintf("json: %q", p.mapping)
+}