summaryrefslogtreecommitdiffstats
path: root/src/peers.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/peers.c1003
1 files changed, 517 insertions, 486 deletions
diff --git a/src/peers.c b/src/peers.c
index 9ba3d9b..4ec981c 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -49,57 +49,12 @@
#include <haproxy/tools.h>
#include <haproxy/trace.h>
-
-/*******************************/
-/* Current peer learning state */
-/*******************************/
-
-/******************************/
-/* Current peers section resync state */
-/******************************/
-#define PEERS_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
-#define PEERS_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
-#define PEERS_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
-#define PEERS_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
-#define PEERS_F_RESYNC_LOCALTIMEOUT 0x00000010 /* Timeout waiting for a full resync from a local node */
-#define PEERS_F_RESYNC_REMOTETIMEOUT 0x00000020 /* Timeout waiting for a full resync from a remote node */
-#define PEERS_F_RESYNC_LOCALABORT 0x00000040 /* Session aborted learning from a local node */
-#define PEERS_F_RESYNC_REMOTEABORT 0x00000080 /* Session aborted learning from a remote node */
-#define PEERS_F_RESYNC_LOCALFINISHED 0x00000100 /* A local node teach us and was fully up to date */
-#define PEERS_F_RESYNC_REMOTEFINISHED 0x00000200 /* A remote node teach us and was fully up to date */
-#define PEERS_F_RESYNC_LOCALPARTIAL 0x00000400 /* A local node teach us but was partially up to date */
-#define PEERS_F_RESYNC_REMOTEPARTIAL 0x00000800 /* A remote node teach us but was partially up to date */
-#define PEERS_F_RESYNC_LOCALASSIGN 0x00001000 /* A local node was assigned for a full resync */
-#define PEERS_F_RESYNC_REMOTEASSIGN 0x00002000 /* A remote node was assigned for a full resync */
-#define PEERS_F_RESYNC_REQUESTED 0x00004000 /* A resync was explicitly requested */
-#define PEERS_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
- to push data to new process */
-
-#define PEERS_RESYNC_STATEMASK (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE)
-#define PEERS_RESYNC_FROMLOCAL 0x00000000
-#define PEERS_RESYNC_FROMREMOTE PEERS_F_RESYNC_LOCAL
-#define PEERS_RESYNC_FINISHED (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE)
-
/***********************************/
/* Current shared table sync state */
/***********************************/
#define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */
#define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */
-/******************************/
-/* Remote peer teaching state */
-/******************************/
-#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
-#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
-#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
-#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
-#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
-#define PEER_F_ALIVE 0x20000000 /* Used to flag a peer a alive. */
-#define PEER_F_HEARTBEAT 0x40000000 /* Heartbeat message to send. */
-#define PEER_F_DWNGRD 0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */
-
-#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
-#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
#define PEER_RESYNC_TIMEOUT 5000 /* 5 seconds */
#define PEER_RECONNECT_TIMEOUT 5000 /* 5 seconds */
@@ -334,6 +289,7 @@ static const struct trace_event peers_trace_events[] = {
{ .mask = PEERS_EV_SESSREL, .name = "sessrl", .desc = "peer session releasing" },
#define PEERS_EV_PROTOERR (1 << 6)
{ .mask = PEERS_EV_PROTOERR, .name = "protoerr", .desc = "protocol error" },
+ { }
};
static const struct name_desc peers_trace_lockon_args[4] = {
@@ -489,6 +445,38 @@ static const char *statuscode_str(int statuscode)
}
}
+static const char *peer_app_state_str(enum peer_app_state appstate)
+{
+ switch (appstate) {
+ case PEER_APP_ST_STOPPED:
+ return "STOPPED";
+ case PEER_APP_ST_STARTING:
+ return "STARTING";
+ case PEER_APP_ST_RUNNING:
+ return "RUNNING";
+ case PEER_APP_ST_STOPPING:
+ return "STOPPING";
+ default:
+ return "UNKNOWN";
+ }
+}
+
+static const char *peer_learn_state_str(enum peer_learn_state learnstate)
+{
+ switch (learnstate) {
+ case PEER_LR_ST_NOTASSIGNED:
+ return "NOTASSIGNED";
+ case PEER_LR_ST_ASSIGNED:
+ return "ASSIGNED";
+ case PEER_LR_ST_PROCESSING:
+ return "PROCESSING";
+ case PEER_LR_ST_FINISHED:
+ return "FINISHED";
+ default:
+ return "UNKNOWN";
+ }
+}
+
/* This function encode an uint64 to 'dynamic' length format.
The encoded value is written at address *str, and the
caller must assure that size after *str is large enough.
@@ -1059,21 +1047,14 @@ void __peer_session_deinit(struct peer *peer)
/* Re-init current table pointers to force announcement on re-connect */
peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL;
peer->appctx = NULL;
- if (peer->flags & PEER_F_LEARN_ASSIGN) {
- /* unassign current peer for learning */
- peer->flags &= ~(PEER_F_LEARN_ASSIGN);
- peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
- if (peer->local)
- peers->flags |= PEERS_F_RESYNC_LOCALABORT;
- else
- peers->flags |= PEERS_F_RESYNC_REMOTEABORT;
- /* reschedule a resync */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
- }
- /* reset teaching and learning flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
- peer->flags &= PEER_LEARN_RESET;
+ /* reset teaching flags to 0 */
+ peer->flags &= ~PEER_TEACH_FLAGS;
+
+ /* Mark the peer as stopping and wait for the sync task */
+ peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
+ peer->appstate = PEER_APP_ST_STOPPING;
+
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
@@ -1083,8 +1064,9 @@ static int peer_session_init(struct appctx *appctx)
struct stream *s;
struct sockaddr_storage *addr = NULL;
- if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
+ if (!sockaddr_alloc(&addr, &peer->srv->addr, sizeof(peer->srv->addr)))
goto out_error;
+ set_host_port(addr, peer->srv->svc_port);
if (appctx_finalize_startup(appctx, peer->peers->peers_fe, &BUF_NULL) == -1)
goto out_free_addr;
@@ -1393,7 +1375,7 @@ static inline int peer_send_resync_finishedmsg(struct appctx *appctx,
.control.head = { PEER_MSG_CLASS_CONTROL, },
};
- p.control.head[1] = (peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ?
+ p.control.head[1] = (HA_ATOMIC_LOAD(&peers->flags) & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ?
PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL;
TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
@@ -1472,11 +1454,12 @@ static inline int peer_send_error_protomsg(struct appctx *appctx)
/*
* Function used to lookup for recent stick-table updates associated with
- * <st> shared stick-table when a lesson must be taught a peer (PEER_F_LEARN_ASSIGN flag set).
+ * <st> shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED).
*/
static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
{
struct eb32_node *eb;
+ struct stksess *ret;
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb) {
@@ -1496,7 +1479,10 @@ static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_ta
return NULL;
}
- return eb32_entry(eb, struct stksess, upd);
+ ret = eb32_entry(eb, struct stksess, upd);
+ if (!_HA_ATOMIC_LOAD(&ret->seen))
+ _HA_ATOMIC_STORE(&ret->seen, 1);
+ return ret;
}
/*
@@ -1506,6 +1492,7 @@ static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_ta
static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
{
struct eb32_node *eb;
+ struct stksess *ret;
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb) {
@@ -1516,7 +1503,10 @@ static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_tab
return NULL;
}
- return eb32_entry(eb, struct stksess, upd);
+ ret = eb32_entry(eb, struct stksess, upd);
+ if (!_HA_ATOMIC_LOAD(&ret->seen))
+ _HA_ATOMIC_STORE(&ret->seen, 1);
+ return ret;
}
/*
@@ -1526,6 +1516,7 @@ static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_tab
static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
{
struct eb32_node *eb;
+ struct stksess *ret;
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb || eb->key > st->teaching_origin) {
@@ -1533,7 +1524,10 @@ static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_tab
return NULL;
}
- return eb32_entry(eb, struct stksess, upd);
+ ret = eb32_entry(eb, struct stksess, upd);
+ if (!_HA_ATOMIC_LOAD(&ret->seen))
+ _HA_ATOMIC_STORE(&ret->seen, 1);
+ return ret;
}
/*
@@ -1621,10 +1615,7 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
updates_sent++;
if (updates_sent >= peers_max_updates_at_once) {
- /* pretend we're full so that we get back ASAP */
- struct stconn *sc = appctx_sc(appctx);
-
- sc_need_room(sc, 0);
+ applet_have_more_data(appctx);
ret = -1;
break;
}
@@ -1637,7 +1628,7 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
/*
* Function to emit update messages for <st> stick-table when a lesson must
- * be taught to the peer <p> (PEER_F_LEARN_ASSIGN flag set).
+ * be taught to the peer <p> (learn state is not PEER_LR_ST_NOTASSIGNED).
*
* Note that <st> shared stick-table is locked when calling this function, and
* the lock is dropped then re-acquired.
@@ -1650,13 +1641,7 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
struct shared_table *st)
{
- int ret;
-
- HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
- ret = peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
- HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
-
- return ret;
+ return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
}
/*
@@ -2487,73 +2472,27 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
}
/* reset teaching flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
+ peer->flags &= ~PEER_TEACH_FLAGS;
/* flag to start to teach lesson */
- peer->flags |= PEER_F_TEACH_PROCESS;
- peers->flags |= PEERS_F_RESYNC_REQUESTED;
+ peer->flags |= (PEER_F_TEACH_PROCESS|PEER_F_DBG_RESYNC_REQUESTED);
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
NULL, &msg_head[1], peers->local->id, peer->id);
- if (peer->flags & PEER_F_LEARN_ASSIGN) {
- int commit_a_finish = 1;
-
- peer->flags &= ~PEER_F_LEARN_ASSIGN;
- peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
- if (peer->srv->shard) {
- struct peer *ps;
-
- peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
- peer->flags |= PEER_F_LEARN_NOTUP2DATE;
- for (ps = peers->remote; ps; ps = ps->next) {
- if (ps->srv->shard == peer->srv->shard) {
- /* flag all peers from same shard
- * notup2date to disable request
- * of a resync frm them
- */
- ps->flags |= PEER_F_LEARN_NOTUP2DATE;
- }
- else if (ps->srv->shard && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
- /* it remains some other shards not requested
- * we don't commit a resync finish to request
- * the other shards
- */
- commit_a_finish = 0;
- }
- }
-
- if (!commit_a_finish) {
- /* it remains some shard to request, we schedule a new request
- */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
- task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
- }
- }
-
- if (commit_a_finish) {
- peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
- if (peer->local)
- peers->flags |= PEERS_F_RESYNC_LOCALFINISHED;
- else
- peers->flags |= PEERS_F_RESYNC_REMOTEFINISHED;
- }
+ if (peer->learnstate == PEER_LR_ST_PROCESSING) {
+ peer->learnstate = PEER_LR_ST_FINISHED;
+ peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
+ task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
peer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
NULL, &msg_head[1], peers->local->id, peer->id);
- if (peer->flags & PEER_F_LEARN_ASSIGN) {
- peer->flags &= ~PEER_F_LEARN_ASSIGN;
- peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
-
- if (peer->local)
- peers->flags |= PEERS_F_RESYNC_LOCALPARTIAL;
- else
- peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
- peer->flags |= PEER_F_LEARN_NOTUP2DATE;
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ if (peer->learnstate == PEER_LR_ST_PROCESSING) {
+ peer->learnstate = PEER_LR_ST_FINISHED;
+ peer->flags |= (PEER_F_LEARN_NOTUP2DATE|PEER_F_WAIT_SYNCTASK_ACK);
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
peer->confirm++;
@@ -2566,7 +2505,7 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
/* If stopping state */
if (stopping) {
/* Close session, push resync no more needed */
- peer->flags |= PEER_F_TEACH_COMPLETE;
+ peer->flags |= PEER_F_LOCAL_TEACH_COMPLETE;
appctx->st0 = PEER_SESS_ST_END;
return 0;
}
@@ -2576,7 +2515,7 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
}
/* reset teaching flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
+ peer->flags &= ~PEER_TEACH_FLAGS;
}
else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
@@ -2650,16 +2589,13 @@ static inline int peer_send_msgs(struct appctx *appctx,
{
int repl;
- /* Need to request a resync */
- if ((peer->flags & PEER_F_LEARN_ASSIGN) &&
- (peers->flags & PEERS_F_RESYNC_ASSIGN) &&
- !(peers->flags & PEERS_F_RESYNC_PROCESS)) {
-
+ /* Need to request a resync (only possible for a remote peer at this stage) */
+ if (peer->learnstate == PEER_LR_ST_ASSIGNED) {
+ BUG_ON(peer->local);
repl = peer_send_resync_reqmsg(appctx, peer, peers);
if (repl <= 0)
return repl;
-
- peers->flags |= PEERS_F_RESYNC_PROCESS;
+ peer->learnstate = PEER_LR_ST_PROCESSING;
}
/* Nothing to read, now we start to write */
@@ -2688,18 +2624,19 @@ static inline int peer_send_msgs(struct appctx *appctx,
}
if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
- HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
- if (!(peer->flags & PEER_F_LEARN_ASSIGN) &&
- (st->last_pushed != st->table->localupdate)) {
+ int must_send;
+ HA_RWLOCK_RDLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
+ must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate);
+ HA_RWLOCK_RDUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
+
+ if (must_send) {
repl = peer_send_teach_process_msgs(appctx, peer, st);
if (repl <= 0) {
- HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
peer->stop_local_table = peer->last_local_table;
return repl;
}
}
- HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
}
else if (!(peer->flags & PEER_F_TEACH_FINISHED)) {
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
@@ -2733,10 +2670,7 @@ static inline int peer_send_msgs(struct appctx *appctx,
updates++;
if (updates >= peers_max_updates_at_once) {
- /* pretend we're full so that we get back ASAP */
- struct stconn *sc = appctx_sc(appctx);
-
- sc_need_room(sc, 0);
+ applet_have_more_data(appctx);
return -1;
}
@@ -2872,88 +2806,16 @@ static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer
}
/*
- * Init <peer> peer after having accepted it at peer protocol level.
- */
-static inline void init_accepted_peer(struct peer *peer, struct peers *peers)
-{
- struct shared_table *st;
-
- peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
- /* Register status code */
- peer->statuscode = PEER_SESS_SC_SUCCESSCODE;
- peer->last_hdshk = now_ms;
-
- /* Awake main task */
- task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
-
- /* Init confirm counter */
- peer->confirm = 0;
-
- /* Init cursors */
- for (st = peer->tables; st ; st = st->next) {
- uint commitid, updateid;
-
- st->last_get = st->last_acked = 0;
- HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
- /* if st->update appears to be in future it means
- * that the last acked value is very old and we
- * remain unconnected a too long time to use this
- * acknowledgement as a reset.
- * We should update the protocol to be able to
- * signal the remote peer that it needs a full resync.
- * Here a partial fix consist to set st->update at
- * the max past value
- */
- if ((int)(st->table->localupdate - st->update) < 0)
- st->update = st->table->localupdate + (2147483648U);
- st->teaching_origin = st->last_pushed = st->update;
- st->flags = 0;
-
- updateid = st->last_pushed;
- commitid = _HA_ATOMIC_LOAD(&st->table->commitupdate);
-
- while ((int)(updateid - commitid) > 0) {
- if (_HA_ATOMIC_CAS(&st->table->commitupdate, &commitid, updateid))
- break;
- __ha_cpu_relax();
- }
-
- HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
- }
-
- /* reset teaching and learning flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
- peer->flags &= PEER_LEARN_RESET;
-
- /* if current peer is local */
- if (peer->local) {
- /* if current host need resyncfrom local and no process assigned */
- if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* assign local peer for a lesson, consider lesson already requested */
- peer->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
- peers->flags |= PEERS_F_RESYNC_LOCALASSIGN;
- }
-
- }
- else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* assign peer for a lesson */
- peer->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
- }
-}
-
-/*
- * Init <peer> peer after having connected it at peer protocol level.
+ * Init <peer> peer after validating a connection at peer protocol level. It may
+ * a incoming or outgoing connection. The peer init must be acknowledge by the
+ * sync task. Message processing is blocked in the meanwhile.
*/
static inline void init_connected_peer(struct peer *peer, struct peers *peers)
{
struct shared_table *st;
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+
/* Init cursors */
for (st = peer->tables; st ; st = st->next) {
uint updateid, commitid;
@@ -2986,28 +2848,25 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
}
+ /* Awake main task to ack the new peer state */
+ task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
+
/* Init confirm counter */
peer->confirm = 0;
- /* reset teaching and learning flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
- peer->flags &= PEER_LEARN_RESET;
+ /* reset teaching flags to 0 */
+ peer->flags &= ~PEER_TEACH_FLAGS;
- /* If current peer is local */
- if (peer->local) {
- /* flag to start to teach lesson */
- peer->flags |= PEER_F_TEACH_PROCESS;
- }
- else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* If peer is remote and resync from remote is needed,
- and no peer currently assigned */
-
- /* assign peer for a lesson */
- peer->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
+ if (peer->local && !(appctx_is_back(peer->appctx))) {
+ /* If the local peer has established the connection (appctx is
+ * on the frontend side), flag it to start to teach lesson.
+ */
+ peer->flags |= PEER_F_TEACH_PROCESS;
}
+
+ /* Mark the peer as starting and wait the sync task */
+ peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
+ peer->appstate = PEER_APP_ST_STARTING;
}
/*
@@ -3024,7 +2883,7 @@ static void peer_io_handler(struct appctx *appctx)
unsigned int maj_ver, min_ver;
int prev_state;
- if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) {
+ if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
co_skip(sc_oc(sc), co_data(sc_oc(sc)));
goto out;
}
@@ -3091,6 +2950,7 @@ switchstate:
*/
curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
peer_session_forceshutdown(curpeer);
+
curpeer->heartbeat = TICK_ETERNITY;
curpeer->coll++;
}
@@ -3127,7 +2987,11 @@ switchstate:
goto switchstate;
}
- init_accepted_peer(curpeer, curpeers);
+ /* Register status code */
+ curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
+ curpeer->last_hdshk = now_ms;
+
+ init_connected_peer(curpeer, curpeers);
/* switch to waiting message state */
_HA_ATOMIC_INC(&connected_peers);
@@ -3216,6 +3080,13 @@ switchstate:
}
}
+ if (curpeer->flags & PEER_F_WAIT_SYNCTASK_ACK)
+ goto out;
+
+ /* local peer is assigned of a lesson, start it */
+ if (curpeer->learnstate == PEER_LR_ST_ASSIGNED && curpeer->local)
+ curpeer->learnstate = PEER_LR_ST_PROCESSING;
+
reql = peer_recv_msg(appctx, (char *)msg_head, sizeof msg_head, &msg_len, &totl);
if (reql <= 0) {
if (reql == -1)
@@ -3348,7 +3219,7 @@ static void peer_session_forceshutdown(struct peer *peer)
/* Pre-configures a peers frontend to accept incoming connections */
void peers_setup_frontend(struct proxy *fe)
{
- fe->last_change = ns_to_sec(now_ns);
+ fe->fe_counters.last_change = ns_to_sec(now_ns);
fe->cap = PR_CAP_FE | PR_CAP_BE;
fe->mode = PR_MODE_PEERS;
fe->maxconn = 0;
@@ -3394,274 +3265,432 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
return NULL;
}
-/*
- * Task processing function to manage re-connect, peer session
- * tasks wakeup on local update and heartbeat. Let's keep it exported so that it
- * resolves in stack traces and "show tasks".
+/* Clear LEARN flags to a given peer, dealing with aborts if it was assigned for
+ * learning. In this case, the resync timeout is re-armed.
*/
-struct task *process_peer_sync(struct task * task, void *context, unsigned int state)
+static void clear_peer_learning_status(struct peer *peer)
{
- struct peers *peers = context;
- struct peer *ps;
- struct shared_table *st;
+ if (peer->learnstate != PEER_LR_ST_NOTASSIGNED) {
+ struct peers *peers = peer->peers;
- task->expire = TICK_ETERNITY;
+ /* unassign current peer for learning */
+ HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN);
+ HA_ATOMIC_OR(&peers->flags, (peer->local ? PEERS_F_DBG_RESYNC_LOCALABORT : PEERS_F_DBG_RESYNC_REMOTEABORT));
- /* Acquire lock for all peers of the section */
- for (ps = peers->remote; ps; ps = ps->next)
- HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
+ /* reschedule a resync */
+ peer->peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
+ peer->learnstate = PEER_LR_ST_NOTASSIGNED;
+ }
+ peer->flags &= ~PEER_F_LEARN_NOTUP2DATE;
+}
- if (!stopping) {
- /* Normal case (not soft stop)*/
+static void sync_peer_learn_state(struct peers *peers, struct peer *peer)
+{
+ unsigned int flags = 0;
- /* resync timeout set to TICK_ETERNITY means we just start
- * a new process and timer was not initialized.
- * We must arm this timer to switch to a request to a remote
- * node if incoming connection from old local process never
- * comes.
- */
- if (peers->resync_timeout == TICK_ETERNITY)
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ if (peer->learnstate != PEER_LR_ST_FINISHED)
+ return;
+
+ /* The learning process is now finished */
+ if (peer->flags & PEER_F_LEARN_NOTUP2DATE) {
+ /* Partial resync */
+ flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALPARTIAL : PEERS_F_DBG_RESYNC_REMOTEPARTIAL);
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
+ else {
+ /* Full resync */
+ struct peer *rem_peer;
+ int commit_a_finish = 1;
+
+ if (peer->srv->shard) {
+ flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL;
+ peer->flags |= PEER_F_LEARN_NOTUP2DATE;
+ for (rem_peer = peers->remote; rem_peer; rem_peer = rem_peer->next) {
+ if (rem_peer->srv->shard && rem_peer != peer) {
+ HA_SPIN_LOCK(PEER_LOCK, &rem_peer->lock);
+ if (rem_peer->srv->shard == peer->srv->shard) {
+ /* flag all peers from same shard
+ * notup2date to disable request
+ * of a resync frm them
+ */
+ rem_peer->flags |= PEER_F_LEARN_NOTUP2DATE;
+ }
+ else if (!(rem_peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
+ /* it remains some other shards not requested
+ * we don't commit a resync finish to request
+ * the other shards
+ */
+ commit_a_finish = 0;
+ }
+ HA_SPIN_UNLOCK(PEER_LOCK, &rem_peer->lock);
+ }
+ }
- if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
- (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* Resync from local peer needed
- no peer was assigned for the lesson
- and no old local peer found
- or resync timeout expire */
+ if (!commit_a_finish) {
+ /* it remains some shard to request, we schedule a new request */
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
+ }
- /* flag no more resync from local, to try resync from remotes */
- peers->flags |= PEERS_F_RESYNC_LOCAL;
- peers->flags |= PEERS_F_RESYNC_LOCALTIMEOUT;
+ if (commit_a_finish) {
+ flags |= (PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_RESYNC_REMOTE_FINISHED);
+ flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALFINISHED : PEERS_F_DBG_RESYNC_REMOTEFINISHED);
+ }
+ }
+ peer->learnstate = PEER_LR_ST_NOTASSIGNED;
+ HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN);
+ HA_ATOMIC_OR(&peers->flags, flags);
- /* reschedule a resync */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ appctx_wakeup(peer->appctx);
+}
+
+/* Synchronise the peer applet state with its associated peers section. This
+ * function handles STARTING->RUNNING and STOPPING->STOPPED transitions.
+ */
+static void sync_peer_app_state(struct peers *peers, struct peer *peer)
+{
+ if (peer->appstate == PEER_APP_ST_STOPPING) {
+ clear_peer_learning_status(peer);
+ peer->appstate = PEER_APP_ST_STOPPED;
+ }
+ else if (peer->appstate == PEER_APP_ST_STARTING) {
+ clear_peer_learning_status(peer);
+ if (peer->local & appctx_is_back(peer->appctx)) {
+ /* if local peer has accepted the connection (appctx is
+ * on the backend side), flag it to learn a lesson and
+ * be sure it will start immediately. This only happens
+ * if no resync is in progress and if the lacal resync
+ * was not already performed.
+ */
+ if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+ /* assign local peer for a lesson */
+ peer->learnstate = PEER_LR_ST_ASSIGNED;
+ HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_LOCALASSIGN);
+ }
+ }
+ else if (!peer->local) {
+ /* If a connection was validated for a remote peer, flag
+ * it to learn a lesson but don't start it yet. The peer
+ * must request it explicitly. This only happens if no
+ * resync is in progress and if the remote resync was
+ * not already performed.
+ */
+ if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+ /* assign remote peer for a lesson */
+ peer->learnstate = PEER_LR_ST_ASSIGNED;
+ HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
+ }
}
+ peer->appstate = PEER_APP_ST_RUNNING;
+ appctx_wakeup(peer->appctx);
+ }
+}
- /* For each session */
- for (ps = peers->remote; ps; ps = ps->next) {
- /* For each remote peers */
- if (!ps->local) {
- if (!ps->appctx) {
- /* no active peer connection */
- if (ps->statuscode == 0 ||
- ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
- ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
- ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
- tick_is_expired(ps->reconnect, now_ms))) {
- /* connection never tried
- * or previous peer connection established with success
- * or previous peer connection failed while connecting
- * and reconnection timer is expired */
-
- /* retry a connect */
- ps->appctx = peer_session_create(peers, ps);
- }
- else if (!tick_is_expired(ps->reconnect, now_ms)) {
- /* If previous session failed during connection
- * but reconnection timer is not expired */
+/* Process the sync task for a running process. It is called from process_peer_sync() only */
+static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
+{
+ struct peer *peer;
+ struct shared_table *st;
- /* reschedule task for reconnect */
- task->expire = tick_first(task->expire, ps->reconnect);
- }
- /* else do nothing */
- } /* !ps->appctx */
- else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
- /* current peer connection is active and established */
- if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
- !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
- /* Resync from a remote is needed
- * and no peer was assigned for lesson
- * and current peer may be up2date */
-
- /* assign peer for the lesson */
- ps->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
-
- /* wake up peer handler to handle a request of resync */
- appctx_wakeup(ps->appctx);
+ /* resync timeout set to TICK_ETERNITY means we just start
+ * a new process and timer was not initialized.
+ * We must arm this timer to switch to a request to a remote
+ * node if incoming connection from old local process never
+ * comes.
+ */
+ if (peers->resync_timeout == TICK_ETERNITY)
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+
+ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
+ (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+ /* Resync from local peer needed
+ no peer was assigned for the lesson
+ and no old local peer found
+ or resync timeout expire */
+
+ /* flag no more resync from local, to try resync from remotes */
+ HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_DBG_RESYNC_LOCALTIMEOUT);
+
+ /* reschedule a resync */
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
+
+ /* For each session */
+ for (peer = peers->remote; peer; peer = peer->next) {
+ HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
+
+ sync_peer_learn_state(peers, peer);
+ sync_peer_app_state(peers, peer);
+
+ /* Peer changes, if any, were now ack by the sync task. Unblock
+ * the peer (any wakeup should already be performed, no need to
+ * do it here)
+ */
+ peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
+
+ /* For each remote peers */
+ if (!peer->local) {
+ if (!peer->appctx) {
+ /* no active peer connection */
+ if (peer->statuscode == 0 ||
+ ((peer->statuscode == PEER_SESS_SC_CONNECTCODE ||
+ peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
+ peer->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
+ tick_is_expired(peer->reconnect, now_ms))) {
+ /* connection never tried
+ * or previous peer connection established with success
+ * or previous peer connection failed while connecting
+ * and reconnection timer is expired */
+
+ /* retry a connect */
+ peer->appctx = peer_session_create(peers, peer);
+ }
+ else if (!tick_is_expired(peer->reconnect, now_ms)) {
+ /* If previous session failed during connection
+ * but reconnection timer is not expired */
+
+ /* reschedule task for reconnect */
+ task->expire = tick_first(task->expire, peer->reconnect);
+ }
+ /* else do nothing */
+ } /* !peer->appctx */
+ else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
+ /* current peer connection is active and established */
+ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
+ !(peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
+ /* Resync from a remote is needed
+ * and no peer was assigned for lesson
+ * and current peer may be up2date */
+
+ /* assign peer for the lesson */
+ peer->learnstate = PEER_LR_ST_ASSIGNED;
+ HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
+
+ /* wake up peer handler to handle a request of resync */
+ appctx_wakeup(peer->appctx);
+ }
+ else {
+ int update_to_push = 0;
+
+ /* Awake session if there is data to push */
+ for (st = peer->tables; st ; st = st->next) {
+ if (st->last_pushed != st->table->localupdate) {
+ /* wake up the peer handler to push local updates */
+ update_to_push = 1;
+ /* There is no need to send a heartbeat message
+ * when some updates must be pushed. The remote
+ * peer will consider <peer> peer as alive when it will
+ * receive these updates.
+ */
+ peer->flags &= ~PEER_F_HEARTBEAT;
+ /* Re-schedule another one later. */
+ peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ /* Refresh reconnect if necessary */
+ if (tick_is_expired(peer->reconnect, now_ms))
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
+ /* We are going to send updates, let's ensure we will
+ * come back to send heartbeat messages or to reconnect.
+ */
+ task->expire = tick_first(peer->reconnect, peer->heartbeat);
+ appctx_wakeup(peer->appctx);
+ break;
+ }
}
- else {
- int update_to_push = 0;
-
- /* Awake session if there is data to push */
- for (st = ps->tables; st ; st = st->next) {
- if (st->last_pushed != st->table->localupdate) {
- /* wake up the peer handler to push local updates */
- update_to_push = 1;
- /* There is no need to send a heartbeat message
- * when some updates must be pushed. The remote
- * peer will consider <ps> peer as alive when it will
- * receive these updates.
- */
- ps->flags &= ~PEER_F_HEARTBEAT;
- /* Re-schedule another one later. */
- ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
- /* Refresh reconnect if necessary */
- if (tick_is_expired(ps->reconnect, now_ms))
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
- /* We are going to send updates, let's ensure we will
- * come back to send heartbeat messages or to reconnect.
+ /* When there are updates to send we do not reconnect
+ * and do not send heartbeat message either.
+ */
+ if (!update_to_push) {
+ if (tick_is_expired(peer->reconnect, now_ms)) {
+ if (peer->flags & PEER_F_ALIVE) {
+ /* This peer was alive during a 'reconnect' period.
+ * Flag it as not alive again for the next period.
*/
- task->expire = tick_first(ps->reconnect, ps->heartbeat);
- appctx_wakeup(ps->appctx);
- break;
- }
- }
- /* When there are updates to send we do not reconnect
- * and do not send heartbeat message either.
- */
- if (!update_to_push) {
- if (tick_is_expired(ps->reconnect, now_ms)) {
- if (ps->flags & PEER_F_ALIVE) {
- /* This peer was alive during a 'reconnect' period.
- * Flag it as not alive again for the next period.
- */
- ps->flags &= ~PEER_F_ALIVE;
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
- }
- else {
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
- ps->heartbeat = TICK_ETERNITY;
- peer_session_forceshutdown(ps);
- ps->no_hbt++;
- }
+ peer->flags &= ~PEER_F_ALIVE;
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
}
- else if (tick_is_expired(ps->heartbeat, now_ms)) {
- ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
- ps->flags |= PEER_F_HEARTBEAT;
- appctx_wakeup(ps->appctx);
+ else {
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
+ peer->heartbeat = TICK_ETERNITY;
+ peer_session_forceshutdown(peer);
+ sync_peer_app_state(peers, peer);
+ peer->no_hbt++;
}
- task->expire = tick_first(ps->reconnect, ps->heartbeat);
}
- }
- /* else do nothing */
- } /* SUCCESSCODE */
- } /* !ps->peer->local */
- } /* for */
-
- /* Resync from remotes expired: consider resync is finished */
- if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
- tick_is_expired(peers->resync_timeout, now_ms)) {
- /* Resync from remote peer needed
- * no peer was assigned for the lesson
- * and resync timeout expire */
-
- /* flag no more resync from remote, consider resync is finished */
- peers->flags |= PEERS_F_RESYNC_REMOTE;
- peers->flags |= PEERS_F_RESYNC_REMOTETIMEOUT;
- }
-
- if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
- /* Resync not finished*/
- /* reschedule task to resync timeout if not expired, to ended resync if needed */
- if (!tick_is_expired(peers->resync_timeout, now_ms))
- task->expire = tick_first(task->expire, peers->resync_timeout);
- }
- } /* !stopping */
- else {
- /* soft stop case */
- if (state & TASK_WOKEN_SIGNAL) {
- /* We've just received the signal */
- if (!(peers->flags & PEERS_F_DONOTSTOP)) {
- /* add DO NOT STOP flag if not present */
- _HA_ATOMIC_INC(&jobs);
- peers->flags |= PEERS_F_DONOTSTOP;
-
- /* disconnect all connected peers to process a local sync
- * this must be done only the first time we are switching
- * in stopping state
- */
- for (ps = peers->remote; ps; ps = ps->next) {
- /* we're killing a connection, we must apply a random delay before
- * retrying otherwise the other end will do the same and we can loop
- * for a while.
- */
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
- if (ps->appctx) {
- peer_session_forceshutdown(ps);
+ else if (tick_is_expired(peer->heartbeat, now_ms)) {
+ peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ peer->flags |= PEER_F_HEARTBEAT;
+ appctx_wakeup(peer->appctx);
+ }
+ task->expire = tick_first(peer->reconnect, peer->heartbeat);
}
}
+ /* else do nothing */
+ } /* SUCCESSCODE */
+ } /* !peer->peer->local */
- /* Set resync timeout for the local peer and request a immediate reconnect */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
- peers->local->reconnect = now_ms;
+ HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
+ } /* for */
+
+ /* Resync from remotes expired or no remote peer: consider resync is finished */
+ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
+ (tick_is_expired(peers->resync_timeout, now_ms) || !peers->remote->next)) {
+ /* Resync from remote peer needed
+ * no peer was assigned for the lesson
+ * and resync timeout expire */
+
+ /* flag no more resync from remote, consider resync is finished */
+ HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_REMOTE_FINISHED|PEERS_F_DBG_RESYNC_REMOTETIMEOUT);
+ }
+
+ if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
+ /* Resync not finished*/
+ /* reschedule task to resync timeout if not expired, to ended resync if needed */
+ if (!tick_is_expired(peers->resync_timeout, now_ms))
+ task->expire = tick_first(task->expire, peers->resync_timeout);
+ }
+}
+
+/* Process the sync task for a stopping process. It is called from process_peer_sync() only */
+static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state)
+{
+ struct peer *peer;
+ struct shared_table *st;
+ static int dont_stop = 0;
+
+ /* For each peer */
+ for (peer = peers->remote; peer; peer = peer->next) {
+ HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
+
+ sync_peer_learn_state(peers, peer);
+ sync_peer_app_state(peers, peer);
+
+ /* Peer changes, if any, were now ack by the sync task. Unblock
+ * the peer (any wakeup should already be performed, no need to
+ * do it here)
+ */
+ peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
+
+ if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) {
+ /* we're killing a connection, we must apply a random delay before
+ * retrying otherwise the other end will do the same and we can loop
+ * for a while.
+ */
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
+ if (peer->appctx) {
+ peer_session_forceshutdown(peer);
+ sync_peer_app_state(peers, peer);
}
}
- ps = peers->local;
- if (ps->flags & PEER_F_TEACH_COMPLETE) {
- if (peers->flags & PEERS_F_DONOTSTOP) {
- /* resync of new process was complete, current process can die now */
- _HA_ATOMIC_DEC(&jobs);
- peers->flags &= ~PEERS_F_DONOTSTOP;
- for (st = ps->tables; st ; st = st->next)
- HA_ATOMIC_DEC(&st->table->refcnt);
- }
+ HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
+ }
+
+ /* We've just received the signal */
+ if (state & TASK_WOKEN_SIGNAL) {
+ if (!dont_stop) {
+ /* add DO NOT STOP flag if not present */
+ _HA_ATOMIC_INC(&jobs);
+ dont_stop = 1;
+
+ /* Set resync timeout for the local peer and request a immediate reconnect */
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ peers->local->reconnect = now_ms;
}
- else if (!ps->appctx) {
- /* Re-arm resync timeout if necessary */
- if (!tick_isset(peers->resync_timeout))
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
- /* If there's no active peer connection */
- if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
- !tick_is_expired(peers->resync_timeout, now_ms) &&
- (ps->statuscode == 0 ||
- ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
- ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
- ps->statuscode == PEER_SESS_SC_TRYAGAIN)) {
- /* The resync is finished for the local peer and
- * the resync timeout is not expired and
- * connection never tried
- * or previous peer connection was successfully established
- * or previous tcp connect succeeded but init state incomplete
- * or during previous connect, peer replies a try again statuscode */
-
- if (!tick_is_expired(ps->reconnect, now_ms)) {
- /* reconnection timer is not expired. reschedule task for reconnect */
- task->expire = tick_first(task->expire, ps->reconnect);
- }
- else {
- /* connect to the local peer if we must push a local sync */
- if (peers->flags & PEERS_F_DONOTSTOP) {
- peer_session_create(peers, ps);
- }
- }
+ peer = peers->local;
+ HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
+ if (peer->flags & PEER_F_LOCAL_TEACH_COMPLETE) {
+ if (dont_stop) {
+ /* resync of new process was complete, current process can die now */
+ _HA_ATOMIC_DEC(&jobs);
+ dont_stop = 0;
+ for (st = peer->tables; st ; st = st->next)
+ HA_ATOMIC_DEC(&st->table->refcnt);
+ }
+ }
+ else if (!peer->appctx) {
+ /* Re-arm resync timeout if necessary */
+ if (!tick_isset(peers->resync_timeout))
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+
+ /* If there's no active peer connection */
+ if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
+ !tick_is_expired(peers->resync_timeout, now_ms) &&
+ (peer->statuscode == 0 ||
+ peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
+ peer->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
+ peer->statuscode == PEER_SESS_SC_TRYAGAIN)) {
+ /* The resync is finished for the local peer and
+ * the resync timeout is not expired and
+ * connection never tried
+ * or previous peer connection was successfully established
+ * or previous tcp connect succeeded but init state incomplete
+ * or during previous connect, peer replies a try again statuscode */
+
+ if (!tick_is_expired(peer->reconnect, now_ms)) {
+ /* reconnection timer is not expired. reschedule task for reconnect */
+ task->expire = tick_first(task->expire, peer->reconnect);
}
- else {
- /* Other error cases */
- if (peers->flags & PEERS_F_DONOTSTOP) {
- /* unable to resync new process, current process can die now */
- _HA_ATOMIC_DEC(&jobs);
- peers->flags &= ~PEERS_F_DONOTSTOP;
- for (st = ps->tables; st ; st = st->next)
- HA_ATOMIC_DEC(&st->table->refcnt);
+ else {
+ /* connect to the local peer if we must push a local sync */
+ if (dont_stop) {
+ peer_session_create(peers, peer);
}
}
}
- else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
- /* Reset resync timeout during a resync */
- peers->resync_timeout = TICK_ETERNITY;
-
- /* current peer connection is active and established
- * wake up all peer handlers to push remaining local updates */
- for (st = ps->tables; st ; st = st->next) {
- if (st->last_pushed != st->table->localupdate) {
- appctx_wakeup(ps->appctx);
- break;
- }
+ else {
+ /* Other error cases */
+ if (dont_stop) {
+ /* unable to resync new process, current process can die now */
+ _HA_ATOMIC_DEC(&jobs);
+ dont_stop = 0;
+ for (st = peer->tables; st ; st = st->next)
+ HA_ATOMIC_DEC(&st->table->refcnt);
}
}
- } /* stopping */
+ }
+ else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
+ /* Reset resync timeout during a resync */
+ peers->resync_timeout = TICK_ETERNITY;
+
+ /* current peer connection is active and established
+ * wake up all peer handlers to push remaining local updates */
+ for (st = peer->tables; st ; st = st->next) {
+ if (st->last_pushed != st->table->localupdate) {
+ appctx_wakeup(peer->appctx);
+ break;
+ }
+ }
+ }
+ HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
+}
- /* Release lock for all peers of the section */
- for (ps = peers->remote; ps; ps = ps->next)
- HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+/*
+ * Task processing function to manage re-connect, peer session
+ * tasks wakeup on local update and heartbeat. Let's keep it exported so that it
+ * resolves in stack traces and "show tasks".
+ */
+struct task *process_peer_sync(struct task * task, void *context, unsigned int state)
+{
+ struct peers *peers = context;
+
+ task->expire = TICK_ETERNITY;
+
+ if (!stopping) {
+ /* Normal case (not soft stop)*/
+ __process_running_peer_sync(task, peers, state);
+
+ }
+ else {
+ /* soft stop case */
+ __process_stopping_peer_sync(task, peers, state);
+ } /* stopping */
/* Wakeup for re-connect */
return task;
@@ -3940,7 +3969,7 @@ static int peers_dump_head(struct buffer *msg, struct appctx *appctx, struct pee
peers,
tm.tm_mday, monthname[tm.tm_mon], tm.tm_year+1900,
tm.tm_hour, tm.tm_min, tm.tm_sec,
- peers->id, peers->disabled, peers->flags,
+ peers->id, peers->disabled, HA_ATOMIC_LOAD(&peers->flags),
peers->resync_timeout ?
tick_is_expired(peers->resync_timeout, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(peers->resync_timeout - now_ms),
@@ -3966,12 +3995,14 @@ static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct pee
struct stream *peer_s;
struct shared_table *st;
- addr_to_str(&peer->addr, pn, sizeof pn);
- chunk_appendf(msg, " %p: id=%s(%s,%s) addr=%s:%d last_status=%s",
+ addr_to_str(&peer->srv->addr, pn, sizeof pn);
+ chunk_appendf(msg, " %p: id=%s(%s,%s) addr=%s:%d app_state=%s learn_state=%s last_status=%s",
peer, peer->id,
peer->local ? "local" : "remote",
peer->appctx ? "active" : "inactive",
- pn, get_host_port(&peer->addr),
+ pn, peer->srv->svc_port,
+ peer_app_state_str(peer->appstate),
+ peer_learn_state_str(peer->learnstate),
statuscode_str(peer->statuscode));
chunk_appendf(msg, " last_hdshk=%s\n",