diff options
Diffstat (limited to 'modules/indexer/internal/bleve')
-rw-r--r-- | modules/indexer/internal/bleve/batch.go | 58 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/indexer.go | 102 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/metadata.go | 55 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/metadata_test.go | 28 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/query.go | 56 | ||||
-rw-r--r-- | modules/indexer/internal/bleve/util.go | 48 |
6 files changed, 347 insertions, 0 deletions
diff --git a/modules/indexer/internal/bleve/batch.go b/modules/indexer/internal/bleve/batch.go new file mode 100644 index 00000000..ed5ef077 --- /dev/null +++ b/modules/indexer/internal/bleve/batch.go @@ -0,0 +1,58 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "github.com/blevesearch/bleve/v2" +) + +// FlushingBatch is a batch of operations that automatically flushes to the +// underlying index once it reaches a certain size. +type FlushingBatch struct { + maxBatchSize int + batch *bleve.Batch + index bleve.Index +} + +// NewFlushingBatch creates a new flushing batch for the specified index. Once +// the number of operations in the batch reaches the specified limit, the batch +// automatically flushes its operations to the index. +func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { + return &FlushingBatch{ + maxBatchSize: maxBatchSize, + batch: index.NewBatch(), + index: index, + } +} + +// Index add a new index to batch +func (b *FlushingBatch) Index(id string, data any) error { + if err := b.batch.Index(id, data); err != nil { + return err + } + return b.flushIfFull() +} + +// Delete add a delete index to batch +func (b *FlushingBatch) Delete(id string) error { + b.batch.Delete(id) + return b.flushIfFull() +} + +func (b *FlushingBatch) flushIfFull() error { + if b.batch.Size() < b.maxBatchSize { + return nil + } + return b.Flush() +} + +// Flush submit the batch and create a new one +func (b *FlushingBatch) Flush() error { + err := b.index.Batch(b.batch) + if err != nil { + return err + } + b.batch = b.index.NewBatch() + return nil +} diff --git a/modules/indexer/internal/bleve/indexer.go b/modules/indexer/internal/bleve/indexer.go new file mode 100644 index 00000000..1435d2f5 --- /dev/null +++ b/modules/indexer/internal/bleve/indexer.go @@ -0,0 +1,102 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" + "code.gitea.io/gitea/modules/log" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/mapping" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic bleve indexer implementation +type Indexer struct { + Indexer bleve.Index + + indexDir string + version int + mappingGetter MappingGetter +} + +type MappingGetter func() (mapping.IndexMapping, error) + +func NewIndexer(indexDir string, version int, mappingGetter func() (mapping.IndexMapping, error)) *Indexer { + return &Indexer{ + indexDir: indexDir, + version: version, + mappingGetter: mappingGetter, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + + if i.Indexer != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + indexer, version, err := openIndexer(i.indexDir, i.version) + if err != nil { + return false, err + } + if indexer != nil { + i.Indexer = indexer + return true, nil + } + + if version != 0 { + log.Warn("Found older bleve index with version %d, Forgejo will remove it and rebuild", version) + } + + indexMapping, err := i.mappingGetter() + if err != nil { + return false, err + } + + indexer, err = bleve.New(i.indexDir, indexMapping) + if err != nil { + return false, err + } + + if err = writeIndexMetadata(i.indexDir, &IndexMetadata{ + Version: i.version, + }); err != nil { + return false, err + } + + i.Indexer = indexer + + return false, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(_ context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Indexer == nil { + return fmt.Errorf("indexer is not initialized") + } + return nil +} + +func (i *Indexer) Close() { + if i == nil || i.Indexer == nil { + return + } + + if err := i.Indexer.Close(); err != nil { + log.Error("Failed to close bleve indexer in %q: %v", i.indexDir, err) + } + i.Indexer = nil +} diff --git a/modules/indexer/internal/bleve/metadata.go b/modules/indexer/internal/bleve/metadata.go new file mode 100644 index 00000000..3c570ab4 --- /dev/null +++ b/modules/indexer/internal/bleve/metadata.go @@ -0,0 +1,55 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Copied and modified from https://github.com/ethantkoenig/rupture (MIT License) + +package bleve + +import ( + "os" + "path/filepath" + + "code.gitea.io/gitea/modules/json" +) + +const metaFilename = "rupture_meta.json" + +func indexMetadataPath(dir string) string { + return filepath.Join(dir, metaFilename) +} + +// IndexMetadata contains metadata about a bleve index. +type IndexMetadata struct { + // The version of the data in the index. This can be useful for tracking + // schema changes or data migrations. + Version int `json:"version"` +} + +// readIndexMetadata returns the metadata for the index at the specified path. +// If no such index metadata exists, an empty metadata and a nil error are +// returned. +func readIndexMetadata(path string) (*IndexMetadata, error) { + meta := &IndexMetadata{} + metaPath := indexMetadataPath(path) + if _, err := os.Stat(metaPath); os.IsNotExist(err) { + return meta, nil + } else if err != nil { + return nil, err + } + + metaBytes, err := os.ReadFile(metaPath) + if err != nil { + return nil, err + } + return meta, json.Unmarshal(metaBytes, &meta) +} + +// writeIndexMetadata writes metadata for the index at the specified path. +func writeIndexMetadata(path string, meta *IndexMetadata) error { + metaBytes, err := json.Marshal(meta) + if err != nil { + return err + } + + return os.WriteFile(indexMetadataPath(path), metaBytes, 0o644) +} diff --git a/modules/indexer/internal/bleve/metadata_test.go b/modules/indexer/internal/bleve/metadata_test.go new file mode 100644 index 00000000..31603a92 --- /dev/null +++ b/modules/indexer/internal/bleve/metadata_test.go @@ -0,0 +1,28 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Copied and modified from https://github.com/ethantkoenig/rupture (MIT License) + +package bleve + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMetadata(t *testing.T) { + dir := t.TempDir() + + meta, err := readIndexMetadata(dir) + require.NoError(t, err) + assert.Equal(t, &IndexMetadata{}, meta) + + meta.Version = 24 + require.NoError(t, writeIndexMetadata(dir, meta)) + + meta, err = readIndexMetadata(dir) + require.NoError(t, err) + assert.EqualValues(t, 24, meta.Version) +} diff --git a/modules/indexer/internal/bleve/query.go b/modules/indexer/internal/bleve/query.go new file mode 100644 index 00000000..21422b28 --- /dev/null +++ b/modules/indexer/internal/bleve/query.go @@ -0,0 +1,56 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "code.gitea.io/gitea/modules/optional" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/search/query" +) + +// NumericEqualityQuery generates a numeric equality query for the given value and field +func NumericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q +} + +// MatchPhraseQuery generates a match phrase query for the given phrase, field and analyzer +func MatchPhraseQuery(matchPhrase, field, analyzer string, fuzziness int) *query.MatchPhraseQuery { + q := bleve.NewMatchPhraseQuery(matchPhrase) + q.FieldVal = field + q.Analyzer = analyzer + q.Fuzziness = fuzziness + return q +} + +// BoolFieldQuery generates a bool field query for the given value and field +func BoolFieldQuery(value bool, field string) *query.BoolFieldQuery { + q := bleve.NewBoolFieldQuery(value) + q.SetField(field) + return q +} + +func NumericRangeInclusiveQuery(min, max optional.Option[int64], field string) *query.NumericRangeQuery { + var minF, maxF *float64 + var minI, maxI *bool + if min.Has() { + minF = new(float64) + *minF = float64(min.Value()) + minI = new(bool) + *minI = true + } + if max.Has() { + maxF = new(float64) + *maxF = float64(max.Value()) + maxI = new(bool) + *maxI = true + } + q := bleve.NewNumericRangeInclusiveQuery(minF, maxF, minI, maxI) + q.SetField(field) + return q +} diff --git a/modules/indexer/internal/bleve/util.go b/modules/indexer/internal/bleve/util.go new file mode 100644 index 00000000..d05b6797 --- /dev/null +++ b/modules/indexer/internal/bleve/util.go @@ -0,0 +1,48 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "errors" + "os" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/index/upsidedown" +) + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, int, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, 0, nil + } else if err != nil { + return nil, 0, err + } + + metadata, err := readIndexMetadata(path) + if err != nil { + return nil, 0, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, metadata.Version, util.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil { + if errors.Is(err, upsidedown.IncompatibleVersion) { + log.Warn("Indexer was built with a previous version of bleve, deleting and rebuilding") + return nil, 0, util.RemoveAll(path) + } + return nil, 0, err + } + + return index, 0, nil +} |