diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-07 13:11:22 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-07 13:11:22 +0000 |
commit | b20732900e4636a467c0183a47f7396700f5f743 (patch) | |
tree | 42f079ff82e701ebcb76829974b4caca3e5b6798 /drivers/md/dm-vdo/indexer/index-session.c | |
parent | Adding upstream version 6.8.12. (diff) | |
download | linux-b20732900e4636a467c0183a47f7396700f5f743.tar.xz linux-b20732900e4636a467c0183a47f7396700f5f743.zip |
Adding upstream version 6.9.7.upstream/6.9.7
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'drivers/md/dm-vdo/indexer/index-session.c')
-rw-r--r-- | drivers/md/dm-vdo/indexer/index-session.c | 739 |
1 files changed, 739 insertions, 0 deletions
diff --git a/drivers/md/dm-vdo/indexer/index-session.c b/drivers/md/dm-vdo/indexer/index-session.c new file mode 100644 index 0000000000..aee0914d60 --- /dev/null +++ b/drivers/md/dm-vdo/indexer/index-session.c @@ -0,0 +1,739 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Copyright 2023 Red Hat + */ + +#include "index-session.h" + +#include <linux/atomic.h> + +#include "logger.h" +#include "memory-alloc.h" +#include "time-utils.h" + +#include "funnel-requestqueue.h" +#include "index.h" +#include "index-layout.h" + +/* + * The index session contains a lock (the request_mutex) which ensures that only one thread can + * change the state of its index at a time. The state field indicates the current state of the + * index through a set of descriptive flags. The request_mutex must be notified whenever a + * non-transient state flag is cleared. The request_mutex is also used to count the number of + * requests currently in progress so that they can be drained when suspending or closing the index. + * + * If the index session is suspended shortly after opening an index, it may have to suspend during + * a rebuild. Depending on the size of the index, a rebuild may take a significant amount of time, + * so UDS allows the rebuild to be paused in order to suspend the session in a timely manner. When + * the index session is resumed, the rebuild can continue from where it left off. If the index + * session is shut down with a suspended rebuild, the rebuild progress is abandoned and the rebuild + * will start from the beginning the next time the index is loaded. The mutex and status fields in + * the index_load_context are used to record the state of any interrupted rebuild. + */ + +enum index_session_flag_bit { + IS_FLAG_BIT_START = 8, + /* The session has started loading an index but not completed it. */ + IS_FLAG_BIT_LOADING = IS_FLAG_BIT_START, + /* The session has loaded an index, which can handle requests. */ + IS_FLAG_BIT_LOADED, + /* The session's index has been permanently disabled. */ + IS_FLAG_BIT_DISABLED, + /* The session's index is suspended. */ + IS_FLAG_BIT_SUSPENDED, + /* The session is handling some index state change. */ + IS_FLAG_BIT_WAITING, + /* The session's index is closing and draining requests. */ + IS_FLAG_BIT_CLOSING, + /* The session is being destroyed and is draining requests. */ + IS_FLAG_BIT_DESTROYING, +}; + +enum index_session_flag { + IS_FLAG_LOADED = (1 << IS_FLAG_BIT_LOADED), + IS_FLAG_LOADING = (1 << IS_FLAG_BIT_LOADING), + IS_FLAG_DISABLED = (1 << IS_FLAG_BIT_DISABLED), + IS_FLAG_SUSPENDED = (1 << IS_FLAG_BIT_SUSPENDED), + IS_FLAG_WAITING = (1 << IS_FLAG_BIT_WAITING), + IS_FLAG_CLOSING = (1 << IS_FLAG_BIT_CLOSING), + IS_FLAG_DESTROYING = (1 << IS_FLAG_BIT_DESTROYING), +}; + +/* Release a reference to an index session. */ +static void release_index_session(struct uds_index_session *index_session) +{ + mutex_lock(&index_session->request_mutex); + if (--index_session->request_count == 0) + uds_broadcast_cond(&index_session->request_cond); + mutex_unlock(&index_session->request_mutex); +} + +/* + * Acquire a reference to the index session for an asynchronous index request. The reference must + * eventually be released with a corresponding call to release_index_session(). + */ +static int get_index_session(struct uds_index_session *index_session) +{ + unsigned int state; + int result = UDS_SUCCESS; + + mutex_lock(&index_session->request_mutex); + index_session->request_count++; + state = index_session->state; + mutex_unlock(&index_session->request_mutex); + + if (state == IS_FLAG_LOADED) { + return UDS_SUCCESS; + } else if (state & IS_FLAG_DISABLED) { + result = UDS_DISABLED; + } else if ((state & IS_FLAG_LOADING) || + (state & IS_FLAG_SUSPENDED) || + (state & IS_FLAG_WAITING)) { + result = -EBUSY; + } else { + result = UDS_NO_INDEX; + } + + release_index_session(index_session); + return result; +} + +int uds_launch_request(struct uds_request *request) +{ + size_t internal_size; + int result; + + if (request->callback == NULL) { + vdo_log_error("missing required callback"); + return -EINVAL; + } + + switch (request->type) { + case UDS_DELETE: + case UDS_POST: + case UDS_QUERY: + case UDS_QUERY_NO_UPDATE: + case UDS_UPDATE: + break; + default: + vdo_log_error("received invalid callback type"); + return -EINVAL; + } + + /* Reset all internal fields before processing. */ + internal_size = + sizeof(struct uds_request) - offsetof(struct uds_request, zone_number); + // FIXME should be using struct_group for this instead + memset((char *) request + sizeof(*request) - internal_size, 0, internal_size); + + result = get_index_session(request->session); + if (result != UDS_SUCCESS) + return result; + + request->found = false; + request->unbatched = false; + request->index = request->session->index; + + uds_enqueue_request(request, STAGE_TRIAGE); + return UDS_SUCCESS; +} + +static void enter_callback_stage(struct uds_request *request) +{ + if (request->status != UDS_SUCCESS) { + /* All request errors are considered unrecoverable */ + mutex_lock(&request->session->request_mutex); + request->session->state |= IS_FLAG_DISABLED; + mutex_unlock(&request->session->request_mutex); + } + + uds_request_queue_enqueue(request->session->callback_queue, request); +} + +static inline void count_once(u64 *count_ptr) +{ + WRITE_ONCE(*count_ptr, READ_ONCE(*count_ptr) + 1); +} + +static void update_session_stats(struct uds_request *request) +{ + struct session_stats *session_stats = &request->session->stats; + + count_once(&session_stats->requests); + + switch (request->type) { + case UDS_POST: + if (request->found) + count_once(&session_stats->posts_found); + else + count_once(&session_stats->posts_not_found); + + if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER) + count_once(&session_stats->posts_found_open_chapter); + else if (request->location == UDS_LOCATION_IN_DENSE) + count_once(&session_stats->posts_found_dense); + else if (request->location == UDS_LOCATION_IN_SPARSE) + count_once(&session_stats->posts_found_sparse); + break; + + case UDS_UPDATE: + if (request->found) + count_once(&session_stats->updates_found); + else + count_once(&session_stats->updates_not_found); + break; + + case UDS_DELETE: + if (request->found) + count_once(&session_stats->deletions_found); + else + count_once(&session_stats->deletions_not_found); + break; + + case UDS_QUERY: + case UDS_QUERY_NO_UPDATE: + if (request->found) + count_once(&session_stats->queries_found); + else + count_once(&session_stats->queries_not_found); + break; + + default: + request->status = VDO_ASSERT(false, "unknown request type: %d", + request->type); + } +} + +static void handle_callbacks(struct uds_request *request) +{ + struct uds_index_session *index_session = request->session; + + if (request->status == UDS_SUCCESS) + update_session_stats(request); + + request->status = uds_status_to_errno(request->status); + request->callback(request); + release_index_session(index_session); +} + +static int __must_check make_empty_index_session(struct uds_index_session **index_session_ptr) +{ + int result; + struct uds_index_session *session; + + result = vdo_allocate(1, struct uds_index_session, __func__, &session); + if (result != VDO_SUCCESS) + return result; + + mutex_init(&session->request_mutex); + uds_init_cond(&session->request_cond); + mutex_init(&session->load_context.mutex); + uds_init_cond(&session->load_context.cond); + + result = uds_make_request_queue("callbackW", &handle_callbacks, + &session->callback_queue); + if (result != UDS_SUCCESS) { + vdo_free(session); + return result; + } + + *index_session_ptr = session; + return UDS_SUCCESS; +} + +int uds_create_index_session(struct uds_index_session **session) +{ + if (session == NULL) { + vdo_log_error("missing session pointer"); + return -EINVAL; + } + + return uds_status_to_errno(make_empty_index_session(session)); +} + +static int __must_check start_loading_index_session(struct uds_index_session *index_session) +{ + int result; + + mutex_lock(&index_session->request_mutex); + if (index_session->state & IS_FLAG_SUSPENDED) { + vdo_log_info("Index session is suspended"); + result = -EBUSY; + } else if (index_session->state != 0) { + vdo_log_info("Index is already loaded"); + result = -EBUSY; + } else { + index_session->state |= IS_FLAG_LOADING; + result = UDS_SUCCESS; + } + mutex_unlock(&index_session->request_mutex); + return result; +} + +static void finish_loading_index_session(struct uds_index_session *index_session, + int result) +{ + mutex_lock(&index_session->request_mutex); + index_session->state &= ~IS_FLAG_LOADING; + if (result == UDS_SUCCESS) + index_session->state |= IS_FLAG_LOADED; + + uds_broadcast_cond(&index_session->request_cond); + mutex_unlock(&index_session->request_mutex); +} + +static int initialize_index_session(struct uds_index_session *index_session, + enum uds_open_index_type open_type) +{ + int result; + struct uds_configuration *config; + + result = uds_make_configuration(&index_session->parameters, &config); + if (result != UDS_SUCCESS) { + vdo_log_error_strerror(result, "Failed to allocate config"); + return result; + } + + memset(&index_session->stats, 0, sizeof(index_session->stats)); + result = uds_make_index(config, open_type, &index_session->load_context, + enter_callback_stage, &index_session->index); + if (result != UDS_SUCCESS) + vdo_log_error_strerror(result, "Failed to make index"); + else + uds_log_configuration(config); + + uds_free_configuration(config); + return result; +} + +static const char *get_open_type_string(enum uds_open_index_type open_type) +{ + switch (open_type) { + case UDS_CREATE: + return "creating index"; + case UDS_LOAD: + return "loading or rebuilding index"; + case UDS_NO_REBUILD: + return "loading index"; + default: + return "unknown open method"; + } +} + +/* + * Open an index under the given session. This operation will fail if the + * index session is suspended, or if there is already an open index. + */ +int uds_open_index(enum uds_open_index_type open_type, + const struct uds_parameters *parameters, + struct uds_index_session *session) +{ + int result; + char name[BDEVNAME_SIZE]; + + if (parameters == NULL) { + vdo_log_error("missing required parameters"); + return -EINVAL; + } + if (parameters->bdev == NULL) { + vdo_log_error("missing required block device"); + return -EINVAL; + } + if (session == NULL) { + vdo_log_error("missing required session pointer"); + return -EINVAL; + } + + result = start_loading_index_session(session); + if (result != UDS_SUCCESS) + return uds_status_to_errno(result); + + session->parameters = *parameters; + format_dev_t(name, parameters->bdev->bd_dev); + vdo_log_info("%s: %s", get_open_type_string(open_type), name); + + result = initialize_index_session(session, open_type); + if (result != UDS_SUCCESS) + vdo_log_error_strerror(result, "Failed %s", + get_open_type_string(open_type)); + + finish_loading_index_session(session, result); + return uds_status_to_errno(result); +} + +static void wait_for_no_requests_in_progress(struct uds_index_session *index_session) +{ + mutex_lock(&index_session->request_mutex); + while (index_session->request_count > 0) { + uds_wait_cond(&index_session->request_cond, + &index_session->request_mutex); + } + mutex_unlock(&index_session->request_mutex); +} + +static int __must_check save_index(struct uds_index_session *index_session) +{ + wait_for_no_requests_in_progress(index_session); + return uds_save_index(index_session->index); +} + +static void suspend_rebuild(struct uds_index_session *session) +{ + mutex_lock(&session->load_context.mutex); + switch (session->load_context.status) { + case INDEX_OPENING: + session->load_context.status = INDEX_SUSPENDING; + + /* Wait until the index indicates that it is not replaying. */ + while ((session->load_context.status != INDEX_SUSPENDED) && + (session->load_context.status != INDEX_READY)) { + uds_wait_cond(&session->load_context.cond, + &session->load_context.mutex); + } + + break; + + case INDEX_READY: + /* Index load does not need to be suspended. */ + break; + + case INDEX_SUSPENDED: + case INDEX_SUSPENDING: + case INDEX_FREEING: + default: + /* These cases should not happen. */ + VDO_ASSERT_LOG_ONLY(false, "Bad load context state %u", + session->load_context.status); + break; + } + mutex_unlock(&session->load_context.mutex); +} + +/* + * Suspend index operation, draining all current index requests and preventing new index requests + * from starting. Optionally saves all index data before returning. + */ +int uds_suspend_index_session(struct uds_index_session *session, bool save) +{ + int result = UDS_SUCCESS; + bool no_work = false; + bool rebuilding = false; + + /* Wait for any current index state change to complete. */ + mutex_lock(&session->request_mutex); + while (session->state & IS_FLAG_CLOSING) + uds_wait_cond(&session->request_cond, &session->request_mutex); + + if ((session->state & IS_FLAG_WAITING) || (session->state & IS_FLAG_DESTROYING)) { + no_work = true; + vdo_log_info("Index session is already changing state"); + result = -EBUSY; + } else if (session->state & IS_FLAG_SUSPENDED) { + no_work = true; + } else if (session->state & IS_FLAG_LOADING) { + session->state |= IS_FLAG_WAITING; + rebuilding = true; + } else if (session->state & IS_FLAG_LOADED) { + session->state |= IS_FLAG_WAITING; + } else { + no_work = true; + session->state |= IS_FLAG_SUSPENDED; + uds_broadcast_cond(&session->request_cond); + } + mutex_unlock(&session->request_mutex); + + if (no_work) + return uds_status_to_errno(result); + + if (rebuilding) + suspend_rebuild(session); + else if (save) + result = save_index(session); + else + result = uds_flush_index_session(session); + + mutex_lock(&session->request_mutex); + session->state &= ~IS_FLAG_WAITING; + session->state |= IS_FLAG_SUSPENDED; + uds_broadcast_cond(&session->request_cond); + mutex_unlock(&session->request_mutex); + return uds_status_to_errno(result); +} + +static int replace_device(struct uds_index_session *session, struct block_device *bdev) +{ + int result; + + result = uds_replace_index_storage(session->index, bdev); + if (result != UDS_SUCCESS) + return result; + + session->parameters.bdev = bdev; + return UDS_SUCCESS; +} + +/* + * Resume index operation after being suspended. If the index is suspended and the supplied block + * device differs from the current backing store, the index will start using the new backing store. + */ +int uds_resume_index_session(struct uds_index_session *session, + struct block_device *bdev) +{ + int result = UDS_SUCCESS; + bool no_work = false; + bool resume_replay = false; + + mutex_lock(&session->request_mutex); + if (session->state & IS_FLAG_WAITING) { + vdo_log_info("Index session is already changing state"); + no_work = true; + result = -EBUSY; + } else if (!(session->state & IS_FLAG_SUSPENDED)) { + /* If not suspended, just succeed. */ + no_work = true; + result = UDS_SUCCESS; + } else { + session->state |= IS_FLAG_WAITING; + if (session->state & IS_FLAG_LOADING) + resume_replay = true; + } + mutex_unlock(&session->request_mutex); + + if (no_work) + return result; + + if ((session->index != NULL) && (bdev != session->parameters.bdev)) { + result = replace_device(session, bdev); + if (result != UDS_SUCCESS) { + mutex_lock(&session->request_mutex); + session->state &= ~IS_FLAG_WAITING; + uds_broadcast_cond(&session->request_cond); + mutex_unlock(&session->request_mutex); + return uds_status_to_errno(result); + } + } + + if (resume_replay) { + mutex_lock(&session->load_context.mutex); + switch (session->load_context.status) { + case INDEX_SUSPENDED: + session->load_context.status = INDEX_OPENING; + /* Notify the index to start replaying again. */ + uds_broadcast_cond(&session->load_context.cond); + break; + + case INDEX_READY: + /* There is no index rebuild to resume. */ + break; + + case INDEX_OPENING: + case INDEX_SUSPENDING: + case INDEX_FREEING: + default: + /* These cases should not happen; do nothing. */ + VDO_ASSERT_LOG_ONLY(false, "Bad load context state %u", + session->load_context.status); + break; + } + mutex_unlock(&session->load_context.mutex); + } + + mutex_lock(&session->request_mutex); + session->state &= ~IS_FLAG_WAITING; + session->state &= ~IS_FLAG_SUSPENDED; + uds_broadcast_cond(&session->request_cond); + mutex_unlock(&session->request_mutex); + return UDS_SUCCESS; +} + +static int save_and_free_index(struct uds_index_session *index_session) +{ + int result = UDS_SUCCESS; + bool suspended; + struct uds_index *index = index_session->index; + + if (index == NULL) + return UDS_SUCCESS; + + mutex_lock(&index_session->request_mutex); + suspended = (index_session->state & IS_FLAG_SUSPENDED); + mutex_unlock(&index_session->request_mutex); + + if (!suspended) { + result = uds_save_index(index); + if (result != UDS_SUCCESS) + vdo_log_warning_strerror(result, + "ignoring error from save_index"); + } + uds_free_index(index); + index_session->index = NULL; + + /* + * Reset all index state that happens to be in the index + * session, so it doesn't affect any future index. + */ + mutex_lock(&index_session->load_context.mutex); + index_session->load_context.status = INDEX_OPENING; + mutex_unlock(&index_session->load_context.mutex); + + mutex_lock(&index_session->request_mutex); + /* Only the suspend bit will remain relevant. */ + index_session->state &= IS_FLAG_SUSPENDED; + mutex_unlock(&index_session->request_mutex); + + return result; +} + +/* Save and close the current index. */ +int uds_close_index(struct uds_index_session *index_session) +{ + int result = UDS_SUCCESS; + + /* Wait for any current index state change to complete. */ + mutex_lock(&index_session->request_mutex); + while ((index_session->state & IS_FLAG_WAITING) || + (index_session->state & IS_FLAG_CLOSING)) { + uds_wait_cond(&index_session->request_cond, + &index_session->request_mutex); + } + + if (index_session->state & IS_FLAG_SUSPENDED) { + vdo_log_info("Index session is suspended"); + result = -EBUSY; + } else if ((index_session->state & IS_FLAG_DESTROYING) || + !(index_session->state & IS_FLAG_LOADED)) { + /* The index doesn't exist, hasn't finished loading, or is being destroyed. */ + result = UDS_NO_INDEX; + } else { + index_session->state |= IS_FLAG_CLOSING; + } + mutex_unlock(&index_session->request_mutex); + if (result != UDS_SUCCESS) + return uds_status_to_errno(result); + + vdo_log_debug("Closing index"); + wait_for_no_requests_in_progress(index_session); + result = save_and_free_index(index_session); + vdo_log_debug("Closed index"); + + mutex_lock(&index_session->request_mutex); + index_session->state &= ~IS_FLAG_CLOSING; + uds_broadcast_cond(&index_session->request_cond); + mutex_unlock(&index_session->request_mutex); + return uds_status_to_errno(result); +} + +/* This will save and close an open index before destroying the session. */ +int uds_destroy_index_session(struct uds_index_session *index_session) +{ + int result; + bool load_pending = false; + + vdo_log_debug("Destroying index session"); + + /* Wait for any current index state change to complete. */ + mutex_lock(&index_session->request_mutex); + while ((index_session->state & IS_FLAG_WAITING) || + (index_session->state & IS_FLAG_CLOSING)) { + uds_wait_cond(&index_session->request_cond, + &index_session->request_mutex); + } + + if (index_session->state & IS_FLAG_DESTROYING) { + mutex_unlock(&index_session->request_mutex); + vdo_log_info("Index session is already closing"); + return -EBUSY; + } + + index_session->state |= IS_FLAG_DESTROYING; + load_pending = ((index_session->state & IS_FLAG_LOADING) && + (index_session->state & IS_FLAG_SUSPENDED)); + mutex_unlock(&index_session->request_mutex); + + if (load_pending) { + /* Tell the index to terminate the rebuild. */ + mutex_lock(&index_session->load_context.mutex); + if (index_session->load_context.status == INDEX_SUSPENDED) { + index_session->load_context.status = INDEX_FREEING; + uds_broadcast_cond(&index_session->load_context.cond); + } + mutex_unlock(&index_session->load_context.mutex); + + /* Wait until the load exits before proceeding. */ + mutex_lock(&index_session->request_mutex); + while (index_session->state & IS_FLAG_LOADING) { + uds_wait_cond(&index_session->request_cond, + &index_session->request_mutex); + } + mutex_unlock(&index_session->request_mutex); + } + + wait_for_no_requests_in_progress(index_session); + result = save_and_free_index(index_session); + uds_request_queue_finish(index_session->callback_queue); + index_session->callback_queue = NULL; + vdo_log_debug("Destroyed index session"); + vdo_free(index_session); + return uds_status_to_errno(result); +} + +/* Wait until all callbacks for index operations are complete. */ +int uds_flush_index_session(struct uds_index_session *index_session) +{ + wait_for_no_requests_in_progress(index_session); + uds_wait_for_idle_index(index_session->index); + return UDS_SUCCESS; +} + +/* Statistics collection is intended to be thread-safe. */ +static void collect_stats(const struct uds_index_session *index_session, + struct uds_index_stats *stats) +{ + const struct session_stats *session_stats = &index_session->stats; + + stats->current_time = ktime_to_seconds(current_time_ns(CLOCK_REALTIME)); + stats->posts_found = READ_ONCE(session_stats->posts_found); + stats->in_memory_posts_found = READ_ONCE(session_stats->posts_found_open_chapter); + stats->dense_posts_found = READ_ONCE(session_stats->posts_found_dense); + stats->sparse_posts_found = READ_ONCE(session_stats->posts_found_sparse); + stats->posts_not_found = READ_ONCE(session_stats->posts_not_found); + stats->updates_found = READ_ONCE(session_stats->updates_found); + stats->updates_not_found = READ_ONCE(session_stats->updates_not_found); + stats->deletions_found = READ_ONCE(session_stats->deletions_found); + stats->deletions_not_found = READ_ONCE(session_stats->deletions_not_found); + stats->queries_found = READ_ONCE(session_stats->queries_found); + stats->queries_not_found = READ_ONCE(session_stats->queries_not_found); + stats->requests = READ_ONCE(session_stats->requests); +} + +int uds_get_index_session_stats(struct uds_index_session *index_session, + struct uds_index_stats *stats) +{ + if (stats == NULL) { + vdo_log_error("received a NULL index stats pointer"); + return -EINVAL; + } + + collect_stats(index_session, stats); + if (index_session->index != NULL) { + uds_get_index_stats(index_session->index, stats); + } else { + stats->entries_indexed = 0; + stats->memory_used = 0; + stats->collisions = 0; + stats->entries_discarded = 0; + } + + return UDS_SUCCESS; +} + +void uds_wait_cond(struct cond_var *cv, struct mutex *mutex) +{ + DEFINE_WAIT(__wait); + + prepare_to_wait(&cv->wait_queue, &__wait, TASK_IDLE); + mutex_unlock(mutex); + schedule(); + finish_wait(&cv->wait_queue, &__wait); + mutex_lock(mutex); +} |