summaryrefslogtreecommitdiffstats
path: root/pkg/icingadb/history/retention.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/icingadb/history/retention.go')
-rw-r--r--pkg/icingadb/history/retention.go214
1 files changed, 214 insertions, 0 deletions
diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go
new file mode 100644
index 0000000..ff217cd
--- /dev/null
+++ b/pkg/icingadb/history/retention.go
@@ -0,0 +1,214 @@
+package history
+
+import (
+ "context"
+ "fmt"
+ "github.com/icinga/icingadb/pkg/icingadb"
+ v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
+ "github.com/icinga/icingadb/pkg/icingaredis/telemetry"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/periodic"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+ "time"
+)
+
+type RetentionType int
+
+const (
+ RetentionHistory RetentionType = iota
+ RetentionSla
+)
+
+type retentionStatement struct {
+ icingadb.CleanupStmt
+ RetentionType
+ Category string
+}
+
+// RetentionStatements maps history categories with corresponding cleanup statements.
+var RetentionStatements = []retentionStatement{{
+ RetentionType: RetentionHistory,
+ Category: "acknowledgement",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "acknowledgement_history",
+ PK: "id",
+ Column: "clear_time",
+ },
+}, {
+ RetentionType: RetentionHistory,
+ Category: "comment",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "comment_history",
+ PK: "comment_id",
+ Column: "remove_time",
+ },
+}, {
+ RetentionType: RetentionHistory,
+ Category: "downtime",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "downtime_history",
+ PK: "downtime_id",
+ Column: "end_time",
+ },
+}, {
+ RetentionType: RetentionHistory,
+ Category: "flapping",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "flapping_history",
+ PK: "id",
+ Column: "end_time",
+ },
+}, {
+ RetentionType: RetentionHistory,
+ Category: "notification",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "notification_history",
+ PK: "id",
+ Column: "send_time",
+ },
+}, {
+ RetentionType: RetentionHistory,
+ Category: "state",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "state_history",
+ PK: "id",
+ Column: "event_time",
+ },
+}, {
+ RetentionType: RetentionSla,
+ Category: "sla_downtime",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "sla_history_downtime",
+ PK: "downtime_id",
+ Column: "downtime_end",
+ },
+}, {
+ RetentionType: RetentionSla,
+ Category: "sla_state",
+ CleanupStmt: icingadb.CleanupStmt{
+ Table: "sla_history_state",
+ PK: "id",
+ Column: "event_time",
+ },
+}}
+
+// RetentionOptions defines the non-default mapping of history categories with their retention period in days.
+type RetentionOptions map[string]uint64
+
+// Validate checks constraints in the supplied retention options and
+// returns an error if they are violated.
+func (o RetentionOptions) Validate() error {
+ allowedCategories := make(map[string]struct{})
+ for _, stmt := range RetentionStatements {
+ if stmt.RetentionType == RetentionHistory {
+ allowedCategories[stmt.Category] = struct{}{}
+ }
+ }
+
+ for category := range o {
+ if _, ok := allowedCategories[category]; !ok {
+ return errors.Errorf("invalid key %s for history retention", category)
+ }
+ }
+
+ return nil
+}
+
+// Retention deletes rows from history tables that exceed their configured retention period.
+type Retention struct {
+ db *icingadb.DB
+ logger *logging.Logger
+ historyDays uint64
+ slaDays uint64
+ interval time.Duration
+ count uint64
+ options RetentionOptions
+}
+
+// NewRetention returns a new Retention.
+func NewRetention(
+ db *icingadb.DB, historyDays uint64, slaDays uint64, interval time.Duration,
+ count uint64, options RetentionOptions, logger *logging.Logger,
+) *Retention {
+ return &Retention{
+ db: db,
+ logger: logger,
+ historyDays: historyDays,
+ slaDays: slaDays,
+ interval: interval,
+ count: count,
+ options: options,
+ }
+}
+
+// Start starts the retention.
+func (r *Retention) Start(ctx context.Context) error {
+ ctx, cancelCtx := context.WithCancel(ctx)
+ defer cancelCtx()
+
+ e, ok := v1.EnvironmentFromContext(ctx)
+ if !ok {
+ return errors.New("can't get environment from context")
+ }
+
+ errs := make(chan error, 1)
+
+ for _, stmt := range RetentionStatements {
+ var days uint64
+ switch stmt.RetentionType {
+ case RetentionHistory:
+ if d, ok := r.options[stmt.Category]; ok {
+ days = d
+ } else {
+ days = r.historyDays
+ }
+ case RetentionSla:
+ days = r.slaDays
+ }
+
+ if days < 1 {
+ r.logger.Debugf("Skipping history retention for category %s", stmt.Category)
+ continue
+ }
+
+ r.logger.Debugw(
+ fmt.Sprintf("Starting history retention for category %s", stmt.Category),
+ zap.Uint64("count", r.count),
+ zap.Duration("interval", r.interval),
+ zap.Uint64("retention-days", days),
+ )
+
+ stmt := stmt
+ periodic.Start(ctx, r.interval, func(tick periodic.Tick) {
+ olderThan := tick.Time.AddDate(0, 0, -int(days))
+
+ r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s",
+ stmt.Category, stmt.Table, olderThan)
+
+ deleted, err := r.db.CleanupOlderThan(
+ ctx, stmt.CleanupStmt, e.Id, r.count, olderThan,
+ icingadb.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup),
+ )
+ if err != nil {
+ select {
+ case errs <- err:
+ case <-ctx.Done():
+ }
+
+ return
+ }
+
+ if deleted > 0 {
+ r.logger.Infof("Removed %d old %s history items", deleted, stmt.Category)
+ }
+ }, periodic.Immediate())
+ }
+
+ select {
+ case err := <-errs:
+ return err
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}