From cff6d757e3ba609c08ef2aaa00f07e53551e5bf6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 3 Jun 2024 07:11:10 +0200 Subject: Adding upstream version 3.0.0. Signed-off-by: Daniel Baumann --- src/peers.c | 1003 ++++++++++++++++++++++++++++++----------------------------- 1 file changed, 517 insertions(+), 486 deletions(-) (limited to 'src/peers.c') diff --git a/src/peers.c b/src/peers.c index 9ba3d9b..4ec981c 100644 --- a/src/peers.c +++ b/src/peers.c @@ -49,57 +49,12 @@ #include #include - -/*******************************/ -/* Current peer learning state */ -/*******************************/ - -/******************************/ -/* Current peers section resync state */ -/******************************/ -#define PEERS_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */ -#define PEERS_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */ -#define PEERS_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */ -#define PEERS_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */ -#define PEERS_F_RESYNC_LOCALTIMEOUT 0x00000010 /* Timeout waiting for a full resync from a local node */ -#define PEERS_F_RESYNC_REMOTETIMEOUT 0x00000020 /* Timeout waiting for a full resync from a remote node */ -#define PEERS_F_RESYNC_LOCALABORT 0x00000040 /* Session aborted learning from a local node */ -#define PEERS_F_RESYNC_REMOTEABORT 0x00000080 /* Session aborted learning from a remote node */ -#define PEERS_F_RESYNC_LOCALFINISHED 0x00000100 /* A local node teach us and was fully up to date */ -#define PEERS_F_RESYNC_REMOTEFINISHED 0x00000200 /* A remote node teach us and was fully up to date */ -#define PEERS_F_RESYNC_LOCALPARTIAL 0x00000400 /* A local node teach us but was partially up to date */ -#define PEERS_F_RESYNC_REMOTEPARTIAL 0x00000800 /* A remote node teach us but was partially up to date */ -#define PEERS_F_RESYNC_LOCALASSIGN 0x00001000 /* A local node was assigned for a full resync */ -#define PEERS_F_RESYNC_REMOTEASSIGN 0x00002000 /* A remote node was assigned for a full resync */ -#define PEERS_F_RESYNC_REQUESTED 0x00004000 /* A resync was explicitly requested */ -#define PEERS_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop - to push data to new process */ - -#define PEERS_RESYNC_STATEMASK (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE) -#define PEERS_RESYNC_FROMLOCAL 0x00000000 -#define PEERS_RESYNC_FROMREMOTE PEERS_F_RESYNC_LOCAL -#define PEERS_RESYNC_FINISHED (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE) - /***********************************/ /* Current shared table sync state */ /***********************************/ #define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */ #define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */ -/******************************/ -/* Remote peer teaching state */ -/******************************/ -#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */ -#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */ -#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */ -#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */ -#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */ -#define PEER_F_ALIVE 0x20000000 /* Used to flag a peer a alive. */ -#define PEER_F_HEARTBEAT 0x40000000 /* Heartbeat message to send. */ -#define PEER_F_DWNGRD 0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */ - -#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */ -#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE) #define PEER_RESYNC_TIMEOUT 5000 /* 5 seconds */ #define PEER_RECONNECT_TIMEOUT 5000 /* 5 seconds */ @@ -334,6 +289,7 @@ static const struct trace_event peers_trace_events[] = { { .mask = PEERS_EV_SESSREL, .name = "sessrl", .desc = "peer session releasing" }, #define PEERS_EV_PROTOERR (1 << 6) { .mask = PEERS_EV_PROTOERR, .name = "protoerr", .desc = "protocol error" }, + { } }; static const struct name_desc peers_trace_lockon_args[4] = { @@ -489,6 +445,38 @@ static const char *statuscode_str(int statuscode) } } +static const char *peer_app_state_str(enum peer_app_state appstate) +{ + switch (appstate) { + case PEER_APP_ST_STOPPED: + return "STOPPED"; + case PEER_APP_ST_STARTING: + return "STARTING"; + case PEER_APP_ST_RUNNING: + return "RUNNING"; + case PEER_APP_ST_STOPPING: + return "STOPPING"; + default: + return "UNKNOWN"; + } +} + +static const char *peer_learn_state_str(enum peer_learn_state learnstate) +{ + switch (learnstate) { + case PEER_LR_ST_NOTASSIGNED: + return "NOTASSIGNED"; + case PEER_LR_ST_ASSIGNED: + return "ASSIGNED"; + case PEER_LR_ST_PROCESSING: + return "PROCESSING"; + case PEER_LR_ST_FINISHED: + return "FINISHED"; + default: + return "UNKNOWN"; + } +} + /* This function encode an uint64 to 'dynamic' length format. The encoded value is written at address *str, and the caller must assure that size after *str is large enough. @@ -1059,21 +1047,14 @@ void __peer_session_deinit(struct peer *peer) /* Re-init current table pointers to force announcement on re-connect */ peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL; peer->appctx = NULL; - if (peer->flags & PEER_F_LEARN_ASSIGN) { - /* unassign current peer for learning */ - peer->flags &= ~(PEER_F_LEARN_ASSIGN); - peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - if (peer->local) - peers->flags |= PEERS_F_RESYNC_LOCALABORT; - else - peers->flags |= PEERS_F_RESYNC_REMOTEABORT; - /* reschedule a resync */ - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); - } - /* reset teaching and learning flags to 0 */ - peer->flags &= PEER_TEACH_RESET; - peer->flags &= PEER_LEARN_RESET; + /* reset teaching flags to 0 */ + peer->flags &= ~PEER_TEACH_FLAGS; + + /* Mark the peer as stopping and wait for the sync task */ + peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; + peer->appstate = PEER_APP_ST_STOPPING; + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } @@ -1083,8 +1064,9 @@ static int peer_session_init(struct appctx *appctx) struct stream *s; struct sockaddr_storage *addr = NULL; - if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr))) + if (!sockaddr_alloc(&addr, &peer->srv->addr, sizeof(peer->srv->addr))) goto out_error; + set_host_port(addr, peer->srv->svc_port); if (appctx_finalize_startup(appctx, peer->peers->peers_fe, &BUF_NULL) == -1) goto out_free_addr; @@ -1393,7 +1375,7 @@ static inline int peer_send_resync_finishedmsg(struct appctx *appctx, .control.head = { PEER_MSG_CLASS_CONTROL, }, }; - p.control.head[1] = (peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ? + p.control.head[1] = (HA_ATOMIC_LOAD(&peers->flags) & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ? PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL; TRACE_PROTO("send control message", PEERS_EV_CTRLMSG, @@ -1472,11 +1454,12 @@ static inline int peer_send_error_protomsg(struct appctx *appctx) /* * Function used to lookup for recent stick-table updates associated with - * shared stick-table when a lesson must be taught a peer (PEER_F_LEARN_ASSIGN flag set). + * shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED). */ static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st) { struct eb32_node *eb; + struct stksess *ret; eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); if (!eb) { @@ -1496,7 +1479,10 @@ static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_ta return NULL; } - return eb32_entry(eb, struct stksess, upd); + ret = eb32_entry(eb, struct stksess, upd); + if (!_HA_ATOMIC_LOAD(&ret->seen)) + _HA_ATOMIC_STORE(&ret->seen, 1); + return ret; } /* @@ -1506,6 +1492,7 @@ static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_ta static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st) { struct eb32_node *eb; + struct stksess *ret; eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); if (!eb) { @@ -1516,7 +1503,10 @@ static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_tab return NULL; } - return eb32_entry(eb, struct stksess, upd); + ret = eb32_entry(eb, struct stksess, upd); + if (!_HA_ATOMIC_LOAD(&ret->seen)) + _HA_ATOMIC_STORE(&ret->seen, 1); + return ret; } /* @@ -1526,6 +1516,7 @@ static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_tab static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st) { struct eb32_node *eb; + struct stksess *ret; eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); if (!eb || eb->key > st->teaching_origin) { @@ -1533,7 +1524,10 @@ static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_tab return NULL; } - return eb32_entry(eb, struct stksess, upd); + ret = eb32_entry(eb, struct stksess, upd); + if (!_HA_ATOMIC_LOAD(&ret->seen)) + _HA_ATOMIC_STORE(&ret->seen, 1); + return ret; } /* @@ -1621,10 +1615,7 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, updates_sent++; if (updates_sent >= peers_max_updates_at_once) { - /* pretend we're full so that we get back ASAP */ - struct stconn *sc = appctx_sc(appctx); - - sc_need_room(sc, 0); + applet_have_more_data(appctx); ret = -1; break; } @@ -1637,7 +1628,7 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, /* * Function to emit update messages for stick-table when a lesson must - * be taught to the peer

