summaryrefslogtreecommitdiffstats
path: root/pkg/icingadb/cleanup.go
blob: e57eafaa16232e9f2eb58cf81bdf1324d67521b3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package icingadb

import (
	"context"
	"fmt"
	"github.com/icinga/icingadb/internal"
	"github.com/icinga/icingadb/pkg/com"
	"github.com/icinga/icingadb/pkg/driver"
	"github.com/icinga/icingadb/pkg/types"
	"time"
)

// CleanupStmt defines information needed to compose cleanup statements.
type CleanupStmt struct {
	Table  string
	PK     string
	Column string
}

// Build assembles the cleanup statement for the specified database driver with the given limit.
func (stmt *CleanupStmt) Build(driverName string, limit uint64) string {
	switch driverName {
	case driver.MySQL, "mysql":
		return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
	case driver.PostgreSQL, "postgres":
		return fmt.Sprintf(`WITH rows AS (
SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
)
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
	default:
		panic(fmt.Sprintf("invalid database type %s", driverName))
	}
}

// CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
// Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess.
// Returns the total number of rows deleted.
func (db *DB) CleanupOlderThan(
	ctx context.Context, stmt CleanupStmt, envId types.Binary,
	count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error) {
	var counter com.Counter
	defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()

	for {
		q := db.Rebind(stmt.Build(db.DriverName(), count))
		rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
			EnvironmentId: envId,
			Time:          types.UnixMilli(olderThan),
		})
		if err != nil {
			return 0, internal.CantPerformQuery(err, q)
		}

		n, err := rs.RowsAffected()
		if err != nil {
			return 0, err
		}

		counter.Add(uint64(n))

		for _, onSuccess := range onSuccess {
			if err := onSuccess(ctx, make([]struct{}, n)); err != nil {
				return 0, err
			}
		}

		if n < int64(count) {
			break
		}
	}

	return counter.Total(), nil
}

type cleanupWhere struct {
	EnvironmentId types.Binary
	Time          types.UnixMilli
}