summaryrefslogtreecommitdiffstats
path: root/tests/unit/cluster/links.tcl
blob: a202c378bdaa78b57168b2aea249bfe1d18212c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
proc get_links_with_peer {this_instance_id peer_nodename} {
    set links [R $this_instance_id cluster links]
    set links_with_peer {}
    foreach l $links {
        if {[dict get $l node] eq $peer_nodename} {
            lappend links_with_peer $l
        }
    }
    return $links_with_peer
}

# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link established toward a peer identified by `peer_nodename`
proc get_link_to_peer {this_instance_id peer_nodename} {
    set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
    foreach l $links_with_peer {
        if {[dict get $l direction] eq "to"} {
            return $l
        }
    }
    return {}
}

# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link accepted from a peer identified by `peer_nodename`
proc get_link_from_peer {this_instance_id peer_nodename} {
    set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
    foreach l $links_with_peer {
        if {[dict get $l direction] eq "from"} {
            return $l
        }
    }
    return {}
}

# Reset cluster links to their original state
proc reset_links {id} {
    set limit [lindex [R $id CONFIG get cluster-link-sendbuf-limit] 1]

    # Set a 1 byte limit and wait for cluster cron to run
    # (executes every 100ms) and terminate links
    R $id CONFIG SET cluster-link-sendbuf-limit 1
    after 150

    # Reset limit
    R $id CONFIG SET cluster-link-sendbuf-limit $limit

    # Wait until the cluster links come back up for each node
    wait_for_condition 50 100 {
        [number_of_links $id] == [expr [number_of_peers $id] * 2]
    } else {
        fail "Cluster links did not come back up"
    }
}

proc number_of_peers {id} {
    expr [llength $::servers] - 1
}

proc number_of_links {id} {
    llength [R $id cluster links]
}

proc publish_messages {server num_msgs msg_size} {
    for {set i 0} {$i < $num_msgs} {incr i} {
        $server PUBLISH channel [string repeat "x" $msg_size]
    }
}

start_cluster 1 2 {tags {external:skip cluster}} {
    set primary_id 0
    set replica1_id 1

    set primary [Rn $primary_id]
    set replica1 [Rn $replica1_id]

    test "Broadcast message across a cluster shard while a cluster link is down" {
        set replica1_node_id [$replica1 CLUSTER MYID]

        set channelname ch3

        # subscribe on replica1
        set subscribeclient1 [redis_deferring_client -1]
        $subscribeclient1 deferred 1
        $subscribeclient1 SSUBSCRIBE $channelname
        $subscribeclient1 read

        # subscribe on replica2
        set subscribeclient2 [redis_deferring_client -2]
        $subscribeclient2 deferred 1
        $subscribeclient2 SSUBSCRIBE $channelname
        $subscribeclient2 read

        # Verify number of links with cluster stable state
        assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id]

        # Disconnect the cluster between primary and replica1 and publish a message.
        $primary MULTI
        $primary DEBUG CLUSTERLINK KILL TO $replica1_node_id
        $primary SPUBLISH $channelname hello
        set res [$primary EXEC]

        # Verify no client exists on the primary to receive the published message.
        assert_equal $res {OK 0}

        # Wait for all the cluster links are healthy
        wait_for_condition 50 100 {
            [number_of_peers $primary_id]*2 == [number_of_links $primary_id]
        } else {
            fail "All peer links couldn't be established"
        }

        # Publish a message afterwards.
        $primary SPUBLISH $channelname world

        # Verify replica1 has received only (world) / hello is lost.
        assert_equal "smessage ch3 world" [$subscribeclient1 read]

        # Verify replica2 has received both messages (hello/world)
        assert_equal "smessage ch3 hello" [$subscribeclient2 read]
        assert_equal "smessage ch3 world" [$subscribeclient2 read]
    } {} {needs:debug}
}

