summaryrefslogtreecommitdiffstats
path: root/modules/setting/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/setting/queue.go')
-rw-r--r--modules/setting/queue.go120
1 files changed, 120 insertions, 0 deletions
diff --git a/modules/setting/queue.go b/modules/setting/queue.go
new file mode 100644
index 00000000..251a6c1e
--- /dev/null
+++ b/modules/setting/queue.go
@@ -0,0 +1,120 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package setting
+
+import (
+ "path/filepath"
+ "runtime"
+
+ "code.gitea.io/gitea/modules/json"
+ "code.gitea.io/gitea/modules/log"
+)
+
+// QueueSettings represent the settings for a queue from the ini
+type QueueSettings struct {
+ Name string // not an INI option, it is the name for [queue.the-name] section
+
+ Type string
+ Datadir string
+ ConnStr string // for leveldb or redis
+ Length int // max queue length before blocking
+
+ QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue
+
+ BatchLength int
+ MaxWorkers int
+}
+
+func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
+ queueSettingsDefault := QueueSettings{
+ Type: "level", // dummy, channel, level, redis
+ Datadir: "queues/common", // relative to AppDataPath
+ Length: 100000, // queue length before a channel queue will block
+
+ QueueName: "_queue",
+ SetName: "_unique",
+ BatchLength: 20,
+ MaxWorkers: runtime.NumCPU() / 2,
+ }
+ if queueSettingsDefault.MaxWorkers < 1 {
+ queueSettingsDefault.MaxWorkers = 1
+ }
+ if queueSettingsDefault.MaxWorkers > 10 {
+ queueSettingsDefault.MaxWorkers = 10
+ }
+
+ // deep copy default settings
+ cfg := QueueSettings{}
+ if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil {
+ return cfg, err
+ } else if err = json.Unmarshal(cfgBs, &cfg); err != nil {
+ return cfg, err
+ }
+
+ cfg.Name = name
+ if sec, err := rootCfg.GetSection("queue"); err == nil {
+ if err = sec.MapTo(&cfg); err != nil {
+ log.Error("Failed to map queue common config for %q: %v", name, err)
+ return cfg, nil
+ }
+ }
+ if sec, err := rootCfg.GetSection("queue." + name); err == nil {
+ if err = sec.MapTo(&cfg); err != nil {
+ log.Error("Failed to map queue spec config for %q: %v", name, err)
+ return cfg, nil
+ }
+ if sec.HasKey("CONN_STR") {
+ cfg.ConnStr = sec.Key("CONN_STR").String()
+ }
+ }
+
+ if cfg.Datadir == "" {
+ cfg.Datadir = queueSettingsDefault.Datadir
+ }
+ if !filepath.IsAbs(cfg.Datadir) {
+ cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir)
+ }
+ cfg.Datadir = filepath.ToSlash(cfg.Datadir)
+
+ if cfg.Type == "redis" && cfg.ConnStr == "" {
+ cfg.ConnStr = "redis://127.0.0.1:6379/0"
+ }
+
+ if cfg.Length <= 0 {
+ cfg.Length = queueSettingsDefault.Length
+ }
+ if cfg.MaxWorkers <= 0 {
+ cfg.MaxWorkers = queueSettingsDefault.MaxWorkers
+ }
+ if cfg.BatchLength <= 0 {
+ cfg.BatchLength = queueSettingsDefault.BatchLength
+ }
+
+ return cfg, nil
+}
+
+func LoadQueueSettings() {
+ loadQueueFrom(CfgProvider)
+}
+
+func loadQueueFrom(rootCfg ConfigProvider) {
+ hasOld := false
+ handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
+ if rootCfg.Section(oldSection).HasKey(oldKey) {
+ hasOld = true
+ log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName)
+ }
+ }
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR")
+ handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN")
+ handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN")
+ handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH")
+ handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
+ if hasOld {
+ log.Fatal("Please update your app.ini to remove deprecated config options")
+ }
+}