summaryrefslogtreecommitdiffstats
path: root/tests/history_bench_test.go
blob: 7f66acce978ab2d3a3839f9ba5f7e9095803144a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package icingadb_test

import (
	"context"
	"fmt"
	"github.com/google/uuid"
	"github.com/icinga/icinga-testing/utils"
	"github.com/jmoiron/sqlx"
	"github.com/redis/go-redis/v9"
	"github.com/stretchr/testify/require"
	"strconv"
	"testing"
	"time"
)

func BenchmarkHistory(b *testing.B) {
	for _, numComments := range []int64{100_000, 200_000} {
		b.Run(fmt.Sprintf("%d-Comments", numComments), func(b *testing.B) {
			b.StopTimer()

			for i := 0; i < b.N; i++ {
				benchmarkHistory(b, numComments)
			}
		})
	}

}

func benchmarkHistory(b *testing.B, numComments int64) {
	rdb := getDatabase(b)
	r := it.RedisServer()
	defer r.Cleanup()
	n := it.Icinga2Node("master")
	defer n.Cleanup()
	n.EnableIcingaDb(r)
	err := n.Reload()
	require.NoError(b, err, "icinga2 should reload without error")

	db, err := sqlx.Connect(rdb.Driver(), rdb.DSN())
	require.NoError(b, err, "connecting to database")
	defer func() { _ = db.Close() }()

	redisClient := r.Open()
	defer func() { _ = redisClient.Close() }()

	client := n.ApiClient()

	hostname := utils.UniqueName(b, "host")
	client.CreateHost(b, hostname, map[string]interface{}{
		"attrs": map[string]interface{}{
			"enable_active_checks":  false,
			"enable_passive_checks": true,
			"check_command":         "dummy",
		},
	})

	baseTime := time.Now().Add(time.Duration(-numComments) * time.Second)
	for i := int64(0); i < numComments; i++ {
		redisClient.XAdd(context.Background(), &redis.XAddArgs{
			Stream: "icinga:history:stream:comment",
			Values: map[string]string{
				"comment_id":     utils.RandomString(32),
				"environment_id": "da39a3ee5e6b4b0d3255bfef95601890afd80709",
				"host_id":        "05d7e9c12104a1e8851a871d2f78e25b8c3d9eae",
				"entry_time":     strconv.FormatInt(baseTime.Add(time.Duration(i)*time.Second).UnixMilli(), 10),
				"author":         utils.RandomString(8),
				"comment":        utils.RandomString(8),
				"entry_type":     "1",
				"is_persistent":  "0",
				"is_sticky":      "0",
				"event_id":       uuid.New().String(),
				"event_type":     "comment_add",
				"object_type":    "service",
				"service_id":     "98fe4a1696c4804c75ff5c0e76f1e79ef855c634",
				"endpoint_id":    "05d7e9c12104a1e8851a871d2f78e25b8c3d9eae",
			},
		})
	}

	pendingCount := func() int64 {
		result, err := redisClient.XInfoStream(context.Background(), "icinga:history:stream:comment").Result()
		require.NoError(b, err, "XINFO should not fail")
		return result.Length
	}

	writtenCount := func() int64 {
		var count int64
		err := db.Get(&count, "SELECT COUNT(*) FROM comment_history")
		require.NoError(b, err, "SELECT COUNT(*) should not fail")
		return count
	}

	lastPending := pendingCount()
	b.Logf("current stream length: %d", lastPending)

	b.StartTimer()
	idb := it.IcingaDbInstance(r, rdb)
	defer idb.Cleanup()

	ticker := time.NewTicker(5 * time.Millisecond)
	defer ticker.Stop()
	logTicker := time.NewTicker(1 * time.Second)
	defer logTicker.Stop()
	timeout := time.NewTicker(5 * time.Minute)
	defer timeout.Stop()

loop:
	for {
		select {
		case <-ticker.C:
			if pendingCount() == 0 && writtenCount() >= numComments {
				break loop
			}
		case <-logTicker.C:
			if p := pendingCount(); p > 0 {
				b.Logf("last second: %d, pending: %d", lastPending-p, p)
				lastPending = p
			} else {
				logTicker.Stop()
			}
		case <-timeout.C:
			b.Fatal("did not drain stream in time")
		}
	}

	b.StopTimer()
}