1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
package icingadb_test
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/icinga/icinga-testing/utils"
"github.com/jmoiron/sqlx"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"strconv"
"testing"
"time"
)
func BenchmarkHistory(b *testing.B) {
for _, numComments := range []int64{100_000, 200_000} {
b.Run(fmt.Sprintf("%d-Comments", numComments), func(b *testing.B) {
b.StopTimer()
for i := 0; i < b.N; i++ {
benchmarkHistory(b, numComments)
}
})
}
}
func benchmarkHistory(b *testing.B, numComments int64) {
rdb := getDatabase(b)
r := it.RedisServer()
defer r.Cleanup()
n := it.Icinga2Node("master")
defer n.Cleanup()
n.EnableIcingaDb(r)
err := n.Reload()
require.NoError(b, err, "icinga2 should reload without error")
db, err := sqlx.Connect(rdb.Driver(), rdb.DSN())
require.NoError(b, err, "connecting to database")
defer func() { _ = db.Close() }()
redisClient := r.Open()
defer func() { _ = redisClient.Close() }()
client := n.ApiClient()
hostname := utils.UniqueName(b, "host")
client.CreateHost(b, hostname, map[string]interface{}{
"attrs": map[string]interface{}{
"enable_active_checks": false,
"enable_passive_checks": true,
"check_command": "dummy",
},
})
baseTime := time.Now().Add(time.Duration(-numComments) * time.Second)
for i := int64(0); i < numComments; i++ {
redisClient.XAdd(context.Background(), &redis.XAddArgs{
Stream: "icinga:history:stream:comment",
Values: map[string]string{
"comment_id": utils.RandomString(32),
"environment_id": "da39a3ee5e6b4b0d3255bfef95601890afd80709",
"host_id": "05d7e9c12104a1e8851a871d2f78e25b8c3d9eae",
"entry_time": strconv.FormatInt(baseTime.Add(time.Duration(i)*time.Second).UnixMilli(), 10),
"author": utils.RandomString(8),
"comment": utils.RandomString(8),
"entry_type": "1",
"is_persistent": "0",
"is_sticky": "0",
"event_id": uuid.New().String(),
"event_type": "comment_add",
"object_type": "service",
"service_id": "98fe4a1696c4804c75ff5c0e76f1e79ef855c634",
"endpoint_id": "05d7e9c12104a1e8851a871d2f78e25b8c3d9eae",
},
})
}
pendingCount := func() int64 {
result, err := redisClient.XInfoStream(context.Background(), "icinga:history:stream:comment").Result()
require.NoError(b, err, "XINFO should not fail")
return result.Length
}
writtenCount := func() int64 {
var count int64
err := db.Get(&count, "SELECT COUNT(*) FROM comment_history")
require.NoError(b, err, "SELECT COUNT(*) should not fail")
return count
}
lastPending := pendingCount()
b.Logf("current stream length: %d", lastPending)
b.StartTimer()
idb := it.IcingaDbInstance(r, rdb)
defer idb.Cleanup()
ticker := time.NewTicker(5 * time.Millisecond)
defer ticker.Stop()
logTicker := time.NewTicker(1 * time.Second)
defer logTicker.Stop()
timeout := time.NewTicker(5 * time.Minute)
defer timeout.Stop()
loop:
for {
select {
case <-ticker.C:
if pendingCount() == 0 && writtenCount() >= numComments {
break loop
}
case <-logTicker.C:
if p := pendingCount(); p > 0 {
b.Logf("last second: %d, pending: %d", lastPending-p, p)
lastPending = p
} else {
logTicker.Stop()
}
case <-timeout.C:
b.Fatal("did not drain stream in time")
}
}
b.StopTimer()
}
|