diff options
Diffstat (limited to '')
-rw-r--r-- | src/pmdk/src/libpmemobj/memops.c | 837 |
1 files changed, 837 insertions, 0 deletions
diff --git a/src/pmdk/src/libpmemobj/memops.c b/src/pmdk/src/libpmemobj/memops.c new file mode 100644 index 000000000..81464e663 --- /dev/null +++ b/src/pmdk/src/libpmemobj/memops.c @@ -0,0 +1,837 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2016-2020, Intel Corporation */ + +/* + * memops.c -- aggregated memory operations helper implementation + * + * The operation collects all of the required memory modifications that + * need to happen in an atomic way (all of them or none), and abstracts + * away the storage type (transient/persistent) and the underlying + * implementation of how it's actually performed - in some cases using + * the redo log is unnecessary and the allocation process can be sped up + * a bit by completely omitting that whole machinery. + * + * The modifications are not visible until the context is processed. + */ + +#include "memops.h" +#include "obj.h" +#include "out.h" +#include "ravl.h" +#include "valgrind_internal.h" +#include "vecq.h" +#include "sys_util.h" + +#define ULOG_BASE_SIZE 1024 +#define OP_MERGE_SEARCH 64 + +enum operation_state { + OPERATION_IDLE, + OPERATION_IN_PROGRESS, + OPERATION_CLEANUP, +}; + +struct operation_log { + size_t capacity; /* capacity of the ulog log */ + size_t offset; /* data offset inside of the log */ + struct ulog *ulog; /* DRAM allocated log of modifications */ +}; + +/* + * operation_context -- context of an ongoing palloc operation + */ +struct operation_context { + enum log_type type; + + ulog_extend_fn extend; /* function to allocate next ulog */ + ulog_free_fn ulog_free; /* function to free next ulogs */ + + const struct pmem_ops *p_ops; + struct pmem_ops t_ops; /* used for transient data processing */ + struct pmem_ops s_ops; /* used for shadow copy data processing */ + + size_t ulog_curr_offset; /* offset in the log for buffer stores */ + size_t ulog_curr_capacity; /* capacity of the current log */ + size_t ulog_curr_gen_num; /* transaction counter in the current log */ + struct ulog *ulog_curr; /* current persistent log */ + size_t total_logged; /* total amount of buffer stores in the logs */ + + struct ulog *ulog; /* pointer to the persistent ulog log */ + size_t ulog_base_nbytes; /* available bytes in initial ulog log */ + size_t ulog_capacity; /* sum of capacity, incl all next ulog logs */ + int ulog_auto_reserve; /* allow or do not to auto ulog reservation */ + int ulog_any_user_buffer; /* set if any user buffer is added */ + + struct ulog_next next; /* vector of 'next' fields of persistent ulog */ + + enum operation_state state; /* operation sanity check */ + + struct operation_log pshadow_ops; /* shadow copy of persistent ulog */ + struct operation_log transient_ops; /* log of transient changes */ + + /* collection used to look for potential merge candidates */ + VECQ(, struct ulog_entry_val *) merge_entries; +}; + +/* + * operation_log_transient_init -- (internal) initialize operation log + * containing transient memory resident changes + */ +static int +operation_log_transient_init(struct operation_log *log) +{ + log->capacity = ULOG_BASE_SIZE; + log->offset = 0; + + struct ulog *src = Zalloc(sizeof(struct ulog) + + ULOG_BASE_SIZE); + if (src == NULL) { + ERR("!Zalloc"); + return -1; + } + + /* initialize underlying redo log structure */ + src->capacity = ULOG_BASE_SIZE; + + log->ulog = src; + + return 0; +} + +/* + * operation_log_persistent_init -- (internal) initialize operation log + * containing persistent memory resident changes + */ +static int +operation_log_persistent_init(struct operation_log *log, + size_t ulog_base_nbytes) +{ + log->capacity = ULOG_BASE_SIZE; + log->offset = 0; + + struct ulog *src = Zalloc(sizeof(struct ulog) + + ULOG_BASE_SIZE); + if (src == NULL) { + ERR("!Zalloc"); + return -1; + } + + /* initialize underlying redo log structure */ + src->capacity = ulog_base_nbytes; + memset(src->unused, 0, sizeof(src->unused)); + + log->ulog = src; + + return 0; +} + +/* + * operation_transient_clean -- cleans pmemcheck address state + */ +static int +operation_transient_clean(void *base, const void *addr, size_t len, + unsigned flags) +{ + VALGRIND_SET_CLEAN(addr, len); + + return 0; +} + +/* + * operation_transient_drain -- noop + */ +static void +operation_transient_drain(void *base) +{ +} + +/* + * operation_transient_memcpy -- transient memcpy wrapper + */ +static void * +operation_transient_memcpy(void *base, void *dest, const void *src, size_t len, + unsigned flags) +{ + return memcpy(dest, src, len); +} + +/* + * operation_new -- creates new operation context + */ +struct operation_context * +operation_new(struct ulog *ulog, size_t ulog_base_nbytes, + ulog_extend_fn extend, ulog_free_fn ulog_free, + const struct pmem_ops *p_ops, enum log_type type) +{ + struct operation_context *ctx = Zalloc(sizeof(*ctx)); + if (ctx == NULL) { + ERR("!Zalloc"); + goto error_ctx_alloc; + } + + ctx->ulog = ulog; + ctx->ulog_base_nbytes = ulog_base_nbytes; + ctx->ulog_capacity = ulog_capacity(ulog, + ulog_base_nbytes, p_ops); + ctx->extend = extend; + ctx->ulog_free = ulog_free; + ctx->state = OPERATION_IDLE; + VEC_INIT(&ctx->next); + ulog_rebuild_next_vec(ulog, &ctx->next, p_ops); + ctx->p_ops = p_ops; + ctx->type = type; + ctx->ulog_any_user_buffer = 0; + + ctx->ulog_curr_offset = 0; + ctx->ulog_curr_capacity = 0; + ctx->ulog_curr = NULL; + + ctx->t_ops.base = NULL; + ctx->t_ops.flush = operation_transient_clean; + ctx->t_ops.memcpy = operation_transient_memcpy; + ctx->t_ops.drain = operation_transient_drain; + + ctx->s_ops.base = p_ops->base; + ctx->s_ops.flush = operation_transient_clean; + ctx->s_ops.memcpy = operation_transient_memcpy; + ctx->s_ops.drain = operation_transient_drain; + + VECQ_INIT(&ctx->merge_entries); + + if (operation_log_transient_init(&ctx->transient_ops) != 0) + goto error_ulog_alloc; + + if (operation_log_persistent_init(&ctx->pshadow_ops, + ulog_base_nbytes) != 0) + goto error_ulog_alloc; + + return ctx; + +error_ulog_alloc: + operation_delete(ctx); +error_ctx_alloc: + return NULL; +} + +/* + * operation_delete -- deletes operation context + */ +void +operation_delete(struct operation_context *ctx) +{ + VECQ_DELETE(&ctx->merge_entries); + VEC_DELETE(&ctx->next); + Free(ctx->pshadow_ops.ulog); + Free(ctx->transient_ops.ulog); + Free(ctx); +} + +/* + * operation_user_buffer_remove -- removes range from the tree and returns 0 + */ +static int +operation_user_buffer_remove(void *base, void *addr) +{ + PMEMobjpool *pop = base; + if (!pop->ulog_user_buffers.verify) + return 0; + + util_mutex_lock(&pop->ulog_user_buffers.lock); + + struct ravl *ravl = pop->ulog_user_buffers.map; + enum ravl_predicate predict = RAVL_PREDICATE_EQUAL; + + struct user_buffer_def range; + range.addr = addr; + range.size = 0; + + struct ravl_node *n = ravl_find(ravl, &range, predict); + ASSERTne(n, NULL); + ravl_remove(ravl, n); + + util_mutex_unlock(&pop->ulog_user_buffers.lock); + + return 0; +} + +/* + * operation_free_logs -- free all logs except first + */ +void +operation_free_logs(struct operation_context *ctx, uint64_t flags) +{ + int freed = ulog_free_next(ctx->ulog, ctx->p_ops, ctx->ulog_free, + operation_user_buffer_remove, flags); + if (freed) { + ctx->ulog_capacity = ulog_capacity(ctx->ulog, + ctx->ulog_base_nbytes, ctx->p_ops); + VEC_CLEAR(&ctx->next); + ulog_rebuild_next_vec(ctx->ulog, &ctx->next, ctx->p_ops); + } + + ASSERTeq(VEC_SIZE(&ctx->next), 0); +} + +/* + * operation_merge -- (internal) performs operation on a field + */ +static inline void +operation_merge(struct ulog_entry_base *entry, uint64_t value, + ulog_operation_type type) +{ + struct ulog_entry_val *e = (struct ulog_entry_val *)entry; + + switch (type) { + case ULOG_OPERATION_AND: + e->value &= value; + break; + case ULOG_OPERATION_OR: + e->value |= value; + break; + case ULOG_OPERATION_SET: + e->value = value; + break; + default: + ASSERT(0); /* unreachable */ + } +} + +/* + * operation_try_merge_entry -- tries to merge the incoming log entry with + * existing entries + * + * Because this requires a reverse foreach, it cannot be implemented using + * the on-media ulog log structure since there's no way to find what's + * the previous entry in the log. Instead, the last N entries are stored + * in a collection and traversed backwards. + */ +static int +operation_try_merge_entry(struct operation_context *ctx, + void *ptr, uint64_t value, ulog_operation_type type) +{ + int ret = 0; + uint64_t offset = OBJ_PTR_TO_OFF(ctx->p_ops->base, ptr); + + struct ulog_entry_val *e; + VECQ_FOREACH_REVERSE(e, &ctx->merge_entries) { + if (ulog_entry_offset(&e->base) == offset) { + if (ulog_entry_type(&e->base) == type) { + operation_merge(&e->base, value, type); + return 1; + } else { + break; + } + } + } + + return ret; +} + +/* + * operation_merge_entry_add -- adds a new entry to the merge collection, + * keeps capacity at OP_MERGE_SEARCH. Removes old entries in FIFO fashion. + */ +static void +operation_merge_entry_add(struct operation_context *ctx, + struct ulog_entry_val *entry) +{ + if (VECQ_SIZE(&ctx->merge_entries) == OP_MERGE_SEARCH) + (void) VECQ_DEQUEUE(&ctx->merge_entries); + + if (VECQ_ENQUEUE(&ctx->merge_entries, entry) != 0) { + /* this is fine, only runtime perf will get slower */ + LOG(2, "out of memory - unable to track entries"); + } +} + +/* + * operation_add_typed_value -- adds new entry to the current operation, if the + * same ptr address already exists and the operation type is set, + * the new value is not added and the function has no effect. + */ +int +operation_add_typed_entry(struct operation_context *ctx, + void *ptr, uint64_t value, + ulog_operation_type type, enum operation_log_type log_type) +{ + struct operation_log *oplog = log_type == LOG_PERSISTENT ? + &ctx->pshadow_ops : &ctx->transient_ops; + + /* + * Always make sure to have one extra spare cacheline so that the + * ulog log entry creation has enough room for zeroing. + */ + if (oplog->offset + CACHELINE_SIZE == oplog->capacity) { + size_t ncapacity = oplog->capacity + ULOG_BASE_SIZE; + struct ulog *ulog = Realloc(oplog->ulog, + SIZEOF_ULOG(ncapacity)); + if (ulog == NULL) + return -1; + oplog->capacity += ULOG_BASE_SIZE; + oplog->ulog = ulog; + oplog->ulog->capacity = oplog->capacity; + + /* + * Realloc invalidated the ulog entries that are inside of this + * vector, need to clear it to avoid use after free. + */ + VECQ_CLEAR(&ctx->merge_entries); + } + + if (log_type == LOG_PERSISTENT && + operation_try_merge_entry(ctx, ptr, value, type) != 0) + return 0; + + struct ulog_entry_val *entry = ulog_entry_val_create( + oplog->ulog, oplog->offset, ptr, value, type, + log_type == LOG_TRANSIENT ? &ctx->t_ops : &ctx->s_ops); + + if (log_type == LOG_PERSISTENT) + operation_merge_entry_add(ctx, entry); + + oplog->offset += ulog_entry_size(&entry->base); + + return 0; +} + +/* + * operation_add_value -- adds new entry to the current operation with + * entry type autodetected based on the memory location + */ +int +operation_add_entry(struct operation_context *ctx, void *ptr, uint64_t value, + ulog_operation_type type) +{ + const struct pmem_ops *p_ops = ctx->p_ops; + PMEMobjpool *pop = (PMEMobjpool *)p_ops->base; + + int from_pool = OBJ_OFF_IS_VALID(pop, + (uintptr_t)ptr - (uintptr_t)p_ops->base); + + return operation_add_typed_entry(ctx, ptr, value, type, + from_pool ? LOG_PERSISTENT : LOG_TRANSIENT); +} + +/* + * operation_add_buffer -- adds a buffer operation to the log + */ +int +operation_add_buffer(struct operation_context *ctx, + void *dest, void *src, size_t size, ulog_operation_type type) +{ + size_t real_size = size + sizeof(struct ulog_entry_buf); + + /* if there's no space left in the log, reserve some more */ + if (ctx->ulog_curr_capacity == 0) { + ctx->ulog_curr_gen_num = ctx->ulog->gen_num; + if (operation_reserve(ctx, ctx->total_logged + real_size) != 0) + return -1; + + ctx->ulog_curr = ctx->ulog_curr == NULL ? ctx->ulog : + ulog_next(ctx->ulog_curr, ctx->p_ops); + ASSERTne(ctx->ulog_curr, NULL); + ctx->ulog_curr_offset = 0; + ctx->ulog_curr_capacity = ctx->ulog_curr->capacity; + } + + size_t curr_size = MIN(real_size, ctx->ulog_curr_capacity); + size_t data_size = curr_size - sizeof(struct ulog_entry_buf); + size_t entry_size = ALIGN_UP(curr_size, CACHELINE_SIZE); + + /* + * To make sure that the log is consistent and contiguous, we need + * make sure that the header of the entry that would be located + * immediately after this one is zeroed. + */ + struct ulog_entry_base *next_entry = NULL; + if (entry_size == ctx->ulog_curr_capacity) { + struct ulog *u = ulog_next(ctx->ulog_curr, ctx->p_ops); + if (u != NULL) + next_entry = (struct ulog_entry_base *)u->data; + } else { + size_t next_entry_offset = ctx->ulog_curr_offset + entry_size; + next_entry = (struct ulog_entry_base *)(ctx->ulog_curr->data + + next_entry_offset); + } + if (next_entry != NULL) + ulog_clobber_entry(next_entry, ctx->p_ops); + + /* create a persistent log entry */ + struct ulog_entry_buf *e = ulog_entry_buf_create(ctx->ulog_curr, + ctx->ulog_curr_offset, + ctx->ulog_curr_gen_num, + dest, src, data_size, + type, ctx->p_ops); + ASSERT(entry_size == ulog_entry_size(&e->base)); + ASSERT(entry_size <= ctx->ulog_curr_capacity); + + ctx->total_logged += entry_size; + ctx->ulog_curr_offset += entry_size; + ctx->ulog_curr_capacity -= entry_size; + + /* + * Recursively add the data to the log until the entire buffer is + * processed. + */ + return size - data_size == 0 ? 0 : operation_add_buffer(ctx, + (char *)dest + data_size, + (char *)src + data_size, + size - data_size, type); +} + +/* + * operation_user_buffer_range_cmp -- compares addresses of + * user buffers + */ +int +operation_user_buffer_range_cmp(const void *lhs, const void *rhs) +{ + const struct user_buffer_def *l = lhs; + const struct user_buffer_def *r = rhs; + + if (l->addr > r->addr) + return 1; + else if (l->addr < r->addr) + return -1; + + return 0; +} + +/* + * operation_user_buffer_try_insert -- adds a user buffer range to the tree, + * if the buffer already exists in the tree function returns -1, otherwise + * it returns 0 + */ +static int +operation_user_buffer_try_insert(PMEMobjpool *pop, + struct user_buffer_def *userbuf) +{ + int ret = 0; + + if (!pop->ulog_user_buffers.verify) + return ret; + + util_mutex_lock(&pop->ulog_user_buffers.lock); + + void *addr_end = (char *)userbuf->addr + userbuf->size; + struct user_buffer_def search; + search.addr = addr_end; + struct ravl_node *n = ravl_find(pop->ulog_user_buffers.map, + &search, RAVL_PREDICATE_LESS_EQUAL); + if (n != NULL) { + struct user_buffer_def *r = ravl_data(n); + void *r_end = (char *)r->addr + r->size; + + if (r_end > userbuf->addr && r->addr < addr_end) { + /* what was found overlaps with what is being added */ + ret = -1; + goto out; + } + } + + if (ravl_emplace_copy(pop->ulog_user_buffers.map, userbuf) == -1) { + ASSERTne(errno, EEXIST); + ret = -1; + } + +out: + util_mutex_unlock(&pop->ulog_user_buffers.lock); + return ret; +} + +/* + * operation_user_buffer_verify_align -- verify if the provided buffer can be + * used as a transaction log, and if so - perform necessary alignments + */ +int +operation_user_buffer_verify_align(struct operation_context *ctx, + struct user_buffer_def *userbuf) +{ + /* + * Address of the buffer has to be aligned up, and the size + * has to be aligned down, taking into account the number of bytes + * the address was incremented by. The remaining size has to be large + * enough to contain the header and at least one ulog entry. + */ + uint64_t buffer_offset = OBJ_PTR_TO_OFF(ctx->p_ops->base, + userbuf->addr); + ptrdiff_t size_diff = (intptr_t)ulog_by_offset(buffer_offset, + ctx->p_ops) - (intptr_t)userbuf->addr; + ssize_t capacity_unaligned = (ssize_t)userbuf->size - size_diff + - (ssize_t)sizeof(struct ulog); + if (capacity_unaligned < (ssize_t)CACHELINE_SIZE) { + ERR("Capacity insufficient"); + return -1; + } + + size_t capacity_aligned = ALIGN_DOWN((size_t)capacity_unaligned, + CACHELINE_SIZE); + + userbuf->addr = ulog_by_offset(buffer_offset, ctx->p_ops); + userbuf->size = capacity_aligned + sizeof(struct ulog); + + if (operation_user_buffer_try_insert(ctx->p_ops->base, userbuf)) { + ERR("Buffer currently used"); + return -1; + } + + return 0; +} + +/* + * operation_add_user_buffer -- add user buffer to the ulog + */ +void +operation_add_user_buffer(struct operation_context *ctx, + struct user_buffer_def *userbuf) +{ + uint64_t buffer_offset = OBJ_PTR_TO_OFF(ctx->p_ops->base, + userbuf->addr); + size_t capacity = userbuf->size - sizeof(struct ulog); + + ulog_construct(buffer_offset, capacity, ctx->ulog->gen_num, + 1, ULOG_USER_OWNED, ctx->p_ops); + + struct ulog *last_log; + /* if there is only one log */ + if (!VEC_SIZE(&ctx->next)) + last_log = ctx->ulog; + else /* get last element from vector */ + last_log = ulog_by_offset(VEC_BACK(&ctx->next), ctx->p_ops); + + ASSERTne(last_log, NULL); + size_t next_size = sizeof(last_log->next); + VALGRIND_ADD_TO_TX(&last_log->next, next_size); + last_log->next = buffer_offset; + pmemops_persist(ctx->p_ops, &last_log->next, next_size); + + VEC_PUSH_BACK(&ctx->next, buffer_offset); + ctx->ulog_capacity += capacity; + operation_set_any_user_buffer(ctx, 1); +} + +/* + * operation_set_auto_reserve -- set auto reserve value for context + */ +void +operation_set_auto_reserve(struct operation_context *ctx, int auto_reserve) +{ + ctx->ulog_auto_reserve = auto_reserve; +} + +/* + * operation_set_any_user_buffer -- set ulog_any_user_buffer value for context + */ +void +operation_set_any_user_buffer(struct operation_context *ctx, + int any_user_buffer) +{ + ctx->ulog_any_user_buffer = any_user_buffer; +} + +/* + * operation_get_any_user_buffer -- get ulog_any_user_buffer value from context + */ +int +operation_get_any_user_buffer(struct operation_context *ctx) +{ + return ctx->ulog_any_user_buffer; +} + +/* + * operation_process_persistent_redo -- (internal) process using ulog + */ +static void +operation_process_persistent_redo(struct operation_context *ctx) +{ + ASSERTeq(ctx->pshadow_ops.capacity % CACHELINE_SIZE, 0); + + ulog_store(ctx->ulog, ctx->pshadow_ops.ulog, + ctx->pshadow_ops.offset, ctx->ulog_base_nbytes, + ctx->ulog_capacity, + &ctx->next, ctx->p_ops); + + ulog_process(ctx->pshadow_ops.ulog, OBJ_OFF_IS_VALID_FROM_CTX, + ctx->p_ops); + + ulog_clobber(ctx->ulog, &ctx->next, ctx->p_ops); +} + +/* + * operation_process_persistent_undo -- (internal) process using ulog + */ +static void +operation_process_persistent_undo(struct operation_context *ctx) +{ + ASSERTeq(ctx->pshadow_ops.capacity % CACHELINE_SIZE, 0); + + ulog_process(ctx->ulog, OBJ_OFF_IS_VALID_FROM_CTX, ctx->p_ops); +} + +/* + * operation_reserve -- (internal) reserves new capacity in persistent ulog log + */ +int +operation_reserve(struct operation_context *ctx, size_t new_capacity) +{ + if (new_capacity > ctx->ulog_capacity) { + if (ctx->extend == NULL) { + ERR("no extend function present"); + return -1; + } + + if (ulog_reserve(ctx->ulog, + ctx->ulog_base_nbytes, + ctx->ulog_curr_gen_num, + ctx->ulog_auto_reserve, + &new_capacity, ctx->extend, + &ctx->next, ctx->p_ops) != 0) + return -1; + ctx->ulog_capacity = new_capacity; + } + + return 0; +} + +/* + * operation_init -- initializes runtime state of an operation + */ +void +operation_init(struct operation_context *ctx) +{ + struct operation_log *plog = &ctx->pshadow_ops; + struct operation_log *tlog = &ctx->transient_ops; + + VALGRIND_ANNOTATE_NEW_MEMORY(ctx, sizeof(*ctx)); + VALGRIND_ANNOTATE_NEW_MEMORY(tlog->ulog, sizeof(struct ulog) + + tlog->capacity); + VALGRIND_ANNOTATE_NEW_MEMORY(plog->ulog, sizeof(struct ulog) + + plog->capacity); + tlog->offset = 0; + plog->offset = 0; + VECQ_REINIT(&ctx->merge_entries); + + ctx->ulog_curr_offset = 0; + ctx->ulog_curr_capacity = 0; + ctx->ulog_curr_gen_num = 0; + ctx->ulog_curr = NULL; + ctx->total_logged = 0; + ctx->ulog_auto_reserve = 1; + ctx->ulog_any_user_buffer = 0; +} + +/* + * operation_start -- initializes and starts a new operation + */ +void +operation_start(struct operation_context *ctx) +{ + operation_init(ctx); + ASSERTeq(ctx->state, OPERATION_IDLE); + ctx->state = OPERATION_IN_PROGRESS; +} + +void +operation_resume(struct operation_context *ctx) +{ + operation_start(ctx); + ctx->total_logged = ulog_base_nbytes(ctx->ulog); +} + +/* + * operation_cancel -- cancels a running operation + */ +void +operation_cancel(struct operation_context *ctx) +{ + ASSERTeq(ctx->state, OPERATION_IN_PROGRESS); + ctx->state = OPERATION_IDLE; +} + +/* + * operation_process -- processes registered operations + * + * The order of processing is important: persistent, transient. + * This is because the transient entries that reside on persistent memory might + * require write to a location that is currently occupied by a valid persistent + * state but becomes a transient state after operation is processed. + */ +void +operation_process(struct operation_context *ctx) +{ + /* + * If there's exactly one persistent entry there's no need to involve + * the redo log. We can simply assign the value, the operation will be + * atomic. + */ + int redo_process = ctx->type == LOG_TYPE_REDO && + ctx->pshadow_ops.offset != 0; + if (redo_process && + ctx->pshadow_ops.offset == sizeof(struct ulog_entry_val)) { + struct ulog_entry_base *e = (struct ulog_entry_base *) + ctx->pshadow_ops.ulog->data; + ulog_operation_type t = ulog_entry_type(e); + if (t == ULOG_OPERATION_SET || t == ULOG_OPERATION_AND || + t == ULOG_OPERATION_OR) { + ulog_entry_apply(e, 1, ctx->p_ops); + redo_process = 0; + } + } + + if (redo_process) { + operation_process_persistent_redo(ctx); + ctx->state = OPERATION_CLEANUP; + } else if (ctx->type == LOG_TYPE_UNDO && ctx->total_logged != 0) { + operation_process_persistent_undo(ctx); + ctx->state = OPERATION_CLEANUP; + } + + /* process transient entries with transient memory ops */ + if (ctx->transient_ops.offset != 0) + ulog_process(ctx->transient_ops.ulog, NULL, &ctx->t_ops); +} + +/* + * operation_finish -- finalizes the operation + */ +void +operation_finish(struct operation_context *ctx, unsigned flags) +{ + ASSERTne(ctx->state, OPERATION_IDLE); + + if (ctx->type == LOG_TYPE_UNDO && ctx->total_logged != 0) + ctx->state = OPERATION_CLEANUP; + + if (ctx->ulog_any_user_buffer) { + flags |= ULOG_ANY_USER_BUFFER; + ctx->state = OPERATION_CLEANUP; + } + + if (ctx->state != OPERATION_CLEANUP) + goto out; + + if (ctx->type == LOG_TYPE_UNDO) { + int ret = ulog_clobber_data(ctx->ulog, + ctx->total_logged, ctx->ulog_base_nbytes, + &ctx->next, ctx->ulog_free, + operation_user_buffer_remove, + ctx->p_ops, flags); + if (ret == 0) + goto out; + } else if (ctx->type == LOG_TYPE_REDO) { + int ret = ulog_free_next(ctx->ulog, ctx->p_ops, + ctx->ulog_free, operation_user_buffer_remove, + flags); + if (ret == 0) + goto out; + } + + /* clobbering shrunk the ulog */ + ctx->ulog_capacity = ulog_capacity(ctx->ulog, + ctx->ulog_base_nbytes, ctx->p_ops); + VEC_CLEAR(&ctx->next); + ulog_rebuild_next_vec(ctx->ulog, &ctx->next, ctx->p_ops); + +out: + ctx->state = OPERATION_IDLE; +} |