diff options
Diffstat (limited to 'src/responder/kcm/kcmsrv_op_queue.c')
-rw-r--r-- | src/responder/kcm/kcmsrv_op_queue.c | 332 |
1 files changed, 332 insertions, 0 deletions
diff --git a/src/responder/kcm/kcmsrv_op_queue.c b/src/responder/kcm/kcmsrv_op_queue.c new file mode 100644 index 0000000..29af521 --- /dev/null +++ b/src/responder/kcm/kcmsrv_op_queue.c @@ -0,0 +1,332 @@ +/* + SSSD + + KCM Server - the KCM operations wait queue + + Copyright (C) Red Hat, 2017 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "util/util.h" +#include "util/util_creds.h" +#include "responder/kcm/kcmsrv_pvt.h" + +struct kcm_ops_queue_entry { + struct tevent_req *req; + + struct kcm_ops_queue *queue; + + struct kcm_ops_queue_entry *next; + struct kcm_ops_queue_entry *prev; +}; + +struct kcm_ops_queue { + uid_t uid; + struct tevent_context *ev; + struct kcm_ops_queue_ctx *qctx; + + struct kcm_ops_queue_entry *head; +}; + +struct kcm_ops_queue_ctx { + struct kcm_ctx *kctx; + + /* UID:kcm_ops_queue */ + hash_table_t *wait_queue_hash; +}; + +/* + * Per-UID wait queue + * + * They key in the hash table is the UID of the peer. The value of each + * hash table entry is kcm_ops_queue structure which in turn contains a + * linked list of kcm_ops_queue_entry structures * which primarily hold the + * tevent request being queued. + */ +struct kcm_ops_queue_ctx *kcm_ops_queue_create(TALLOC_CTX *mem_ctx, + struct kcm_ctx *kctx) +{ + errno_t ret; + struct kcm_ops_queue_ctx *queue_ctx; + + queue_ctx = talloc_zero(mem_ctx, struct kcm_ops_queue_ctx); + if (queue_ctx == NULL) { + return NULL; + } + + ret = sss_hash_create_ex(mem_ctx, 0, + &queue_ctx->wait_queue_hash, 0, 0, 0, 0, + NULL, NULL); + if (ret != EOK) { + DEBUG(SSSDBG_CRIT_FAILURE, + "sss_hash_create failed [%d]: %s\n", ret, sss_strerror(ret)); + talloc_free(queue_ctx); + return NULL; + } + + queue_ctx->kctx = kctx; + + return queue_ctx; +} + +void queue_removal_cb(struct tevent_context *ctx, + struct tevent_immediate *imm, + void *private_data) +{ + struct kcm_ops_queue *kq = talloc_get_type(private_data, + struct kcm_ops_queue); + int ret; + hash_key_t key; + + talloc_free(imm); + + if (kq->head != NULL) { + DEBUG(SSSDBG_TRACE_LIBS, "The queue is no longer empty\n"); + return; + } + + key.type = HASH_KEY_ULONG; + key.ul = kq->uid; + + /* If this was the last entry, remove the key (the UID) from the + * hash table to signal the queue is empty + */ + ret = hash_delete(kq->qctx->wait_queue_hash, &key); + if (ret != HASH_SUCCESS) { + DEBUG(SSSDBG_CRIT_FAILURE, + "Failed to remove wait queue for user %"SPRIuid"\n", + kq->uid); + return; + } + + DEBUG(SSSDBG_FUNC_DATA, + "Removed queue for %"SPRIuid" \n", kq->uid); + talloc_free(kq); +} + +static int kcm_op_queue_entry_destructor(struct kcm_ops_queue_entry *entry) +{ + struct kcm_ops_queue_entry *next_entry; + struct tevent_immediate *imm; + + if (entry == NULL) { + return 1; + /* Prevent use-after-free of req when shutting down with non-empty queue */ + } else if (entry->queue->qctx->kctx->rctx->shutting_down) { + return 0; + } + + /* Take the next entry from the queue */ + next_entry = entry->next; + + /* Remove the current entry from the queue */ + DLIST_REMOVE(entry->queue->head, entry); + + if (next_entry == NULL) { + /* If there was no other entry, schedule removal of the queue. Do it + * in another tevent tick to avoid issues with callbacks invoking + * the destructor while another request is touching the queue + */ + imm = tevent_create_immediate(entry->queue); + if (imm == NULL) { + return 1; + } + + tevent_schedule_immediate(imm, entry->queue->ev, queue_removal_cb, entry->queue); + return 0; + } + + /* Otherwise, mark the current head as done to run the next request */ + tevent_req_done(next_entry->req); + return 0; +} + +static struct kcm_ops_queue *kcm_op_queue_get(struct kcm_ops_queue_ctx *qctx, + struct tevent_context *ev, + uid_t uid) +{ + errno_t ret; + hash_key_t key; + hash_value_t value; + struct kcm_ops_queue *kq; + + key.type = HASH_KEY_ULONG; + key.ul = uid; + + ret = hash_lookup(qctx->wait_queue_hash, &key, &value); + switch (ret) { + case HASH_SUCCESS: + if (value.type != HASH_VALUE_PTR) { + DEBUG(SSSDBG_CRIT_FAILURE, "Unexpected hash value type.\n"); + return NULL; + } + + kq = talloc_get_type(value.ptr, struct kcm_ops_queue); + if (kq == NULL) { + DEBUG(SSSDBG_CRIT_FAILURE, "Invalid queue pointer\n"); + return NULL; + } + + DEBUG(SSSDBG_TRACE_LIBS, "Found existing queue for this ID\n"); + break; + + case HASH_ERROR_KEY_NOT_FOUND: + /* No request for this UID yet. Enqueue this request in case + * another one comes in and return EOK to run the current request + * immediately + */ + DEBUG(SSSDBG_TRACE_LIBS, "No existing queue for this ID\n"); + + kq = talloc_zero(qctx->wait_queue_hash, struct kcm_ops_queue); + if (kq == NULL) { + return NULL; + } + kq->uid = uid; + kq->qctx = qctx; + kq->ev = ev; + + value.type = HASH_VALUE_PTR; + value.ptr = kq; + + ret = hash_enter(qctx->wait_queue_hash, &key, &value); + if (ret != HASH_SUCCESS) { + DEBUG(SSSDBG_CRIT_FAILURE, "hash_enter failed.\n"); + return NULL; + } + break; + + default: + DEBUG(SSSDBG_CRIT_FAILURE, "hash_lookup failed.\n"); + return NULL; + } + + return kq; +} + +struct kcm_op_queue_state { + struct kcm_ops_queue_entry *entry; +}; + +static errno_t kcm_op_queue_add_req(struct kcm_ops_queue *kq, + struct tevent_req *req); + +/* + * Enqueue a request. + * + * If the request queue /for the given ID/ is empty, that is, if this + * request is the first one in the queue, run the request immediately. + * + * Otherwise just add it to the queue and wait until the previous request + * finishes and only at that point mark the current request as done, which + * will trigger calling the recv function and allow the request to continue. + */ +struct tevent_req *kcm_op_queue_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct kcm_ops_queue_ctx *qctx, + struct cli_creds *client) +{ + errno_t ret; + struct tevent_req *req; + struct kcm_ops_queue *kq; + struct kcm_op_queue_state *state; + uid_t uid; + + uid = cli_creds_get_uid(client); + + req = tevent_req_create(mem_ctx, &state, struct kcm_op_queue_state); + if (req == NULL) { + return NULL; + } + + DEBUG(SSSDBG_FUNC_DATA, + "Adding request by %"SPRIuid" to the wait queue\n", uid); + + kq = kcm_op_queue_get(qctx, ev, uid); + if (kq == NULL) { + ret = EIO; + DEBUG(SSSDBG_OP_FAILURE, + "Cannot get queue [%d]: %s\n", ret, sss_strerror(ret)); + goto immediate; + } + + ret = kcm_op_queue_add_req(kq, req); + if (ret == EOK) { + DEBUG(SSSDBG_TRACE_LIBS, + "Queue was empty, running the request immediately\n"); + goto immediate; + } else if (ret != EAGAIN) { + DEBUG(SSSDBG_OP_FAILURE, + "Cannot enqueue request [%d]: %s\n", ret, sss_strerror(ret)); + goto immediate; + } + + DEBUG(SSSDBG_TRACE_LIBS, "Waiting our turn in the queue\n"); + return req; + +immediate: + if (ret == EOK) { + tevent_req_done(req); + } else { + tevent_req_error(req, ret); + } + tevent_req_post(req, ev); + return req; +} + +static errno_t kcm_op_queue_add_req(struct kcm_ops_queue *kq, + struct tevent_req *req) +{ + errno_t ret; + struct kcm_op_queue_state *state = tevent_req_data(req, + struct kcm_op_queue_state); + + state->entry = talloc_zero(kq->qctx->wait_queue_hash, struct kcm_ops_queue_entry); + if (state->entry == NULL) { + return ENOMEM; + } + state->entry->req = req; + state->entry->queue = kq; + talloc_set_destructor(state->entry, kcm_op_queue_entry_destructor); + + if (kq->head == NULL) { + /* First entry, will run callback at once */ + ret = EOK; + } else { + /* Will wait for the previous callbacks to finish */ + ret = EAGAIN; + } + + DLIST_ADD_END(kq->head, state->entry, struct kcm_ops_queue_entry *); + return ret; +} + +/* + * The queue recv function is called when this request is 'activated'. The queue + * entry should be allocated on the same memory context as the enqueued request + * to trigger freeing the kcm_ops_queue_entry structure destructor when the + * parent request is done and its tevent_req freed. This would in turn unblock + * the next request in the queue + */ +errno_t kcm_op_queue_recv(struct tevent_req *req, + TALLOC_CTX *mem_ctx, + struct kcm_ops_queue_entry **_entry) +{ + struct kcm_op_queue_state *state = tevent_req_data(req, + struct kcm_op_queue_state); + + TEVENT_REQ_RETURN_ON_ERROR(req); + *_entry = talloc_steal(mem_ctx, state->entry); + return EOK; +} |