summaryrefslogtreecommitdiffstats
path: root/tests/history_test.go
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:36:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:36:04 +0000
commitb09c6d56832eb1718c07d74abf3bc6ae3fe4e030 (patch)
treed2caec2610d4ea887803ec9e9c3cd77136c448ba /tests/history_test.go
parentInitial commit. (diff)
downloadicingadb-b09c6d56832eb1718c07d74abf3bc6ae3fe4e030.tar.xz
icingadb-b09c6d56832eb1718c07d74abf3bc6ae3fe4e030.zip
Adding upstream version 1.1.0.upstream/1.1.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/history_test.go')
-rw-r--r--tests/history_test.go835
1 files changed, 835 insertions, 0 deletions
diff --git a/tests/history_test.go b/tests/history_test.go
new file mode 100644
index 0000000..e720f02
--- /dev/null
+++ b/tests/history_test.go
@@ -0,0 +1,835 @@
+package icingadb_test
+
+import (
+ "bytes"
+ "context"
+ _ "embed"
+ "encoding/json"
+ "fmt"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icinga-testing/services"
+ "github.com/icinga/icinga-testing/utils"
+ "github.com/icinga/icinga-testing/utils/eventually"
+ "github.com/icinga/icinga-testing/utils/pki"
+ "github.com/jmoiron/sqlx"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/zap"
+ "io/ioutil"
+ "math"
+ "net/http"
+ "sort"
+ "strconv"
+ "testing"
+ "text/template"
+ "time"
+)
+
+//go:embed history_test_zones.conf
+var historyZonesConfRaw string
+var historyZonesConfTemplate = template.Must(template.New("zones.conf").Parse(historyZonesConfRaw))
+
+func TestHistory(t *testing.T) {
+ t.Run("SingleNode", func(t *testing.T) {
+ testHistory(t, 1)
+ })
+
+ t.Run("HA", func(t *testing.T) {
+ testHistory(t, 2)
+ })
+}
+
+func testHistory(t *testing.T, numNodes int) {
+ rdb := getDatabase(t)
+
+ ca, err := pki.NewCA()
+ require.NoError(t, err, "generating a CA should succeed")
+
+ type Node struct {
+ Name string
+ Icinga2 services.Icinga2
+ IcingaClient *utils.Icinga2Client
+
+ // Redis server and client for the instance used by the Icinga DB process.
+ Redis services.RedisServer
+ RedisClient *redis.Client
+
+ // Second Redis server and client to verify the consistency of the history streams in a HA setup.
+ // There is no Icinga DB process reading from this Redis so history events are not removed there.
+ ConsistencyRedis services.RedisServer
+ ConsistencyRedisClient *redis.Client
+ }
+
+ nodes := make([]*Node, numNodes)
+
+ for i := range nodes {
+ name := fmt.Sprintf("master-%d", i)
+ redisServer := it.RedisServerT(t)
+ consistencyRedisServer := it.RedisServerT(t)
+ icinga := it.Icinga2NodeT(t, name)
+
+ nodes[i] = &Node{
+ Name: name,
+ Icinga2: icinga,
+ IcingaClient: icinga.ApiClient(),
+ Redis: redisServer,
+ RedisClient: redisServer.Open(),
+ ConsistencyRedis: consistencyRedisServer,
+ ConsistencyRedisClient: consistencyRedisServer.Open(),
+ }
+ }
+
+ zonesConf := bytes.NewBuffer(nil)
+ err = historyZonesConfTemplate.Execute(zonesConf, nodes)
+ require.NoError(t, err, "failed to render zones.conf")
+
+ for _, n := range nodes {
+ cert, err := ca.NewCertificate(n.Name)
+ require.NoError(t, err, "generating cert for %q should succeed", n.Name)
+ n.Icinga2.WriteConfig("etc/icinga2/zones.conf", zonesConf.Bytes())
+ n.Icinga2.WriteConfig("etc/icinga2/features-available/api.conf", []byte(`
+ object ApiListener "api" {
+ accept_config = true
+ accept_commands = true
+ }
+ `))
+ n.Icinga2.WriteConfig("var/lib/icinga2/certs/ca.crt", ca.CertificateToPem())
+ n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".crt", cert.CertificateToPem())
+ n.Icinga2.WriteConfig("var/lib/icinga2/certs/"+n.Name+".key", cert.KeyToPem())
+ n.Icinga2.EnableIcingaDb(n.Redis)
+ n.Icinga2.EnableIcingaDb(n.ConsistencyRedis)
+ err = n.Icinga2.Reload()
+ require.NoError(t, err, "icinga2 should reload without error")
+ it.IcingaDbInstanceT(t, n.Redis, rdb)
+
+ {
+ n := n
+ t.Cleanup(func() {
+ _ = n.RedisClient.Close()
+ _ = n.ConsistencyRedisClient.Close()
+ })
+ }
+ }
+
+ testConsistency := func(t *testing.T, stream string) {
+ if numNodes > 1 {
+ t.Run("Consistency", func(t *testing.T) {
+ var clients []*redis.Client
+ for _, node := range nodes {
+ clients = append(clients, node.ConsistencyRedisClient)
+ }
+ assertStreamConsistency(t, clients, stream)
+ })
+ }
+ }
+
+ eventually.Require(t, func(t require.TestingT) {
+ for i, ni := range nodes {
+ for j, nj := range nodes {
+ if i != j {
+ response, err := ni.IcingaClient.GetJson("/v1/objects/endpoints/" + nj.Name)
+ require.NoErrorf(t, err, "fetching endpoint %q from %q should not fail", nj.Name, ni.Name)
+ require.Equalf(t, 200, response.StatusCode, "fetching endpoint %q from %q should not fail", nj.Name, ni.Name)
+
+ var endpoints ObjectsEndpointsResponse
+ err = json.NewDecoder(response.Body).Decode(&endpoints)
+ require.NoErrorf(t, err, "parsing response from %q for endpoint %q should not fail", ni.Name, nj.Name)
+ require.NotEmptyf(t, endpoints.Results, "response from %q for endpoint %q should contain a result", ni.Name, nj.Name)
+
+ assert.Truef(t, endpoints.Results[0].Attrs.Connected, "endpoint %q should be connected to %q", nj.Name, ni.Name)
+ }
+ }
+ }
+ }, 15*time.Second, 200*time.Millisecond)
+
+ db, err := sqlx.Connect(rdb.Driver(), rdb.DSN())
+ require.NoError(t, err, "connecting to mysql")
+ t.Cleanup(func() { _ = db.Close() })
+
+ client := nodes[0].IcingaClient
+
+ t.Run("Acknowledgement", func(t *testing.T) {
+ const stream = "icinga:history:stream:acknowledgement"
+
+ hostname := utils.UniqueName(t, "host")
+ client.CreateHost(t, hostname, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "enable_active_checks": false,
+ "enable_passive_checks": true,
+ "check_command": "dummy",
+ "max_check_attempts": 1,
+ },
+ })
+
+ processCheckResult(t, client, hostname, 1)
+
+ author := utils.RandomString(8)
+ comment := utils.RandomString(8)
+ req, err := json.Marshal(ActionsAcknowledgeProblemRequest{
+ Type: "Host",
+ Filter: fmt.Sprintf(`host.name==%q`, hostname),
+ Author: author,
+ Comment: comment,
+ })
+ ackTime := time.Now()
+ require.NoError(t, err, "marshal request")
+ response, err := client.PostJson("/v1/actions/acknowledge-problem", bytes.NewBuffer(req))
+ require.NoError(t, err, "acknowledge-problem")
+ require.Equal(t, 200, response.StatusCode, "acknowledge-problem")
+
+ var ackResponse ActionsAcknowledgeProblemResponse
+ err = json.NewDecoder(response.Body).Decode(&ackResponse)
+ require.NoError(t, err, "decode acknowledge-problem response")
+ require.Equal(t, 1, len(ackResponse.Results), "acknowledge-problem should return 1 result")
+ require.Equal(t, http.StatusOK, ackResponse.Results[0].Code, "acknowledge-problem result should have OK status")
+
+ for _, n := range nodes {
+ assertEventuallyDrained(t, n.RedisClient, stream)
+ }
+
+ eventually.Assert(t, func(t require.TestingT) {
+ type Row struct {
+ Author string `db:"author"`
+ Comment string `db:"comment"`
+ }
+
+ var rows []Row
+ err = db.Select(&rows, db.Rebind("SELECT a.author, a.comment FROM history h"+
+ " JOIN host ON host.id = h.host_id"+
+ " JOIN acknowledgement_history a ON a.id = h.acknowledgement_history_id"+
+ " WHERE host.name = ? AND ? < h.event_time AND h.event_time < ?"),
+ hostname, ackTime.Add(-time.Second).UnixMilli(), ackTime.Add(time.Second).UnixMilli())
+ require.NoError(t, err, "select acknowledgement_history")
+
+ require.Equal(t, 1, len(rows), "there should be exactly one acknowledgement history entry")
+ assert.Equal(t, author, rows[0].Author, "acknowledgement author should match")
+ assert.Equal(t, comment, rows[0].Comment, "acknowledgement comment should match")
+ }, 5*time.Second, 200*time.Millisecond)
+
+ testConsistency(t, stream)
+ })
+
+ t.Run("Comment", func(t *testing.T) {
+ const stream = "icinga:history:stream:comment"
+
+ type HistoryEvent struct {
+ Type string `db:"event_type"`
+ Author string `db:"author"`
+ Comment string `db:"comment"`
+ RemovedBy *string `db:"removed_by"`
+ }
+
+ hostname := utils.UniqueName(t, "host")
+ client.CreateHost(t, hostname, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "enable_active_checks": false,
+ "enable_passive_checks": true,
+ "check_command": "dummy",
+ },
+ })
+
+ author := utils.RandomString(8)
+ comment := utils.RandomString(8)
+ req, err := json.Marshal(ActionsAddCommentRequest{
+ Type: "Host",
+ Filter: fmt.Sprintf(`host.name==%q`, hostname),
+ Author: author,
+ Comment: comment,
+ })
+ require.NoError(t, err, "marshal request")
+ response, err := client.PostJson("/v1/actions/add-comment", bytes.NewBuffer(req))
+ require.NoError(t, err, "add-comment")
+ require.Equal(t, 200, response.StatusCode, "add-comment")
+
+ var addResponse ActionsAddCommentResponse
+ err = json.NewDecoder(response.Body).Decode(&addResponse)
+ require.NoError(t, err, "decode add-comment response")
+ require.Equal(t, 1, len(addResponse.Results), "add-comment should return 1 result")
+ require.Equal(t, http.StatusOK, addResponse.Results[0].Code, "add-comment result should have OK status")
+ commentName := addResponse.Results[0].Name
+
+ // Ensure that downtime events have distinct timestamps in millisecond resolution.
+ time.Sleep(10 * time.Millisecond)
+
+ removedBy := utils.RandomString(8)
+ req, err = json.Marshal(ActionsRemoveCommentRequest{
+ Comment: commentName,
+ Author: removedBy,
+ })
+ require.NoError(t, err, "marshal remove-comment request")
+ response, err = client.PostJson("/v1/actions/remove-comment", bytes.NewBuffer(req))
+ require.NoError(t, err, "remove-comment")
+ require.Equal(t, 200, response.StatusCode, "remove-comment")
+
+ expected := []HistoryEvent{
+ {Type: "comment_add", Author: author, Comment: comment, RemovedBy: &removedBy},
+ {Type: "comment_remove", Author: author, Comment: comment, RemovedBy: &removedBy},
+ }
+
+ if !testing.Short() {
+ // Ensure that downtime events have distinct timestamps in millisecond resolution.
+ time.Sleep(10 * time.Millisecond)
+
+ expireAuthor := utils.RandomString(8)
+ expireComment := utils.RandomString(8)
+ expireDelay := time.Second
+ req, err = json.Marshal(ActionsAddCommentRequest{
+ Type: "Host",
+ Filter: fmt.Sprintf(`host.name==%q`, hostname),
+ Author: expireAuthor,
+ Comment: expireComment,
+ Expiry: float64(time.Now().Add(expireDelay).UnixMilli()) / 1000,
+ })
+ require.NoError(t, err, "marshal request")
+ response, err = client.PostJson("/v1/actions/add-comment", bytes.NewBuffer(req))
+ require.NoError(t, err, "add-comment")
+ require.Equal(t, 200, response.StatusCode, "add-comment")
+
+ // Icinga only expires comments every 60 seconds, so wait this long after the expiry time.
+ time.Sleep(expireDelay + 60*time.Second)
+
+ expected = append(expected,
+ HistoryEvent{Type: "comment_add", Author: expireAuthor, Comment: expireComment},
+ HistoryEvent{Type: "comment_remove", Author: expireAuthor, Comment: expireComment},
+ )
+ }
+
+ for _, n := range nodes {
+ assertEventuallyDrained(t, n.RedisClient, stream)
+ }
+
+ eventually.Assert(t, func(t require.TestingT) {
+ var rows []HistoryEvent
+ err = db.Select(&rows, db.Rebind("SELECT h.event_type, c.author, c.comment, c.removed_by"+
+ " FROM history h"+
+ " JOIN comment_history c ON c.comment_id = h.comment_history_id"+
+ " JOIN host ON host.id = c.host_id WHERE host.name = ?"+
+ " ORDER BY h.event_time"), hostname)
+ require.NoError(t, err, "select comment_history")
+
+ assert.Equal(t, expected, rows, "comment history should match")
+ }, 5*time.Second, 200*time.Millisecond)
+
+ testConsistency(t, stream)
+
+ if testing.Short() {
+ t.Skip("skipped comment expiry test")
+ }
+ })
+
+ t.Run("Downtime", func(t *testing.T) {
+ const stream = "icinga:history:stream:downtime"
+
+ type HistoryEvent struct {
+ Event string `db:"event_type"`
+ Author string `db:"author"`
+ Comment string `db:"comment"`
+ Cancelled string `db:"has_been_cancelled"`
+ }
+
+ hostname := utils.UniqueName(t, "host")
+ client.CreateHost(t, hostname, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "enable_active_checks": false,
+ "enable_flapping": true,
+ "enable_passive_checks": true,
+ "check_command": "dummy",
+ },
+ })
+
+ downtimeStart := time.Now()
+ author := utils.RandomString(8)
+ comment := utils.RandomString(8)
+ req, err := json.Marshal(ActionsScheduleDowntimeRequest{
+ Type: "Host",
+ Filter: fmt.Sprintf(`host.name==%q`, hostname),
+ StartTime: downtimeStart.Unix(),
+ EndTime: downtimeStart.Add(time.Hour).Unix(),
+ Fixed: true,
+ Author: author,
+ Comment: comment,
+ })
+ require.NoError(t, err, "marshal request")
+ response, err := client.PostJson("/v1/actions/schedule-downtime", bytes.NewBuffer(req))
+ require.NoError(t, err, "schedule-downtime")
+ require.Equal(t, 200, response.StatusCode, "schedule-downtime")
+
+ var scheduleResponse ActionsScheduleDowntimeResponse
+ err = json.NewDecoder(response.Body).Decode(&scheduleResponse)
+ require.NoError(t, err, "decode schedule-downtime response")
+ require.Equal(t, 1, len(scheduleResponse.Results), "schedule-downtime should return 1 result")
+ require.Equal(t, http.StatusOK, scheduleResponse.Results[0].Code, "schedule-downtime result should have OK status")
+ downtimeName := scheduleResponse.Results[0].Name
+
+ // Ensure that downtime events have distinct timestamps in millisecond resolution.
+ time.Sleep(10 * time.Millisecond)
+
+ req, err = json.Marshal(ActionsRemoveDowntimeRequest{
+ Downtime: downtimeName,
+ Author: utils.RandomString(8),
+ })
+ require.NoError(t, err, "marshal remove-downtime request")
+ response, err = client.PostJson("/v1/actions/remove-downtime", bytes.NewBuffer(req))
+ require.NoError(t, err, "remove-downtime")
+ require.Equal(t, 200, response.StatusCode, "remove-downtime")
+ downtimeEnd := time.Now()
+
+ expected := []HistoryEvent{
+ {Event: "downtime_start", Author: author, Comment: comment, Cancelled: "y"},
+ {Event: "downtime_end", Author: author, Comment: comment, Cancelled: "y"},
+ }
+
+ if !testing.Short() {
+ // Ensure that downtime events have distinct timestamps in second resolution (for start time).
+ time.Sleep(time.Second)
+
+ expireStart := time.Now()
+ expireAuthor := utils.RandomString(8)
+ expireComment := utils.RandomString(8)
+ req, err := json.Marshal(ActionsScheduleDowntimeRequest{
+ Type: "Host",
+ Filter: fmt.Sprintf(`host.name==%q`, hostname),
+ StartTime: expireStart.Unix(),
+ EndTime: expireStart.Add(time.Second).Unix(),
+ Fixed: true,
+ Author: expireAuthor,
+ Comment: expireComment,
+ })
+ require.NoError(t, err, "marshal request")
+ response, err := client.PostJson("/v1/actions/schedule-downtime", bytes.NewBuffer(req))
+ require.NoError(t, err, "schedule-downtime")
+ require.Equal(t, 200, response.StatusCode, "schedule-downtime")
+
+ var scheduleResponse ActionsScheduleDowntimeResponse
+ err = json.NewDecoder(response.Body).Decode(&scheduleResponse)
+ require.NoError(t, err, "decode schedule-downtime response")
+ require.Equal(t, 1, len(scheduleResponse.Results), "schedule-downtime should return 1 result")
+ require.Equal(t, http.StatusOK, scheduleResponse.Results[0].Code, "schedule-downtime result should have OK status")
+
+ // Icinga only expires downtimes every 60 seconds, so wait this long in addition to the downtime duration.
+ time.Sleep(60*time.Second + 1*time.Second)
+
+ expected = append(expected,
+ HistoryEvent{Event: "downtime_start", Author: expireAuthor, Comment: expireComment, Cancelled: "n"},
+ HistoryEvent{Event: "downtime_end", Author: expireAuthor, Comment: expireComment, Cancelled: "n"},
+ )
+
+ downtimeEnd = time.Now()
+ }
+
+ for _, n := range nodes {
+ assertEventuallyDrained(t, n.RedisClient, stream)
+ }
+
+ if !eventually.Assert(t, func(t require.TestingT) {
+ var got []HistoryEvent
+ err = db.Select(&got, db.Rebind("SELECT h.event_type, d.author, d.comment, d.has_been_cancelled"+
+ " FROM history h"+
+ " JOIN host ON host.id = h.host_id"+
+ // Joining downtime_history checks that events are written to it.
+ " JOIN downtime_history d ON d.downtime_id = h.downtime_history_id"+
+ " WHERE host.name = ? AND ? < h.event_time AND h.event_time < ?"+
+ " ORDER BY h.event_time"),
+ hostname, downtimeStart.Add(-time.Second).UnixMilli(), downtimeEnd.Add(time.Second).UnixMilli())
+ require.NoError(t, err, "select downtime_history")
+
+ assert.Equal(t, expected, got, "downtime history should match expected result")
+ }, 5*time.Second, 200*time.Millisecond) {
+ t.Logf("\n%s", utils.MustT(t).String(utils.PrettySelect(db,
+ "SELECT h.event_time, h.event_type FROM history h"+
+ " JOIN host ON host.id = h.host_id"+
+ " LEFT JOIN downtime_history d ON d.downtime_id = h.downtime_history_id"+
+ " WHERE host.name = ?"+
+ " ORDER BY h.event_time", hostname)))
+ }
+
+ testConsistency(t, stream)
+
+ if testing.Short() {
+ t.Skip("skipped expiring downtime")
+ }
+ })
+
+ t.Run("Flapping", func(t *testing.T) {
+ const stream = "icinga:history:stream:flapping"
+
+ hostname := utils.UniqueName(t, "host")
+ client.CreateHost(t, hostname, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "enable_active_checks": false,
+ "enable_flapping": true,
+ "enable_passive_checks": true,
+ "check_command": "dummy",
+ },
+ })
+
+ timeBefore := time.Now()
+ for i := 0; i < 10; i++ {
+ processCheckResult(t, client, hostname, 0)
+ processCheckResult(t, client, hostname, 1)
+ }
+ for i := 0; i < 20; i++ {
+ processCheckResult(t, client, hostname, 0)
+ }
+ timeAfter := time.Now()
+
+ for _, n := range nodes {
+ assertEventuallyDrained(t, n.RedisClient, stream)
+ }
+
+ eventually.Assert(t, func(t require.TestingT) {
+ var rows []string
+ err = db.Select(&rows, db.Rebind("SELECT h.event_type FROM history h"+
+ " JOIN host ON host.id = h.host_id"+
+ // Joining flapping_history checks that events are written to it.
+ " JOIN flapping_history f ON f.id = h.flapping_history_id"+
+ " WHERE host.name = ? AND ? < h.event_time AND h.event_time < ?"+
+ " ORDER BY h.event_time"),
+ hostname, timeBefore.Add(-time.Second).UnixMilli(), timeAfter.Add(time.Second).UnixMilli())
+ require.NoError(t, err, "select flapping_history")
+
+ require.Equal(t, []string{"flapping_start", "flapping_end"}, rows,
+ "flapping history should match expected result")
+ }, 5*time.Second, 200*time.Millisecond)
+
+ testConsistency(t, stream)
+ })
+
+ t.Run("Notification", func(t *testing.T) {
+ const stream = "icinga:history:stream:notification"
+
+ hostname := utils.UniqueName(t, "host")
+ client.CreateHost(t, hostname, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "enable_active_checks": false,
+ "enable_flapping": true,
+ "enable_passive_checks": true,
+ "check_command": "dummy",
+ "max_check_attempts": 1,
+ },
+ })
+
+ users := make([]string, 5)
+ for i := range users {
+ users[i] = utils.UniqueName(t, "user")
+ client.CreateObject(t, "users", users[i], nil)
+ }
+
+ // Sort users so that the SQL query can use ORDER BY and the resulting slices can just be compared for equality.
+ sort.Slice(users, func(i, j int) bool { return users[i] < users[j] })
+
+ command := utils.UniqueName(t, "notificationcommand")
+ client.CreateObject(t, "notificationcommands", command, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "command": []string{"true"},
+ },
+ })
+
+ notification := utils.UniqueName(t, "notification")
+ client.CreateObject(t, "notifications", hostname+"!"+notification, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "users": users,
+ "command": command,
+ },
+ })
+
+ type Notification struct {
+ Type string `db:"type"`
+ User string `db:"username"`
+ }
+
+ var expected []Notification
+
+ timeBefore := time.Now()
+ processCheckResult(t, client, hostname, 1)
+ for _, u := range users {
+ expected = append(expected, Notification{Type: "problem", User: u})
+ }
+ processCheckResult(t, client, hostname, 0)
+ for _, u := range users {
+ expected = append(expected, Notification{Type: "recovery", User: u})
+ }
+ timeAfter := time.Now()
+
+ for _, n := range nodes {
+ assertEventuallyDrained(t, n.RedisClient, stream)
+ }
+
+ eventually.Assert(t, func(t require.TestingT) {
+ var rows []Notification
+ err = db.Select(&rows, db.Rebind("SELECT n.type, COALESCE(u.name, '') AS username FROM history h"+
+ " JOIN host ON host.id = h.host_id"+
+ " JOIN notification_history n ON n.id = h.notification_history_id"+
+ " LEFT JOIN user_notification_history un ON un.notification_history_id = n.id"+
+ ` LEFT JOIN "user" u ON u.id = un.user_id`+
+ " WHERE host.name = ? AND ? < h.event_time AND h.event_time < ?"+
+ " ORDER BY h.event_time, username"),
+ hostname, timeBefore.Add(-time.Second).UnixMilli(), timeAfter.Add(time.Second).UnixMilli())
+ require.NoError(t, err, "select notification_history")
+
+ require.Equal(t, expected, rows, "notification history should match expected result")
+ }, 5*time.Second, 200*time.Millisecond)
+
+ testConsistency(t, stream)
+ })
+
+ t.Run("State", func(t *testing.T) {
+ const stream = "icinga:history:stream:state"
+
+ hostname := utils.UniqueName(t, "host")
+ client.CreateHost(t, hostname, map[string]interface{}{
+ "attrs": map[string]interface{}{
+ "enable_active_checks": false,
+ "enable_passive_checks": true,
+ "check_command": "dummy",
+ "max_check_attempts": 2,
+ },
+ })
+
+ type State struct {
+ Type string `db:"state_type"`
+ Soft int `db:"soft_state"`
+ Hard int `db:"hard_state"`
+ }
+
+ var expected []State
+
+ timeBefore := time.Now()
+ processCheckResult(t, client, hostname, 0) // UNKNOWN -> UP (hard)
+ expected = append(expected, State{Type: "hard", Soft: 0, Hard: 0})
+ processCheckResult(t, client, hostname, 1) // -> DOWN (soft)
+ expected = append(expected, State{Type: "soft", Soft: 1, Hard: 0})
+ processCheckResult(t, client, hostname, 1) // -> DOWN (hard)
+ expected = append(expected, State{Type: "hard", Soft: 1, Hard: 1})
+ processCheckResult(t, client, hostname, 1) // -> DOWN
+ processCheckResult(t, client, hostname, 0) // -> UP (hard)
+ expected = append(expected, State{Type: "hard", Soft: 0, Hard: 0})
+ processCheckResult(t, client, hostname, 1) // -> DOWN (soft)
+ expected = append(expected, State{Type: "soft", Soft: 1, Hard: 0})
+ processCheckResult(t, client, hostname, 0) // -> UP (hard)
+ expected = append(expected, State{Type: "hard", Soft: 0, Hard: 0})
+ processCheckResult(t, client, hostname, 0) // -> UP
+ processCheckResult(t, client, hostname, 1) // -> down (soft)
+ expected = append(expected, State{Type: "soft", Soft: 1, Hard: 0})
+ processCheckResult(t, client, hostname, 1) // -> DOWN (hard)
+ expected = append(expected, State{Type: "hard", Soft: 1, Hard: 1})
+ processCheckResult(t, client, hostname, 0) // -> UP (hard)
+ expected = append(expected, State{Type: "hard", Soft: 0, Hard: 0})
+ timeAfter := time.Now()
+
+ for _, n := range nodes {
+ assertEventuallyDrained(t, n.RedisClient, stream)
+ }
+
+ eventually.Assert(t, func(t require.TestingT) {
+
+ var rows []State
+ err = db.Select(&rows, db.Rebind("SELECT s.state_type, s.soft_state, s.hard_state FROM history h"+
+ " JOIN host ON host.id = h.host_id JOIN state_history s ON s.id = h.state_history_id"+
+ " WHERE host.name = ? AND ? < h.event_time AND h.event_time < ?"+
+ " ORDER BY h.event_time"),
+ hostname, timeBefore.Add(-time.Second).UnixMilli(), timeAfter.Add(time.Second).UnixMilli())
+ require.NoError(t, err, "select state_history")
+
+ require.Equal(t, expected, rows, "state history does not match expected result")
+ }, 5*time.Second, 200*time.Millisecond)
+
+ testConsistency(t, stream)
+ })
+}
+
+func assertEventuallyDrained(t testing.TB, redis *redis.Client, stream string) {
+ eventually.Assert(t, func(t require.TestingT) {
+ result, err := redis.XRange(context.Background(), stream, "-", "+").Result()
+ require.NoError(t, err, "reading %s should not fail", stream)
+ assert.Empty(t, result, "%s should eventually be drained", stream)
+ }, 5*time.Second, 10*time.Millisecond)
+}
+
+func assertStreamConsistency(t testing.TB, clients []*redis.Client, stream string) {
+ messages := make([][]map[string]interface{}, len(clients))
+
+ for i, c := range clients {
+ xmessages, err := c.XRange(context.Background(), stream, "-", "+").Result()
+ require.NoError(t, err, "reading %s should not fail", stream)
+ assert.NotEmpty(t, xmessages, "%s should not be empty on the Redis server %d", stream, i)
+
+ // Convert []XMessage into a slice of just the values. The IDs are generated by the Redis server based
+ // on the time the entry was written to the stream, so these IDs are expected to differ.
+ ms := make([]map[string]interface{}, 0, len(xmessages))
+ for _, xmessage := range xmessages {
+ values := xmessage.Values
+
+ // Delete endpoint_id as this is supposed to differ between both history streams as each endpoint
+ // writes its own ID into the stream.
+ delete(values, "endpoint_id")
+
+ // users_notified_ids represents a set, so order does not matter, sort for the comparison later.
+ if idsJson, ok := values["users_notified_ids"]; ok {
+ var ids []string
+ err := json.Unmarshal([]byte(idsJson.(string)), &ids)
+ require.NoError(t, err, "if users_notified_ids is present, it must be a JSON list of strings")
+ sort.Strings(ids)
+ values["users_notified_ids"] = ids
+ }
+
+ ms = append(ms, values)
+ }
+ sort.Slice(ms, func(i, j int) bool {
+ eventTime := func(v map[string]interface{}) uint64 {
+ s, ok := v["event_time"].(string)
+ if !ok {
+ return 0
+ }
+ u, err := strconv.ParseUint(s, 10, 64)
+ if err != nil {
+ return 0
+ }
+ return u
+ }
+
+ sortKey := func(v map[string]interface{}) string {
+ return fmt.Sprintf("%020d-%s", eventTime(v), v["event_id"])
+ }
+
+ return sortKey(ms[i]) < sortKey(ms[j])
+ })
+ messages[i] = ms
+ }
+
+ for i := 0; i < len(messages)-1; i++ {
+ assert.Equal(t, messages[i], messages[i+1], "%s should be equal on both Redis servers", stream)
+ }
+}
+
+func processCheckResult(t *testing.T, client *utils.Icinga2Client, hostname string, status int) time.Time {
+ // Ensure that check results have distinct timestamps in millisecond resolution.
+ time.Sleep(10 * time.Millisecond)
+
+ output := utils.RandomString(8)
+ reqBody, err := json.Marshal(ActionsProcessCheckResultRequest{
+ Type: "Host",
+ Filter: fmt.Sprintf(`host.name==%q`, hostname),
+ ExitStatus: status,
+ PluginOutput: output,
+ })
+ require.NoError(t, err, "marshal request")
+ response, err := client.PostJson("/v1/actions/process-check-result", bytes.NewBuffer(reqBody))
+ require.NoError(t, err, "process-check-result")
+ if !assert.Equal(t, 200, response.StatusCode, "process-check-result") {
+ body, err := ioutil.ReadAll(response.Body)
+ require.NoError(t, err, "reading process-check-result response")
+ it.Logger(t).Error("process-check-result", zap.ByteString("api-response", body))
+ t.FailNow()
+ }
+
+ response, err = client.GetJson("/v1/objects/hosts/" + hostname)
+ require.NoError(t, err, "get host: request")
+ require.Equal(t, 200, response.StatusCode, "get host: request")
+
+ var hosts ObjectsHostsResponse
+ err = json.NewDecoder(response.Body).Decode(&hosts)
+ require.NoError(t, err, "get host: parse response")
+
+ require.Equal(t, 1, len(hosts.Results), "there must be one host in the response")
+ host := hosts.Results[0]
+ require.Equal(t, output, host.Attrs.LastCheckResult.Output,
+ "last check result should be visible in host object")
+ require.Equal(t, status, host.Attrs.State, "state should match check result")
+
+ sec, nsec := math.Modf(host.Attrs.LastCheckResult.ExecutionEnd)
+ return time.Unix(int64(sec), int64(nsec*1e9))
+}
+
+type ActionsAcknowledgeProblemRequest struct {
+ Type string `json:"type"`
+ Filter string `json:"filter"`
+ Author string `json:"author"`
+ Comment string `json:"comment"`
+}
+
+type ActionsAcknowledgeProblemResponse struct {
+ Results []struct {
+ Code int `json:"code"`
+ Status string `json:"status"`
+ } `json:"results"`
+}
+
+type ActionsAddCommentRequest struct {
+ Type string `json:"type"`
+ Filter string `json:"filter"`
+ Author string `json:"author"`
+ Comment string `json:"comment"`
+ Expiry float64 `json:"expiry"`
+}
+
+type ActionsAddCommentResponse struct {
+ Results []struct {
+ Code int `json:"code"`
+ LegacyId int `json:"legacy_id"`
+ Name string `json:"name"`
+ Status string `json:"status"`
+ } `json:"results"`
+}
+
+type ActionsRemoveCommentRequest struct {
+ Comment string `json:"comment"`
+ Author string `json:"author"`
+}
+
+type ActionsProcessCheckResultRequest struct {
+ Type string `json:"type"`
+ Filter string `json:"filter"`
+ ExitStatus int `json:"exit_status"`
+ PluginOutput string `json:"plugin_output"`
+}
+
+type ActionsRemoveDowntimeRequest struct {
+ Downtime string `json:"downtime"`
+ Author string `json:"author"`
+}
+
+type ActionsScheduleDowntimeRequest struct {
+ Type string `json:"type"`
+ Filter string `json:"filter"`
+ StartTime int64 `json:"start_time"`
+ EndTime int64 `json:"end_time"`
+ Fixed bool `json:"fixed"`
+ Duration float64 `json:"duration"`
+ Author string `json:"author"`
+ Comment string `json:"comment"`
+}
+
+type ActionsScheduleDowntimeResponse struct {
+ Results []struct {
+ Code int `json:"code"`
+ Name string `json:"name"`
+ Status string `json:"status"`
+ } `json:"results"`
+}
+
+type ObjectsHostsResponse struct {
+ Results []struct {
+ Attrs struct {
+ State int `json:"state"`
+ StateType int `json:"state_type"`
+ LastCheckResult struct {
+ ExecutionEnd float64 `json:"execution_end"`
+ ExitStatus int `json:"exit_status"`
+ Output string `json:"output"`
+ } `json:"last_check_result"`
+ LastHardState int `json:"last_hard_state"`
+ LastHardStateChange float64 `json:"last_hard_state_change"`
+ LastState int `json:"last_state"`
+ } `json:"attrs"`
+ } `json:"results"`
+}
+
+type ObjectsEndpointsResponse struct {
+ Results []struct {
+ Name string `json:"name"`
+ Attrs struct {
+ Connected bool `json:"connected"`
+ } `json:"attrs"`
+ } `json:"results"`
+}