summaryrefslogtreecommitdiffstats
path: root/cmd/icingadb-migrate/misc.go
blob: c228f3a8e4fb13ed090b208da04ea948c5de5a10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package main

import (
	"context"
	"crypto/sha1"
	"github.com/icinga/icingadb/pkg/contracts"
	"github.com/icinga/icingadb/pkg/driver"
	"github.com/icinga/icingadb/pkg/icingadb"
	"github.com/icinga/icingadb/pkg/icingadb/objectpacker"
	icingadbTypes "github.com/icinga/icingadb/pkg/types"
	"github.com/jmoiron/sqlx"
	"github.com/pkg/errors"
	"github.com/vbauerster/mpb/v6"
	"github.com/vbauerster/mpb/v6/decor"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"strings"
	"time"
)

type IdoMigrationProgressUpserter struct {
	LastIdoId any `json:"last_ido_id"`
}

// Upsert implements the contracts.Upserter interface.
func (impu *IdoMigrationProgressUpserter) Upsert() interface{} {
	return impu
}

type IdoMigrationProgress struct {
	IdoMigrationProgressUpserter `json:",inline"`
	EnvironmentId                string `json:"environment_id"`
	HistoryType                  string `json:"history_type"`
	FromTs                       int32  `json:"from_ts"`
	ToTs                         int32  `json:"to_ts"`
}

// Assert interface compliance.
var (
	_ contracts.Upserter = (*IdoMigrationProgressUpserter)(nil)
	_ contracts.Upserter = (*IdoMigrationProgress)(nil)
)

// log is the root logger.
var log = func() *zap.SugaredLogger {
	logger, err := zap.NewDevelopmentConfig().Build()
	if err != nil {
		panic(err)
	}

	return logger.Sugar()
}()

// objectTypes maps IDO values to Icinga DB ones.
var objectTypes = map[uint8]string{1: "host", 2: "service"}

// hashAny combines objectpacker.PackAny and SHA1 hashing.
func hashAny(in interface{}) []byte {
	hash := sha1.New()
	if err := objectpacker.PackAny(in, hash); err != nil {
		panic(err)
	}

	return hash.Sum(nil)
}

// convertTime converts *nix timestamps from the IDO for Icinga DB.
func convertTime(ts int64, tsUs uint32) icingadbTypes.UnixMilli {
	if ts == 0 && tsUs == 0 {
		return icingadbTypes.UnixMilli{}
	}

	return icingadbTypes.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond)))
}

// calcObjectId calculates the ID of the config object named name1 for Icinga DB.
func calcObjectId(env, name1 string) []byte {
	if name1 == "" {
		return nil
	}

	return hashAny([2]string{env, name1})
}

// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB.
func calcServiceId(env, name1, name2 string) []byte {
	if name2 == "" {
		return nil
	}

	return hashAny([2]string{env, name1 + "!" + name2})
}

// sliceIdoHistory performs query with args+fromid,toid,checkpoint,bulk on ht.snapshot
// and passes the results to onRows until either an empty result set or onRows() returns nil.
// Rationale: split the likely large result set of a query by adding a WHERE condition and a LIMIT,
// both with :named placeholders (:checkpoint, :bulk).
// checkpoint is the initial value for the WHERE condition, onRows() returns follow-up ones.
// (On non-recoverable errors the whole program exits.)
func sliceIdoHistory[Row any](
	ht *historyType, query string, args map[string]any,
	checkpoint interface{}, onRows func([]Row) (checkpoint interface{}),
) {
	if args == nil {
		args = map[string]interface{}{}
	}

	args["fromid"] = ht.fromId
	args["toid"] = ht.toId
	args["checkpoint"] = checkpoint
	args["bulk"] = 20000

	if ht.snapshot.DriverName() != driver.MySQL {
		query = strings.ReplaceAll(query, " USE INDEX (PRIMARY)", "")
	}

	for {
		// TODO: use Tx#SelectNamed() one nice day (https://github.com/jmoiron/sqlx/issues/779)
		stmt, err := ht.snapshot.PrepareNamed(query)
		if err != nil {
			log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
		}

		var rows []Row
		if err := stmt.Select(&rows, args); err != nil {
			log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
		}

		_ = stmt.Close()

		if len(rows) < 1 {
			break
		}

		if checkpoint = onRows(rows); checkpoint == nil {
			break
		}

		args["checkpoint"] = checkpoint
	}
}

type progressBar struct {
	*mpb.Bar

	lastUpdate time.Time
}

// IncrBy does pb.Bar.DecoratorEwmaUpdate() automatically.
func (pb *progressBar) IncrBy(n int) {
	pb.Bar.IncrBy(n)

	now := time.Now()

	if !pb.lastUpdate.IsZero() {
		pb.Bar.DecoratorEwmaUpdate(now.Sub(pb.lastUpdate))
	}

	pb.lastUpdate = now
}