start_cluster 3 0 {tags {external:skip cluster}} {
    test "Each node has two links with each peer" {
        for {set id 0} {$id < [llength $::servers]} {incr id} {
            # Assert that from point of view of each node, there are two links for
            # each peer. It might take a while for cluster to stabilize so wait up
            # to 5 seconds.
            wait_for_condition 50 100 {
                [number_of_peers $id]*2 == [number_of_links $id]
            } else {
                assert_equal [expr [number_of_peers $id]*2] [number_of_links $id]
            }

            set nodes [get_cluster_nodes $id]
            set links [R $id cluster links]

            # For each peer there should be exactly one
            # link "to" it and one link "from" it.
            foreach n $nodes {
                if {[cluster_has_flag $n myself]} continue
                set peer [dict get $n id]
                set to 0
                set from 0
                foreach l $links {
                    if {[dict get $l node] eq $peer} {
                        if {[dict get $l direction] eq "to"} {
                            incr to
                        } elseif {[dict get $l direction] eq "from"} {
                            incr from
                        }
                    }
                }
                assert {$to eq 1}
                assert {$from eq 1}
            }
        }
    }

    test {Validate cluster links format} {
        set lines [R 0 cluster links]
        foreach l $lines {
            if {$l eq {}} continue
            assert_equal [llength $l] 12
            assert_equal 1 [dict exists $l "direction"]
            assert_equal 1 [dict exists $l "node"]
            assert_equal 1 [dict exists $l "create-time"]
            assert_equal 1 [dict exists $l "events"]
            assert_equal 1 [dict exists $l "send-buffer-allocated"]
            assert_equal 1 [dict exists $l "send-buffer-used"]
        }
    }

    set primary1_id 0
    set primary2_id 1

    set primary1 [Rn $primary1_id]
    set primary2 [Rn $primary2_id]

    test "Disconnect link when send buffer limit reached" {
        # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts
        set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1]
        $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000]

        # Get primary1's links with primary2
        set primary2_name [dict get [cluster_get_myself $primary2_id] id]
        set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
        set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]

        # On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be
        # overflowed by regular gossip messages but also small enough that it doesn't take too much
        # memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this
        # limit is overflowed in some RAM-limited test environments.
        set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1]
        $primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024]
        assert {[CI $primary1_id total_cluster_links_buffer_limit_exceeded] eq 0}

        # To manufacture an ever-growing send buffer from primary1 to primary2,
        # make primary2 unresponsive.
        set primary2_pid [srv [expr -1*$primary2_id] pid]
        pause_process $primary2_pid

        # On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from
        # primary1 to primary2 exceeds buffer limit therefore be dropped.
        # For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP
        # receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB
        # messages should be sufficient.
        set i 0
        wait_for_condition 100 0 {
            [catch {incr i} e] == 0 &&
            [catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 &&
            [catch {after 500} e] == 0 &&
            [CI $primary1_id total_cluster_links_buffer_limit_exceeded] >= 1
        } else {
            fail "Cluster link not freed as expected"
        }

        # A new link to primary2 should have been recreated
        set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
        assert {[dict get $new_link_p1_to_p2 create-time] > [dict get $orig_link_p1_to_p2 create-time]}

        # Link from primary2 should not be affected
        set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
        assert {[dict get $same_link_p1_from_p2 create-time] eq [dict get $orig_link_p1_from_p2 create-time]}

        # Revive primary2
        resume_process $primary2_pid

        # Reset configs on primary1 so config changes don't leak out to other tests
        $primary1 CONFIG set cluster-node-timeout $oldtimeout
        $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit

        reset_links $primary1_id
    }

    test "Link memory increases with publishes" {
        set server_id 0
        set server [Rn $server_id]
        set msg_size 10000
        set num_msgs 10

        # Remove any sendbuf limit
        $primary1 CONFIG set cluster-link-sendbuf-limit 0

        # Publish ~100KB to one of the servers
        $server MULTI
        $server INFO memory
        publish_messages $server $num_msgs $msg_size
        $server INFO memory
        set res [$server EXEC]

        set link_mem_before_pubs [getInfoProperty $res mem_cluster_links]

        # Remove the first half of the response string which contains the
        # first "INFO memory" results and search for the property again
        set res [string range $res [expr [string length $res] / 2] end]
        set link_mem_after_pubs [getInfoProperty $res mem_cluster_links]
        
        # We expect the memory to have increased by more than
        # the culmulative size of the publish messages
        set mem_diff_floor [expr $msg_size * $num_msgs]
        set mem_diff [expr $link_mem_after_pubs - $link_mem_before_pubs]
        assert {$mem_diff > $mem_diff_floor}

        # Reset links to ensure no leftover data for the next test
        reset_links $server_id
    }

    test "Link memory resets after publish messages flush" {
        set server [Rn 0]
        set msg_size 100000
        set num_msgs 10

        set link_mem_before [status $server mem_cluster_links]

        # Publish ~1MB to one of the servers
        $server MULTI
        publish_messages $server $num_msgs $msg_size
        $server EXEC

        # Wait until the cluster link memory has returned to below the pre-publish value.
        # We can't guarantee it returns to the exact same value since gossip messages
        # can cause the values to fluctuate.
        wait_for_condition 1000 500 {
            [status $server mem_cluster_links] <= $link_mem_before
        } else {
            fail "Cluster link memory did not settle back to expected range"
        }
    }
}