(PEER_F_LEARN_ASSIGN flag set). + * be taught to the peer

(learn state is not PEER_LR_ST_NOTASSIGNED). * * Note that shared stick-table is locked when calling this function, and * the lock is dropped then re-acquired. @@ -1650,13 +1641,7 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p, struct shared_table *st) { - int ret; - - HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); - ret = peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st); - HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->updt_lock); - - return ret; + return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st); } /* @@ -2487,73 +2472,27 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee } /* reset teaching flags to 0 */ - peer->flags &= PEER_TEACH_RESET; + peer->flags &= ~PEER_TEACH_FLAGS; /* flag to start to teach lesson */ - peer->flags |= PEER_F_TEACH_PROCESS; - peers->flags |= PEERS_F_RESYNC_REQUESTED; + peer->flags |= (PEER_F_TEACH_PROCESS|PEER_F_DBG_RESYNC_REQUESTED); } else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) { TRACE_PROTO("received control message", PEERS_EV_CTRLMSG, NULL, &msg_head[1], peers->local->id, peer->id); - if (peer->flags & PEER_F_LEARN_ASSIGN) { - int commit_a_finish = 1; - - peer->flags &= ~PEER_F_LEARN_ASSIGN; - peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - if (peer->srv->shard) { - struct peer *ps; - - peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL; - peer->flags |= PEER_F_LEARN_NOTUP2DATE; - for (ps = peers->remote; ps; ps = ps->next) { - if (ps->srv->shard == peer->srv->shard) { - /* flag all peers from same shard - * notup2date to disable request - * of a resync frm them - */ - ps->flags |= PEER_F_LEARN_NOTUP2DATE; - } - else if (ps->srv->shard && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { - /* it remains some other shards not requested - * we don't commit a resync finish to request - * the other shards - */ - commit_a_finish = 0; - } - } - - if (!commit_a_finish) { - /* it remains some shard to request, we schedule a new request - */ - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); - task_wakeup(peers->sync_task, TASK_WOKEN_MSG); - } - } - - if (commit_a_finish) { - peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE); - if (peer->local) - peers->flags |= PEERS_F_RESYNC_LOCALFINISHED; - else - peers->flags |= PEERS_F_RESYNC_REMOTEFINISHED; - } + if (peer->learnstate == PEER_LR_ST_PROCESSING) { + peer->learnstate = PEER_LR_ST_FINISHED; + peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } peer->confirm++; } else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) { TRACE_PROTO("received control message", PEERS_EV_CTRLMSG, NULL, &msg_head[1], peers->local->id, peer->id); - if (peer->flags & PEER_F_LEARN_ASSIGN) { - peer->flags &= ~PEER_F_LEARN_ASSIGN; - peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - - if (peer->local) - peers->flags |= PEERS_F_RESYNC_LOCALPARTIAL; - else - peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL; - peer->flags |= PEER_F_LEARN_NOTUP2DATE; - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + if (peer->learnstate == PEER_LR_ST_PROCESSING) { + peer->learnstate = PEER_LR_ST_FINISHED; + peer->flags |= (PEER_F_LEARN_NOTUP2DATE|PEER_F_WAIT_SYNCTASK_ACK); task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } peer->confirm++; @@ -2566,7 +2505,7 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee /* If stopping state */ if (stopping) { /* Close session, push resync no more needed */ - peer->flags |= PEER_F_TEACH_COMPLETE; + peer->flags |= PEER_F_LOCAL_TEACH_COMPLETE; appctx->st0 = PEER_SESS_ST_END; return 0; } @@ -2576,7 +2515,7 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee } /* reset teaching flags to 0 */ - peer->flags &= PEER_TEACH_RESET; + peer->flags &= ~PEER_TEACH_FLAGS; } else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) { TRACE_PROTO("received control message", PEERS_EV_CTRLMSG, @@ -2650,16 +2589,13 @@ static inline int peer_send_msgs(struct appctx *appctx, { int repl; - /* Need to request a resync */ - if ((peer->flags & PEER_F_LEARN_ASSIGN) && - (peers->flags & PEERS_F_RESYNC_ASSIGN) && - !(peers->flags & PEERS_F_RESYNC_PROCESS)) { - + /* Need to request a resync (only possible for a remote peer at this stage) */ + if (peer->learnstate == PEER_LR_ST_ASSIGNED) { + BUG_ON(peer->local); repl = peer_send_resync_reqmsg(appctx, peer, peers); if (repl <= 0) return repl; - - peers->flags |= PEERS_F_RESYNC_PROCESS; + peer->learnstate = PEER_LR_ST_PROCESSING; } /* Nothing to read, now we start to write */ @@ -2688,18 +2624,19 @@ static inline int peer_send_msgs(struct appctx *appctx, } if (!(peer->flags & PEER_F_TEACH_PROCESS)) { - HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->updt_lock); - if (!(peer->flags & PEER_F_LEARN_ASSIGN) && - (st->last_pushed != st->table->localupdate)) { + int must_send; + HA_RWLOCK_RDLOCK(STK_TABLE_LOCK, &st->table->updt_lock); + must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate); + HA_RWLOCK_RDUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); + + if (must_send) { repl = peer_send_teach_process_msgs(appctx, peer, st); if (repl <= 0) { - HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); peer->stop_local_table = peer->last_local_table; return repl; } } - HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); } else if (!(peer->flags & PEER_F_TEACH_FINISHED)) { if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { @@ -2733,10 +2670,7 @@ static inline int peer_send_msgs(struct appctx *appctx, updates++; if (updates >= peers_max_updates_at_once) { - /* pretend we're full so that we get back ASAP */ - struct stconn *sc = appctx_sc(appctx); - - sc_need_room(sc, 0); + applet_have_more_data(appctx); return -1; } @@ -2872,88 +2806,16 @@ static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer } /* - * Init peer after having accepted it at peer protocol level. - */ -static inline void init_accepted_peer(struct peer *peer, struct peers *peers) -{ - struct shared_table *st; - - peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); - /* Register status code */ - peer->statuscode = PEER_SESS_SC_SUCCESSCODE; - peer->last_hdshk = now_ms; - - /* Awake main task */ - task_wakeup(peers->sync_task, TASK_WOKEN_MSG); - - /* Init confirm counter */ - peer->confirm = 0; - - /* Init cursors */ - for (st = peer->tables; st ; st = st->next) { - uint commitid, updateid; - - st->last_get = st->last_acked = 0; - HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &st->table->updt_lock); - /* if st->update appears to be in future it means - * that the last acked value is very old and we - * remain unconnected a too long time to use this - * acknowledgement as a reset. - * We should update the protocol to be able to - * signal the remote peer that it needs a full resync. - * Here a partial fix consist to set st->update at - * the max past value - */ - if ((int)(st->table->localupdate - st->update) < 0) - st->update = st->table->localupdate + (2147483648U); - st->teaching_origin = st->last_pushed = st->update; - st->flags = 0; - - updateid = st->last_pushed; - commitid = _HA_ATOMIC_LOAD(&st->table->commitupdate); - - while ((int)(updateid - commitid) > 0) { - if (_HA_ATOMIC_CAS(&st->table->commitupdate, &commitid, updateid)) - break; - __ha_cpu_relax(); - } - - HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); - } - - /* reset teaching and learning flags to 0 */ - peer->flags &= PEER_TEACH_RESET; - peer->flags &= PEER_LEARN_RESET; - - /* if current peer is local */ - if (peer->local) { - /* if current host need resyncfrom local and no process assigned */ - if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL && - !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { - /* assign local peer for a lesson, consider lesson already requested */ - peer->flags |= PEER_F_LEARN_ASSIGN; - peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS); - peers->flags |= PEERS_F_RESYNC_LOCALASSIGN; - } - - } - else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && - !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { - /* assign peer for a lesson */ - peer->flags |= PEER_F_LEARN_ASSIGN; - peers->flags |= PEERS_F_RESYNC_ASSIGN; - peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN; - } -} - -/* - * Init peer after having connected it at peer protocol level. + * Init peer after validating a connection at peer protocol level. It may + * a incoming or outgoing connection. The peer init must be acknowledge by the + * sync task. Message processing is blocked in the meanwhile. */ static inline void init_connected_peer(struct peer *peer, struct peers *peers) { struct shared_table *st; peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); + /* Init cursors */ for (st = peer->tables; st ; st = st->next) { uint updateid, commitid; @@ -2986,28 +2848,25 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers) HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock); } + /* Awake main task to ack the new peer state */ + task_wakeup(peers->sync_task, TASK_WOKEN_MSG); + /* Init confirm counter */ peer->confirm = 0; - /* reset teaching and learning flags to 0 */ - peer->flags &= PEER_TEACH_RESET; - peer->flags &= PEER_LEARN_RESET; + /* reset teaching flags to 0 */ + peer->flags &= ~PEER_TEACH_FLAGS; - /* If current peer is local */ - if (peer->local) { - /* flag to start to teach lesson */ - peer->flags |= PEER_F_TEACH_PROCESS; - } - else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && - !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { - /* If peer is remote and resync from remote is needed, - and no peer currently assigned */ - - /* assign peer for a lesson */ - peer->flags |= PEER_F_LEARN_ASSIGN; - peers->flags |= PEERS_F_RESYNC_ASSIGN; - peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN; + if (peer->local && !(appctx_is_back(peer->appctx))) { + /* If the local peer has established the connection (appctx is + * on the frontend side), flag it to start to teach lesson. + */ + peer->flags |= PEER_F_TEACH_PROCESS; } + + /* Mark the peer as starting and wait the sync task */ + peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; + peer->appstate = PEER_APP_ST_STARTING; } /* @@ -3024,7 +2883,7 @@ static void peer_io_handler(struct appctx *appctx) unsigned int maj_ver, min_ver; int prev_state; - if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) { + if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) { co_skip(sc_oc(sc), co_data(sc_oc(sc))); goto out; } @@ -3091,6 +2950,7 @@ switchstate: */ curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); peer_session_forceshutdown(curpeer); + curpeer->heartbeat = TICK_ETERNITY; curpeer->coll++; } @@ -3127,7 +2987,11 @@ switchstate: goto switchstate; } - init_accepted_peer(curpeer, curpeers); + /* Register status code */ + curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE; + curpeer->last_hdshk = now_ms; + + init_connected_peer(curpeer, curpeers); /* switch to waiting message state */ _HA_ATOMIC_INC(&connected_peers); @@ -3216,6 +3080,13 @@ switchstate: } } + if (curpeer->flags & PEER_F_WAIT_SYNCTASK_ACK) + goto out; + + /* local peer is assigned of a lesson, start it */ + if (curpeer->learnstate == PEER_LR_ST_ASSIGNED && curpeer->local) + curpeer->learnstate = PEER_LR_ST_PROCESSING; + reql = peer_recv_msg(appctx, (char *)msg_head, sizeof msg_head, &msg_len, &totl); if (reql <= 0) { if (reql == -1) @@ -3348,7 +3219,7 @@ static void peer_session_forceshutdown(struct peer *peer) /* Pre-configures a peers frontend to accept incoming connections */ void peers_setup_frontend(struct proxy *fe) { - fe->last_change = ns_to_sec(now_ns); + fe->fe_counters.last_change = ns_to_sec(now_ns); fe->cap = PR_CAP_FE | PR_CAP_BE; fe->mode = PR_MODE_PEERS; fe->maxconn = 0; @@ -3394,274 +3265,432 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer return NULL; } -/* - * Task processing function to manage re-connect, peer session - * tasks wakeup on local update and heartbeat. Let's keep it exported so that it - * resolves in stack traces and "show tasks". +/* Clear LEARN flags to a given peer, dealing with aborts if it was assigned for + * learning. In this case, the resync timeout is re-armed. */ -struct task *process_peer_sync(struct task * task, void *context, unsigned int state) +static void clear_peer_learning_status(struct peer *peer) { - struct peers *peers = context; - struct peer *ps; - struct shared_table *st; + if (peer->learnstate != PEER_LR_ST_NOTASSIGNED) { + struct peers *peers = peer->peers; - task->expire = TICK_ETERNITY; + /* unassign current peer for learning */ + HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN); + HA_ATOMIC_OR(&peers->flags, (peer->local ? PEERS_F_DBG_RESYNC_LOCALABORT : PEERS_F_DBG_RESYNC_REMOTEABORT)); - /* Acquire lock for all peers of the section */ - for (ps = peers->remote; ps; ps = ps->next) - HA_SPIN_LOCK(PEER_LOCK, &ps->lock); + /* reschedule a resync */ + peer->peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + peer->learnstate = PEER_LR_ST_NOTASSIGNED; + } + peer->flags &= ~PEER_F_LEARN_NOTUP2DATE; +} - if (!stopping) { - /* Normal case (not soft stop)*/ +static void sync_peer_learn_state(struct peers *peers, struct peer *peer) +{ + unsigned int flags = 0; - /* resync timeout set to TICK_ETERNITY means we just start - * a new process and timer was not initialized. - * We must arm this timer to switch to a request to a remote - * node if incoming connection from old local process never - * comes. - */ - if (peers->resync_timeout == TICK_ETERNITY) - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + if (peer->learnstate != PEER_LR_ST_FINISHED) + return; + + /* The learning process is now finished */ + if (peer->flags & PEER_F_LEARN_NOTUP2DATE) { + /* Partial resync */ + flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALPARTIAL : PEERS_F_DBG_RESYNC_REMOTEPARTIAL); + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + } + else { + /* Full resync */ + struct peer *rem_peer; + int commit_a_finish = 1; + + if (peer->srv->shard) { + flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL; + peer->flags |= PEER_F_LEARN_NOTUP2DATE; + for (rem_peer = peers->remote; rem_peer; rem_peer = rem_peer->next) { + if (rem_peer->srv->shard && rem_peer != peer) { + HA_SPIN_LOCK(PEER_LOCK, &rem_peer->lock); + if (rem_peer->srv->shard == peer->srv->shard) { + /* flag all peers from same shard + * notup2date to disable request + * of a resync frm them + */ + rem_peer->flags |= PEER_F_LEARN_NOTUP2DATE; + } + else if (!(rem_peer->flags & PEER_F_LEARN_NOTUP2DATE)) { + /* it remains some other shards not requested + * we don't commit a resync finish to request + * the other shards + */ + commit_a_finish = 0; + } + HA_SPIN_UNLOCK(PEER_LOCK, &rem_peer->lock); + } + } - if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) && - (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) && - !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { - /* Resync from local peer needed - no peer was assigned for the lesson - and no old local peer found - or resync timeout expire */ + if (!commit_a_finish) { + /* it remains some shard to request, we schedule a new request */ + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + } + } - /* flag no more resync from local, to try resync from remotes */ - peers->flags |= PEERS_F_RESYNC_LOCAL; - peers->flags |= PEERS_F_RESYNC_LOCALTIMEOUT; + if (commit_a_finish) { + flags |= (PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_RESYNC_REMOTE_FINISHED); + flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALFINISHED : PEERS_F_DBG_RESYNC_REMOTEFINISHED); + } + } + peer->learnstate = PEER_LR_ST_NOTASSIGNED; + HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN); + HA_ATOMIC_OR(&peers->flags, flags); - /* reschedule a resync */ - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + appctx_wakeup(peer->appctx); +} + +/* Synchronise the peer applet state with its associated peers section. This + * function handles STARTING->RUNNING and STOPPING->STOPPED transitions. + */ +static void sync_peer_app_state(struct peers *peers, struct peer *peer) +{ + if (peer->appstate == PEER_APP_ST_STOPPING) { + clear_peer_learning_status(peer); + peer->appstate = PEER_APP_ST_STOPPED; + } + else if (peer->appstate == PEER_APP_ST_STARTING) { + clear_peer_learning_status(peer); + if (peer->local & appctx_is_back(peer->appctx)) { + /* if local peer has accepted the connection (appctx is + * on the backend side), flag it to learn a lesson and + * be sure it will start immediately. This only happens + * if no resync is in progress and if the lacal resync + * was not already performed. + */ + if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* assign local peer for a lesson */ + peer->learnstate = PEER_LR_ST_ASSIGNED; + HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_LOCALASSIGN); + } + } + else if (!peer->local) { + /* If a connection was validated for a remote peer, flag + * it to learn a lesson but don't start it yet. The peer + * must request it explicitly. This only happens if no + * resync is in progress and if the remote resync was + * not already performed. + */ + if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* assign remote peer for a lesson */ + peer->learnstate = PEER_LR_ST_ASSIGNED; + HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN); + } } + peer->appstate = PEER_APP_ST_RUNNING; + appctx_wakeup(peer->appctx); + } +} - /* For each session */ - for (ps = peers->remote; ps; ps = ps->next) { - /* For each remote peers */ - if (!ps->local) { - if (!ps->appctx) { - /* no active peer connection */ - if (ps->statuscode == 0 || - ((ps->statuscode == PEER_SESS_SC_CONNECTCODE || - ps->statuscode == PEER_SESS_SC_SUCCESSCODE || - ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) && - tick_is_expired(ps->reconnect, now_ms))) { - /* connection never tried - * or previous peer connection established with success - * or previous peer connection failed while connecting - * and reconnection timer is expired */ - - /* retry a connect */ - ps->appctx = peer_session_create(peers, ps); - } - else if (!tick_is_expired(ps->reconnect, now_ms)) { - /* If previous session failed during connection - * but reconnection timer is not expired */ +/* Process the sync task for a running process. It is called from process_peer_sync() only */ +static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state) +{ + struct peer *peer; + struct shared_table *st; - /* reschedule task for reconnect */ - task->expire = tick_first(task->expire, ps->reconnect); - } - /* else do nothing */ - } /* !ps->appctx */ - else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) { - /* current peer connection is active and established */ - if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && - !(peers->flags & PEERS_F_RESYNC_ASSIGN) && - !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { - /* Resync from a remote is needed - * and no peer was assigned for lesson - * and current peer may be up2date */ - - /* assign peer for the lesson */ - ps->flags |= PEER_F_LEARN_ASSIGN; - peers->flags |= PEERS_F_RESYNC_ASSIGN; - peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN; - - /* wake up peer handler to handle a request of resync */ - appctx_wakeup(ps->appctx); + /* resync timeout set to TICK_ETERNITY means we just start + * a new process and timer was not initialized. + * We must arm this timer to switch to a request to a remote + * node if incoming connection from old local process never + * comes. + */ + if (peers->resync_timeout == TICK_ETERNITY) + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + + if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) && + (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) && + !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { + /* Resync from local peer needed + no peer was assigned for the lesson + and no old local peer found + or resync timeout expire */ + + /* flag no more resync from local, to try resync from remotes */ + HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_DBG_RESYNC_LOCALTIMEOUT); + + /* reschedule a resync */ + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + } + + /* For each session */ + for (peer = peers->remote; peer; peer = peer->next) { + HA_SPIN_LOCK(PEER_LOCK, &peer->lock); + + sync_peer_learn_state(peers, peer); + sync_peer_app_state(peers, peer); + + /* Peer changes, if any, were now ack by the sync task. Unblock + * the peer (any wakeup should already be performed, no need to + * do it here) + */ + peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK; + + /* For each remote peers */ + if (!peer->local) { + if (!peer->appctx) { + /* no active peer connection */ + if (peer->statuscode == 0 || + ((peer->statuscode == PEER_SESS_SC_CONNECTCODE || + peer->statuscode == PEER_SESS_SC_SUCCESSCODE || + peer->statuscode == PEER_SESS_SC_CONNECTEDCODE) && + tick_is_expired(peer->reconnect, now_ms))) { + /* connection never tried + * or previous peer connection established with success + * or previous peer connection failed while connecting + * and reconnection timer is expired */ + + /* retry a connect */ + peer->appctx = peer_session_create(peers, peer); + } + else if (!tick_is_expired(peer->reconnect, now_ms)) { + /* If previous session failed during connection + * but reconnection timer is not expired */ + + /* reschedule task for reconnect */ + task->expire = tick_first(task->expire, peer->reconnect); + } + /* else do nothing */ + } /* !peer->appctx */ + else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE) { + /* current peer connection is active and established */ + if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && + !(peers->flags & PEERS_F_RESYNC_ASSIGN) && + !(peer->flags & PEER_F_LEARN_NOTUP2DATE)) { + /* Resync from a remote is needed + * and no peer was assigned for lesson + * and current peer may be up2date */ + + /* assign peer for the lesson */ + peer->learnstate = PEER_LR_ST_ASSIGNED; + HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN); + + /* wake up peer handler to handle a request of resync */ + appctx_wakeup(peer->appctx); + } + else { + int update_to_push = 0; + + /* Awake session if there is data to push */ + for (st = peer->tables; st ; st = st->next) { + if (st->last_pushed != st->table->localupdate) { + /* wake up the peer handler to push local updates */ + update_to_push = 1; + /* There is no need to send a heartbeat message + * when some updates must be pushed. The remote + * peer will consider peer as alive when it will + * receive these updates. + */ + peer->flags &= ~PEER_F_HEARTBEAT; + /* Re-schedule another one later. */ + peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); + /* Refresh reconnect if necessary */ + if (tick_is_expired(peer->reconnect, now_ms)) + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); + /* We are going to send updates, let's ensure we will + * come back to send heartbeat messages or to reconnect. + */ + task->expire = tick_first(peer->reconnect, peer->heartbeat); + appctx_wakeup(peer->appctx); + break; + } } - else { - int update_to_push = 0; - - /* Awake session if there is data to push */ - for (st = ps->tables; st ; st = st->next) { - if (st->last_pushed != st->table->localupdate) { - /* wake up the peer handler to push local updates */ - update_to_push = 1; - /* There is no need to send a heartbeat message - * when some updates must be pushed. The remote - * peer will consider peer as alive when it will - * receive these updates. - */ - ps->flags &= ~PEER_F_HEARTBEAT; - /* Re-schedule another one later. */ - ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); - /* Refresh reconnect if necessary */ - if (tick_is_expired(ps->reconnect, now_ms)) - ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - /* We are going to send updates, let's ensure we will - * come back to send heartbeat messages or to reconnect. + /* When there are updates to send we do not reconnect + * and do not send heartbeat message either. + */ + if (!update_to_push) { + if (tick_is_expired(peer->reconnect, now_ms)) { + if (peer->flags & PEER_F_ALIVE) { + /* This peer was alive during a 'reconnect' period. + * Flag it as not alive again for the next period. */ - task->expire = tick_first(ps->reconnect, ps->heartbeat); - appctx_wakeup(ps->appctx); - break; - } - } - /* When there are updates to send we do not reconnect - * and do not send heartbeat message either. - */ - if (!update_to_push) { - if (tick_is_expired(ps->reconnect, now_ms)) { - if (ps->flags & PEER_F_ALIVE) { - /* This peer was alive during a 'reconnect' period. - * Flag it as not alive again for the next period. - */ - ps->flags &= ~PEER_F_ALIVE; - ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - } - else { - ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); - ps->heartbeat = TICK_ETERNITY; - peer_session_forceshutdown(ps); - ps->no_hbt++; - } + peer->flags &= ~PEER_F_ALIVE; + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); } - else if (tick_is_expired(ps->heartbeat, now_ms)) { - ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); - ps->flags |= PEER_F_HEARTBEAT; - appctx_wakeup(ps->appctx); + else { + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); + peer->heartbeat = TICK_ETERNITY; + peer_session_forceshutdown(peer); + sync_peer_app_state(peers, peer); + peer->no_hbt++; } - task->expire = tick_first(ps->reconnect, ps->heartbeat); } - } - /* else do nothing */ - } /* SUCCESSCODE */ - } /* !ps->peer->local */ - } /* for */ - - /* Resync from remotes expired: consider resync is finished */ - if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && - !(peers->flags & PEERS_F_RESYNC_ASSIGN) && - tick_is_expired(peers->resync_timeout, now_ms)) { - /* Resync from remote peer needed - * no peer was assigned for the lesson - * and resync timeout expire */ - - /* flag no more resync from remote, consider resync is finished */ - peers->flags |= PEERS_F_RESYNC_REMOTE; - peers->flags |= PEERS_F_RESYNC_REMOTETIMEOUT; - } - - if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) { - /* Resync not finished*/ - /* reschedule task to resync timeout if not expired, to ended resync if needed */ - if (!tick_is_expired(peers->resync_timeout, now_ms)) - task->expire = tick_first(task->expire, peers->resync_timeout); - } - } /* !stopping */ - else { - /* soft stop case */ - if (state & TASK_WOKEN_SIGNAL) { - /* We've just received the signal */ - if (!(peers->flags & PEERS_F_DONOTSTOP)) { - /* add DO NOT STOP flag if not present */ - _HA_ATOMIC_INC(&jobs); - peers->flags |= PEERS_F_DONOTSTOP; - - /* disconnect all connected peers to process a local sync - * this must be done only the first time we are switching - * in stopping state - */ - for (ps = peers->remote; ps; ps = ps->next) { - /* we're killing a connection, we must apply a random delay before - * retrying otherwise the other end will do the same and we can loop - * for a while. - */ - ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); - if (ps->appctx) { - peer_session_forceshutdown(ps); + else if (tick_is_expired(peer->heartbeat, now_ms)) { + peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); + peer->flags |= PEER_F_HEARTBEAT; + appctx_wakeup(peer->appctx); + } + task->expire = tick_first(peer->reconnect, peer->heartbeat); } } + /* else do nothing */ + } /* SUCCESSCODE */ + } /* !peer->peer->local */ - /* Set resync timeout for the local peer and request a immediate reconnect */ - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); - peers->local->reconnect = now_ms; + HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock); + } /* for */ + + /* Resync from remotes expired or no remote peer: consider resync is finished */ + if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && + !(peers->flags & PEERS_F_RESYNC_ASSIGN) && + (tick_is_expired(peers->resync_timeout, now_ms) || !peers->remote->next)) { + /* Resync from remote peer needed + * no peer was assigned for the lesson + * and resync timeout expire */ + + /* flag no more resync from remote, consider resync is finished */ + HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_REMOTE_FINISHED|PEERS_F_DBG_RESYNC_REMOTETIMEOUT); + } + + if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) { + /* Resync not finished*/ + /* reschedule task to resync timeout if not expired, to ended resync if needed */ + if (!tick_is_expired(peers->resync_timeout, now_ms)) + task->expire = tick_first(task->expire, peers->resync_timeout); + } +} + +/* Process the sync task for a stopping process. It is called from process_peer_sync() only */ +static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state) +{ + struct peer *peer; + struct shared_table *st; + static int dont_stop = 0; + + /* For each peer */ + for (peer = peers->remote; peer; peer = peer->next) { + HA_SPIN_LOCK(PEER_LOCK, &peer->lock); + + sync_peer_learn_state(peers, peer); + sync_peer_app_state(peers, peer); + + /* Peer changes, if any, were now ack by the sync task. Unblock + * the peer (any wakeup should already be performed, no need to + * do it here) + */ + peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK; + + if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) { + /* we're killing a connection, we must apply a random delay before + * retrying otherwise the other end will do the same and we can loop + * for a while. + */ + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); + if (peer->appctx) { + peer_session_forceshutdown(peer); + sync_peer_app_state(peers, peer); } } - ps = peers->local; - if (ps->flags & PEER_F_TEACH_COMPLETE) { - if (peers->flags & PEERS_F_DONOTSTOP) { - /* resync of new process was complete, current process can die now */ - _HA_ATOMIC_DEC(&jobs); - peers->flags &= ~PEERS_F_DONOTSTOP; - for (st = ps->tables; st ; st = st->next) - HA_ATOMIC_DEC(&st->table->refcnt); - } + HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock); + } + + /* We've just received the signal */ + if (state & TASK_WOKEN_SIGNAL) { + if (!dont_stop) { + /* add DO NOT STOP flag if not present */ + _HA_ATOMIC_INC(&jobs); + dont_stop = 1; + + /* Set resync timeout for the local peer and request a immediate reconnect */ + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + peers->local->reconnect = now_ms; } - else if (!ps->appctx) { - /* Re-arm resync timeout if necessary */ - if (!tick_isset(peers->resync_timeout)) - peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + } - /* If there's no active peer connection */ - if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED && - !tick_is_expired(peers->resync_timeout, now_ms) && - (ps->statuscode == 0 || - ps->statuscode == PEER_SESS_SC_SUCCESSCODE || - ps->statuscode == PEER_SESS_SC_CONNECTEDCODE || - ps->statuscode == PEER_SESS_SC_TRYAGAIN)) { - /* The resync is finished for the local peer and - * the resync timeout is not expired and - * connection never tried - * or previous peer connection was successfully established - * or previous tcp connect succeeded but init state incomplete - * or during previous connect, peer replies a try again statuscode */ - - if (!tick_is_expired(ps->reconnect, now_ms)) { - /* reconnection timer is not expired. reschedule task for reconnect */ - task->expire = tick_first(task->expire, ps->reconnect); - } - else { - /* connect to the local peer if we must push a local sync */ - if (peers->flags & PEERS_F_DONOTSTOP) { - peer_session_create(peers, ps); - } - } + peer = peers->local; + HA_SPIN_LOCK(PEER_LOCK, &peer->lock); + if (peer->flags & PEER_F_LOCAL_TEACH_COMPLETE) { + if (dont_stop) { + /* resync of new process was complete, current process can die now */ + _HA_ATOMIC_DEC(&jobs); + dont_stop = 0; + for (st = peer->tables; st ; st = st->next) + HA_ATOMIC_DEC(&st->table->refcnt); + } + } + else if (!peer->appctx) { + /* Re-arm resync timeout if necessary */ + if (!tick_isset(peers->resync_timeout)) + peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); + + /* If there's no active peer connection */ + if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED && + !tick_is_expired(peers->resync_timeout, now_ms) && + (peer->statuscode == 0 || + peer->statuscode == PEER_SESS_SC_SUCCESSCODE || + peer->statuscode == PEER_SESS_SC_CONNECTEDCODE || + peer->statuscode == PEER_SESS_SC_TRYAGAIN)) { + /* The resync is finished for the local peer and + * the resync timeout is not expired and + * connection never tried + * or previous peer connection was successfully established + * or previous tcp connect succeeded but init state incomplete + * or during previous connect, peer replies a try again statuscode */ + + if (!tick_is_expired(peer->reconnect, now_ms)) { + /* reconnection timer is not expired. reschedule task for reconnect */ + task->expire = tick_first(task->expire, peer->reconnect); } - else { - /* Other error cases */ - if (peers->flags & PEERS_F_DONOTSTOP) { - /* unable to resync new process, current process can die now */ - _HA_ATOMIC_DEC(&jobs); - peers->flags &= ~PEERS_F_DONOTSTOP; - for (st = ps->tables; st ; st = st->next) - HA_ATOMIC_DEC(&st->table->refcnt); + else { + /* connect to the local peer if we must push a local sync */ + if (dont_stop) { + peer_session_create(peers, peer); } } } - else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) { - /* Reset resync timeout during a resync */ - peers->resync_timeout = TICK_ETERNITY; - - /* current peer connection is active and established - * wake up all peer handlers to push remaining local updates */ - for (st = ps->tables; st ; st = st->next) { - if (st->last_pushed != st->table->localupdate) { - appctx_wakeup(ps->appctx); - break; - } + else { + /* Other error cases */ + if (dont_stop) { + /* unable to resync new process, current process can die now */ + _HA_ATOMIC_DEC(&jobs); + dont_stop = 0; + for (st = peer->tables; st ; st = st->next) + HA_ATOMIC_DEC(&st->table->refcnt); } } - } /* stopping */ + } + else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE ) { + /* Reset resync timeout during a resync */ + peers->resync_timeout = TICK_ETERNITY; + + /* current peer connection is active and established + * wake up all peer handlers to push remaining local updates */ + for (st = peer->tables; st ; st = st->next) { + if (st->last_pushed != st->table->localupdate) { + appctx_wakeup(peer->appctx); + break; + } + } + } + HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock); +} - /* Release lock for all peers of the section */ - for (ps = peers->remote; ps; ps = ps->next) - HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock); +/* + * Task processing function to manage re-connect, peer session + * tasks wakeup on local update and heartbeat. Let's keep it exported so that it + * resolves in stack traces and "show tasks". + */ +struct task *process_peer_sync(struct task * task, void *context, unsigned int state) +{ + struct peers *peers = context; + + task->expire = TICK_ETERNITY; + + if (!stopping) { + /* Normal case (not soft stop)*/ + __process_running_peer_sync(task, peers, state); + + } + else { + /* soft stop case */ + __process_stopping_peer_sync(task, peers, state); + } /* stopping */ /* Wakeup for re-connect */ return task; @@ -3940,7 +3969,7 @@ static int peers_dump_head(struct buffer *msg, struct appctx *appctx, struct pee peers, tm.tm_mday, monthname[tm.tm_mon], tm.tm_year+1900, tm.tm_hour, tm.tm_min, tm.tm_sec, - peers->id, peers->disabled, peers->flags, + peers->id, peers->disabled, HA_ATOMIC_LOAD(&peers->flags), peers->resync_timeout ? tick_is_expired(peers->resync_timeout, now_ms) ? "" : human_time(TICKS_TO_MS(peers->resync_timeout - now_ms), @@ -3966,12 +3995,14 @@ static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct pee struct stream *peer_s; struct shared_table *st; - addr_to_str(&peer->addr, pn, sizeof pn); - chunk_appendf(msg, " %p: id=%s(%s,%s) addr=%s:%d last_status=%s", + addr_to_str(&peer->srv->addr, pn, sizeof pn); + chunk_appendf(msg, " %p: id=%s(%s,%s) addr=%s:%d app_state=%s learn_state=%s last_status=%s", peer, peer->id, peer->local ? "local" : "remote", peer->appctx ? "active" : "inactive", - pn, get_host_port(&peer->addr), + pn, peer->srv->svc_port, + peer_app_state_str(peer->appstate), + peer_learn_state_str(peer->learnstate), statuscode_str(peer->statuscode)); chunk_appendf(msg, " last_hdshk=%s\n", -- cgit v1.2.3