diff options
Diffstat (limited to 'modules/indexer')
44 files changed, 6042 insertions, 0 deletions
diff --git a/modules/indexer/code/bleve/bleve.go b/modules/indexer/code/bleve/bleve.go new file mode 100644 index 00000000..66724a34 --- /dev/null +++ b/modules/indexer/code/bleve/bleve.go @@ -0,0 +1,352 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "bufio" + "context" + "fmt" + "io" + "strconv" + "strings" + "time" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/analyze" + "code.gitea.io/gitea/modules/charset" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/internal" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_bleve "code.gitea.io/gitea/modules/indexer/internal/bleve" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/typesniffer" + + "github.com/blevesearch/bleve/v2" + analyzer_custom "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" + analyzer_keyword "github.com/blevesearch/bleve/v2/analysis/analyzer/keyword" + "github.com/blevesearch/bleve/v2/analysis/token/camelcase" + "github.com/blevesearch/bleve/v2/analysis/token/lowercase" + "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/v2/mapping" + "github.com/blevesearch/bleve/v2/search/query" + "github.com/go-enry/go-enry/v2" +) + +const ( + unicodeNormalizeName = "unicodeNormalize" + maxBatchSize = 16 + // fuzzyDenominator determines the levenshtein distance per each character of a keyword + fuzzyDenominator = 4 + // see https://github.com/blevesearch/bleve/issues/1563#issuecomment-786822311 + maxFuzziness = 2 +) + +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]any{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) +} + +// RepoIndexerData data stored in the repo indexer +type RepoIndexerData struct { + RepoID int64 + CommitID string + Content string + Language string + UpdatedAt time.Time +} + +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType +} + +const ( + repoIndexerAnalyzer = "repoIndexerAnalyzer" + repoIndexerDocType = "repoIndexerDocType" + repoIndexerLatestVersion = 6 +) + +// generateBleveIndexMapping generates a bleve index mapping for the repo indexer +func generateBleveIndexMapping() (mapping.IndexMapping, error) { + docMapping := bleve.NewDocumentMapping() + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + + termFieldMapping := bleve.NewTextFieldMapping() + termFieldMapping.IncludeInAll = false + termFieldMapping.Analyzer = analyzer_keyword.Name + docMapping.AddFieldMappingsAt("Language", termFieldMapping) + docMapping.AddFieldMappingsAt("CommitID", termFieldMapping) + + timeFieldMapping := bleve.NewDateTimeFieldMapping() + timeFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("UpdatedAt", timeFieldMapping) + + mapping := bleve.NewIndexMapping() + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return nil, err + } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]any{ + "type": analyzer_custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name}, + }); err != nil { + return nil, err + } + mapping.DefaultAnalyzer = repoIndexerAnalyzer + mapping.AddDocumentMapping(repoIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + return mapping, nil +} + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a bleve indexer implementation +type Indexer struct { + inner *inner_bleve.Indexer + indexer_internal.Indexer // do not composite inner_bleve.Indexer directly to avoid exposing too much +} + +// NewIndexer creates a new bleve local indexer +func NewIndexer(indexDir string) *Indexer { + inner := inner_bleve.NewIndexer(indexDir, repoIndexerLatestVersion, generateBleveIndexMapping) + return &Indexer{ + Indexer: inner, + inner: inner, + } +} + +func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, + update internal.FileUpdate, repo *repo_model.Repository, batch *inner_bleve.FlushingBatch, +) error { + // Ignore vendored files in code search + if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { + return nil + } + + size := update.Size + + var err error + if !update.Sized { + var stdout string + stdout, _, err = git.NewCommand(ctx, "cat-file", "-s").AddDynamicArguments(update.BlobSha).RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) + if err != nil { + return err + } + if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil { + return fmt.Errorf("misformatted git cat-file output: %w", err) + } + } + + if size > setting.Indexer.MaxIndexerFileSize { + return b.addDelete(update.Filename, repo, batch) + } + + if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil { + return err + } + + _, _, size, err = git.ReadBatchLine(batchReader) + if err != nil { + return err + } + + fileContents, err := io.ReadAll(io.LimitReader(batchReader, size)) + if err != nil { + return err + } else if !typesniffer.DetectContentType(fileContents).IsText() { + // FIXME: UTF-16 files will probably fail here + return nil + } + + if _, err = batchReader.Discard(1); err != nil { + return err + } + id := internal.FilenameIndexerID(repo.ID, update.Filename) + return batch.Index(id, &RepoIndexerData{ + RepoID: repo.ID, + CommitID: commitSha, + Content: string(charset.ToUTF8DropErrors(fileContents, charset.ConvertOpts{})), + Language: analyze.GetCodeLanguage(update.Filename, fileContents), + UpdatedAt: time.Now().UTC(), + }) +} + +func (b *Indexer) addDelete(filename string, repo *repo_model.Repository, batch *inner_bleve.FlushingBatch) error { + id := internal.FilenameIndexerID(repo.ID, filename) + return batch.Delete(id) +} + +// Index indexes the data +func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error { + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) + if len(changes.Updates) > 0 { + // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first! + if err := git.EnsureValidGitRepository(ctx, repo.RepoPath()); err != nil { + log.Error("Unable to open git repo: %s for %-v: %v", repo.RepoPath(), repo, err) + return err + } + + batchWriter, batchReader, cancel := git.CatFileBatch(ctx, repo.RepoPath()) + defer cancel() + + for _, update := range changes.Updates { + if err := b.addUpdate(ctx, batchWriter, batchReader, sha, update, repo, batch); err != nil { + return err + } + } + cancel() + } + for _, filename := range changes.RemovedFilenames { + if err := b.addDelete(filename, repo, batch); err != nil { + return err + } + } + return batch.Flush() +} + +// Delete deletes indexes by ids +func (b *Indexer) Delete(_ context.Context, repoID int64) error { + query := inner_bleve.NumericEqualityQuery(repoID, "RepoID") + searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) + result, err := b.inner.Indexer.Search(searchRequest) + if err != nil { + return err + } + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) + for _, hit := range result.Hits { + if err = batch.Delete(hit.ID); err != nil { + return err + } + } + return batch.Flush() +} + +// Search searches for files in the specified repo. +// Returns the matching file-paths +func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { + var ( + indexerQuery query.Query + keywordQuery query.Query + ) + + phraseQuery := bleve.NewMatchPhraseQuery(opts.Keyword) + phraseQuery.FieldVal = "Content" + phraseQuery.Analyzer = repoIndexerAnalyzer + keywordQuery = phraseQuery + if opts.IsKeywordFuzzy { + phraseQuery.Fuzziness = min(maxFuzziness, len(opts.Keyword)/fuzzyDenominator) + } + + if len(opts.RepoIDs) > 0 { + repoQueries := make([]query.Query, 0, len(opts.RepoIDs)) + for _, repoID := range opts.RepoIDs { + repoQueries = append(repoQueries, inner_bleve.NumericEqualityQuery(repoID, "RepoID")) + } + + indexerQuery = bleve.NewConjunctionQuery( + bleve.NewDisjunctionQuery(repoQueries...), + keywordQuery, + ) + } else { + indexerQuery = keywordQuery + } + + // Save for reuse without language filter + facetQuery := indexerQuery + if len(opts.Language) > 0 { + languageQuery := bleve.NewMatchQuery(opts.Language) + languageQuery.FieldVal = "Language" + languageQuery.Analyzer = analyzer_keyword.Name + + indexerQuery = bleve.NewConjunctionQuery( + indexerQuery, + languageQuery, + ) + } + + from, pageSize := opts.GetSkipTake() + searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) + searchRequest.Fields = []string{"Content", "RepoID", "Language", "CommitID", "UpdatedAt"} + searchRequest.IncludeLocations = true + + if len(opts.Language) == 0 { + searchRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10)) + } + + result, err := b.inner.Indexer.SearchInContext(ctx, searchRequest) + if err != nil { + return 0, nil, nil, err + } + + total := int64(result.Total) + + searchResults := make([]*internal.SearchResult, len(result.Hits)) + for i, hit := range result.Hits { + startIndex, endIndex := -1, -1 + for _, locations := range hit.Locations["Content"] { + location := locations[0] + locationStart := int(location.Start) + locationEnd := int(location.End) + if startIndex < 0 || locationStart < startIndex { + startIndex = locationStart + } + if endIndex < 0 || locationEnd > endIndex { + endIndex = locationEnd + } + } + language := hit.Fields["Language"].(string) + var updatedUnix timeutil.TimeStamp + if t, err := time.Parse(time.RFC3339, hit.Fields["UpdatedAt"].(string)); err == nil { + updatedUnix = timeutil.TimeStamp(t.Unix()) + } + searchResults[i] = &internal.SearchResult{ + RepoID: int64(hit.Fields["RepoID"].(float64)), + StartIndex: startIndex, + EndIndex: endIndex, + Filename: internal.FilenameOfIndexerID(hit.ID), + Content: hit.Fields["Content"].(string), + CommitID: hit.Fields["CommitID"].(string), + UpdatedUnix: updatedUnix, + Language: language, + Color: enry.GetColor(language), + } + } + + searchResultLanguages := make([]*internal.SearchResultLanguages, 0, 10) + if len(opts.Language) > 0 { + // Use separate query to go get all language counts + facetRequest := bleve.NewSearchRequestOptions(facetQuery, 1, 0, false) + facetRequest.Fields = []string{"Content", "RepoID", "Language", "CommitID", "UpdatedAt"} + facetRequest.IncludeLocations = true + facetRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10)) + + if result, err = b.inner.Indexer.Search(facetRequest); err != nil { + return 0, nil, nil, err + } + } + languagesFacet := result.Facets["languages"] + for _, term := range languagesFacet.Terms.Terms() { + if len(term.Term) == 0 { + continue + } + searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{ + Language: term.Term, + Color: enry.GetColor(term.Term), + Count: term.Count, + }) + } + return total, searchResults, searchResultLanguages, nil +} diff --git a/modules/indexer/code/elasticsearch/elasticsearch.go b/modules/indexer/code/elasticsearch/elasticsearch.go new file mode 100644 index 00000000..e4622fd6 --- /dev/null +++ b/modules/indexer/code/elasticsearch/elasticsearch.go @@ -0,0 +1,360 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "bufio" + "context" + "fmt" + "io" + "strconv" + "strings" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/analyze" + "code.gitea.io/gitea/modules/charset" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/internal" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/typesniffer" + + "github.com/go-enry/go-enry/v2" + "github.com/olivere/elastic/v7" +) + +const ( + esRepoIndexerLatestVersion = 1 + // multi-match-types, currently only 2 types are used + // Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types + esMultiMatchTypeBestFields = "best_fields" + esMultiMatchTypePhrasePrefix = "phrase_prefix" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_elasticsearch.Indexer + indexer_internal.Indexer // do not composite inner_elasticsearch.Indexer directly to avoid exposing too much +} + +// NewIndexer creates a new elasticsearch indexer +func NewIndexer(url, indexerName string) *Indexer { + inner := inner_elasticsearch.NewIndexer(url, indexerName, esRepoIndexerLatestVersion, defaultMapping) + indexer := &Indexer{ + inner: inner, + Indexer: inner, + } + return indexer +} + +const ( + defaultMapping = `{ + "mappings": { + "properties": { + "repo_id": { + "type": "long", + "index": true + }, + "content": { + "type": "text", + "term_vector": "with_positions_offsets", + "index": true + }, + "commit_id": { + "type": "keyword", + "index": true + }, + "language": { + "type": "keyword", + "index": true + }, + "updated_at": { + "type": "long", + "index": true + } + } + } + }` +) + +func (b *Indexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update internal.FileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { + // Ignore vendored files in code search + if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { + return nil, nil + } + + size := update.Size + var err error + if !update.Sized { + var stdout string + stdout, _, err = git.NewCommand(ctx, "cat-file", "-s").AddDynamicArguments(update.BlobSha).RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) + if err != nil { + return nil, err + } + if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil { + return nil, fmt.Errorf("misformatted git cat-file output: %w", err) + } + } + + if size > setting.Indexer.MaxIndexerFileSize { + return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil + } + + if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil { + return nil, err + } + + _, _, size, err = git.ReadBatchLine(batchReader) + if err != nil { + return nil, err + } + + fileContents, err := io.ReadAll(io.LimitReader(batchReader, size)) + if err != nil { + return nil, err + } else if !typesniffer.DetectContentType(fileContents).IsText() { + // FIXME: UTF-16 files will probably fail here + return nil, nil + } + + if _, err = batchReader.Discard(1); err != nil { + return nil, err + } + id := internal.FilenameIndexerID(repo.ID, update.Filename) + + return []elastic.BulkableRequest{ + elastic.NewBulkIndexRequest(). + Index(b.inner.VersionedIndexName()). + Id(id). + Doc(map[string]any{ + "repo_id": repo.ID, + "content": string(charset.ToUTF8DropErrors(fileContents, charset.ConvertOpts{})), + "commit_id": sha, + "language": analyze.GetCodeLanguage(update.Filename, fileContents), + "updated_at": timeutil.TimeStampNow(), + }), + }, nil +} + +func (b *Indexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest { + id := internal.FilenameIndexerID(repo.ID, filename) + return elastic.NewBulkDeleteRequest(). + Index(b.inner.VersionedIndexName()). + Id(id) +} + +// Index will save the index data +func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error { + reqs := make([]elastic.BulkableRequest, 0) + if len(changes.Updates) > 0 { + // Now because of some insanity with git cat-file not immediately failing if not run in a valid git directory we need to run git rev-parse first! + if err := git.EnsureValidGitRepository(ctx, repo.RepoPath()); err != nil { + log.Error("Unable to open git repo: %s for %-v: %v", repo.RepoPath(), repo, err) + return err + } + + batchWriter, batchReader, cancel := git.CatFileBatch(ctx, repo.RepoPath()) + defer cancel() + + for _, update := range changes.Updates { + updateReqs, err := b.addUpdate(ctx, batchWriter, batchReader, sha, update, repo) + if err != nil { + return err + } + if len(updateReqs) > 0 { + reqs = append(reqs, updateReqs...) + } + } + cancel() + } + + for _, filename := range changes.RemovedFilenames { + reqs = append(reqs, b.addDelete(filename, repo)) + } + + if len(reqs) > 0 { + esBatchSize := 50 + + for i := 0; i < len(reqs); i += esBatchSize { + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). + Add(reqs[i:min(i+esBatchSize, len(reqs))]...). + Do(ctx) + if err != nil { + return err + } + } + } + return nil +} + +// Delete deletes indexes by ids +func (b *Indexer) Delete(ctx context.Context, repoID int64) error { + _, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()). + Query(elastic.NewTermsQuery("repo_id", repoID)). + Do(ctx) + return err +} + +// indexPos find words positions for start and the following end on content. It will +// return the beginning position of the first start and the ending position of the +// first end following the start string. +// If not found any of the positions, it will return -1, -1. +func indexPos(content, start, end string) (int, int) { + startIdx := strings.Index(content, start) + if startIdx < 0 { + return -1, -1 + } + endIdx := strings.Index(content[startIdx+len(start):], end) + if endIdx < 0 { + return -1, -1 + } + return startIdx, startIdx + len(start) + endIdx + len(end) +} + +func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { + hits := make([]*internal.SearchResult, 0, pageSize) + for _, hit := range searchResult.Hits.Hits { + // FIXME: There is no way to get the position the keyword on the content currently on the same request. + // So we get it from content, this may made the query slower. See + // https://discuss.elastic.co/t/fetching-position-of-keyword-in-matched-document/94291 + var startIndex, endIndex int + c, ok := hit.Highlight["content"] + if ok && len(c) > 0 { + // FIXME: Since the highlighting content will include <em> and </em> for the keywords, + // now we should find the positions. But how to avoid html content which contains the + // <em> and </em> tags? If elastic search has handled that? + startIndex, endIndex = indexPos(c[0], "<em>", "</em>") + if startIndex == -1 { + panic(fmt.Sprintf("1===%s,,,%#v,,,%s", kw, hit.Highlight, c[0])) + } + } else { + panic(fmt.Sprintf("2===%#v", hit.Highlight)) + } + + repoID, fileName := internal.ParseIndexerID(hit.Id) + res := make(map[string]any) + if err := json.Unmarshal(hit.Source, &res); err != nil { + return 0, nil, nil, err + } + + language := res["language"].(string) + + hits = append(hits, &internal.SearchResult{ + RepoID: repoID, + Filename: fileName, + CommitID: res["commit_id"].(string), + Content: res["content"].(string), + UpdatedUnix: timeutil.TimeStamp(res["updated_at"].(float64)), + Language: language, + StartIndex: startIndex, + EndIndex: endIndex - 9, // remove the length <em></em> since we give Content the original data + Color: enry.GetColor(language), + }) + } + + return searchResult.TotalHits(), hits, extractAggs(searchResult), nil +} + +func extractAggs(searchResult *elastic.SearchResult) []*internal.SearchResultLanguages { + var searchResultLanguages []*internal.SearchResultLanguages + agg, found := searchResult.Aggregations.Terms("language") + if found { + searchResultLanguages = make([]*internal.SearchResultLanguages, 0, 10) + + for _, bucket := range agg.Buckets { + searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{ + Language: bucket.Key.(string), + Color: enry.GetColor(bucket.Key.(string)), + Count: int(bucket.DocCount), + }) + } + } + return searchResultLanguages +} + +// Search searches for codes and language stats by given conditions. +func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { + searchType := esMultiMatchTypePhrasePrefix + if opts.IsKeywordFuzzy { + searchType = esMultiMatchTypeBestFields + } + + kwQuery := elastic.NewMultiMatchQuery(opts.Keyword, "content").Type(searchType) + query := elastic.NewBoolQuery() + query = query.Must(kwQuery) + if len(opts.RepoIDs) > 0 { + repoStrs := make([]any, 0, len(opts.RepoIDs)) + for _, repoID := range opts.RepoIDs { + repoStrs = append(repoStrs, repoID) + } + repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...) + query = query.Must(repoQuery) + } + + var ( + start, pageSize = opts.GetSkipTake() + kw = "<em>" + opts.Keyword + "</em>" + aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc() + ) + + if len(opts.Language) == 0 { + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). + Aggregation("language", aggregation). + Query(query). + Highlight( + elastic.NewHighlight(). + Field("content"). + NumOfFragments(0). // return all highting content on fragments + HighlighterType("fvh"), + ). + Sort("repo_id", true). + From(start).Size(pageSize). + Do(ctx) + if err != nil { + return 0, nil, nil, err + } + + return convertResult(searchResult, kw, pageSize) + } + + langQuery := elastic.NewMatchQuery("language", opts.Language) + countResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). + Aggregation("language", aggregation). + Query(query). + Size(0). // We only need stats information + Do(ctx) + if err != nil { + return 0, nil, nil, err + } + + query = query.Must(langQuery) + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). + Query(query). + Highlight( + elastic.NewHighlight(). + Field("content"). + NumOfFragments(0). // return all highting content on fragments + HighlighterType("fvh"), + ). + Sort("repo_id", true). + From(start).Size(pageSize). + Do(ctx) + if err != nil { + return 0, nil, nil, err + } + + total, hits, _, err := convertResult(searchResult, kw, pageSize) + + return total, hits, extractAggs(countResult), err +} diff --git a/modules/indexer/code/elasticsearch/elasticsearch_test.go b/modules/indexer/code/elasticsearch/elasticsearch_test.go new file mode 100644 index 00000000..c6ba93e7 --- /dev/null +++ b/modules/indexer/code/elasticsearch/elasticsearch_test.go @@ -0,0 +1,16 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexPos(t *testing.T) { + startIdx, endIdx := indexPos("test index start and end", "start", "end") + assert.EqualValues(t, 11, startIdx) + assert.EqualValues(t, 24, endIdx) +} diff --git a/modules/indexer/code/git.go b/modules/indexer/code/git.go new file mode 100644 index 00000000..c5dfe438 --- /dev/null +++ b/modules/indexer/code/git.go @@ -0,0 +1,175 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package code + +import ( + "context" + "strconv" + "strings" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/internal" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +func getDefaultBranchSha(ctx context.Context, repo *repo_model.Repository) (string, error) { + stdout, _, err := git.NewCommand(ctx, "show-ref", "-s").AddDynamicArguments(git.BranchPrefix + repo.DefaultBranch).RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) + if err != nil { + return "", err + } + return strings.TrimSpace(stdout), nil +} + +// getRepoChanges returns changes to repo since last indexer update +func getRepoChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { + status, err := repo_model.GetIndexerStatus(ctx, repo, repo_model.RepoIndexerTypeCode) + if err != nil { + return nil, err + } + + needGenesis := len(status.CommitSha) == 0 + if !needGenesis { + hasAncestorCmd := git.NewCommand(ctx, "merge-base").AddDynamicArguments(status.CommitSha, revision) + stdout, _, _ := hasAncestorCmd.RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) + needGenesis = len(stdout) == 0 + } + + if needGenesis { + return genesisChanges(ctx, repo, revision) + } + return nonGenesisChanges(ctx, repo, revision) +} + +func isIndexable(entry *git.TreeEntry) bool { + if !entry.IsRegular() && !entry.IsExecutable() { + return false + } + name := strings.ToLower(entry.Name()) + for _, g := range setting.Indexer.ExcludePatterns { + if g.Match(name) { + return false + } + } + for _, g := range setting.Indexer.IncludePatterns { + if g.Match(name) { + return true + } + } + return len(setting.Indexer.IncludePatterns) == 0 +} + +// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command +func parseGitLsTreeOutput(stdout []byte) ([]internal.FileUpdate, error) { + entries, err := git.ParseTreeEntries(stdout) + if err != nil { + return nil, err + } + idxCount := 0 + updates := make([]internal.FileUpdate, len(entries)) + for _, entry := range entries { + if isIndexable(entry) { + updates[idxCount] = internal.FileUpdate{ + Filename: entry.Name(), + BlobSha: entry.ID.String(), + Size: entry.Size(), + Sized: true, + } + idxCount++ + } + } + return updates[:idxCount], nil +} + +// genesisChanges get changes to add repo to the indexer for the first time +func genesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { + var changes internal.RepoChanges + stdout, _, runErr := git.NewCommand(ctx, "ls-tree", "--full-tree", "-l", "-r").AddDynamicArguments(revision).RunStdBytes(&git.RunOpts{Dir: repo.RepoPath()}) + if runErr != nil { + return nil, runErr + } + + var err error + changes.Updates, err = parseGitLsTreeOutput(stdout) + return &changes, err +} + +// nonGenesisChanges get changes since the previous indexer update +func nonGenesisChanges(ctx context.Context, repo *repo_model.Repository, revision string) (*internal.RepoChanges, error) { + diffCmd := git.NewCommand(ctx, "diff", "--name-status").AddDynamicArguments(repo.CodeIndexerStatus.CommitSha, revision) + stdout, _, runErr := diffCmd.RunStdString(&git.RunOpts{Dir: repo.RepoPath()}) + if runErr != nil { + // previous commit sha may have been removed by a force push, so + // try rebuilding from scratch + log.Warn("git diff: %v", runErr) + if err := (*globalIndexer.Load()).Delete(ctx, repo.ID); err != nil { + return nil, err + } + return genesisChanges(ctx, repo, revision) + } + + var changes internal.RepoChanges + var err error + updatedFilenames := make([]string, 0, 10) + for _, line := range strings.Split(stdout, "\n") { + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + fields := strings.Split(line, "\t") + if len(fields) < 2 { + log.Warn("Unparsable output for diff --name-status: `%s`)", line) + continue + } + filename := fields[1] + if len(filename) == 0 { + continue + } else if filename[0] == '"' { + filename, err = strconv.Unquote(filename) + if err != nil { + return nil, err + } + } + + switch status := fields[0][0]; status { + case 'M', 'A': + updatedFilenames = append(updatedFilenames, filename) + case 'D': + changes.RemovedFilenames = append(changes.RemovedFilenames, filename) + case 'R', 'C': + if len(fields) < 3 { + log.Warn("Unparsable output for diff --name-status: `%s`)", line) + continue + } + dest := fields[2] + if len(dest) == 0 { + log.Warn("Unparsable output for diff --name-status: `%s`)", line) + continue + } + if dest[0] == '"' { + dest, err = strconv.Unquote(dest) + if err != nil { + return nil, err + } + } + if status == 'R' { + changes.RemovedFilenames = append(changes.RemovedFilenames, filename) + } + updatedFilenames = append(updatedFilenames, dest) + default: + log.Warn("Unrecognized status: %c (line=%s)", status, line) + } + } + + cmd := git.NewCommand(ctx, "ls-tree", "--full-tree", "-l").AddDynamicArguments(revision). + AddDashesAndList(updatedFilenames...) + lsTreeStdout, _, err := cmd.RunStdBytes(&git.RunOpts{Dir: repo.RepoPath()}) + if err != nil { + return nil, err + } + + changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) + return &changes, err +} diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go new file mode 100644 index 00000000..0a8ce279 --- /dev/null +++ b/modules/indexer/code/indexer.go @@ -0,0 +1,310 @@ +// Copyright 2016 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package code + +import ( + "context" + "os" + "runtime/pprof" + "slices" + "sync/atomic" + "time" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" +) + +var ( + indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer +) + +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) +} + +func index(ctx context.Context, indexer internal.Indexer, repoID int64) error { + repo, err := repo_model.GetRepositoryByID(ctx, repoID) + if repo_model.IsErrRepoNotExist(err) { + return indexer.Delete(ctx, repoID) + } + if err != nil { + return err + } + + repoTypes := setting.Indexer.RepoIndexerRepoTypes + + if len(repoTypes) == 0 { + repoTypes = []string{"sources"} + } + + // skip forks from being indexed if unit is not present + if !slices.Contains(repoTypes, "forks") && repo.IsFork { + return nil + } + + // skip mirrors from being indexed if unit is not present + if !slices.Contains(repoTypes, "mirrors") && repo.IsMirror { + return nil + } + + // skip templates from being indexed if unit is not present + if !slices.Contains(repoTypes, "templates") && repo.IsTemplate { + return nil + } + + // skip regular repos from being indexed if unit is not present + if !slices.Contains(repoTypes, "sources") && !repo.IsFork && !repo.IsMirror && !repo.IsTemplate { + return nil + } + + sha, err := getDefaultBranchSha(ctx, repo) + if err != nil { + return err + } + changes, err := getRepoChanges(ctx, repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil + } + + if err := indexer.Index(ctx, repo, sha, changes); err != nil { + return err + } + + return repo_model.UpdateIndexerStatus(ctx, repo, repo_model.RepoIndexerTypeCode, sha) +} + +// Init initialize the repo indexer +func Init() { + if !setting.Indexer.RepoIndexerEnabled { + (*globalIndexer.Load()).Close() + return + } + + ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), "Service: CodeIndexer", process.SystemProcessType, false) + + graceful.GetManager().RunAtTerminate(func() { + select { + case <-ctx.Done(): + return + default: + } + cancel() + log.Debug("Closing repository indexer") + (*globalIndexer.Load()).Close() + log.Info("PID: %d Repository Indexer closed", os.Getpid()) + finished() + }) + + waitChannel := make(chan time.Duration, 1) + + // Create the Queue + switch setting.Indexer.RepoType { + case "bleve", "elasticsearch": + handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) { + indexer := *globalIndexer.Load() + // make it a process to allow for cancellation (especially during integration tests where no global shutdown happens) + batchCtx, _, finished := process.GetManager().AddContext(ctx, "CodeIndexer batch") + defer finished() + for _, indexerData := range items { + log.Trace("IndexerData Process Repo: %d", indexerData.RepoID) + if err := index(batchCtx, indexer, indexerData.RepoID); err != nil { + unhandled = append(unhandled, indexerData) + if !setting.IsInTesting { + log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err) + } + } + } + return unhandled + } + + indexerQueue = queue.CreateUniqueQueue(ctx, "code_indexer", handler) + if indexerQueue == nil { + log.Fatal("Unable to create codes indexer queue") + } + default: + log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType) + } + + go func() { + pprof.SetGoroutineLabels(ctx) + start := time.Now() + var ( + rIndexer internal.Indexer + existed bool + err error + ) + switch setting.Indexer.RepoType { + case "bleve": + log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath) + defer func() { + if err := recover(); err != nil { + log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2)) + log.Error("The indexer files are likely corrupted and may need to be deleted") + log.Error("You can completely remove the \"%s\" directory to make Forgejo recreate the indexes", setting.Indexer.RepoPath) + } + }() + + rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath) + existed, err = rIndexer.Init(ctx) + if err != nil { + cancel() + (*globalIndexer.Load()).Close() + close(waitChannel) + log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err) + } + case "elasticsearch": + log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoConnStr) + defer func() { + if err := recover(); err != nil { + log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2)) + log.Error("The indexer files are likely corrupted and may need to be deleted") + log.Error("You can completely remove the \"%s\" index to make Forgejo recreate the indexes", setting.Indexer.RepoConnStr) + } + }() + + rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName) + existed, err = rIndexer.Init(ctx) + if err != nil { + cancel() + (*globalIndexer.Load()).Close() + close(waitChannel) + log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err) + } + + default: + log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType) + } + + globalIndexer.Store(&rIndexer) + + // Start processing the queue + go graceful.GetManager().RunWithCancel(indexerQueue) + + if !existed { // populate the index because it's created for the first time + go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) + } + select { + case waitChannel <- time.Since(start): + case <-graceful.GetManager().IsShutdown(): + } + + close(waitChannel) + }() + + if setting.Indexer.StartupTimeout > 0 { + go func() { + pprof.SetGoroutineLabels(ctx) + timeout := setting.Indexer.StartupTimeout + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case <-graceful.GetManager().IsShutdown(): + log.Warn("Shutdown before Repository Indexer completed initialization") + cancel() + (*globalIndexer.Load()).Close() + case duration, ok := <-waitChannel: + if !ok { + log.Warn("Repository Indexer Initialization failed") + cancel() + (*globalIndexer.Load()).Close() + return + } + log.Info("Repository Indexer Initialization took %v", duration) + case <-time.After(timeout): + cancel() + (*globalIndexer.Load()).Close() + log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) + } + }() + } +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *repo_model.Repository) { + indexData := &internal.IndexerData{RepoID: repo.ID} + if err := indexerQueue.Push(indexData); err != nil { + log.Error("Update repo index data %v failed: %v", indexData, err) + } +} + +// IsAvailable checks if issue indexer is available +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer(ctx context.Context) { + log.Info("Populating the repo indexer with existing repositories") + + exist, err := db.IsTableNotEmpty("repository") + if err != nil { + log.Fatal("System error: %v", err) + } else if !exist { + return + } + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := db.DeleteAllRecords("repo_indexer_status"); err != nil { + log.Fatal("System error: %v", err) + } + + var maxRepoID int64 + if maxRepoID, err = db.GetMaxID("repository"); err != nil { + log.Fatal("System error: %v", err) + } + + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + ids, err := repo_model.GetUnindexedRepos(ctx, repo_model.RepoIndexerTypeCode, maxRepoID, 0, 50) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(ids) == 0 { + break + } + for _, id := range ids { + select { + case <-ctx.Done(): + log.Info("Repository Indexer population shutdown before completion") + return + default: + } + if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil { + log.Error("indexerQueue.Push: %v", err) + return + } + maxRepoID = id - 1 + } + } + log.Info("Done (re)populating the repo indexer with existing repositories") +} diff --git a/modules/indexer/code/indexer_test.go b/modules/indexer/code/indexer_test.go new file mode 100644 index 00000000..f43e8e42 --- /dev/null +++ b/modules/indexer/code/indexer_test.go @@ -0,0 +1,145 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package code + +import ( + "context" + "os" + "testing" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/indexer/code/bleve" + "code.gitea.io/gitea/modules/indexer/code/elasticsearch" + "code.gitea.io/gitea/modules/indexer/code/internal" + + _ "code.gitea.io/gitea/models" + _ "code.gitea.io/gitea/models/actions" + _ "code.gitea.io/gitea/models/activities" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + unittest.MainTest(m) +} + +func testIndexer(name string, t *testing.T, indexer internal.Indexer) { + t.Run(name, func(t *testing.T) { + var repoID int64 = 1 + err := index(git.DefaultContext, indexer, repoID) + require.NoError(t, err) + keywords := []struct { + RepoIDs []int64 + Keyword string + IDs []int64 + Langs int + }{ + { + RepoIDs: nil, + Keyword: "Description", + IDs: []int64{repoID}, + Langs: 1, + }, + { + RepoIDs: []int64{2}, + Keyword: "Description", + IDs: []int64{}, + Langs: 0, + }, + { + RepoIDs: nil, + Keyword: "Description for", + IDs: []int64{repoID}, + Langs: 1, + }, + { + RepoIDs: nil, + Keyword: "repo1", + IDs: []int64{repoID}, + Langs: 1, + }, + { + RepoIDs: []int64{2}, + Keyword: "repo1", + IDs: []int64{}, + Langs: 0, + }, + { + RepoIDs: nil, + Keyword: "non-exist", + IDs: []int64{}, + Langs: 0, + }, + } + + for _, kw := range keywords { + t.Run(kw.Keyword, func(t *testing.T) { + total, res, langs, err := indexer.Search(context.TODO(), &internal.SearchOptions{ + RepoIDs: kw.RepoIDs, + Keyword: kw.Keyword, + Paginator: &db.ListOptions{ + Page: 1, + PageSize: 10, + }, + IsKeywordFuzzy: true, + }) + require.NoError(t, err) + assert.Len(t, kw.IDs, int(total)) + assert.Len(t, langs, kw.Langs) + + ids := make([]int64, 0, len(res)) + for _, hit := range res { + ids = append(ids, hit.RepoID) + assert.EqualValues(t, "# repo1\n\nDescription for repo1", hit.Content) + } + assert.EqualValues(t, kw.IDs, ids) + }) + } + + require.NoError(t, indexer.Delete(context.Background(), repoID)) + }) +} + +func TestBleveIndexAndSearch(t *testing.T) { + unittest.PrepareTestEnv(t) + + dir := t.TempDir() + + idx := bleve.NewIndexer(dir) + _, err := idx.Init(context.Background()) + if err != nil { + if idx != nil { + idx.Close() + } + assert.FailNow(t, "Unable to create bleve indexer Error: %v", err) + } + defer idx.Close() + + testIndexer("beleve", t, idx) +} + +func TestESIndexAndSearch(t *testing.T) { + unittest.PrepareTestEnv(t) + + u := os.Getenv("TEST_INDEXER_CODE_ES_URL") + if u == "" { + t.SkipNow() + return + } + + indexer := elasticsearch.NewIndexer(u, "gitea_codes") + if _, err := indexer.Init(context.Background()); err != nil { + if indexer != nil { + indexer.Close() + } + assert.FailNow(t, "Unable to init ES indexer Error: %v", err) + } + + defer indexer.Close() + + testIndexer("elastic_search", t, indexer) +} diff --git a/modules/indexer/code/internal/indexer.go b/modules/indexer/code/internal/indexer.go new file mode 100644 index 00000000..c259fcd2 --- /dev/null +++ b/modules/indexer/code/internal/indexer.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/indexer/internal" +) + +// Indexer defines an interface to index and search code contents +type Indexer interface { + internal.Indexer + Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *RepoChanges) error + Delete(ctx context.Context, repoID int64) error + Search(ctx context.Context, opts *SearchOptions) (int64, []*SearchResult, []*SearchResultLanguages, error) +} + +type SearchOptions struct { + RepoIDs []int64 + Keyword string + Language string + + IsKeywordFuzzy bool + + db.Paginator +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{ + Indexer: internal.NewDummyIndexer(), + } +} + +type dummyIndexer struct { + internal.Indexer +} + +func (d *dummyIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *RepoChanges) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Delete(ctx context.Context, repoID int64) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Search(ctx context.Context, opts *SearchOptions) (int64, []*SearchResult, []*SearchResultLanguages, error) { + return 0, nil, nil, fmt.Errorf("indexer is not ready") +} diff --git a/modules/indexer/code/internal/model.go b/modules/indexer/code/internal/model.go new file mode 100644 index 00000000..f75263c8 --- /dev/null +++ b/modules/indexer/code/internal/model.go @@ -0,0 +1,44 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import "code.gitea.io/gitea/modules/timeutil" + +type FileUpdate struct { + Filename string + BlobSha string + Size int64 + Sized bool +} + +// RepoChanges changes (file additions/updates/removals) to a repo +type RepoChanges struct { + Updates []FileUpdate + RemovedFilenames []string +} + +// IndexerData represents data stored in the code indexer +type IndexerData struct { + RepoID int64 +} + +// SearchResult result of performing a search in a repo +type SearchResult struct { + RepoID int64 + StartIndex int + EndIndex int + Filename string + Content string + CommitID string + UpdatedUnix timeutil.TimeStamp + Language string + Color string +} + +// SearchResultLanguages result of top languages count in search results +type SearchResultLanguages struct { + Language string + Color string + Count int +} diff --git a/modules/indexer/code/internal/util.go b/modules/indexer/code/internal/util.go new file mode 100644 index 00000000..689c4f45 --- /dev/null +++ b/modules/indexer/code/internal/util.go @@ -0,0 +1,32 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "strings" + + "code.gitea.io/gitea/modules/indexer/internal" + "code.gitea.io/gitea/modules/log" +) + +func FilenameIndexerID(repoID int64, filename string) string { + return internal.Base36(repoID) + "_" + filename +} + +func ParseIndexerID(indexerID string) (int64, string) { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + repoID, _ := internal.ParseBase36(indexerID[:index]) + return repoID, indexerID[index+1:] +} + +func FilenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} diff --git a/modules/indexer/code/search.go b/modules/indexer/code/search.go new file mode 100644 index 00000000..5f35e807 --- /dev/null +++ b/modules/indexer/code/search.go @@ -0,0 +1,151 @@ +// Copyright 2017 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package code + +import ( + "bytes" + "context" + "html/template" + "strings" + + "code.gitea.io/gitea/modules/highlight" + "code.gitea.io/gitea/modules/indexer/code/internal" + "code.gitea.io/gitea/modules/timeutil" +) + +// Result a search result to display +type Result struct { + RepoID int64 + Filename string + CommitID string + UpdatedUnix timeutil.TimeStamp + Language string + Color string + Lines []ResultLine +} + +type ResultLine struct { + Num int + FormattedContent template.HTML +} + +type SearchResultLanguages = internal.SearchResultLanguages + +type SearchOptions = internal.SearchOptions + +func indices(content string, selectionStartIndex, selectionEndIndex int) (int, int) { + startIndex := selectionStartIndex + numLinesBefore := 0 + for ; startIndex > 0; startIndex-- { + if content[startIndex-1] == '\n' { + if numLinesBefore == 1 { + break + } + numLinesBefore++ + } + } + + endIndex := selectionEndIndex + numLinesAfter := 0 + for ; endIndex < len(content); endIndex++ { + if content[endIndex] == '\n' { + if numLinesAfter == 1 { + break + } + numLinesAfter++ + } + } + + return startIndex, endIndex +} + +func writeStrings(buf *bytes.Buffer, strs ...string) error { + for _, s := range strs { + _, err := buf.WriteString(s) + if err != nil { + return err + } + } + return nil +} + +func HighlightSearchResultCode(filename string, lineNums []int, code string) []ResultLine { + // we should highlight the whole code block first, otherwise it doesn't work well with multiple line highlighting + hl, _ := highlight.Code(filename, "", code) + highlightedLines := strings.Split(string(hl), "\n") + + // The lineNums outputted by highlight.Code might not match the original lineNums, because "highlight" removes the last `\n` + lines := make([]ResultLine, min(len(highlightedLines), len(lineNums))) + for i := 0; i < len(lines); i++ { + lines[i].Num = lineNums[i] + lines[i].FormattedContent = template.HTML(highlightedLines[i]) + } + return lines +} + +func searchResult(result *internal.SearchResult, startIndex, endIndex int) (*Result, error) { + startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n") + + var formattedLinesBuffer bytes.Buffer + + contentLines := strings.SplitAfter(result.Content[startIndex:endIndex], "\n") + lineNums := make([]int, 0, len(contentLines)) + index := startIndex + for i, line := range contentLines { + var err error + if index < result.EndIndex && + result.StartIndex < index+len(line) && + result.StartIndex < result.EndIndex { + openActiveIndex := max(result.StartIndex-index, 0) + closeActiveIndex := min(result.EndIndex-index, len(line)) + err = writeStrings(&formattedLinesBuffer, + line[:openActiveIndex], + line[openActiveIndex:closeActiveIndex], + line[closeActiveIndex:], + ) + } else { + err = writeStrings(&formattedLinesBuffer, line) + } + if err != nil { + return nil, err + } + + lineNums = append(lineNums, startLineNum+i) + index += len(line) + } + + return &Result{ + RepoID: result.RepoID, + Filename: result.Filename, + CommitID: result.CommitID, + UpdatedUnix: result.UpdatedUnix, + Language: result.Language, + Color: result.Color, + Lines: HighlightSearchResultCode(result.Filename, lineNums, formattedLinesBuffer.String()), + }, nil +} + +// PerformSearch perform a search on a repository +// if isFuzzy is true set the Damerau-Levenshtein distance from 0 to 2 +func PerformSearch(ctx context.Context, opts *SearchOptions) (int, []*Result, []*SearchResultLanguages, error) { + if opts == nil || len(opts.Keyword) == 0 { + return 0, nil, nil, nil + } + + total, results, resultLanguages, err := (*globalIndexer.Load()).Search(ctx, opts) + if err != nil { + return 0, nil, nil, err + } + + displayResults := make([]*Result, len(results)) + + for i, result := range results { + startIndex, endIndex := indices(result.Content, result.StartIndex, result.EndIndex) + displayResults[i], err = searchResult(result, startIndex, endIndex) + if err != nil { + return 0, nil, nil, err + } + } + return int(total), displayResults, resultLanguages, nil +} diff --git a/modules/indexer/internal/base32.go b/modules/indexer/internal/base32.go new file mode 100644 index 00000000..aca756c6 --- /dev/null +++ b/modules/indexer/internal/base32.go @@ -0,0 +1,21 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "fmt" + "strconv" +) + +func Base36(i int64) string { + return strconv.FormatInt(i, 36) +} + +func ParseBase36(s string) (int64, error) { + i, err := strconv.ParseInt(s, 36, 64) + if err != nil { + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) + } + return i, nil +} diff --git a/modules/indexer/internal/bleve/batch.go b/modules/indexer/internal/bleve/batch.go new file mode 100644 index 00000000..ed5ef077 --- /dev/null +++ b/modules/indexer/internal/bleve/batch.go @@ -0,0 +1,58 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "github.com/blevesearch/bleve/v2" +) + +// FlushingBatch is a batch of operations that automatically flushes to the +// underlying index once it reaches a certain size. +type FlushingBatch struct { + maxBatchSize int + batch *bleve.Batch + index bleve.Index +} + +// NewFlushingBatch creates a new flushing batch for the specified index. Once +// the number of operations in the batch reaches the specified limit, the batch +// automatically flushes its operations to the index. +func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { + return &FlushingBatch{ + maxBatchSize: maxBatchSize, + batch: index.NewBatch(), + index: index, + } +} + +// Index add a new index to batch +func (b *FlushingBatch) Index(id string, data any) error { + if err := b.batch.Index(id, data); err != nil { + return err + } + return b.flushIfFull() +} + +// Delete add a delete index to batch +func (b *FlushingBatch) Delete(id string) error { + b.batch.Delete(id) + return b.flushIfFull() +} + +func (b *FlushingBatch) flushIfFull() error { + if b.batch.Size() < b.maxBatchSize { + return nil + } + return b.Flush() +} + +// Flush submit the batch and create a new one +func (b *FlushingBatch) Flush() error { + err := b.index.Batch(b.batch) + if err != nil { + return err + } + b.batch = b.index.NewBatch() + return nil +} diff --git a/modules/indexer/internal/bleve/indexer.go b/modules/indexer/internal/bleve/indexer.go new file mode 100644 index 00000000..1435d2f5 --- /dev/null +++ b/modules/indexer/internal/bleve/indexer.go @@ -0,0 +1,102 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" + "code.gitea.io/gitea/modules/log" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/mapping" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic bleve indexer implementation +type Indexer struct { + Indexer bleve.Index + + indexDir string + version int + mappingGetter MappingGetter +} + +type MappingGetter func() (mapping.IndexMapping, error) + +func NewIndexer(indexDir string, version int, mappingGetter func() (mapping.IndexMapping, error)) *Indexer { + return &Indexer{ + indexDir: indexDir, + version: version, + mappingGetter: mappingGetter, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + + if i.Indexer != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + indexer, version, err := openIndexer(i.indexDir, i.version) + if err != nil { + return false, err + } + if indexer != nil { + i.Indexer = indexer + return true, nil + } + + if version != 0 { + log.Warn("Found older bleve index with version %d, Forgejo will remove it and rebuild", version) + } + + indexMapping, err := i.mappingGetter() + if err != nil { + return false, err + } + + indexer, err = bleve.New(i.indexDir, indexMapping) + if err != nil { + return false, err + } + + if err = writeIndexMetadata(i.indexDir, &IndexMetadata{ + Version: i.version, + }); err != nil { + return false, err + } + + i.Indexer = indexer + + return false, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(_ context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Indexer == nil { + return fmt.Errorf("indexer is not initialized") + } + return nil +} + +func (i *Indexer) Close() { + if i == nil || i.Indexer == nil { + return + } + + if err := i.Indexer.Close(); err != nil { + log.Error("Failed to close bleve indexer in %q: %v", i.indexDir, err) + } + i.Indexer = nil +} diff --git a/modules/indexer/internal/bleve/metadata.go b/modules/indexer/internal/bleve/metadata.go new file mode 100644 index 00000000..3c570ab4 --- /dev/null +++ b/modules/indexer/internal/bleve/metadata.go @@ -0,0 +1,55 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Copied and modified from https://github.com/ethantkoenig/rupture (MIT License) + +package bleve + +import ( + "os" + "path/filepath" + + "code.gitea.io/gitea/modules/json" +) + +const metaFilename = "rupture_meta.json" + +func indexMetadataPath(dir string) string { + return filepath.Join(dir, metaFilename) +} + +// IndexMetadata contains metadata about a bleve index. +type IndexMetadata struct { + // The version of the data in the index. This can be useful for tracking + // schema changes or data migrations. + Version int `json:"version"` +} + +// readIndexMetadata returns the metadata for the index at the specified path. +// If no such index metadata exists, an empty metadata and a nil error are +// returned. +func readIndexMetadata(path string) (*IndexMetadata, error) { + meta := &IndexMetadata{} + metaPath := indexMetadataPath(path) + if _, err := os.Stat(metaPath); os.IsNotExist(err) { + return meta, nil + } else if err != nil { + return nil, err + } + + metaBytes, err := os.ReadFile(metaPath) + if err != nil { + return nil, err + } + return meta, json.Unmarshal(metaBytes, &meta) +} + +// writeIndexMetadata writes metadata for the index at the specified path. +func writeIndexMetadata(path string, meta *IndexMetadata) error { + metaBytes, err := json.Marshal(meta) + if err != nil { + return err + } + + return os.WriteFile(indexMetadataPath(path), metaBytes, 0o644) +} diff --git a/modules/indexer/internal/bleve/metadata_test.go b/modules/indexer/internal/bleve/metadata_test.go new file mode 100644 index 00000000..31603a92 --- /dev/null +++ b/modules/indexer/internal/bleve/metadata_test.go @@ -0,0 +1,28 @@ +// Copyright 2024 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Copied and modified from https://github.com/ethantkoenig/rupture (MIT License) + +package bleve + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMetadata(t *testing.T) { + dir := t.TempDir() + + meta, err := readIndexMetadata(dir) + require.NoError(t, err) + assert.Equal(t, &IndexMetadata{}, meta) + + meta.Version = 24 + require.NoError(t, writeIndexMetadata(dir, meta)) + + meta, err = readIndexMetadata(dir) + require.NoError(t, err) + assert.EqualValues(t, 24, meta.Version) +} diff --git a/modules/indexer/internal/bleve/query.go b/modules/indexer/internal/bleve/query.go new file mode 100644 index 00000000..21422b28 --- /dev/null +++ b/modules/indexer/internal/bleve/query.go @@ -0,0 +1,56 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "code.gitea.io/gitea/modules/optional" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/search/query" +) + +// NumericEqualityQuery generates a numeric equality query for the given value and field +func NumericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q +} + +// MatchPhraseQuery generates a match phrase query for the given phrase, field and analyzer +func MatchPhraseQuery(matchPhrase, field, analyzer string, fuzziness int) *query.MatchPhraseQuery { + q := bleve.NewMatchPhraseQuery(matchPhrase) + q.FieldVal = field + q.Analyzer = analyzer + q.Fuzziness = fuzziness + return q +} + +// BoolFieldQuery generates a bool field query for the given value and field +func BoolFieldQuery(value bool, field string) *query.BoolFieldQuery { + q := bleve.NewBoolFieldQuery(value) + q.SetField(field) + return q +} + +func NumericRangeInclusiveQuery(min, max optional.Option[int64], field string) *query.NumericRangeQuery { + var minF, maxF *float64 + var minI, maxI *bool + if min.Has() { + minF = new(float64) + *minF = float64(min.Value()) + minI = new(bool) + *minI = true + } + if max.Has() { + maxF = new(float64) + *maxF = float64(max.Value()) + maxI = new(bool) + *maxI = true + } + q := bleve.NewNumericRangeInclusiveQuery(minF, maxF, minI, maxI) + q.SetField(field) + return q +} diff --git a/modules/indexer/internal/bleve/util.go b/modules/indexer/internal/bleve/util.go new file mode 100644 index 00000000..d05b6797 --- /dev/null +++ b/modules/indexer/internal/bleve/util.go @@ -0,0 +1,48 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "errors" + "os" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/index/upsidedown" +) + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, int, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, 0, nil + } else if err != nil { + return nil, 0, err + } + + metadata, err := readIndexMetadata(path) + if err != nil { + return nil, 0, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, metadata.Version, util.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil { + if errors.Is(err, upsidedown.IncompatibleVersion) { + log.Warn("Indexer was built with a previous version of bleve, deleting and rebuilding") + return nil, 0, util.RemoveAll(path) + } + return nil, 0, err + } + + return index, 0, nil +} diff --git a/modules/indexer/internal/db/indexer.go b/modules/indexer/internal/db/indexer.go new file mode 100644 index 00000000..3deec836 --- /dev/null +++ b/modules/indexer/internal/db/indexer.go @@ -0,0 +1,34 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package db + +import ( + "context" + + "code.gitea.io/gitea/modules/indexer/internal" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic db indexer implementation +type Indexer struct{} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + // Return true to indicate that the index was opened/existed. + // So that the indexer will not try to populate the index, the data is already there. + return true, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(_ context.Context) error { + // No need to ping database to check if it is available. + // If the database goes down, Gitea will go down, so nobody will care if the indexer is available. + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + // nothing to do +} diff --git a/modules/indexer/internal/elasticsearch/indexer.go b/modules/indexer/internal/elasticsearch/indexer.go new file mode 100644 index 00000000..395eea3b --- /dev/null +++ b/modules/indexer/internal/elasticsearch/indexer.go @@ -0,0 +1,93 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" + + "github.com/olivere/elastic/v7" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer represents a basic elasticsearch indexer implementation +type Indexer struct { + Client *elastic.Client + + url string + indexName string + version int + mapping string +} + +func NewIndexer(url, indexName string, version int, mapping string) *Indexer { + return &Indexer{ + url: url, + indexName: indexName, + version: version, + mapping: mapping, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(ctx context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + if i.Client != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + client, err := i.initClient() + if err != nil { + return false, err + } + i.Client = client + + exists, err := i.Client.IndexExists(i.VersionedIndexName()).Do(ctx) + if err != nil { + return false, err + } + if exists { + return true, nil + } + + if err := i.createIndex(ctx); err != nil { + return false, err + } + + return exists, nil +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(ctx context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Client == nil { + return fmt.Errorf("indexer is not initialized") + } + + resp, err := i.Client.ClusterHealth().Do(ctx) + if err != nil { + return err + } + if resp.Status != "green" && resp.Status != "yellow" { + // It's healthy if the status is green, and it's available if the status is yellow, + // see https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html + return fmt.Errorf("status of elasticsearch cluster is %s", resp.Status) + } + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + if i == nil { + return + } + i.Client = nil +} diff --git a/modules/indexer/internal/elasticsearch/util.go b/modules/indexer/internal/elasticsearch/util.go new file mode 100644 index 00000000..18cb152d --- /dev/null +++ b/modules/indexer/internal/elasticsearch/util.go @@ -0,0 +1,68 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/olivere/elastic/v7" +) + +// VersionedIndexName returns the full index name with version +func (i *Indexer) VersionedIndexName() string { + return versionedIndexName(i.indexName, i.version) +} + +func versionedIndexName(indexName string, version int) string { + if version == 0 { + // Old index name without version + return indexName + } + return fmt.Sprintf("%s.v%d", indexName, version) +} + +func (i *Indexer) createIndex(ctx context.Context) error { + createIndex, err := i.Client.CreateIndex(i.VersionedIndexName()).BodyString(i.mapping).Do(ctx) + if err != nil { + return err + } + if !createIndex.Acknowledged { + return fmt.Errorf("create index %s with %s failed", i.VersionedIndexName(), i.mapping) + } + + i.checkOldIndexes(ctx) + + return nil +} + +func (i *Indexer) initClient() (*elastic.Client, error) { + opts := []elastic.ClientOptionFunc{ + elastic.SetURL(i.url), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10 * time.Second), + elastic.SetGzip(false), + } + + logger := log.GetLogger(log.DEFAULT) + + opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace})) + opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info})) + opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error})) + + return elastic.NewClient(opts...) +} + +func (i *Indexer) checkOldIndexes(ctx context.Context) { + for v := 0; v < i.version; v++ { + indexName := versionedIndexName(i.indexName, v) + exists, err := i.Client.IndexExists(indexName).Do(ctx) + if err == nil && exists { + log.Warn("Found older elasticsearch index named %q, Forgejo will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName) + } + } +} diff --git a/modules/indexer/internal/indexer.go b/modules/indexer/internal/indexer.go new file mode 100644 index 00000000..c7f356da --- /dev/null +++ b/modules/indexer/internal/indexer.go @@ -0,0 +1,37 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" +) + +// Indexer defines an basic indexer interface +type Indexer interface { + // Init initializes the indexer + // returns true if the index was opened/existed (with data populated), false if it was created/not-existed (with no data) + Init(ctx context.Context) (bool, error) + // Ping checks if the indexer is available + Ping(ctx context.Context) error + // Close closes the indexer + Close() +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{} +} + +type dummyIndexer struct{} + +func (d *dummyIndexer) Init(ctx context.Context) (bool, error) { + return false, fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Ping(ctx context.Context) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Close() {} diff --git a/modules/indexer/internal/meilisearch/filter.go b/modules/indexer/internal/meilisearch/filter.go new file mode 100644 index 00000000..593177f1 --- /dev/null +++ b/modules/indexer/internal/meilisearch/filter.go @@ -0,0 +1,119 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "fmt" + "strings" +) + +// Filter represents a filter for meilisearch queries. +// It's just a simple wrapper around a string. +// DO NOT assume that it is a complete implementation. +type Filter interface { + Statement() string +} + +type FilterAnd struct { + filters []Filter +} + +func (f *FilterAnd) Statement() string { + var statements []string + for _, filter := range f.filters { + if s := filter.Statement(); s != "" { + statements = append(statements, fmt.Sprintf("(%s)", s)) + } + } + return strings.Join(statements, " AND ") +} + +func (f *FilterAnd) And(filter Filter) *FilterAnd { + f.filters = append(f.filters, filter) + return f +} + +type FilterOr struct { + filters []Filter +} + +func (f *FilterOr) Statement() string { + var statements []string + for _, filter := range f.filters { + if s := filter.Statement(); s != "" { + statements = append(statements, fmt.Sprintf("(%s)", s)) + } + } + return strings.Join(statements, " OR ") +} + +func (f *FilterOr) Or(filter Filter) *FilterOr { + f.filters = append(f.filters, filter) + return f +} + +type FilterIn string + +// NewFilterIn creates a new FilterIn. +// It supports int64 only, to avoid extra works to handle strings with special characters. +func NewFilterIn[T int64](field string, values ...T) FilterIn { + if len(values) == 0 { + return "" + } + vs := make([]string, len(values)) + for i, v := range values { + vs[i] = fmt.Sprintf("%v", v) + } + return FilterIn(fmt.Sprintf("%s IN [%v]", field, strings.Join(vs, ", "))) +} + +func (f FilterIn) Statement() string { + return string(f) +} + +type FilterEq string + +// NewFilterEq creates a new FilterEq. +// It supports int64 and bool only, to avoid extra works to handle strings with special characters. +func NewFilterEq[T bool | int64](field string, value T) FilterEq { + return FilterEq(fmt.Sprintf("%s = %v", field, value)) +} + +func (f FilterEq) Statement() string { + return string(f) +} + +type FilterNot string + +func NewFilterNot(filter Filter) FilterNot { + return FilterNot(fmt.Sprintf("NOT (%s)", filter.Statement())) +} + +func (f FilterNot) Statement() string { + return string(f) +} + +type FilterGte string + +// NewFilterGte creates a new FilterGte. +// It supports int64 only, to avoid extra works to handle strings with special characters. +func NewFilterGte[T int64](field string, value T) FilterGte { + return FilterGte(fmt.Sprintf("%s >= %v", field, value)) +} + +func (f FilterGte) Statement() string { + return string(f) +} + +type FilterLte string + +// NewFilterLte creates a new FilterLte. +// It supports int64 only, to avoid extra works to handle strings with special characters. +func NewFilterLte[T int64](field string, value T) FilterLte { + return FilterLte(fmt.Sprintf("%s <= %v", field, value)) +} + +func (f FilterLte) Statement() string { + return string(f) +} diff --git a/modules/indexer/internal/meilisearch/indexer.go b/modules/indexer/internal/meilisearch/indexer.go new file mode 100644 index 00000000..f4004849 --- /dev/null +++ b/modules/indexer/internal/meilisearch/indexer.go @@ -0,0 +1,91 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "context" + "fmt" + + "github.com/meilisearch/meilisearch-go" +) + +// Indexer represents a basic meilisearch indexer implementation +type Indexer struct { + Client *meilisearch.Client + + url, apiKey string + indexName string + version int + settings *meilisearch.Settings +} + +func NewIndexer(url, apiKey, indexName string, version int, settings *meilisearch.Settings) *Indexer { + return &Indexer{ + url: url, + apiKey: apiKey, + indexName: indexName, + version: version, + settings: settings, + } +} + +// Init initializes the indexer +func (i *Indexer) Init(_ context.Context) (bool, error) { + if i == nil { + return false, fmt.Errorf("cannot init nil indexer") + } + + if i.Client != nil { + return false, fmt.Errorf("indexer is already initialized") + } + + i.Client = meilisearch.NewClient(meilisearch.ClientConfig{ + Host: i.url, + APIKey: i.apiKey, + }) + + _, err := i.Client.GetIndex(i.VersionedIndexName()) + if err == nil { + return true, nil + } + _, err = i.Client.CreateIndex(&meilisearch.IndexConfig{ + Uid: i.VersionedIndexName(), + PrimaryKey: "id", + }) + if err != nil { + return false, err + } + + i.checkOldIndexes() + + _, err = i.Client.Index(i.VersionedIndexName()).UpdateSettings(i.settings) + return false, err +} + +// Ping checks if the indexer is available +func (i *Indexer) Ping(ctx context.Context) error { + if i == nil { + return fmt.Errorf("cannot ping nil indexer") + } + if i.Client == nil { + return fmt.Errorf("indexer is not initialized") + } + resp, err := i.Client.Health() + if err != nil { + return err + } + if resp.Status != "available" { + // See https://docs.meilisearch.com/reference/api/health.html#status + return fmt.Errorf("status of meilisearch is not available: %s", resp.Status) + } + return nil +} + +// Close closes the indexer +func (i *Indexer) Close() { + if i == nil { + return + } + i.Client = nil +} diff --git a/modules/indexer/internal/meilisearch/util.go b/modules/indexer/internal/meilisearch/util.go new file mode 100644 index 00000000..845bdb6e --- /dev/null +++ b/modules/indexer/internal/meilisearch/util.go @@ -0,0 +1,38 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "fmt" + + "code.gitea.io/gitea/modules/log" +) + +// VersionedIndexName returns the full index name with version +func (i *Indexer) VersionedIndexName() string { + return versionedIndexName(i.indexName, i.version) +} + +func versionedIndexName(indexName string, version int) string { + if version == 0 { + // Old index name without version + return indexName + } + + // The format of the index name is <index_name>_v<version>, not <index_name>.v<version> like elasticsearch. + // Because meilisearch does not support "." in index name, it should contain only alphanumeric characters, hyphens (-) and underscores (_). + // See https://www.meilisearch.com/docs/learn/core_concepts/indexes#index-uid + + return fmt.Sprintf("%s_v%d", indexName, version) +} + +func (i *Indexer) checkOldIndexes() { + for v := 0; v < i.version; v++ { + indexName := versionedIndexName(i.indexName, v) + _, err := i.Client.GetIndex(indexName) + if err == nil { + log.Warn("Found older meilisearch index named %q, Forgejo will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName) + } + } +} diff --git a/modules/indexer/internal/paginator.go b/modules/indexer/internal/paginator.go new file mode 100644 index 00000000..ee204bf0 --- /dev/null +++ b/modules/indexer/internal/paginator.go @@ -0,0 +1,34 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "math" + + "code.gitea.io/gitea/models/db" +) + +// ParsePaginator parses a db.Paginator into a skip and limit +func ParsePaginator(paginator *db.ListOptions, max ...int) (int, int) { + // Use a very large number to indicate no limit + unlimited := math.MaxInt32 + if len(max) > 0 { + // Some indexer engines have a limit on the page size, respect that + unlimited = max[0] + } + + if paginator == nil || paginator.IsListAll() { + // It shouldn't happen. In actual usage scenarios, there should not be requests to search all. + // But if it does happen, respect it and return "unlimited". + // And it's also useful for testing. + return 0, unlimited + } + + if paginator.PageSize == 0 { + // Do not return any results when searching, it's used to get the total count only. + return 0, 0 + } + + return paginator.GetSkipTake() +} diff --git a/modules/indexer/issues/bleve/bleve.go b/modules/indexer/issues/bleve/bleve.go new file mode 100644 index 00000000..b20fcc6f --- /dev/null +++ b/modules/indexer/issues/bleve/bleve.go @@ -0,0 +1,300 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "context" + + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_bleve "code.gitea.io/gitea/modules/indexer/internal/bleve" + "code.gitea.io/gitea/modules/indexer/issues/internal" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" + "github.com/blevesearch/bleve/v2/analysis/token/camelcase" + "github.com/blevesearch/bleve/v2/analysis/token/lowercase" + "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/v2/mapping" + "github.com/blevesearch/bleve/v2/search/query" +) + +const ( + issueIndexerAnalyzer = "issueIndexer" + issueIndexerDocType = "issueIndexerDocType" + issueIndexerLatestVersion = 4 +) + +const unicodeNormalizeName = "unicodeNormalize" + +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]any{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) +} + +const ( + maxBatchSize = 16 + // fuzzyDenominator determines the levenshtein distance per each character of a keyword + fuzzyDenominator = 4 + // see https://github.com/blevesearch/bleve/issues/1563#issuecomment-786822311 + maxFuzziness = 2 +) + +// IndexerData an update to the issue indexer +type IndexerData internal.IndexerData + +// Type returns the document type, for bleve's mapping.Classifier interface. +func (i *IndexerData) Type() string { + return issueIndexerDocType +} + +// generateIssueIndexMapping generates the bleve index mapping for issues +func generateIssueIndexMapping() (mapping.IndexMapping, error) { + mapping := bleve.NewIndexMapping() + docMapping := bleve.NewDocumentMapping() + + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.Store = false + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("repo_id", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.Store = false + textFieldMapping.IncludeInAll = false + + boolFieldMapping := bleve.NewBooleanFieldMapping() + boolFieldMapping.Store = false + boolFieldMapping.IncludeInAll = false + + numberFieldMapping := bleve.NewNumericFieldMapping() + numberFieldMapping.Store = false + numberFieldMapping.IncludeInAll = false + + docMapping.AddFieldMappingsAt("is_public", boolFieldMapping) + + docMapping.AddFieldMappingsAt("title", textFieldMapping) + docMapping.AddFieldMappingsAt("content", textFieldMapping) + docMapping.AddFieldMappingsAt("comments", textFieldMapping) + + docMapping.AddFieldMappingsAt("is_pull", boolFieldMapping) + docMapping.AddFieldMappingsAt("is_closed", boolFieldMapping) + docMapping.AddFieldMappingsAt("label_ids", numberFieldMapping) + docMapping.AddFieldMappingsAt("no_label", boolFieldMapping) + docMapping.AddFieldMappingsAt("milestone_id", numberFieldMapping) + docMapping.AddFieldMappingsAt("project_id", numberFieldMapping) + docMapping.AddFieldMappingsAt("project_board_id", numberFieldMapping) + docMapping.AddFieldMappingsAt("poster_id", numberFieldMapping) + docMapping.AddFieldMappingsAt("assignee_id", numberFieldMapping) + docMapping.AddFieldMappingsAt("mention_ids", numberFieldMapping) + docMapping.AddFieldMappingsAt("reviewed_ids", numberFieldMapping) + docMapping.AddFieldMappingsAt("review_requested_ids", numberFieldMapping) + docMapping.AddFieldMappingsAt("subscriber_ids", numberFieldMapping) + docMapping.AddFieldMappingsAt("updated_unix", numberFieldMapping) + + docMapping.AddFieldMappingsAt("created_unix", numberFieldMapping) + docMapping.AddFieldMappingsAt("deadline_unix", numberFieldMapping) + docMapping.AddFieldMappingsAt("comment_count", numberFieldMapping) + + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return nil, err + } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]any{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name}, + }); err != nil { + return nil, err + } + + mapping.DefaultAnalyzer = issueIndexerAnalyzer + mapping.AddDocumentMapping(issueIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + mapping.DefaultMapping = bleve.NewDocumentDisabledMapping() // disable default mapping, avoid indexing unexpected structs + + return mapping, nil +} + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_bleve.Indexer + indexer_internal.Indexer // do not composite inner_bleve.Indexer directly to avoid exposing too much +} + +// NewIndexer creates a new bleve local indexer +func NewIndexer(indexDir string) *Indexer { + inner := inner_bleve.NewIndexer(indexDir, issueIndexerLatestVersion, generateIssueIndexMapping) + return &Indexer{ + Indexer: inner, + inner: inner, + } +} + +// Index will save the index data +func (b *Indexer) Index(_ context.Context, issues ...*internal.IndexerData) error { + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) + for _, issue := range issues { + if err := batch.Index(indexer_internal.Base36(issue.ID), (*IndexerData)(issue)); err != nil { + return err + } + } + return batch.Flush() +} + +// Delete deletes indexes by ids +func (b *Indexer) Delete(_ context.Context, ids ...int64) error { + batch := inner_bleve.NewFlushingBatch(b.inner.Indexer, maxBatchSize) + for _, id := range ids { + if err := batch.Delete(indexer_internal.Base36(id)); err != nil { + return err + } + } + return batch.Flush() +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (*internal.SearchResult, error) { + var queries []query.Query + + if options.Keyword != "" { + fuzziness := 0 + if options.IsFuzzyKeyword { + fuzziness = min(maxFuzziness, len(options.Keyword)/fuzzyDenominator) + } + + queries = append(queries, bleve.NewDisjunctionQuery([]query.Query{ + inner_bleve.MatchPhraseQuery(options.Keyword, "title", issueIndexerAnalyzer, fuzziness), + inner_bleve.MatchPhraseQuery(options.Keyword, "content", issueIndexerAnalyzer, fuzziness), + inner_bleve.MatchPhraseQuery(options.Keyword, "comments", issueIndexerAnalyzer, fuzziness), + }...)) + } + + if len(options.RepoIDs) > 0 || options.AllPublic { + var repoQueries []query.Query + for _, repoID := range options.RepoIDs { + repoQueries = append(repoQueries, inner_bleve.NumericEqualityQuery(repoID, "repo_id")) + } + if options.AllPublic { + repoQueries = append(repoQueries, inner_bleve.BoolFieldQuery(true, "is_public")) + } + queries = append(queries, bleve.NewDisjunctionQuery(repoQueries...)) + } + + if options.IsPull.Has() { + queries = append(queries, inner_bleve.BoolFieldQuery(options.IsPull.Value(), "is_pull")) + } + if options.IsClosed.Has() { + queries = append(queries, inner_bleve.BoolFieldQuery(options.IsClosed.Value(), "is_closed")) + } + + if options.NoLabelOnly { + queries = append(queries, inner_bleve.BoolFieldQuery(true, "no_label")) + } else { + if len(options.IncludedLabelIDs) > 0 { + var includeQueries []query.Query + for _, labelID := range options.IncludedLabelIDs { + includeQueries = append(includeQueries, inner_bleve.NumericEqualityQuery(labelID, "label_ids")) + } + queries = append(queries, bleve.NewConjunctionQuery(includeQueries...)) + } else if len(options.IncludedAnyLabelIDs) > 0 { + var includeQueries []query.Query + for _, labelID := range options.IncludedAnyLabelIDs { + includeQueries = append(includeQueries, inner_bleve.NumericEqualityQuery(labelID, "label_ids")) + } + queries = append(queries, bleve.NewDisjunctionQuery(includeQueries...)) + } + if len(options.ExcludedLabelIDs) > 0 { + var excludeQueries []query.Query + for _, labelID := range options.ExcludedLabelIDs { + q := bleve.NewBooleanQuery() + q.AddMustNot(inner_bleve.NumericEqualityQuery(labelID, "label_ids")) + excludeQueries = append(excludeQueries, q) + } + queries = append(queries, bleve.NewConjunctionQuery(excludeQueries...)) + } + } + + if len(options.MilestoneIDs) > 0 { + var milestoneQueries []query.Query + for _, milestoneID := range options.MilestoneIDs { + milestoneQueries = append(milestoneQueries, inner_bleve.NumericEqualityQuery(milestoneID, "milestone_id")) + } + queries = append(queries, bleve.NewDisjunctionQuery(milestoneQueries...)) + } + + if options.ProjectID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.ProjectID.Value(), "project_id")) + } + if options.ProjectColumnID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.ProjectColumnID.Value(), "project_board_id")) + } + + if options.PosterID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.PosterID.Value(), "poster_id")) + } + + if options.AssigneeID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.AssigneeID.Value(), "assignee_id")) + } + + if options.MentionID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.MentionID.Value(), "mention_ids")) + } + + if options.ReviewedID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.ReviewedID.Value(), "reviewed_ids")) + } + if options.ReviewRequestedID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.ReviewRequestedID.Value(), "review_requested_ids")) + } + + if options.SubscriberID.Has() { + queries = append(queries, inner_bleve.NumericEqualityQuery(options.SubscriberID.Value(), "subscriber_ids")) + } + + if options.UpdatedAfterUnix.Has() || options.UpdatedBeforeUnix.Has() { + queries = append(queries, inner_bleve.NumericRangeInclusiveQuery( + options.UpdatedAfterUnix, + options.UpdatedBeforeUnix, + "updated_unix")) + } + + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) + if len(queries) == 0 { + indexerQuery = bleve.NewMatchAllQuery() + } + + skip, limit := indexer_internal.ParsePaginator(options.Paginator) + search := bleve.NewSearchRequestOptions(indexerQuery, limit, skip, false) + + if options.SortBy == "" { + options.SortBy = internal.SortByCreatedAsc + } + + search.SortBy([]string{string(options.SortBy), "-_id"}) + + result, err := b.inner.Indexer.SearchInContext(ctx, search) + if err != nil { + return nil, err + } + + ret := &internal.SearchResult{ + Total: int64(result.Total), + Hits: make([]internal.Match, 0, len(result.Hits)), + } + for _, hit := range result.Hits { + id, err := indexer_internal.ParseBase36(hit.ID) + if err != nil { + return nil, err + } + ret.Hits = append(ret.Hits, internal.Match{ + ID: id, + }) + } + return ret, nil +} diff --git a/modules/indexer/issues/bleve/bleve_test.go b/modules/indexer/issues/bleve/bleve_test.go new file mode 100644 index 00000000..908514a0 --- /dev/null +++ b/modules/indexer/issues/bleve/bleve_test.go @@ -0,0 +1,18 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleve + +import ( + "testing" + + "code.gitea.io/gitea/modules/indexer/issues/internal/tests" +) + +func TestBleveIndexer(t *testing.T) { + dir := t.TempDir() + indexer := NewIndexer(dir) + defer indexer.Close() + + tests.TestIndexer(t, indexer) +} diff --git a/modules/indexer/issues/db/db.go b/modules/indexer/issues/db/db.go new file mode 100644 index 00000000..05ec5484 --- /dev/null +++ b/modules/indexer/issues/db/db.go @@ -0,0 +1,107 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package db + +import ( + "context" + + "code.gitea.io/gitea/models/db" + issue_model "code.gitea.io/gitea/models/issues" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_db "code.gitea.io/gitea/modules/indexer/internal/db" + "code.gitea.io/gitea/modules/indexer/issues/internal" + + "xorm.io/builder" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface to use database's like search +type Indexer struct { + indexer_internal.Indexer +} + +func NewIndexer() *Indexer { + return &Indexer{ + Indexer: &inner_db.Indexer{}, + } +} + +// Index dummy function +func (i *Indexer) Index(_ context.Context, _ ...*internal.IndexerData) error { + return nil +} + +// Delete dummy function +func (i *Indexer) Delete(_ context.Context, _ ...int64) error { + return nil +} + +// Search searches for issues +func (i *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (*internal.SearchResult, error) { + // FIXME: I tried to avoid importing models here, but it seems to be impossible. + // We can provide a function to register the search function, so models/issues can register it. + // So models/issues will import modules/indexer/issues, it's OK because it's by design. + // But modules/indexer/issues has already imported models/issues to do UpdateRepoIndexer and UpdateIssueIndexer. + // And to avoid circular import, we have to move the functions to another package. + // I believe it should be services/indexer, sounds great! + // But the two functions are used in modules/notification/indexer, that means we will import services/indexer in modules/notification/indexer. + // So that's the root problem: + // The notification is defined in modules, but it's using lots of things should be in services. + + cond := builder.NewCond() + + if options.Keyword != "" { + repoCond := builder.In("repo_id", options.RepoIDs) + if len(options.RepoIDs) == 1 { + repoCond = builder.Eq{"repo_id": options.RepoIDs[0]} + } + subQuery := builder.Select("id").From("issue").Where(repoCond) + + cond = builder.Or( + db.BuildCaseInsensitiveLike("issue.name", options.Keyword), + db.BuildCaseInsensitiveLike("issue.content", options.Keyword), + builder.In("issue.id", builder.Select("issue_id"). + From("comment"). + Where(builder.And( + builder.Eq{"type": issue_model.CommentTypeComment}, + builder.In("issue_id", subQuery), + db.BuildCaseInsensitiveLike("content", options.Keyword), + )), + ), + ) + } + + opt, err := ToDBOptions(ctx, options) + if err != nil { + return nil, err + } + + // If pagesize == 0, return total count only. It's a special case for search count. + if options.Paginator != nil && options.Paginator.PageSize == 0 { + total, err := issue_model.CountIssues(ctx, opt, cond) + if err != nil { + return nil, err + } + return &internal.SearchResult{ + Total: total, + }, nil + } + + ids, total, err := issue_model.IssueIDs(ctx, opt, cond) + if err != nil { + return nil, err + } + + hits := make([]internal.Match, 0, len(ids)) + for _, id := range ids { + hits = append(hits, internal.Match{ + ID: id, + }) + } + return &internal.SearchResult{ + Total: total, + Hits: hits, + }, nil +} diff --git a/modules/indexer/issues/db/options.go b/modules/indexer/issues/db/options.go new file mode 100644 index 00000000..875a4ca2 --- /dev/null +++ b/modules/indexer/issues/db/options.go @@ -0,0 +1,112 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package db + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/models/db" + issue_model "code.gitea.io/gitea/models/issues" + "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/optional" +) + +func ToDBOptions(ctx context.Context, options *internal.SearchOptions) (*issue_model.IssuesOptions, error) { + var sortType string + switch options.SortBy { + case internal.SortByCreatedAsc: + sortType = "oldest" + case internal.SortByUpdatedAsc: + sortType = "leastupdate" + case internal.SortByCommentsAsc: + sortType = "leastcomment" + case internal.SortByDeadlineDesc: + sortType = "farduedate" + case internal.SortByCreatedDesc: + sortType = "newest" + case internal.SortByUpdatedDesc: + sortType = "recentupdate" + case internal.SortByCommentsDesc: + sortType = "mostcomment" + case internal.SortByDeadlineAsc: + sortType = "nearduedate" + default: + sortType = "newest" + } + + // See the comment of issues_model.SearchOptions for the reason why we need to convert + convertID := func(id optional.Option[int64]) int64 { + if !id.Has() { + return 0 + } + value := id.Value() + if value == 0 { + return db.NoConditionID + } + return value + } + + opts := &issue_model.IssuesOptions{ + Paginator: options.Paginator, + RepoIDs: options.RepoIDs, + AllPublic: options.AllPublic, + RepoCond: nil, + AssigneeID: convertID(options.AssigneeID), + PosterID: convertID(options.PosterID), + MentionedID: convertID(options.MentionID), + ReviewRequestedID: convertID(options.ReviewRequestedID), + ReviewedID: convertID(options.ReviewedID), + SubscriberID: convertID(options.SubscriberID), + ProjectID: convertID(options.ProjectID), + ProjectColumnID: convertID(options.ProjectColumnID), + IsClosed: options.IsClosed, + IsPull: options.IsPull, + IncludedLabelNames: nil, + ExcludedLabelNames: nil, + IncludeMilestones: nil, + SortType: sortType, + IssueIDs: nil, + UpdatedAfterUnix: options.UpdatedAfterUnix.Value(), + UpdatedBeforeUnix: options.UpdatedBeforeUnix.Value(), + PriorityRepoID: 0, + IsArchived: optional.None[bool](), + Org: nil, + Team: nil, + User: nil, + } + + if len(options.MilestoneIDs) == 1 && options.MilestoneIDs[0] == 0 { + opts.MilestoneIDs = []int64{db.NoConditionID} + } else { + opts.MilestoneIDs = options.MilestoneIDs + } + + if options.NoLabelOnly { + opts.LabelIDs = []int64{0} // Be careful, it's zero, not db.NoConditionID + } else { + opts.LabelIDs = make([]int64, 0, len(options.IncludedLabelIDs)+len(options.ExcludedLabelIDs)) + opts.LabelIDs = append(opts.LabelIDs, options.IncludedLabelIDs...) + for _, id := range options.ExcludedLabelIDs { + opts.LabelIDs = append(opts.LabelIDs, -id) + } + + if len(options.IncludedLabelIDs) == 0 && len(options.IncludedAnyLabelIDs) > 0 { + labels, err := issue_model.GetLabelsByIDs(ctx, options.IncludedAnyLabelIDs, "name") + if err != nil { + return nil, fmt.Errorf("GetLabelsByIDs: %v", err) + } + set := container.Set[string]{} + for _, label := range labels { + if !set.Contains(label.Name) { + set.Add(label.Name) + opts.IncludedLabelNames = append(opts.IncludedLabelNames, label.Name) + } + } + } + } + + return opts, nil +} diff --git a/modules/indexer/issues/dboptions.go b/modules/indexer/issues/dboptions.go new file mode 100644 index 00000000..50916024 --- /dev/null +++ b/modules/indexer/issues/dboptions.go @@ -0,0 +1,100 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package issues + +import ( + "code.gitea.io/gitea/models/db" + issues_model "code.gitea.io/gitea/models/issues" + "code.gitea.io/gitea/modules/optional" +) + +func ToSearchOptions(keyword string, opts *issues_model.IssuesOptions) *SearchOptions { + searchOpt := &SearchOptions{ + Keyword: keyword, + RepoIDs: opts.RepoIDs, + AllPublic: opts.AllPublic, + IsPull: opts.IsPull, + IsClosed: opts.IsClosed, + } + + if len(opts.LabelIDs) == 1 && opts.LabelIDs[0] == 0 { + searchOpt.NoLabelOnly = true + } else { + for _, labelID := range opts.LabelIDs { + if labelID > 0 { + searchOpt.IncludedLabelIDs = append(searchOpt.IncludedLabelIDs, labelID) + } else { + searchOpt.ExcludedLabelIDs = append(searchOpt.ExcludedLabelIDs, -labelID) + } + } + // opts.IncludedLabelNames and opts.ExcludedLabelNames are not supported here. + // It's not a TO DO, it's just unnecessary. + } + + if len(opts.MilestoneIDs) == 1 && opts.MilestoneIDs[0] == db.NoConditionID { + searchOpt.MilestoneIDs = []int64{0} + } else { + searchOpt.MilestoneIDs = opts.MilestoneIDs + } + + if opts.ProjectID > 0 { + searchOpt.ProjectID = optional.Some(opts.ProjectID) + } else if opts.ProjectID == -1 { // FIXME: this is inconsistent from other places + searchOpt.ProjectID = optional.Some[int64](0) // Those issues with no project(projectid==0) + } + + // See the comment of issues_model.SearchOptions for the reason why we need to convert + convertID := func(id int64) optional.Option[int64] { + if id > 0 { + return optional.Some(id) + } + if id == db.NoConditionID { + return optional.None[int64]() + } + return nil + } + + searchOpt.ProjectColumnID = convertID(opts.ProjectColumnID) + searchOpt.PosterID = convertID(opts.PosterID) + searchOpt.AssigneeID = convertID(opts.AssigneeID) + searchOpt.MentionID = convertID(opts.MentionedID) + searchOpt.ReviewedID = convertID(opts.ReviewedID) + searchOpt.ReviewRequestedID = convertID(opts.ReviewRequestedID) + searchOpt.SubscriberID = convertID(opts.SubscriberID) + + if opts.UpdatedAfterUnix > 0 { + searchOpt.UpdatedAfterUnix = optional.Some(opts.UpdatedAfterUnix) + } + if opts.UpdatedBeforeUnix > 0 { + searchOpt.UpdatedBeforeUnix = optional.Some(opts.UpdatedBeforeUnix) + } + + searchOpt.Paginator = opts.Paginator + + switch opts.SortType { + case "", "latest": + searchOpt.SortBy = SortByCreatedDesc + case "oldest": + searchOpt.SortBy = SortByCreatedAsc + case "recentupdate": + searchOpt.SortBy = SortByUpdatedDesc + case "leastupdate": + searchOpt.SortBy = SortByUpdatedAsc + case "mostcomment": + searchOpt.SortBy = SortByCommentsDesc + case "leastcomment": + searchOpt.SortBy = SortByCommentsAsc + case "nearduedate": + searchOpt.SortBy = SortByDeadlineAsc + case "farduedate": + searchOpt.SortBy = SortByDeadlineDesc + case "priority", "priorityrepo", "project-column-sorting": + // Unsupported sort type for search + fallthrough + default: + searchOpt.SortBy = SortByUpdatedDesc + } + + return searchOpt +} diff --git a/modules/indexer/issues/elasticsearch/elasticsearch.go b/modules/indexer/issues/elasticsearch/elasticsearch.go new file mode 100644 index 00000000..42e709a5 --- /dev/null +++ b/modules/indexer/issues/elasticsearch/elasticsearch.go @@ -0,0 +1,290 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "context" + "fmt" + "strconv" + "strings" + + "code.gitea.io/gitea/modules/graceful" + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + + "github.com/olivere/elastic/v7" +) + +const ( + issueIndexerLatestVersion = 1 + // multi-match-types, currently only 2 types are used + // Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types + esMultiMatchTypeBestFields = "best_fields" + esMultiMatchTypePhrasePrefix = "phrase_prefix" +) + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_elasticsearch.Indexer + indexer_internal.Indexer // do not composite inner_elasticsearch.Indexer directly to avoid exposing too much +} + +// NewIndexer creates a new elasticsearch indexer +func NewIndexer(url, indexerName string) *Indexer { + inner := inner_elasticsearch.NewIndexer(url, indexerName, issueIndexerLatestVersion, defaultMapping) + indexer := &Indexer{ + inner: inner, + Indexer: inner, + } + return indexer +} + +const ( + defaultMapping = ` +{ + "mappings": { + "properties": { + "id": { "type": "long", "index": true }, + "repo_id": { "type": "long", "index": true }, + "is_public": { "type": "boolean", "index": true }, + + "title": { "type": "text", "index": true }, + "content": { "type": "text", "index": true }, + "comments": { "type" : "text", "index": true }, + + "is_pull": { "type": "boolean", "index": true }, + "is_closed": { "type": "boolean", "index": true }, + "label_ids": { "type": "long", "index": true }, + "no_label": { "type": "boolean", "index": true }, + "milestone_id": { "type": "long", "index": true }, + "project_id": { "type": "long", "index": true }, + "project_board_id": { "type": "long", "index": true }, + "poster_id": { "type": "long", "index": true }, + "assignee_id": { "type": "long", "index": true }, + "mention_ids": { "type": "long", "index": true }, + "reviewed_ids": { "type": "long", "index": true }, + "review_requested_ids": { "type": "long", "index": true }, + "subscriber_ids": { "type": "long", "index": true }, + "updated_unix": { "type": "long", "index": true }, + + "created_unix": { "type": "long", "index": true }, + "deadline_unix": { "type": "long", "index": true }, + "comment_count": { "type": "long", "index": true } + } + } +} +` +) + +// Index will save the index data +func (b *Indexer) Index(ctx context.Context, issues ...*internal.IndexerData) error { + if len(issues) == 0 { + return nil + } else if len(issues) == 1 { + issue := issues[0] + _, err := b.inner.Client.Index(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", issue.ID)). + BodyJson(issue). + Do(ctx) + return err + } + + reqs := make([]elastic.BulkableRequest, 0) + for _, issue := range issues { + reqs = append(reqs, + elastic.NewBulkIndexRequest(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", issue.ID)). + Doc(issue), + ) + } + + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). + Add(reqs...). + Do(graceful.GetManager().HammerContext()) + return err +} + +// Delete deletes indexes by ids +func (b *Indexer) Delete(ctx context.Context, ids ...int64) error { + if len(ids) == 0 { + return nil + } else if len(ids) == 1 { + _, err := b.inner.Client.Delete(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", ids[0])). + Do(ctx) + return err + } + + reqs := make([]elastic.BulkableRequest, 0) + for _, id := range ids { + reqs = append(reqs, + elastic.NewBulkDeleteRequest(). + Index(b.inner.VersionedIndexName()). + Id(fmt.Sprintf("%d", id)), + ) + } + + _, err := b.inner.Client.Bulk(). + Index(b.inner.VersionedIndexName()). + Add(reqs...). + Do(graceful.GetManager().HammerContext()) + return err +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (*internal.SearchResult, error) { + query := elastic.NewBoolQuery() + + if options.Keyword != "" { + searchType := esMultiMatchTypePhrasePrefix + if options.IsFuzzyKeyword { + searchType = esMultiMatchTypeBestFields + } + + query.Must(elastic.NewMultiMatchQuery(options.Keyword, "title", "content", "comments").Type(searchType)) + } + + if len(options.RepoIDs) > 0 { + q := elastic.NewBoolQuery() + q.Should(elastic.NewTermsQuery("repo_id", toAnySlice(options.RepoIDs)...)) + if options.AllPublic { + q.Should(elastic.NewTermQuery("is_public", true)) + } + query.Must(q) + } + + if options.IsPull.Has() { + query.Must(elastic.NewTermQuery("is_pull", options.IsPull.Value())) + } + if options.IsClosed.Has() { + query.Must(elastic.NewTermQuery("is_closed", options.IsClosed.Value())) + } + + if options.NoLabelOnly { + query.Must(elastic.NewTermQuery("no_label", true)) + } else { + if len(options.IncludedLabelIDs) > 0 { + q := elastic.NewBoolQuery() + for _, labelID := range options.IncludedLabelIDs { + q.Must(elastic.NewTermQuery("label_ids", labelID)) + } + query.Must(q) + } else if len(options.IncludedAnyLabelIDs) > 0 { + query.Must(elastic.NewTermsQuery("label_ids", toAnySlice(options.IncludedAnyLabelIDs)...)) + } + if len(options.ExcludedLabelIDs) > 0 { + q := elastic.NewBoolQuery() + for _, labelID := range options.ExcludedLabelIDs { + q.MustNot(elastic.NewTermQuery("label_ids", labelID)) + } + query.Must(q) + } + } + + if len(options.MilestoneIDs) > 0 { + query.Must(elastic.NewTermsQuery("milestone_id", toAnySlice(options.MilestoneIDs)...)) + } + + if options.ProjectID.Has() { + query.Must(elastic.NewTermQuery("project_id", options.ProjectID.Value())) + } + if options.ProjectColumnID.Has() { + query.Must(elastic.NewTermQuery("project_board_id", options.ProjectColumnID.Value())) + } + + if options.PosterID.Has() { + query.Must(elastic.NewTermQuery("poster_id", options.PosterID.Value())) + } + + if options.AssigneeID.Has() { + query.Must(elastic.NewTermQuery("assignee_id", options.AssigneeID.Value())) + } + + if options.MentionID.Has() { + query.Must(elastic.NewTermQuery("mention_ids", options.MentionID.Value())) + } + + if options.ReviewedID.Has() { + query.Must(elastic.NewTermQuery("reviewed_ids", options.ReviewedID.Value())) + } + if options.ReviewRequestedID.Has() { + query.Must(elastic.NewTermQuery("review_requested_ids", options.ReviewRequestedID.Value())) + } + + if options.SubscriberID.Has() { + query.Must(elastic.NewTermQuery("subscriber_ids", options.SubscriberID.Value())) + } + + if options.UpdatedAfterUnix.Has() || options.UpdatedBeforeUnix.Has() { + q := elastic.NewRangeQuery("updated_unix") + if options.UpdatedAfterUnix.Has() { + q.Gte(options.UpdatedAfterUnix.Value()) + } + if options.UpdatedBeforeUnix.Has() { + q.Lte(options.UpdatedBeforeUnix.Value()) + } + query.Must(q) + } + + if options.SortBy == "" { + options.SortBy = internal.SortByCreatedAsc + } + sortBy := []elastic.Sorter{ + parseSortBy(options.SortBy), + elastic.NewFieldSort("id").Desc(), + } + + // See https://stackoverflow.com/questions/35206409/elasticsearch-2-1-result-window-is-too-large-index-max-result-window/35221900 + // TODO: make it configurable since it's configurable in elasticsearch + const maxPageSize = 10000 + + skip, limit := indexer_internal.ParsePaginator(options.Paginator, maxPageSize) + searchResult, err := b.inner.Client.Search(). + Index(b.inner.VersionedIndexName()). + Query(query). + SortBy(sortBy...). + From(skip).Size(limit). + Do(ctx) + if err != nil { + return nil, err + } + + hits := make([]internal.Match, 0, limit) + for _, hit := range searchResult.Hits.Hits { + id, _ := strconv.ParseInt(hit.Id, 10, 64) + hits = append(hits, internal.Match{ + ID: id, + }) + } + + return &internal.SearchResult{ + Total: searchResult.TotalHits(), + Hits: hits, + }, nil +} + +func toAnySlice[T any](s []T) []any { + ret := make([]any, 0, len(s)) + for _, item := range s { + ret = append(ret, item) + } + return ret +} + +func parseSortBy(sortBy internal.SortBy) elastic.Sorter { + field := strings.TrimPrefix(string(sortBy), "-") + ret := elastic.NewFieldSort(field) + if strings.HasPrefix(string(sortBy), "-") { + ret.Desc() + } + return ret +} diff --git a/modules/indexer/issues/elasticsearch/elasticsearch_test.go b/modules/indexer/issues/elasticsearch/elasticsearch_test.go new file mode 100644 index 00000000..4ed0b844 --- /dev/null +++ b/modules/indexer/issues/elasticsearch/elasticsearch_test.go @@ -0,0 +1,48 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package elasticsearch + +import ( + "fmt" + "net/http" + "os" + "testing" + "time" + + "code.gitea.io/gitea/modules/indexer/issues/internal/tests" +) + +func TestElasticsearchIndexer(t *testing.T) { + // The elasticsearch instance started by testing.yml > test-unit > services > elasticsearch + url := "http://elastic:changeme@elasticsearch:9200" + + if os.Getenv("CI") == "" { + // Make it possible to run tests against a local elasticsearch instance + url = os.Getenv("TEST_ELASTICSEARCH_URL") + if url == "" { + t.Skip("TEST_ELASTICSEARCH_URL not set and not running in CI") + return + } + } + + ok := false + for i := 0; i < 60; i++ { + resp, err := http.Get(url) + if err == nil && resp.StatusCode == http.StatusOK { + ok = true + break + } + t.Logf("Waiting for elasticsearch to be up: %v", err) + time.Sleep(time.Second) + } + if !ok { + t.Fatalf("Failed to wait for elasticsearch to be up") + return + } + + indexer := NewIndexer(url, fmt.Sprintf("test_elasticsearch_indexer_%d", time.Now().Unix())) + defer indexer.Close() + + tests.TestIndexer(t, indexer) +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go new file mode 100644 index 00000000..d7310529 --- /dev/null +++ b/modules/indexer/issues/indexer.go @@ -0,0 +1,315 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package issues + +import ( + "context" + "fmt" + "os" + "runtime/pprof" + "sync/atomic" + "time" + + db_model "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer/issues/bleve" + "code.gitea.io/gitea/modules/indexer/issues/db" + "code.gitea.io/gitea/modules/indexer/issues/elasticsearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/indexer/issues/meilisearch" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/optional" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" +) + +// IndexerMetadata is used to send data to the queue, so it contains only the ids. +// It may look weird, because it has to be compatible with the old queue data format. +// If the IsDelete flag is true, the IDs specify the issues to delete from the index without querying the database. +// If the IsDelete flag is false, the ID specify the issue to index, so Indexer will query the database to get the issue data. +// It should be noted that if the id is not existing in the database, it's index will be deleted too even if IsDelete is false. +// Valid values: +// - IsDelete = true, IDs = [1, 2, 3], and ID will be ignored +// - IsDelete = false, ID = 1, and IDs will be ignored +type IndexerMetadata struct { + ID int64 `json:"id"` + + IsDelete bool `json:"is_delete"` + IDs []int64 `json:"ids"` +} + +var ( + // issueIndexerQueue queue of issue ids to be updated + issueIndexerQueue *queue.WorkerPoolQueue[*IndexerMetadata] + // globalIndexer is the global indexer, it cannot be nil. + // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready. + // So it's always safe use it as *globalIndexer.Load() and call its methods. + globalIndexer atomic.Pointer[internal.Indexer] + dummyIndexer *internal.Indexer +) + +func init() { + i := internal.NewDummyIndexer() + dummyIndexer = &i + globalIndexer.Store(dummyIndexer) +} + +// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until +// all issue index done. +func InitIssueIndexer(syncReindex bool) { + ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false) + + indexerInitWaitChannel := make(chan time.Duration, 1) + + // Create the Queue + issueIndexerQueue = queue.CreateUniqueQueue(ctx, "issue_indexer", getIssueIndexerQueueHandler(ctx)) + + graceful.GetManager().RunAtTerminate(finished) + + // Create the Indexer + go func() { + pprof.SetGoroutineLabels(ctx) + start := time.Now() + log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType) + var ( + issueIndexer internal.Indexer + existed bool + err error + ) + switch setting.Indexer.IssueType { + case "bleve": + defer func() { + if err := recover(); err != nil { + log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2)) + log.Error("The indexer files are likely corrupted and may need to be deleted") + log.Error("You can completely remove the %q directory to make Forgejo recreate the indexes", setting.Indexer.IssuePath) + globalIndexer.Store(dummyIndexer) + log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err) + } + }() + issueIndexer = bleve.NewIndexer(setting.Indexer.IssuePath) + existed, err = issueIndexer.Init(ctx) + if err != nil { + log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err) + } + case "elasticsearch": + issueIndexer = elasticsearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) + if err != nil { + log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) + } + case "db": + issueIndexer = db.NewIndexer() + case "meilisearch": + issueIndexer = meilisearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName) + existed, err = issueIndexer.Init(ctx) + if err != nil { + log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err) + } + default: + log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) + } + globalIndexer.Store(&issueIndexer) + + graceful.GetManager().RunAtTerminate(func() { + log.Debug("Closing issue indexer") + (*globalIndexer.Load()).Close() + log.Info("PID: %d Issue Indexer closed", os.Getpid()) + }) + + // Start processing the queue + go graceful.GetManager().RunWithCancel(issueIndexerQueue) + + // Populate the index + if !existed { + if syncReindex { + graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) + } else { + go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) + } + } + + indexerInitWaitChannel <- time.Since(start) + close(indexerInitWaitChannel) + }() + + if syncReindex { + select { + case <-indexerInitWaitChannel: + case <-graceful.GetManager().IsShutdown(): + } + } else if setting.Indexer.StartupTimeout > 0 { + go func() { + pprof.SetGoroutineLabels(ctx) + timeout := setting.Indexer.StartupTimeout + if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-indexerInitWaitChannel: + log.Info("Issue Indexer Initialization took %v", duration) + case <-graceful.GetManager().IsShutdown(): + log.Warn("Shutdown occurred before issue index initialisation was complete") + case <-time.After(timeout): + issueIndexerQueue.ShutdownWait(5 * time.Second) + log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) + } + }() + } +} + +func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMetadata) []*IndexerMetadata { + return func(items ...*IndexerMetadata) []*IndexerMetadata { + var unhandled []*IndexerMetadata + + indexer := *globalIndexer.Load() + for _, item := range items { + log.Trace("IndexerMetadata Process: %d %v %t", item.ID, item.IDs, item.IsDelete) + if item.IsDelete { + if err := indexer.Delete(ctx, item.IDs...); err != nil { + log.Error("Issue indexer handler: failed to from index: %v Error: %v", item.IDs, err) + unhandled = append(unhandled, item) + } + continue + } + data, existed, err := getIssueIndexerData(ctx, item.ID) + if err != nil { + log.Error("Issue indexer handler: failed to get issue data of %d: %v", item.ID, err) + unhandled = append(unhandled, item) + continue + } + if !existed { + if err := indexer.Delete(ctx, item.ID); err != nil { + log.Error("Issue indexer handler: failed to delete issue %d from index: %v", item.ID, err) + unhandled = append(unhandled, item) + } + continue + } + if err := indexer.Index(ctx, data); err != nil { + log.Error("Issue indexer handler: failed to index issue %d: %v", item.ID, err) + unhandled = append(unhandled, item) + continue + } + } + + return unhandled + } +} + +// populateIssueIndexer populate the issue indexer with issue data +func populateIssueIndexer(ctx context.Context) { + ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true) + defer finished() + ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task + if err := PopulateIssueIndexer(ctx); err != nil { + log.Error("Issue indexer population failed: %v", err) + } +} + +func PopulateIssueIndexer(ctx context.Context) error { + for page := 1; ; page++ { + select { + case <-ctx.Done(): + return fmt.Errorf("shutdown before completion: %w", ctx.Err()) + default: + } + repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{ + ListOptions: db_model.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize}, + OrderBy: db_model.SearchOrderByID, + Private: true, + Collaborate: optional.Some(false), + }) + if err != nil { + log.Error("SearchRepositoryByName: %v", err) + continue + } + if len(repos) == 0 { + log.Debug("Issue Indexer population complete") + return nil + } + + for _, repo := range repos { + if err := updateRepoIndexer(ctx, repo.ID); err != nil { + return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err) + } + } + } +} + +// UpdateRepoIndexer add/update all issues of the repositories +func UpdateRepoIndexer(ctx context.Context, repoID int64) { + if err := updateRepoIndexer(ctx, repoID); err != nil { + log.Error("Unable to push repo %d to issue indexer: %v", repoID, err) + } +} + +// UpdateIssueIndexer add/update an issue to the issue indexer +func UpdateIssueIndexer(ctx context.Context, issueID int64) { + if err := updateIssueIndexer(ctx, issueID); err != nil { + log.Error("Unable to push issue %d to issue indexer: %v", issueID, err) + } +} + +// DeleteRepoIssueIndexer deletes repo's all issues indexes +func DeleteRepoIssueIndexer(ctx context.Context, repoID int64) { + if err := deleteRepoIssueIndexer(ctx, repoID); err != nil { + log.Error("Unable to push deleted repo %d to issue indexer: %v", repoID, err) + } +} + +// IsAvailable checks if issue indexer is available +func IsAvailable(ctx context.Context) bool { + return (*globalIndexer.Load()).Ping(ctx) == nil +} + +// SearchOptions indicates the options for searching issues +type SearchOptions = internal.SearchOptions + +const ( + SortByCreatedDesc = internal.SortByCreatedDesc + SortByUpdatedDesc = internal.SortByUpdatedDesc + SortByCommentsDesc = internal.SortByCommentsDesc + SortByDeadlineDesc = internal.SortByDeadlineDesc + SortByCreatedAsc = internal.SortByCreatedAsc + SortByUpdatedAsc = internal.SortByUpdatedAsc + SortByCommentsAsc = internal.SortByCommentsAsc + SortByDeadlineAsc = internal.SortByDeadlineAsc +) + +// SearchIssues search issues by options. +func SearchIssues(ctx context.Context, opts *SearchOptions) ([]int64, int64, error) { + indexer := *globalIndexer.Load() + + if opts.Keyword == "" { + // This is a conservative shortcut. + // If the keyword is empty, db has better (at least not worse) performance to filter issues. + // When the keyword is empty, it tends to listing rather than searching issues. + // So if the user creates an issue and list issues immediately, the issue may not be listed because the indexer needs time to index the issue. + // Even worse, the external indexer like elastic search may not be available for a while, + // and the user may not be able to list issues completely until it is available again. + indexer = db.NewIndexer() + } + + result, err := indexer.Search(ctx, opts) + if err != nil { + return nil, 0, err + } + + ret := make([]int64, 0, len(result.Hits)) + for _, hit := range result.Hits { + ret = append(ret, hit.ID) + } + + return ret, result.Total, nil +} + +// CountIssues counts issues by options. It is a shortcut of SearchIssues(ctx, opts) but only returns the total count. +func CountIssues(ctx context.Context, opts *SearchOptions) (int64, error) { + opts = opts.Copy(func(options *SearchOptions) { options.Paginator = &db_model.ListOptions{PageSize: 0} }) + + _, total, err := SearchIssues(ctx, opts) + return total, err +} diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go new file mode 100644 index 00000000..f43ddca0 --- /dev/null +++ b/modules/indexer/issues/indexer_test.go @@ -0,0 +1,404 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package issues + +import ( + "context" + "testing" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/optional" + "code.gitea.io/gitea/modules/setting" + + _ "code.gitea.io/gitea/models" + _ "code.gitea.io/gitea/models/actions" + _ "code.gitea.io/gitea/models/activities" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + unittest.MainTest(m) +} + +func TestDBSearchIssues(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + setting.Indexer.IssueType = "db" + InitIssueIndexer(true) + + t.Run("search issues with keyword", searchIssueWithKeyword) + t.Run("search issues in repo", searchIssueInRepo) + t.Run("search issues by ID", searchIssueByID) + t.Run("search issues is pr", searchIssueIsPull) + t.Run("search issues is closed", searchIssueIsClosed) + t.Run("search issues by milestone", searchIssueByMilestoneID) + t.Run("search issues by label", searchIssueByLabelID) + t.Run("search issues by time", searchIssueByTime) + t.Run("search issues with order", searchIssueWithOrder) + t.Run("search issues in project", searchIssueInProject) + t.Run("search issues with paginator", searchIssueWithPaginator) +} + +func searchIssueWithKeyword(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + Keyword: "issue2", + RepoIDs: []int64{1}, + }, + []int64{2}, + }, + { + SearchOptions{ + Keyword: "first", + RepoIDs: []int64{1}, + }, + []int64{1}, + }, + { + SearchOptions{ + Keyword: "for", + RepoIDs: []int64{1}, + }, + []int64{11, 5, 3, 2, 1}, + }, + { + SearchOptions{ + Keyword: "good", + RepoIDs: []int64{1}, + }, + []int64{1}, + }, + } + + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueInRepo(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + RepoIDs: []int64{1}, + }, + []int64{11, 5, 3, 2, 1}, + }, + { + SearchOptions{ + RepoIDs: []int64{2}, + }, + []int64{7, 4}, + }, + { + SearchOptions{ + RepoIDs: []int64{3}, + }, + []int64{12, 6}, + }, + { + SearchOptions{ + RepoIDs: []int64{4}, + }, + []int64{}, + }, + { + SearchOptions{ + RepoIDs: []int64{5}, + }, + []int64{15}, + }, + } + + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueByID(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + opts: SearchOptions{ + PosterID: optional.Some(int64(1)), + }, + expectedIDs: []int64{11, 6, 3, 2, 1}, + }, + { + opts: SearchOptions{ + AssigneeID: optional.Some(int64(1)), + }, + expectedIDs: []int64{6, 1}, + }, + { + opts: SearchOptions{ + MentionID: optional.Some(int64(4)), + }, + expectedIDs: []int64{1}, + }, + { + opts: SearchOptions{ + ReviewedID: optional.Some(int64(1)), + }, + expectedIDs: []int64{}, + }, + { + opts: SearchOptions{ + ReviewRequestedID: optional.Some(int64(1)), + }, + expectedIDs: []int64{12}, + }, + { + opts: SearchOptions{ + SubscriberID: optional.Some(int64(1)), + }, + expectedIDs: []int64{11, 6, 5, 3, 2, 1}, + }, + { + // issue 20 request user 15 and team 5 which user 15 belongs to + // the review request number of issue 20 should be 1 + opts: SearchOptions{ + ReviewRequestedID: optional.Some(int64(15)), + }, + expectedIDs: []int64{12, 20}, + }, + { + // user 20 approved the issue 20, so return nothing + opts: SearchOptions{ + ReviewRequestedID: optional.Some(int64(20)), + }, + expectedIDs: []int64{}, + }, + } + + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueIsPull(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + IsPull: optional.Some(false), + }, + []int64{17, 16, 15, 14, 13, 6, 5, 18, 10, 7, 4, 1}, + }, + { + SearchOptions{ + IsPull: optional.Some(true), + }, + []int64{22, 21, 12, 11, 20, 19, 9, 8, 3, 2}, + }, + } + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueIsClosed(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + IsClosed: optional.Some(false), + }, + []int64{22, 21, 17, 16, 15, 14, 13, 12, 11, 20, 6, 19, 18, 10, 7, 9, 8, 3, 2, 1}, + }, + { + SearchOptions{ + IsClosed: optional.Some(true), + }, + []int64{5, 4}, + }, + } + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueByMilestoneID(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + MilestoneIDs: []int64{1}, + }, + []int64{2}, + }, + { + SearchOptions{ + MilestoneIDs: []int64{3}, + }, + []int64{3}, + }, + } + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueByLabelID(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + IncludedLabelIDs: []int64{1}, + }, + []int64{2, 1}, + }, + { + SearchOptions{ + IncludedLabelIDs: []int64{4}, + }, + []int64{2}, + }, + { + SearchOptions{ + ExcludedLabelIDs: []int64{1}, + }, + []int64{22, 21, 17, 16, 15, 14, 13, 12, 11, 20, 6, 5, 19, 18, 10, 7, 4, 9, 8, 3}, + }, + } + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueByTime(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + UpdatedAfterUnix: optional.Some(int64(0)), + }, + []int64{22, 21, 17, 16, 15, 14, 13, 12, 11, 20, 6, 5, 19, 18, 10, 7, 4, 9, 8, 3, 2, 1}, + }, + } + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueWithOrder(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + SortBy: internal.SortByCreatedAsc, + }, + []int64{1, 2, 3, 8, 9, 4, 7, 10, 18, 19, 5, 6, 20, 11, 12, 13, 14, 15, 16, 17, 21, 22}, + }, + } + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueInProject(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + }{ + { + SearchOptions{ + ProjectID: optional.Some(int64(1)), + }, + []int64{5, 3, 2, 1}, + }, + { + SearchOptions{ + ProjectColumnID: optional.Some(int64(1)), + }, + []int64{1}, + }, + { + SearchOptions{ + ProjectColumnID: optional.Some(int64(0)), // issue with in default column + }, + []int64{2}, + }, + } + for _, test := range tests { + issueIDs, _, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + } +} + +func searchIssueWithPaginator(t *testing.T) { + tests := []struct { + opts SearchOptions + expectedIDs []int64 + expectedTotal int64 + }{ + { + SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + }, + []int64{22, 21, 17, 16, 15}, + 22, + }, + } + for _, test := range tests { + issueIDs, total, err := SearchIssues(context.TODO(), &test.opts) + require.NoError(t, err) + + assert.Equal(t, test.expectedIDs, issueIDs) + assert.Equal(t, test.expectedTotal, total) + } +} diff --git a/modules/indexer/issues/internal/indexer.go b/modules/indexer/issues/internal/indexer.go new file mode 100644 index 00000000..95740bc5 --- /dev/null +++ b/modules/indexer/issues/internal/indexer.go @@ -0,0 +1,42 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "context" + "fmt" + + "code.gitea.io/gitea/modules/indexer/internal" +) + +// Indexer defines an interface to indexer issues contents +type Indexer interface { + internal.Indexer + Index(ctx context.Context, issue ...*IndexerData) error + Delete(ctx context.Context, ids ...int64) error + Search(ctx context.Context, options *SearchOptions) (*SearchResult, error) +} + +// NewDummyIndexer returns a dummy indexer +func NewDummyIndexer() Indexer { + return &dummyIndexer{ + Indexer: internal.NewDummyIndexer(), + } +} + +type dummyIndexer struct { + internal.Indexer +} + +func (d *dummyIndexer) Index(_ context.Context, _ ...*IndexerData) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Delete(_ context.Context, _ ...int64) error { + return fmt.Errorf("indexer is not ready") +} + +func (d *dummyIndexer) Search(_ context.Context, _ *SearchOptions) (*SearchResult, error) { + return nil, fmt.Errorf("indexer is not ready") +} diff --git a/modules/indexer/issues/internal/model.go b/modules/indexer/issues/internal/model.go new file mode 100644 index 00000000..2dfee8b7 --- /dev/null +++ b/modules/indexer/issues/internal/model.go @@ -0,0 +1,150 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package internal + +import ( + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/optional" + "code.gitea.io/gitea/modules/timeutil" +) + +// IndexerData data stored in the issue indexer +type IndexerData struct { + ID int64 `json:"id"` + RepoID int64 `json:"repo_id"` + IsPublic bool `json:"is_public"` // If the repo is public + + // Fields used for keyword searching + Title string `json:"title"` + Content string `json:"content"` + Comments []string `json:"comments"` + + // Fields used for filtering + IsPull bool `json:"is_pull"` + IsClosed bool `json:"is_closed"` + LabelIDs []int64 `json:"label_ids"` + NoLabel bool `json:"no_label"` // True if LabelIDs is empty + MilestoneID int64 `json:"milestone_id"` + ProjectID int64 `json:"project_id"` + ProjectColumnID int64 `json:"project_board_id"` // the key should be kept as project_board_id to keep compatible + PosterID int64 `json:"poster_id"` + AssigneeID int64 `json:"assignee_id"` + MentionIDs []int64 `json:"mention_ids"` + ReviewedIDs []int64 `json:"reviewed_ids"` + ReviewRequestedIDs []int64 `json:"review_requested_ids"` + SubscriberIDs []int64 `json:"subscriber_ids"` + UpdatedUnix timeutil.TimeStamp `json:"updated_unix"` + + // Fields used for sorting + // UpdatedUnix is both used for filtering and sorting. + // ID is used for sorting too, to make the sorting stable. + CreatedUnix timeutil.TimeStamp `json:"created_unix"` + DeadlineUnix timeutil.TimeStamp `json:"deadline_unix"` + CommentCount int64 `json:"comment_count"` +} + +// Match represents on search result +type Match struct { + ID int64 `json:"id"` + Score float64 `json:"score"` +} + +// SearchResult represents search results +type SearchResult struct { + Total int64 + Hits []Match +} + +// SearchOptions represents search options. +// +// It has a slightly different design from database query options. +// In database query options, a field is never a pointer, so it could be confusing when it's zero value: +// Do you want to find data with a field value of 0, or do you not specify the field in the options? +// To avoid this confusion, db introduced db.NoConditionID(-1). +// So zero value means the field is not specified in the search options, and db.NoConditionID means "== 0" or "id NOT IN (SELECT id FROM ...)" +// It's still not ideal, it trapped developers many times. +// And sometimes -1 could be a valid value, like issue ID, negative numbers indicate exclusion. +// Since db.NoConditionID is for "db" (the package name is db), it makes sense not to use it in the indexer: +// Why do bleve/elasticsearch/meilisearch indexers need to know about db.NoConditionID? +// So in SearchOptions, we use pointer for fields which could be not specified, +// and always use the value to filter if it's not nil, even if it's zero or negative. +// It can handle almost all cases, if there is an exception, we can add a new field, like NoLabelOnly. +// Unfortunately, we still use db for the indexer and have to convert between db.NoConditionID and nil for legacy reasons. +type SearchOptions struct { + Keyword string // keyword to search + + IsFuzzyKeyword bool // if false the levenshtein distance is 0 + + RepoIDs []int64 // repository IDs which the issues belong to + AllPublic bool // if include all public repositories + + IsPull optional.Option[bool] // if the issues is a pull request + IsClosed optional.Option[bool] // if the issues is closed + + IncludedLabelIDs []int64 // labels the issues have + ExcludedLabelIDs []int64 // labels the issues don't have + IncludedAnyLabelIDs []int64 // labels the issues have at least one. It will be ignored if IncludedLabelIDs is not empty. It's an uncommon filter, but it has been supported accidentally by issues.IssuesOptions.IncludedLabelNames. + NoLabelOnly bool // if the issues have no label, if true, IncludedLabelIDs and ExcludedLabelIDs, IncludedAnyLabelIDs will be ignored + + MilestoneIDs []int64 // milestones the issues have + + ProjectID optional.Option[int64] // project the issues belong to + ProjectColumnID optional.Option[int64] // project column the issues belong to + + PosterID optional.Option[int64] // poster of the issues + + AssigneeID optional.Option[int64] // assignee of the issues, zero means no assignee + + MentionID optional.Option[int64] // mentioned user of the issues + + ReviewedID optional.Option[int64] // reviewer of the issues + ReviewRequestedID optional.Option[int64] // requested reviewer of the issues + + SubscriberID optional.Option[int64] // subscriber of the issues + + UpdatedAfterUnix optional.Option[int64] + UpdatedBeforeUnix optional.Option[int64] + + Paginator *db.ListOptions + + SortBy SortBy // sort by field +} + +// Copy returns a copy of the options. +// Be careful, it's not a deep copy, so `SearchOptions.RepoIDs = {...}` is OK while `SearchOptions.RepoIDs[0] = ...` is not. +func (o *SearchOptions) Copy(edit ...func(options *SearchOptions)) *SearchOptions { + if o == nil { + return nil + } + v := *o + for _, e := range edit { + e(&v) + } + return &v +} + +type SortBy string + +const ( + SortByCreatedDesc SortBy = "-created_unix" + SortByUpdatedDesc SortBy = "-updated_unix" + SortByCommentsDesc SortBy = "-comment_count" + SortByDeadlineDesc SortBy = "-deadline_unix" + SortByCreatedAsc SortBy = "created_unix" + SortByUpdatedAsc SortBy = "updated_unix" + SortByCommentsAsc SortBy = "comment_count" + SortByDeadlineAsc SortBy = "deadline_unix" + // Unsupported sort types which are supported by issues.IssuesOptions.SortType: + // + // - "priorityrepo": + // It's impossible to support it in the indexer. + // It is based on the specified repository in the request, so we cannot add static field to the indexer. + // If we do something like that query the issues in the specified repository first then append other issues, + // it will break the pagination. + // + // - "project-column-sorting": + // Although it's possible to support it by adding project.ProjectIssue.Sorting to the indexer, + // but what if the issue belongs to multiple projects? + // Since it's unsupported to search issues with keyword in project page, we don't need to support it. +) diff --git a/modules/indexer/issues/internal/tests/tests.go b/modules/indexer/issues/internal/tests/tests.go new file mode 100644 index 00000000..a93b2913 --- /dev/null +++ b/modules/indexer/issues/internal/tests/tests.go @@ -0,0 +1,771 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// This package contains tests for issues indexer modules. +// All the code in this package is only used for testing. +// Do not put any production code in this package to avoid it being included in the final binary. + +package tests + +import ( + "context" + "fmt" + "slices" + "testing" + "time" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/optional" + "code.gitea.io/gitea/modules/timeutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIndexer(t *testing.T, indexer internal.Indexer) { + _, err := indexer.Init(context.Background()) + require.NoError(t, err) + + require.NoError(t, indexer.Ping(context.Background())) + + var ( + ids []int64 + data = map[int64]*internal.IndexerData{} + ) + { + d := generateDefaultIndexerData() + for _, v := range d { + ids = append(ids, v.ID) + data[v.ID] = v + } + require.NoError(t, indexer.Index(context.Background(), d...)) + require.NoError(t, waitData(indexer, int64(len(data)))) + } + + defer func() { + require.NoError(t, indexer.Delete(context.Background(), ids...)) + }() + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + if len(c.ExtraData) > 0 { + require.NoError(t, indexer.Index(context.Background(), c.ExtraData...)) + for _, v := range c.ExtraData { + data[v.ID] = v + } + require.NoError(t, waitData(indexer, int64(len(data)))) + defer func() { + for _, v := range c.ExtraData { + require.NoError(t, indexer.Delete(context.Background(), v.ID)) + delete(data, v.ID) + } + require.NoError(t, waitData(indexer, int64(len(data)))) + }() + } + + result, err := indexer.Search(context.Background(), c.SearchOptions) + require.NoError(t, err) + + if c.Expected != nil { + c.Expected(t, data, result) + } else { + ids := make([]int64, 0, len(result.Hits)) + for _, hit := range result.Hits { + ids = append(ids, hit.ID) + } + assert.Equal(t, c.ExpectedIDs, ids) + assert.Equal(t, c.ExpectedTotal, result.Total) + } + + // test counting + c.SearchOptions.Paginator = &db.ListOptions{PageSize: 0} + countResult, err := indexer.Search(context.Background(), c.SearchOptions) + require.NoError(t, err) + assert.Empty(t, countResult.Hits) + assert.Equal(t, result.Total, countResult.Total) + }) + } +} + +var cases = []*testIndexerCase{ + { + Name: "default", + SearchOptions: &internal.SearchOptions{}, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + }, + }, + { + Name: "empty", + SearchOptions: &internal.SearchOptions{ + Keyword: "f1dfac73-fda6-4a6b-b8a4-2408fcb8ef69", + }, + ExpectedIDs: []int64{}, + ExpectedTotal: 0, + }, + { + Name: "with limit", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + assert.Equal(t, len(data), int(result.Total)) + }, + }, + { + Name: "Keyword", + ExtraData: []*internal.IndexerData{ + {ID: 1000, Title: "hi hello world"}, + {ID: 1001, Content: "hi hello world"}, + {ID: 1002, Comments: []string{"hi", "hello world"}}, + }, + SearchOptions: &internal.SearchOptions{ + Keyword: "hello", + }, + ExpectedIDs: []int64{1002, 1001, 1000}, + ExpectedTotal: 3, + }, + { + Name: "Keyword Fuzzy", + ExtraData: []*internal.IndexerData{ + {ID: 1000, Title: "hi hello world"}, + {ID: 1001, Content: "hi hello world"}, + {ID: 1002, Comments: []string{"hi", "hello world"}}, + }, + SearchOptions: &internal.SearchOptions{ + Keyword: "hello world", + IsFuzzyKeyword: true, + }, + ExpectedIDs: []int64{1002, 1001, 1000}, + ExpectedTotal: 3, + }, + { + Name: "RepoIDs", + ExtraData: []*internal.IndexerData{ + {ID: 1001, Title: "hello world", RepoID: 1, IsPublic: false}, + {ID: 1002, Title: "hello world", RepoID: 1, IsPublic: false}, + {ID: 1003, Title: "hello world", RepoID: 2, IsPublic: true}, + {ID: 1004, Title: "hello world", RepoID: 2, IsPublic: true}, + {ID: 1005, Title: "hello world", RepoID: 3, IsPublic: true}, + {ID: 1006, Title: "hello world", RepoID: 4, IsPublic: false}, + {ID: 1007, Title: "hello world", RepoID: 5, IsPublic: false}, + }, + SearchOptions: &internal.SearchOptions{ + Keyword: "hello", + RepoIDs: []int64{1, 4}, + }, + ExpectedIDs: []int64{1006, 1002, 1001}, + ExpectedTotal: 3, + }, + { + Name: "RepoIDs and AllPublic", + ExtraData: []*internal.IndexerData{ + {ID: 1001, Title: "hello world", RepoID: 1, IsPublic: false}, + {ID: 1002, Title: "hello world", RepoID: 1, IsPublic: false}, + {ID: 1003, Title: "hello world", RepoID: 2, IsPublic: true}, + {ID: 1004, Title: "hello world", RepoID: 2, IsPublic: true}, + {ID: 1005, Title: "hello world", RepoID: 3, IsPublic: true}, + {ID: 1006, Title: "hello world", RepoID: 4, IsPublic: false}, + {ID: 1007, Title: "hello world", RepoID: 5, IsPublic: false}, + }, + SearchOptions: &internal.SearchOptions{ + Keyword: "hello", + RepoIDs: []int64{1, 4}, + AllPublic: true, + }, + ExpectedIDs: []int64{1006, 1005, 1004, 1003, 1002, 1001}, + ExpectedTotal: 6, + }, + { + Name: "issue only", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + IsPull: optional.Some(false), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.False(t, data[v.ID].IsPull) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { return !v.IsPull }), result.Total) + }, + }, + { + Name: "pull only", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + IsPull: optional.Some(true), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.True(t, data[v.ID].IsPull) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { return v.IsPull }), result.Total) + }, + }, + { + Name: "opened only", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + IsClosed: optional.Some(false), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.False(t, data[v.ID].IsClosed) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { return !v.IsClosed }), result.Total) + }, + }, + { + Name: "closed only", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + IsClosed: optional.Some(true), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.True(t, data[v.ID].IsClosed) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { return v.IsClosed }), result.Total) + }, + }, + { + Name: "labels", + ExtraData: []*internal.IndexerData{ + {ID: 1000, Title: "hello a", LabelIDs: []int64{2000, 2001, 2002}}, + {ID: 1001, Title: "hello b", LabelIDs: []int64{2000, 2001}}, + {ID: 1002, Title: "hello c", LabelIDs: []int64{2000, 2001, 2003}}, + {ID: 1003, Title: "hello d", LabelIDs: []int64{2000}}, + {ID: 1004, Title: "hello e", LabelIDs: []int64{}}, + }, + SearchOptions: &internal.SearchOptions{ + Keyword: "hello", + IncludedLabelIDs: []int64{2000, 2001}, + ExcludedLabelIDs: []int64{2003}, + }, + ExpectedIDs: []int64{1001, 1000}, + ExpectedTotal: 2, + }, + { + Name: "include any labels", + ExtraData: []*internal.IndexerData{ + {ID: 1000, Title: "hello a", LabelIDs: []int64{2000, 2001, 2002}}, + {ID: 1001, Title: "hello b", LabelIDs: []int64{2001}}, + {ID: 1002, Title: "hello c", LabelIDs: []int64{2000, 2001, 2003}}, + {ID: 1003, Title: "hello d", LabelIDs: []int64{2002}}, + {ID: 1004, Title: "hello e", LabelIDs: []int64{}}, + }, + SearchOptions: &internal.SearchOptions{ + Keyword: "hello", + IncludedAnyLabelIDs: []int64{2001, 2002}, + ExcludedLabelIDs: []int64{2003}, + }, + ExpectedIDs: []int64{1003, 1001, 1000}, + ExpectedTotal: 3, + }, + { + Name: "MilestoneIDs", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + MilestoneIDs: []int64{1, 2, 6}, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Contains(t, []int64{1, 2, 6}, data[v.ID].MilestoneID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.MilestoneID == 1 || v.MilestoneID == 2 || v.MilestoneID == 6 + }), result.Total) + }, + }, + { + Name: "no MilestoneIDs", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + MilestoneIDs: []int64{0}, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(0), data[v.ID].MilestoneID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.MilestoneID == 0 + }), result.Total) + }, + }, + { + Name: "ProjectID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + ProjectID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(1), data[v.ID].ProjectID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.ProjectID == 1 + }), result.Total) + }, + }, + { + Name: "no ProjectID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + ProjectID: optional.Some(int64(0)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(0), data[v.ID].ProjectID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.ProjectID == 0 + }), result.Total) + }, + }, + { + Name: "ProjectColumnID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + ProjectColumnID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(1), data[v.ID].ProjectColumnID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.ProjectColumnID == 1 + }), result.Total) + }, + }, + { + Name: "no ProjectColumnID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + ProjectColumnID: optional.Some(int64(0)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(0), data[v.ID].ProjectColumnID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.ProjectColumnID == 0 + }), result.Total) + }, + }, + { + Name: "PosterID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + PosterID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(1), data[v.ID].PosterID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.PosterID == 1 + }), result.Total) + }, + }, + { + Name: "AssigneeID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + AssigneeID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(1), data[v.ID].AssigneeID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.AssigneeID == 1 + }), result.Total) + }, + }, + { + Name: "no AssigneeID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + AssigneeID: optional.Some(int64(0)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Equal(t, int64(0), data[v.ID].AssigneeID) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return v.AssigneeID == 0 + }), result.Total) + }, + }, + { + Name: "MentionID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + MentionID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Contains(t, data[v.ID].MentionIDs, int64(1)) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return slices.Contains(v.MentionIDs, 1) + }), result.Total) + }, + }, + { + Name: "ReviewedID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + ReviewedID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Contains(t, data[v.ID].ReviewedIDs, int64(1)) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return slices.Contains(v.ReviewedIDs, 1) + }), result.Total) + }, + }, + { + Name: "ReviewRequestedID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + ReviewRequestedID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Contains(t, data[v.ID].ReviewRequestedIDs, int64(1)) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return slices.Contains(v.ReviewRequestedIDs, 1) + }), result.Total) + }, + }, + { + Name: "SubscriberID", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + SubscriberID: optional.Some(int64(1)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.Contains(t, data[v.ID].SubscriberIDs, int64(1)) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return slices.Contains(v.SubscriberIDs, 1) + }), result.Total) + }, + }, + { + Name: "updated", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 5, + }, + UpdatedAfterUnix: optional.Some(int64(20)), + UpdatedBeforeUnix: optional.Some(int64(30)), + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Len(t, result.Hits, 5) + for _, v := range result.Hits { + assert.GreaterOrEqual(t, data[v.ID].UpdatedUnix, int64(20)) + assert.LessOrEqual(t, data[v.ID].UpdatedUnix, int64(30)) + } + assert.Equal(t, countIndexerData(data, func(v *internal.IndexerData) bool { + return data[v.ID].UpdatedUnix >= 20 && data[v.ID].UpdatedUnix <= 30 + }), result.Total) + }, + }, + { + Name: "SortByCreatedDesc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByCreatedDesc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.GreaterOrEqual(t, data[v.ID].CreatedUnix, data[result.Hits[i+1].ID].CreatedUnix) + } + } + }, + }, + { + Name: "SortByUpdatedDesc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByUpdatedDesc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.GreaterOrEqual(t, data[v.ID].UpdatedUnix, data[result.Hits[i+1].ID].UpdatedUnix) + } + } + }, + }, + { + Name: "SortByCommentsDesc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByCommentsDesc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.GreaterOrEqual(t, data[v.ID].CommentCount, data[result.Hits[i+1].ID].CommentCount) + } + } + }, + }, + { + Name: "SortByDeadlineDesc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByDeadlineDesc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.GreaterOrEqual(t, data[v.ID].DeadlineUnix, data[result.Hits[i+1].ID].DeadlineUnix) + } + } + }, + }, + { + Name: "SortByCreatedAsc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByCreatedAsc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.LessOrEqual(t, data[v.ID].CreatedUnix, data[result.Hits[i+1].ID].CreatedUnix) + } + } + }, + }, + { + Name: "SortByUpdatedAsc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByUpdatedAsc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.LessOrEqual(t, data[v.ID].UpdatedUnix, data[result.Hits[i+1].ID].UpdatedUnix) + } + } + }, + }, + { + Name: "SortByCommentsAsc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByCommentsAsc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.LessOrEqual(t, data[v.ID].CommentCount, data[result.Hits[i+1].ID].CommentCount) + } + } + }, + }, + { + Name: "SortByDeadlineAsc", + SearchOptions: &internal.SearchOptions{ + Paginator: &db.ListOptionsAll, + SortBy: internal.SortByDeadlineAsc, + }, + Expected: func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) { + assert.Equal(t, len(data), len(result.Hits)) + assert.Equal(t, len(data), int(result.Total)) + for i, v := range result.Hits { + if i < len(result.Hits)-1 { + assert.LessOrEqual(t, data[v.ID].DeadlineUnix, data[result.Hits[i+1].ID].DeadlineUnix) + } + } + }, + }, +} + +type testIndexerCase struct { + Name string + ExtraData []*internal.IndexerData + + SearchOptions *internal.SearchOptions + + Expected func(t *testing.T, data map[int64]*internal.IndexerData, result *internal.SearchResult) // if nil, use ExpectedIDs, ExpectedTotal + ExpectedIDs []int64 + ExpectedTotal int64 +} + +func generateDefaultIndexerData() []*internal.IndexerData { + var id int64 + var data []*internal.IndexerData + for repoID := int64(1); repoID <= 10; repoID++ { + for issueIndex := int64(1); issueIndex <= 20; issueIndex++ { + id++ + + comments := make([]string, id%4) + for i := range comments { + comments[i] = fmt.Sprintf("comment%d", i) + } + + labelIDs := make([]int64, id%5) + for i := range labelIDs { + labelIDs[i] = int64(i) + 1 // LabelID should not be 0 + } + mentionIDs := make([]int64, id%6) + for i := range mentionIDs { + mentionIDs[i] = int64(i) + 1 // MentionID should not be 0 + } + reviewedIDs := make([]int64, id%7) + for i := range reviewedIDs { + reviewedIDs[i] = int64(i) + 1 // ReviewID should not be 0 + } + reviewRequestedIDs := make([]int64, id%8) + for i := range reviewRequestedIDs { + reviewRequestedIDs[i] = int64(i) + 1 // ReviewRequestedID should not be 0 + } + subscriberIDs := make([]int64, id%9) + for i := range subscriberIDs { + subscriberIDs[i] = int64(i) + 1 // SubscriberID should not be 0 + } + + data = append(data, &internal.IndexerData{ + ID: id, + RepoID: repoID, + IsPublic: repoID%2 == 0, + Title: fmt.Sprintf("issue%d of repo%d", issueIndex, repoID), + Content: fmt.Sprintf("content%d", issueIndex), + Comments: comments, + IsPull: issueIndex%2 == 0, + IsClosed: issueIndex%3 == 0, + LabelIDs: labelIDs, + NoLabel: len(labelIDs) == 0, + MilestoneID: issueIndex % 4, + ProjectID: issueIndex % 5, + ProjectColumnID: issueIndex % 6, + PosterID: id%10 + 1, // PosterID should not be 0 + AssigneeID: issueIndex % 10, + MentionIDs: mentionIDs, + ReviewedIDs: reviewedIDs, + ReviewRequestedIDs: reviewRequestedIDs, + SubscriberIDs: subscriberIDs, + UpdatedUnix: timeutil.TimeStamp(id + issueIndex), + CreatedUnix: timeutil.TimeStamp(id), + DeadlineUnix: timeutil.TimeStamp(id + issueIndex + repoID), + CommentCount: int64(len(comments)), + }) + } + } + + return data +} + +func countIndexerData(data map[int64]*internal.IndexerData, f func(v *internal.IndexerData) bool) int64 { + var count int64 + for _, v := range data { + if f(v) { + count++ + } + } + return count +} + +// waitData waits for the indexer to index all data. +// Some engines like Elasticsearch index data asynchronously, so we need to wait for a while. +func waitData(indexer internal.Indexer, total int64) error { + var actual int64 + for i := 0; i < 100; i++ { + result, err := indexer.Search(context.Background(), &internal.SearchOptions{ + Paginator: &db.ListOptions{ + PageSize: 0, + }, + }) + if err != nil { + return err + } + actual = result.Total + if actual == total { + return nil + } + time.Sleep(100 * time.Millisecond) + } + return fmt.Errorf("waitData: expected %d, actual %d", total, actual) +} diff --git a/modules/indexer/issues/meilisearch/meilisearch.go b/modules/indexer/issues/meilisearch/meilisearch.go new file mode 100644 index 00000000..93323193 --- /dev/null +++ b/modules/indexer/issues/meilisearch/meilisearch.go @@ -0,0 +1,301 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + + indexer_internal "code.gitea.io/gitea/modules/indexer/internal" + inner_meilisearch "code.gitea.io/gitea/modules/indexer/internal/meilisearch" + "code.gitea.io/gitea/modules/indexer/issues/internal" + + "github.com/meilisearch/meilisearch-go" +) + +const ( + issueIndexerLatestVersion = 3 + + // TODO: make this configurable if necessary + maxTotalHits = 10000 +) + +// ErrMalformedResponse is never expected as we initialize the indexer ourself and so define the types. +var ErrMalformedResponse = errors.New("meilisearch returned unexpected malformed content") + +var _ internal.Indexer = &Indexer{} + +// Indexer implements Indexer interface +type Indexer struct { + inner *inner_meilisearch.Indexer + indexer_internal.Indexer // do not composite inner_meilisearch.Indexer directly to avoid exposing too much +} + +// NewIndexer creates a new meilisearch indexer +func NewIndexer(url, apiKey, indexerName string) *Indexer { + settings := &meilisearch.Settings{ + // The default ranking rules of meilisearch are: ["words", "typo", "proximity", "attribute", "sort", "exactness"] + // So even if we specify the sort order, it could not be respected because the priority of "sort" is so low. + // So we need to specify the ranking rules to make sure the sort order is respected. + // See https://www.meilisearch.com/docs/learn/core_concepts/relevancy + RankingRules: []string{"sort", // make sure "sort" has the highest priority + "words", "typo", "proximity", "attribute", "exactness"}, + + SearchableAttributes: []string{ + "title", + "content", + "comments", + }, + DisplayedAttributes: []string{ + "id", + "title", + "content", + "comments", + }, + FilterableAttributes: []string{ + "repo_id", + "is_public", + "is_pull", + "is_closed", + "label_ids", + "no_label", + "milestone_id", + "project_id", + "project_board_id", + "poster_id", + "assignee_id", + "mention_ids", + "reviewed_ids", + "review_requested_ids", + "subscriber_ids", + "updated_unix", + }, + SortableAttributes: []string{ + "updated_unix", + "created_unix", + "deadline_unix", + "comment_count", + "id", + }, + Pagination: &meilisearch.Pagination{ + MaxTotalHits: maxTotalHits, + }, + } + + inner := inner_meilisearch.NewIndexer(url, apiKey, indexerName, issueIndexerLatestVersion, settings) + indexer := &Indexer{ + inner: inner, + Indexer: inner, + } + return indexer +} + +// Index will save the index data +func (b *Indexer) Index(_ context.Context, issues ...*internal.IndexerData) error { + if len(issues) == 0 { + return nil + } + for _, issue := range issues { + _, err := b.inner.Client.Index(b.inner.VersionedIndexName()).AddDocuments(issue) + if err != nil { + return err + } + } + // TODO: bulk send index data + return nil +} + +// Delete deletes indexes by ids +func (b *Indexer) Delete(_ context.Context, ids ...int64) error { + if len(ids) == 0 { + return nil + } + + for _, id := range ids { + _, err := b.inner.Client.Index(b.inner.VersionedIndexName()).DeleteDocument(strconv.FormatInt(id, 10)) + if err != nil { + return err + } + } + // TODO: bulk send deletes + return nil +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (*internal.SearchResult, error) { + query := inner_meilisearch.FilterAnd{} + + if len(options.RepoIDs) > 0 { + q := &inner_meilisearch.FilterOr{} + q.Or(inner_meilisearch.NewFilterIn("repo_id", options.RepoIDs...)) + if options.AllPublic { + q.Or(inner_meilisearch.NewFilterEq("is_public", true)) + } + query.And(q) + } + + if options.IsPull.Has() { + query.And(inner_meilisearch.NewFilterEq("is_pull", options.IsPull.Value())) + } + if options.IsClosed.Has() { + query.And(inner_meilisearch.NewFilterEq("is_closed", options.IsClosed.Value())) + } + + if options.NoLabelOnly { + query.And(inner_meilisearch.NewFilterEq("no_label", true)) + } else { + if len(options.IncludedLabelIDs) > 0 { + q := &inner_meilisearch.FilterAnd{} + for _, labelID := range options.IncludedLabelIDs { + q.And(inner_meilisearch.NewFilterEq("label_ids", labelID)) + } + query.And(q) + } else if len(options.IncludedAnyLabelIDs) > 0 { + query.And(inner_meilisearch.NewFilterIn("label_ids", options.IncludedAnyLabelIDs...)) + } + if len(options.ExcludedLabelIDs) > 0 { + q := &inner_meilisearch.FilterAnd{} + for _, labelID := range options.ExcludedLabelIDs { + q.And(inner_meilisearch.NewFilterNot(inner_meilisearch.NewFilterEq("label_ids", labelID))) + } + query.And(q) + } + } + + if len(options.MilestoneIDs) > 0 { + query.And(inner_meilisearch.NewFilterIn("milestone_id", options.MilestoneIDs...)) + } + + if options.ProjectID.Has() { + query.And(inner_meilisearch.NewFilterEq("project_id", options.ProjectID.Value())) + } + if options.ProjectColumnID.Has() { + query.And(inner_meilisearch.NewFilterEq("project_board_id", options.ProjectColumnID.Value())) + } + + if options.PosterID.Has() { + query.And(inner_meilisearch.NewFilterEq("poster_id", options.PosterID.Value())) + } + + if options.AssigneeID.Has() { + query.And(inner_meilisearch.NewFilterEq("assignee_id", options.AssigneeID.Value())) + } + + if options.MentionID.Has() { + query.And(inner_meilisearch.NewFilterEq("mention_ids", options.MentionID.Value())) + } + + if options.ReviewedID.Has() { + query.And(inner_meilisearch.NewFilterEq("reviewed_ids", options.ReviewedID.Value())) + } + if options.ReviewRequestedID.Has() { + query.And(inner_meilisearch.NewFilterEq("review_requested_ids", options.ReviewRequestedID.Value())) + } + + if options.SubscriberID.Has() { + query.And(inner_meilisearch.NewFilterEq("subscriber_ids", options.SubscriberID.Value())) + } + + if options.UpdatedAfterUnix.Has() { + query.And(inner_meilisearch.NewFilterGte("updated_unix", options.UpdatedAfterUnix.Value())) + } + if options.UpdatedBeforeUnix.Has() { + query.And(inner_meilisearch.NewFilterLte("updated_unix", options.UpdatedBeforeUnix.Value())) + } + + if options.SortBy == "" { + options.SortBy = internal.SortByCreatedAsc + } + sortBy := []string{ + parseSortBy(options.SortBy), + "id:desc", + } + + skip, limit := indexer_internal.ParsePaginator(options.Paginator, maxTotalHits) + + counting := limit == 0 + if counting { + // If set limit to 0, it will be 20 by default, and -1 is not allowed. + // See https://www.meilisearch.com/docs/reference/api/search#limit + // So set limit to 1 to make the cost as low as possible, then clear the result before returning. + limit = 1 + } + + keyword := options.Keyword + if !options.IsFuzzyKeyword { + // to make it non fuzzy ("typo tolerance" in meilisearch terms), we have to quote the keyword(s) + // https://www.meilisearch.com/docs/reference/api/search#phrase-search + keyword = doubleQuoteKeyword(keyword) + } + + searchRes, err := b.inner.Client.Index(b.inner.VersionedIndexName()).Search(keyword, &meilisearch.SearchRequest{ + Filter: query.Statement(), + Limit: int64(limit), + Offset: int64(skip), + Sort: sortBy, + MatchingStrategy: "all", + }) + if err != nil { + return nil, err + } + + if counting { + searchRes.Hits = nil + } + + hits, err := convertHits(searchRes) + if err != nil { + return nil, err + } + + return &internal.SearchResult{ + Total: searchRes.EstimatedTotalHits, + Hits: hits, + }, nil +} + +func parseSortBy(sortBy internal.SortBy) string { + field := strings.TrimPrefix(string(sortBy), "-") + if strings.HasPrefix(string(sortBy), "-") { + return field + ":desc" + } + return field + ":asc" +} + +func doubleQuoteKeyword(k string) string { + kp := strings.Split(k, " ") + parts := 0 + for i := range kp { + part := strings.Trim(kp[i], "\"") + if part != "" { + kp[parts] = fmt.Sprintf(`"%s"`, part) + parts++ + } + } + return strings.Join(kp[:parts], " ") +} + +func convertHits(searchRes *meilisearch.SearchResponse) ([]internal.Match, error) { + hits := make([]internal.Match, 0, len(searchRes.Hits)) + for _, hit := range searchRes.Hits { + hit, ok := hit.(map[string]any) + if !ok { + return nil, ErrMalformedResponse + } + + issueID, ok := hit["id"].(float64) + if !ok { + return nil, ErrMalformedResponse + } + + hits = append(hits, internal.Match{ + ID: int64(issueID), + }) + } + return hits, nil +} diff --git a/modules/indexer/issues/meilisearch/meilisearch_test.go b/modules/indexer/issues/meilisearch/meilisearch_test.go new file mode 100644 index 00000000..349102b7 --- /dev/null +++ b/modules/indexer/issues/meilisearch/meilisearch_test.go @@ -0,0 +1,97 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package meilisearch + +import ( + "fmt" + "net/http" + "os" + "testing" + "time" + + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/indexer/issues/internal/tests" + + "github.com/meilisearch/meilisearch-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMeilisearchIndexer(t *testing.T) { + t.Skip("meilisearch not found in Forgejo test yet") + // The meilisearch instance started by pull-db-tests.yml > test-unit > services > meilisearch + url := "http://meilisearch:7700" + key := "" // auth has been disabled in test environment + + if os.Getenv("CI") == "" { + // Make it possible to run tests against a local meilisearch instance + url = os.Getenv("TEST_MEILISEARCH_URL") + if url == "" { + t.Skip("TEST_MEILISEARCH_URL not set and not running in CI") + return + } + key = os.Getenv("TEST_MEILISEARCH_KEY") + } + + ok := false + for i := 0; i < 60; i++ { + resp, err := http.Get(url) + if err == nil && resp.StatusCode == http.StatusOK { + ok = true + break + } + t.Logf("Waiting for meilisearch to be up: %v", err) + time.Sleep(time.Second) + } + if !ok { + t.Fatalf("Failed to wait for meilisearch to be up") + return + } + + indexer := NewIndexer(url, key, fmt.Sprintf("test_meilisearch_indexer_%d", time.Now().Unix())) + defer indexer.Close() + + tests.TestIndexer(t, indexer) +} + +func TestConvertHits(t *testing.T) { + _, err := convertHits(&meilisearch.SearchResponse{ + Hits: []any{"aa", "bb", "cc", "dd"}, + }) + require.ErrorIs(t, err, ErrMalformedResponse) + + validResponse := &meilisearch.SearchResponse{ + Hits: []any{ + map[string]any{ + "id": float64(11), + "title": "a title", + "content": "issue body with no match", + "comments": []any{"hey what's up?", "I'm currently bowling", "nice"}, + }, + map[string]any{ + "id": float64(22), + "title": "Bowling as title", + "content": "", + "comments": []any{}, + }, + map[string]any{ + "id": float64(33), + "title": "Bowl-ing as fuzzy match", + "content": "", + "comments": []any{}, + }, + }, + } + hits, err := convertHits(validResponse) + require.NoError(t, err) + assert.EqualValues(t, []internal.Match{{ID: 11}, {ID: 22}, {ID: 33}}, hits) +} + +func TestDoubleQuoteKeyword(t *testing.T) { + assert.EqualValues(t, "", doubleQuoteKeyword("")) + assert.EqualValues(t, `"a" "b" "c"`, doubleQuoteKeyword("a b c")) + assert.EqualValues(t, `"a" "d" "g"`, doubleQuoteKeyword("a d g")) + assert.EqualValues(t, `"a" "d" "g"`, doubleQuoteKeyword("a d g")) + assert.EqualValues(t, `"a" "d" "g"`, doubleQuoteKeyword(`a "" "d" """g`)) +} diff --git a/modules/indexer/issues/util.go b/modules/indexer/issues/util.go new file mode 100644 index 00000000..e752ae6f --- /dev/null +++ b/modules/indexer/issues/util.go @@ -0,0 +1,193 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package issues + +import ( + "context" + "errors" + "fmt" + + "code.gitea.io/gitea/models/db" + issue_model "code.gitea.io/gitea/models/issues" + "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/indexer/issues/internal" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" +) + +// getIssueIndexerData returns the indexer data of an issue and a bool value indicating whether the issue exists. +func getIssueIndexerData(ctx context.Context, issueID int64) (*internal.IndexerData, bool, error) { + issue, err := issue_model.GetIssueByID(ctx, issueID) + if err != nil { + if issue_model.IsErrIssueNotExist(err) { + return nil, false, nil + } + return nil, false, err + } + + // FIXME: what if users want to search for a review comment of a pull request? + // The comment type is CommentTypeCode or CommentTypeReview. + // But LoadDiscussComments only loads CommentTypeComment. + if err := issue.LoadDiscussComments(ctx); err != nil { + return nil, false, err + } + + comments := make([]string, 0, len(issue.Comments)) + for _, comment := range issue.Comments { + if comment.Content != "" { + // what ever the comment type is, index the content if it is not empty. + comments = append(comments, comment.Content) + } + } + + if err := issue.LoadAttributes(ctx); err != nil { + return nil, false, err + } + + labels := make([]int64, 0, len(issue.Labels)) + for _, label := range issue.Labels { + labels = append(labels, label.ID) + } + + mentionIDs, err := issue_model.GetIssueMentionIDs(ctx, issueID) + if err != nil { + return nil, false, err + } + + var ( + reviewedIDs []int64 + reviewRequestedIDs []int64 + ) + { + reviews, err := issue_model.FindReviews(ctx, issue_model.FindReviewOptions{ + ListOptions: db.ListOptionsAll, + IssueID: issueID, + OfficialOnly: false, + }) + if err != nil { + return nil, false, err + } + + reviewedIDsSet := make(container.Set[int64], len(reviews)) + reviewRequestedIDsSet := make(container.Set[int64], len(reviews)) + for _, review := range reviews { + if review.Type == issue_model.ReviewTypeRequest { + reviewRequestedIDsSet.Add(review.ReviewerID) + } else { + reviewedIDsSet.Add(review.ReviewerID) + } + } + reviewedIDs = reviewedIDsSet.Values() + reviewRequestedIDs = reviewRequestedIDsSet.Values() + } + + subscriberIDs, err := issue_model.GetIssueWatchersIDs(ctx, issue.ID, true) + if err != nil { + return nil, false, err + } + + var projectID int64 + if issue.Project != nil { + projectID = issue.Project.ID + } + + return &internal.IndexerData{ + ID: issue.ID, + RepoID: issue.RepoID, + IsPublic: !issue.Repo.IsPrivate, + Title: issue.Title, + Content: issue.Content, + Comments: comments, + IsPull: issue.IsPull, + IsClosed: issue.IsClosed, + LabelIDs: labels, + NoLabel: len(labels) == 0, + MilestoneID: issue.MilestoneID, + ProjectID: projectID, + ProjectColumnID: issue.ProjectColumnID(ctx), + PosterID: issue.PosterID, + AssigneeID: issue.AssigneeID, + MentionIDs: mentionIDs, + ReviewedIDs: reviewedIDs, + ReviewRequestedIDs: reviewRequestedIDs, + SubscriberIDs: subscriberIDs, + UpdatedUnix: issue.UpdatedUnix, + CreatedUnix: issue.CreatedUnix, + DeadlineUnix: issue.DeadlineUnix, + CommentCount: int64(len(issue.Comments)), + }, true, nil +} + +func updateRepoIndexer(ctx context.Context, repoID int64) error { + ids, err := issue_model.GetIssueIDsByRepoID(ctx, repoID) + if err != nil { + return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err) + } + for _, id := range ids { + if err := updateIssueIndexer(ctx, id); err != nil { + return err + } + } + return nil +} + +func updateIssueIndexer(ctx context.Context, issueID int64) error { + return pushIssueIndexerQueue(ctx, &IndexerMetadata{ID: issueID}) +} + +func deleteRepoIssueIndexer(ctx context.Context, repoID int64) error { + var ids []int64 + ids, err := issue_model.GetIssueIDsByRepoID(ctx, repoID) + if err != nil { + return fmt.Errorf("issue_model.GetIssueIDsByRepoID: %w", err) + } + + if len(ids) == 0 { + return nil + } + return pushIssueIndexerQueue(ctx, &IndexerMetadata{ + IDs: ids, + IsDelete: true, + }) +} + +type keepRetryKey struct{} + +// contextWithKeepRetry returns a context with a key indicating that the indexer should keep retrying. +// Please note that it's for background tasks only, and it should not be used for user requests, or it may cause blocking. +func contextWithKeepRetry(ctx context.Context) context.Context { + return context.WithValue(ctx, keepRetryKey{}, true) +} + +func pushIssueIndexerQueue(ctx context.Context, data *IndexerMetadata) error { + if issueIndexerQueue == nil { + // Some unit tests will trigger indexing, but the queue is not initialized. + // It's OK to ignore it, but log a warning message in case it's not a unit test. + log.Warn("Trying to push %+v to issue indexer queue, but the queue is not initialized, it's OK if it's a unit test", data) + return nil + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + err := issueIndexerQueue.Push(data) + if errors.Is(err, queue.ErrAlreadyInQueue) { + return nil + } + if errors.Is(err, context.DeadlineExceeded) { // the queue is full + log.Warn("It seems that issue indexer is slow and the queue is full. Please check the issue indexer or increase the queue size.") + if ctx.Value(keepRetryKey{}) == nil { + return err + } + // It will be better to increase the queue size instead of retrying, but users may ignore the previous warning message. + // However, even it retries, it may still cause index loss when there's a deadline in the context. + log.Debug("Retry to push %+v to issue indexer queue", data) + continue + } + return err + } +} diff --git a/modules/indexer/stats/db.go b/modules/indexer/stats/db.go new file mode 100644 index 00000000..98a977c7 --- /dev/null +++ b/modules/indexer/stats/db.go @@ -0,0 +1,84 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package stats + +import ( + "fmt" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/gitrepo" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" +) + +// DBIndexer implements Indexer interface to use database's like search +type DBIndexer struct{} + +// Index repository status function +func (db *DBIndexer) Index(id int64) error { + ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().ShutdownContext(), fmt.Sprintf("Stats.DB Index Repo[%d]", id)) + defer finished() + + repo, err := repo_model.GetRepositoryByID(ctx, id) + if err != nil { + return err + } + if repo.IsEmpty { + return nil + } + + status, err := repo_model.GetIndexerStatus(ctx, repo, repo_model.RepoIndexerTypeStats) + if err != nil { + return err + } + + gitRepo, err := gitrepo.OpenRepository(ctx, repo) + if err != nil { + if err.Error() == "no such file or directory" { + return nil + } + return err + } + defer gitRepo.Close() + + // Get latest commit for default branch + commitID, err := gitRepo.GetBranchCommitID(repo.DefaultBranch) + if err != nil { + if git.IsErrBranchNotExist(err) || git.IsErrNotExist(err) || setting.IsInTesting { + log.Debug("Unable to get commit ID for default branch %s in %s ... skipping this repository", repo.DefaultBranch, repo.RepoPath()) + return nil + } + log.Error("Unable to get commit ID for default branch %s in %s. Error: %v", repo.DefaultBranch, repo.RepoPath(), err) + return err + } + + // Do not recalculate stats if already calculated for this commit + if status.CommitSha == commitID { + return nil + } + + // Calculate and save language statistics to database + stats, err := gitRepo.GetLanguageStats(commitID) + if err != nil { + if !setting.IsInTesting { + log.Error("Unable to get language stats for ID %s for default branch %s in %s. Error: %v", commitID, repo.DefaultBranch, repo.RepoPath(), err) + } + return err + } + err = repo_model.UpdateLanguageStats(ctx, repo, commitID, stats) + if err != nil { + log.Error("Unable to update language stats for ID %s for default branch %s in %s. Error: %v", commitID, repo.DefaultBranch, repo.RepoPath(), err) + return err + } + + log.Debug("DBIndexer completed language stats for ID %s for default branch %s in %s. stats count: %d", commitID, repo.DefaultBranch, repo.RepoPath(), len(stats)) + return nil +} + +// Close dummy function +func (db *DBIndexer) Close() { +} diff --git a/modules/indexer/stats/indexer.go b/modules/indexer/stats/indexer.go new file mode 100644 index 00000000..7ec89e2a --- /dev/null +++ b/modules/indexer/stats/indexer.go @@ -0,0 +1,88 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package stats + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" +) + +// Indexer defines an interface to index repository stats +// TODO: this indexer is quite different from the others, maybe this package should be moved out from module/indexer +type Indexer interface { + Index(id int64) error + Close() +} + +// indexer represents a indexer instance +var indexer Indexer + +// Init initialize the repo indexer +func Init() error { + indexer = &DBIndexer{} + + if err := initStatsQueue(); err != nil { + return err + } + + go populateRepoIndexer(db.DefaultContext) + + return nil +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer(ctx context.Context) { + log.Info("Populating the repo stats indexer with existing repositories") + + isShutdown := graceful.GetManager().IsShutdown() + + exist, err := db.IsTableNotEmpty("repository") + if err != nil { + log.Fatal("System error: %v", err) + } else if !exist { + return + } + + var maxRepoID int64 + if maxRepoID, err = db.GetMaxID("repository"); err != nil { + log.Fatal("System error: %v", err) + } + + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + select { + case <-isShutdown: + log.Info("Repository Stats Indexer population shutdown before completion") + return + default: + } + ids, err := repo_model.GetUnindexedRepos(ctx, repo_model.RepoIndexerTypeStats, maxRepoID, 0, 50) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(ids) == 0 { + break + } + for _, id := range ids { + select { + case <-isShutdown: + log.Info("Repository Stats Indexer population shutdown before completion") + return + default: + } + if err := statsQueue.Push(id); err != nil { + log.Error("statsQueue.Push: %v", err) + } + maxRepoID = id - 1 + } + } + log.Info("Done (re)populating the repo stats indexer with existing repositories") +} diff --git a/modules/indexer/stats/indexer_test.go b/modules/indexer/stats/indexer_test.go new file mode 100644 index 00000000..3ab2e585 --- /dev/null +++ b/modules/indexer/stats/indexer_test.go @@ -0,0 +1,52 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package stats + +import ( + "context" + "testing" + "time" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" + + _ "code.gitea.io/gitea/models" + _ "code.gitea.io/gitea/models/actions" + _ "code.gitea.io/gitea/models/activities" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + unittest.MainTest(m) +} + +func TestRepoStatsIndex(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + setting.CfgProvider, _ = setting.NewConfigProviderFromData("") + + setting.LoadQueueSettings() + + err := Init() + require.NoError(t, err) + + repo, err := repo_model.GetRepositoryByID(db.DefaultContext, 1) + require.NoError(t, err) + + err = UpdateRepoIndexer(repo) + require.NoError(t, err) + + require.NoError(t, queue.GetManager().FlushAll(context.Background(), 5*time.Second)) + + status, err := repo_model.GetIndexerStatus(db.DefaultContext, repo, repo_model.RepoIndexerTypeStats) + require.NoError(t, err) + assert.Equal(t, "65f1bf27bc3bf70f64657658635e66094edbcb4d", status.CommitSha) + langs, err := repo_model.GetTopLanguageStats(db.DefaultContext, repo, 5) + require.NoError(t, err) + assert.Empty(t, langs) +} diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go new file mode 100644 index 00000000..d002bd57 --- /dev/null +++ b/modules/indexer/stats/queue.go @@ -0,0 +1,49 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package stats + +import ( + "fmt" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" +) + +// statsQueue represents a queue to handle repository stats updates +var statsQueue *queue.WorkerPoolQueue[int64] + +// handle passed PR IDs and test the PRs +func handler(items ...int64) []int64 { + for _, opts := range items { + if err := indexer.Index(opts); err != nil { + if !setting.IsInTesting { + log.Error("stats queue indexer.Index(%d) failed: %v", opts, err) + } + } + } + return nil +} + +func initStatsQueue() error { + statsQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo_stats_update", handler) + if statsQueue == nil { + return fmt.Errorf("unable to create repo_stats_update queue") + } + go graceful.GetManager().RunWithCancel(statsQueue) + return nil +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *repo_model.Repository) error { + if err := statsQueue.Push(repo.ID); err != nil { + if err != queue.ErrAlreadyInQueue { + return err + } + log.Debug("Repo ID: %d already queued", repo.ID) + } + return nil +} |