// historyType specifies a history data type.
type historyType struct {
	// name is a human-readable common name.
	name string
	// idoTable specifies the source table.
	idoTable string
	// idoIdColumn specifies idoTable's primary key.
	idoIdColumn string
	// idoStartColumns specifies idoTable's event start time locations. (First non-NULL is used.)
	idoStartColumns []string
	// idoEndColumns specifies idoTable's event end time locations. (First non-NULL is used.)
	idoEndColumns []string
	// cacheSchema specifies <name>.sqlite3's structure.
	cacheSchema string
	// cacheFiller fills cache from snapshot.
	cacheFiller func(*historyType)
	// cacheLimitQuery rationale: see migrate().
	cacheLimitQuery string
	// migrationQuery SELECTs source data for actual migration.
	migrationQuery string
	// migrate does the actual migration.
	migrate func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType)

	// cacheFile locates <name>.sqlite3.
	cacheFile string
	// cache represents <cacheFile>.
	cache *sqlx.DB
	// snapshot represents the data source.
	snapshot *sqlx.Tx
	// fromId is the first IDO row ID to migrate.
	fromId uint64
	// toId is the last IDO row ID to migrate.
	toId uint64
	// total summarizes the source data.
	total int64
	// cacheTotal summarizes the cache source data.
	cacheTotal int64
	// done summarizes the migrated data.
	done int64
	// bar represents the current progress bar.
	bar *progressBar
	// lastId is the last already migrated ID.
	lastId uint64
}

// setupBar (re-)initializes ht.bar.
func (ht *historyType) setupBar(progress *mpb.Progress, total int64) {
	ht.bar = &progressBar{Bar: progress.AddBar(
		total,
		mpb.BarFillerClearOnComplete(),
		mpb.PrependDecorators(
			decor.Name(ht.name, decor.WC{W: len(ht.name) + 1, C: decor.DidentRight}),
			decor.Percentage(decor.WC{W: 5}),
		),
		mpb.AppendDecorators(
			decor.EwmaETA(decor.ET_STYLE_GO, 0, decor.WC{W: 4}),
			decor.Name(" "),
			decor.EwmaSpeed(0, "%.0f/s", 0, decor.WC{W: 4}),
		),
	)}
}

type historyTypes []*historyType

// forEach performs f per hts in parallel.
func (hts historyTypes) forEach(f func(*historyType)) {
	eg, _ := errgroup.WithContext(context.Background())
	for _, ht := range hts {
		ht := ht
		eg.Go(func() error {
			f(ht)
			return nil
		})
	}

	_ = eg.Wait()
}

var types = historyTypes{
	{
		name:            "ack & comment",
		idoTable:        "icinga_commenthistory",
		idoIdColumn:     "commenthistory_id",
		idoStartColumns: []string{"entry_time"},
		// Manual deletion time wins vs. time of expiration which never happens due to manual deletion.
		idoEndColumns:  []string{"deletion_time", "expiration_time"},
		migrationQuery: commentMigrationQuery,
		migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
			migrateOneType(c, idb, envId, ht, convertCommentRows)
		},
	},
	{
		name:        "downtime",
		idoTable:    "icinga_downtimehistory",
		idoIdColumn: "downtimehistory_id",
		// Fall back to scheduled time if actual time is missing.
		idoStartColumns: []string{"actual_start_time", "scheduled_start_time"},
		idoEndColumns:   []string{"actual_end_time", "scheduled_end_time"},
		migrationQuery:  downtimeMigrationQuery,
		migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
			migrateOneType(c, idb, envId, ht, convertDowntimeRows)
		},
	},
	{
		name:            "flapping",
		idoTable:        "icinga_flappinghistory",
		idoIdColumn:     "flappinghistory_id",
		idoStartColumns: []string{"event_time"},
		idoEndColumns:   []string{"event_time"},
		cacheSchema:     eventTimeCacheSchema,
		cacheFiller: func(ht *historyType) {
			buildEventTimeCache(ht, []string{
				"xh.flappinghistory_id id", "UNIX_TIMESTAMP(xh.event_time) event_time",
				"xh.event_time_usec", "1001-xh.event_type event_is_start", "xh.object_id",
			})
		},
		migrationQuery: flappingMigrationQuery,
		migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
			migrateOneType(c, idb, envId, ht, convertFlappingRows)
		},
	},
	{
		name:            "notification",
		idoTable:        "icinga_notifications",
		idoIdColumn:     "notification_id",
		idoStartColumns: []string{"start_time"},
		idoEndColumns:   []string{"end_time"},
		cacheSchema:     previousHardStateCacheSchema,
		cacheFiller: func(ht *historyType) {
			buildPreviousHardStateCache(ht, []string{
				"xh.notification_id id", "xh.object_id", "xh.state last_hard_state",
			})
		},
		cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
		migrationQuery:  notificationMigrationQuery,
		migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
			migrateOneType(c, idb, envId, ht, convertNotificationRows)
		},
	},
	{
		name:            "state",
		idoTable:        "icinga_statehistory",
		idoIdColumn:     "statehistory_id",
		idoStartColumns: []string{"state_time"},
		idoEndColumns:   []string{"state_time"},
		cacheSchema:     previousHardStateCacheSchema,
		cacheFiller: func(ht *historyType) {
			buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"})
		},
		cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
		migrationQuery:  stateMigrationQuery,
		migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
			migrateOneType(c, idb, envId, ht, convertStateRows)
		},
	},
}