summaryrefslogtreecommitdiffstats
path: root/modules/nosql/manager_leveldb.go
diff options
context:
space:
mode:
Diffstat (limited to 'modules/nosql/manager_leveldb.go')
-rw-r--r--modules/nosql/manager_leveldb.go214
1 files changed, 214 insertions, 0 deletions
diff --git a/modules/nosql/manager_leveldb.go b/modules/nosql/manager_leveldb.go
new file mode 100644
index 00000000..4d2c90de
--- /dev/null
+++ b/modules/nosql/manager_leveldb.go
@@ -0,0 +1,214 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package nosql
+
+import (
+ "fmt"
+ "path"
+ "runtime/pprof"
+ "strconv"
+ "strings"
+
+ "code.gitea.io/gitea/modules/log"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+// CloseLevelDB closes a levelDB
+func (m *Manager) CloseLevelDB(connection string) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ db, ok := m.LevelDBConnections[connection]
+ if !ok {
+ // Try the full URI
+ uri := ToLevelDBURI(connection)
+ db, ok = m.LevelDBConnections[uri.String()]
+
+ if !ok {
+ // Try the datadir directly
+ dataDir := path.Join(uri.Host, uri.Path)
+
+ db, ok = m.LevelDBConnections[dataDir]
+ }
+ }
+ if !ok {
+ return nil
+ }
+
+ db.count--
+ if db.count > 0 {
+ return nil
+ }
+
+ for _, name := range db.name {
+ delete(m.LevelDBConnections, name)
+ }
+ return db.db.Close()
+}
+
+// GetLevelDB gets a levelDB for a particular connection
+func (m *Manager) GetLevelDB(connection string) (db *leveldb.DB, err error) {
+ // Because we want associate any goroutines created by this call to the main nosqldb context we need to
+ // wrap this in a goroutine labelled with the nosqldb context
+ done := make(chan struct{})
+ var recovered any
+ go func() {
+ defer func() {
+ recovered = recover()
+ if recovered != nil {
+ log.Critical("PANIC during GetLevelDB: %v\nStacktrace: %s", recovered, log.Stack(2))
+ }
+ close(done)
+ }()
+ pprof.SetGoroutineLabels(m.ctx)
+
+ db, err = m.getLevelDB(connection)
+ }()
+ <-done
+ if recovered != nil {
+ panic(recovered)
+ }
+ return db, err
+}
+
+func (m *Manager) getLevelDB(connection string) (*leveldb.DB, error) {
+ // Convert the provided connection description to the common format
+ uri := ToLevelDBURI(connection)
+
+ // Get the datadir
+ dataDir := path.Join(uri.Host, uri.Path)
+
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ db, ok := m.LevelDBConnections[connection]
+ if ok {
+ db.count++
+
+ return db.db, nil
+ }
+
+ db, ok = m.LevelDBConnections[uri.String()]
+ if ok {
+ db.count++
+
+ return db.db, nil
+ }
+
+ // if there is already a connection to this leveldb reuse that
+ // NOTE: if there differing options then only the first leveldb connection will be used
+ db, ok = m.LevelDBConnections[dataDir]
+ if ok {
+ db.count++
+ log.Warn("Duplicate connection to level db: %s with different connection strings. Initial connection: %s. This connection: %s", dataDir, db.name[0], connection)
+ db.name = append(db.name, connection)
+ m.LevelDBConnections[connection] = db
+ return db.db, nil
+ }
+ db = &levelDBHolder{
+ name: []string{connection, uri.String(), dataDir},
+ }
+
+ opts := &opt.Options{}
+ for k, v := range uri.Query() {
+ switch replacer.Replace(strings.ToLower(k)) {
+ case "blockcachecapacity":
+ opts.BlockCacheCapacity, _ = strconv.Atoi(v[0])
+ case "blockcacheevictremoved":
+ opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0])
+ case "blockrestartinterval":
+ opts.BlockRestartInterval, _ = strconv.Atoi(v[0])
+ case "blocksize":
+ opts.BlockSize, _ = strconv.Atoi(v[0])
+ case "compactionexpandlimitfactor":
+ opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0])
+ case "compactiongpoverlapsfactor":
+ opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0])
+ case "compactionl0trigger":
+ opts.CompactionL0Trigger, _ = strconv.Atoi(v[0])
+ case "compactionsourcelimitfactor":
+ opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0])
+ case "compactiontablesize":
+ opts.CompactionTableSize, _ = strconv.Atoi(v[0])
+ case "compactiontablesizemultiplier":
+ opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
+ case "compactiontablesizemultiplierperlevel":
+ for _, val := range v {
+ f, _ := strconv.ParseFloat(val, 64)
+ opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f)
+ }
+ case "compactiontotalsize":
+ opts.CompactionTotalSize, _ = strconv.Atoi(v[0])
+ case "compactiontotalsizemultiplier":
+ opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
+ case "compactiontotalsizemultiplierperlevel":
+ for _, val := range v {
+ f, _ := strconv.ParseFloat(val, 64)
+ opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f)
+ }
+ case "compression":
+ val, _ := strconv.Atoi(v[0])
+ opts.Compression = opt.Compression(val)
+ case "disablebufferpool":
+ opts.DisableBufferPool, _ = strconv.ParseBool(v[0])
+ case "disableblockcache":
+ opts.DisableBlockCache, _ = strconv.ParseBool(v[0])
+ case "disablecompactionbackoff":
+ opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0])
+ case "disablelargebatchtransaction":
+ opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0])
+ case "errorifexist":
+ opts.ErrorIfExist, _ = strconv.ParseBool(v[0])
+ case "errorifmissing":
+ opts.ErrorIfMissing, _ = strconv.ParseBool(v[0])
+ case "iteratorsamplingrate":
+ opts.IteratorSamplingRate, _ = strconv.Atoi(v[0])
+ case "nosync":
+ opts.NoSync, _ = strconv.ParseBool(v[0])
+ case "nowritemerge":
+ opts.NoWriteMerge, _ = strconv.ParseBool(v[0])
+ case "openfilescachecapacity":
+ opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0])
+ case "readonly":
+ opts.ReadOnly, _ = strconv.ParseBool(v[0])
+ case "strict":
+ val, _ := strconv.Atoi(v[0])
+ opts.Strict = opt.Strict(val)
+ case "writebuffer":
+ opts.WriteBuffer, _ = strconv.Atoi(v[0])
+ case "writel0pausetrigger":
+ opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0])
+ case "writel0slowdowntrigger":
+ opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0])
+ case "clientname":
+ db.name = append(db.name, v[0])
+ }
+ }
+
+ var err error
+ db.db, err = leveldb.OpenFile(dataDir, opts)
+ if err != nil {
+ if !errors.IsCorrupted(err) {
+ if strings.Contains(err.Error(), "resource temporarily unavailable") {
+ err = fmt.Errorf("unable to lock level db at %s: %w", dataDir, err)
+ return nil, err
+ }
+
+ err = fmt.Errorf("unable to open level db at %s: %w", dataDir, err)
+ return nil, err
+ }
+ db.db, err = leveldb.RecoverFile(dataDir, opts)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ for _, name := range db.name {
+ m.LevelDBConnections[name] = db
+ }
+ db.count++
+ return db.db, nil
+}