diff options
Diffstat (limited to 'src/journal/managed-journal-file.c')
-rw-r--r-- | src/journal/managed-journal-file.c | 561 |
1 files changed, 561 insertions, 0 deletions
diff --git a/src/journal/managed-journal-file.c b/src/journal/managed-journal-file.c new file mode 100644 index 0000000..8101677 --- /dev/null +++ b/src/journal/managed-journal-file.c @@ -0,0 +1,561 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include <pthread.h> +#include <unistd.h> + +#include "chattr-util.h" +#include "copy.h" +#include "errno-util.h" +#include "fd-util.h" +#include "format-util.h" +#include "journal-authenticate.h" +#include "managed-journal-file.h" +#include "path-util.h" +#include "random-util.h" +#include "set.h" +#include "stat-util.h" +#include "sync-util.h" + +#define PAYLOAD_BUFFER_SIZE (16U * 1024U) +#define MINIMUM_HOLE_SIZE (1U * 1024U * 1024U / 2U) + +static int managed_journal_file_truncate(JournalFile *f) { + uint64_t p; + int r; + + /* truncate excess from the end of archives */ + r = journal_file_tail_end_by_pread(f, &p); + if (r < 0) + return log_debug_errno(r, "Failed to determine end of tail object: %m"); + + /* arena_size can't exceed the file size, ensure it's updated before truncating */ + f->header->arena_size = htole64(p - le64toh(f->header->header_size)); + + if (ftruncate(f->fd, p) < 0) + return log_debug_errno(errno, "Failed to truncate %s: %m", f->path); + + return journal_file_fstat(f); +} + +static int managed_journal_file_entry_array_punch_hole(JournalFile *f, uint64_t p, uint64_t n_entries) { + Object o; + uint64_t offset, sz, n_items = 0, n_unused; + int r; + + if (n_entries == 0) + return 0; + + for (uint64_t q = p; q != 0; q = le64toh(o.entry_array.next_entry_array_offset)) { + r = journal_file_read_object_header(f, OBJECT_ENTRY_ARRAY, q, &o); + if (r < 0) + return r; + + n_items += journal_file_entry_array_n_items(f, &o); + p = q; + } + + if (p == 0) + return 0; + + if (n_entries > n_items) + return -EBADMSG; + + /* Amount of unused items in the final entry array. */ + n_unused = n_items - n_entries; + + if (n_unused == 0) + return 0; + + offset = p + offsetof(Object, entry_array.items) + + (journal_file_entry_array_n_items(f, &o) - n_unused) * journal_file_entry_array_item_size(f); + sz = p + le64toh(o.object.size) - offset; + + if (sz < MINIMUM_HOLE_SIZE) + return 0; + + if (p == le64toh(f->header->tail_object_offset) && !JOURNAL_HEADER_SEALED(f->header)) { + ssize_t n; + + o.object.size = htole64(offset - p); + + n = pwrite(f->fd, &o, sizeof(EntryArrayObject), p); + if (n < 0) + return log_debug_errno(errno, "Failed to modify entry array object size: %m"); + if ((size_t) n != sizeof(EntryArrayObject)) + return log_debug_errno(SYNTHETIC_ERRNO(EIO), "Short pwrite() while modifying entry array object size."); + + f->header->arena_size = htole64(ALIGN64(offset) - le64toh(f->header->header_size)); + + if (ftruncate(f->fd, ALIGN64(offset)) < 0) + return log_debug_errno(errno, "Failed to truncate %s: %m", f->path); + + return 0; + } + + if (fallocate(f->fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, sz) < 0) { + if (ERRNO_IS_NOT_SUPPORTED(errno)) + return log_debug_errno(SYNTHETIC_ERRNO(EOPNOTSUPP), /* Make recognizable */ + "Hole punching not supported by backing file system, skipping."); + + return log_debug_errno(errno, "Failed to punch hole in entry array of %s: %m", f->path); + } + + return 0; +} + +static int managed_journal_file_punch_holes(JournalFile *f) { + HashItem items[PAYLOAD_BUFFER_SIZE / sizeof(HashItem)]; + uint64_t p, sz; + ssize_t n = SSIZE_MAX; + int r; + + r = managed_journal_file_entry_array_punch_hole( + f, le64toh(f->header->entry_array_offset), le64toh(f->header->n_entries)); + if (r < 0) + return r; + + p = le64toh(f->header->data_hash_table_offset); + sz = le64toh(f->header->data_hash_table_size); + + for (uint64_t i = p; i < p + sz && n > 0; i += n) { + size_t m = MIN(sizeof(items), p + sz - i); + n = pread(f->fd, items, m, i); + if (n < 0) + return log_debug_errno(errno, "Failed to read hash table items: %m"); + + /* Let's ignore any partial hash items by rounding down to the nearest multiple of HashItem. */ + n -= n % sizeof(HashItem); + + for (size_t j = 0; j < (size_t) n / sizeof(HashItem); j++) { + Object o; + + for (uint64_t q = le64toh(items[j].head_hash_offset); q != 0; + q = le64toh(o.data.next_hash_offset)) { + + r = journal_file_read_object_header(f, OBJECT_DATA, q, &o); + if (r < 0) { + log_debug_errno(r, "Invalid data object: %m, ignoring"); + break; + } + + if (le64toh(o.data.n_entries) == 0) + continue; + + r = managed_journal_file_entry_array_punch_hole( + f, le64toh(o.data.entry_array_offset), le64toh(o.data.n_entries) - 1); + if (r == -EOPNOTSUPP) + return -EOPNOTSUPP; + + /* Ignore other errors */ + } + } + } + + return 0; +} + +/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync(). + * As a result we use atomic operations on f->offline_state for inter-thread communications with + * journal_file_set_offline() and journal_file_set_online(). */ +static void managed_journal_file_set_offline_internal(ManagedJournalFile *f) { + int r; + + assert(f); + assert(f->file->fd >= 0); + assert(f->file->header); + + for (;;) { + switch (f->file->offline_state) { + case OFFLINE_CANCEL: { + OfflineState tmp_state = OFFLINE_CANCEL; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_DONE, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + return; + + case OFFLINE_AGAIN_FROM_SYNCING: { + OfflineState tmp_state = OFFLINE_AGAIN_FROM_SYNCING; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_SYNCING, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + break; + + case OFFLINE_AGAIN_FROM_OFFLINING: { + OfflineState tmp_state = OFFLINE_AGAIN_FROM_OFFLINING; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_SYNCING, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + break; + + case OFFLINE_SYNCING: + if (f->file->archive) { + (void) managed_journal_file_truncate(f->file); + (void) managed_journal_file_punch_holes(f->file); + } + + (void) fsync(f->file->fd); + + { + OfflineState tmp_state = OFFLINE_SYNCING; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_OFFLINING, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + + f->file->header->state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE; + (void) fsync(f->file->fd); + + /* If we've archived the journal file, first try to re-enable COW on the file. If the + * FS_NOCOW_FL flag was never set or we successfully removed it, continue. If we fail + * to remove the flag on the archived file, rewrite the file without the NOCOW flag. + * We need this fallback because on some filesystems (BTRFS), the NOCOW flag cannot + * be removed after data has been written to a file. The only way to remove it is to + * copy all data to a new file without the NOCOW flag set. */ + + if (f->file->archive) { + r = chattr_fd(f->file->fd, 0, FS_NOCOW_FL, NULL); + if (r >= 0) + continue; + + log_debug_errno(r, "Failed to re-enable copy-on-write for %s: %m, rewriting file", f->file->path); + + r = copy_file_atomic(FORMAT_PROC_FD_PATH(f->file->fd), f->file->path, f->file->mode, + 0, + FS_NOCOW_FL, + COPY_REPLACE | COPY_FSYNC | COPY_HOLES | COPY_ALL_XATTRS); + if (r < 0) { + log_debug_errno(r, "Failed to rewrite %s: %m", f->file->path); + continue; + } + } + + break; + + case OFFLINE_OFFLINING: { + OfflineState tmp_state = OFFLINE_OFFLINING; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_DONE, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + _fallthrough_; + case OFFLINE_DONE: + return; + + case OFFLINE_JOINED: + log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()"); + return; + } + } +} + +static void * managed_journal_file_set_offline_thread(void *arg) { + ManagedJournalFile *f = arg; + + (void) pthread_setname_np(pthread_self(), "journal-offline"); + + managed_journal_file_set_offline_internal(f); + + return NULL; +} + +/* Trigger a restart if the offline thread is mid-flight in a restartable state. */ +static bool managed_journal_file_set_offline_try_restart(ManagedJournalFile *f) { + for (;;) { + switch (f->file->offline_state) { + case OFFLINE_AGAIN_FROM_SYNCING: + case OFFLINE_AGAIN_FROM_OFFLINING: + return true; + + case OFFLINE_CANCEL: { + OfflineState tmp_state = OFFLINE_CANCEL; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_AGAIN_FROM_SYNCING, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + return true; + + case OFFLINE_SYNCING: { + OfflineState tmp_state = OFFLINE_SYNCING; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_AGAIN_FROM_SYNCING, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + return true; + + case OFFLINE_OFFLINING: { + OfflineState tmp_state = OFFLINE_OFFLINING; + if (!__atomic_compare_exchange_n(&f->file->offline_state, &tmp_state, OFFLINE_AGAIN_FROM_OFFLINING, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + continue; + } + return true; + + default: + return false; + } + } +} + +/* Sets a journal offline. + * + * If wait is false then an offline is dispatched in a separate thread for a + * subsequent journal_file_set_offline() or journal_file_set_online() of the + * same journal to synchronize with. + * + * If wait is true, then either an existing offline thread will be restarted + * and joined, or if none exists the offline is simply performed in this + * context without involving another thread. + */ +int managed_journal_file_set_offline(ManagedJournalFile *f, bool wait) { + int target_state; + bool restarted; + int r; + + assert(f); + + if (!journal_file_writable(f->file)) + return -EPERM; + + if (f->file->fd < 0 || !f->file->header) + return -EINVAL; + + target_state = f->file->archive ? STATE_ARCHIVED : STATE_OFFLINE; + + /* An offlining journal is implicitly online and may modify f->header->state, + * we must also join any potentially lingering offline thread when already in + * the desired offline state. + */ + if (!managed_journal_file_is_offlining(f) && f->file->header->state == target_state) + return journal_file_set_offline_thread_join(f->file); + + /* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */ + restarted = managed_journal_file_set_offline_try_restart(f); + if ((restarted && wait) || !restarted) { + r = journal_file_set_offline_thread_join(f->file); + if (r < 0) + return r; + } + + if (restarted) + return 0; + + /* Initiate a new offline. */ + f->file->offline_state = OFFLINE_SYNCING; + + if (wait) /* Without using a thread if waiting. */ + managed_journal_file_set_offline_internal(f); + else { + sigset_t ss, saved_ss; + int k; + + assert_se(sigfillset(&ss) >= 0); + /* Don't block SIGBUS since the offlining thread accesses a memory mapped file. + * Asynchronous SIGBUS signals can safely be handled by either thread. */ + assert_se(sigdelset(&ss, SIGBUS) >= 0); + + r = pthread_sigmask(SIG_BLOCK, &ss, &saved_ss); + if (r > 0) + return -r; + + r = pthread_create(&f->file->offline_thread, NULL, managed_journal_file_set_offline_thread, f); + + k = pthread_sigmask(SIG_SETMASK, &saved_ss, NULL); + if (r > 0) { + f->file->offline_state = OFFLINE_JOINED; + return -r; + } + if (k > 0) + return -k; + } + + return 0; +} + +bool managed_journal_file_is_offlining(ManagedJournalFile *f) { + assert(f); + + __atomic_thread_fence(__ATOMIC_SEQ_CST); + + if (IN_SET(f->file->offline_state, OFFLINE_DONE, OFFLINE_JOINED)) + return false; + + return true; +} + +ManagedJournalFile* managed_journal_file_close(ManagedJournalFile *f) { + if (!f) + return NULL; + +#if HAVE_GCRYPT + /* Write the final tag */ + if (JOURNAL_HEADER_SEALED(f->file->header) && journal_file_writable(f->file)) { + int r; + + r = journal_file_append_tag(f->file); + if (r < 0) + log_error_errno(r, "Failed to append tag when closing journal: %m"); + } +#endif + + if (sd_event_source_get_enabled(f->file->post_change_timer, NULL) > 0) + journal_file_post_change(f->file); + sd_event_source_disable_unref(f->file->post_change_timer); + + managed_journal_file_set_offline(f, true); + + journal_file_close(f->file); + + return mfree(f); +} + +int managed_journal_file_open( + int fd, + const char *fname, + int open_flags, + JournalFileFlags file_flags, + mode_t mode, + uint64_t compress_threshold_bytes, + JournalMetrics *metrics, + MMapCache *mmap_cache, + Set *deferred_closes, + ManagedJournalFile *template, + ManagedJournalFile **ret) { + _cleanup_free_ ManagedJournalFile *f = NULL; + int r; + + set_clear_with_destructor(deferred_closes, managed_journal_file_close); + + f = new0(ManagedJournalFile, 1); + if (!f) + return -ENOMEM; + + r = journal_file_open(fd, fname, open_flags, file_flags, mode, compress_threshold_bytes, metrics, + mmap_cache, template ? template->file : NULL, &f->file); + if (r < 0) + return r; + + *ret = TAKE_PTR(f); + + return 0; +} + + +ManagedJournalFile* managed_journal_file_initiate_close(ManagedJournalFile *f, Set *deferred_closes) { + int r; + + assert(f); + + if (deferred_closes) { + r = set_put(deferred_closes, f); + if (r < 0) + log_debug_errno(r, "Failed to add file to deferred close set, closing immediately."); + else { + (void) managed_journal_file_set_offline(f, false); + return NULL; + } + } + + return managed_journal_file_close(f); +} + +int managed_journal_file_rotate( + ManagedJournalFile **f, + MMapCache *mmap_cache, + JournalFileFlags file_flags, + uint64_t compress_threshold_bytes, + Set *deferred_closes) { + + _cleanup_free_ char *path = NULL; + ManagedJournalFile *new_file = NULL; + int r; + + assert(f); + assert(*f); + + r = journal_file_archive((*f)->file, &path); + if (r < 0) + return r; + + r = managed_journal_file_open( + -1, + path, + (*f)->file->open_flags, + file_flags, + (*f)->file->mode, + compress_threshold_bytes, + NULL, /* metrics */ + mmap_cache, + deferred_closes, + *f, /* template */ + &new_file); + + managed_journal_file_initiate_close(*f, deferred_closes); + *f = new_file; + + return r; +} + +int managed_journal_file_open_reliably( + const char *fname, + int open_flags, + JournalFileFlags file_flags, + mode_t mode, + uint64_t compress_threshold_bytes, + JournalMetrics *metrics, + MMapCache *mmap_cache, + Set *deferred_closes, + ManagedJournalFile *template, + ManagedJournalFile **ret) { + + _cleanup_(managed_journal_file_closep) ManagedJournalFile *old_file = NULL; + int r; + + r = managed_journal_file_open(-1, fname, open_flags, file_flags, mode, compress_threshold_bytes, metrics, + mmap_cache, deferred_closes, template, ret); + if (!IN_SET(r, + -EBADMSG, /* Corrupted */ + -ENODATA, /* Truncated */ + -EHOSTDOWN, /* Other machine */ + -EPROTONOSUPPORT, /* Incompatible feature */ + -EBUSY, /* Unclean shutdown */ + -ESHUTDOWN, /* Already archived */ + -EIO, /* IO error, including SIGBUS on mmap */ + -EIDRM, /* File has been deleted */ + -ETXTBSY)) /* File is from the future */ + return r; + + if ((open_flags & O_ACCMODE) == O_RDONLY) + return r; + + if (!(open_flags & O_CREAT)) + return r; + + if (!endswith(fname, ".journal")) + return r; + + /* The file is corrupted. Rotate it away and try it again (but only once) */ + log_warning_errno(r, "File %s corrupted or uncleanly shut down, renaming and replacing.", fname); + + if (!template) { + /* The file is corrupted and no template is specified. Try opening it read-only as the + * template before rotating to inherit its sequence number and ID. */ + r = managed_journal_file_open(-1, fname, + (open_flags & ~(O_ACCMODE|O_CREAT|O_EXCL)) | O_RDONLY, + file_flags, 0, compress_threshold_bytes, NULL, + mmap_cache, deferred_closes, NULL, &old_file); + if (r < 0) + log_debug_errno(r, "Failed to continue sequence from file %s, ignoring: %m", fname); + else + template = old_file; + } + + r = journal_file_dispose(AT_FDCWD, fname); + if (r < 0) + return r; + + return managed_journal_file_open(-1, fname, open_flags, file_flags, mode, compress_threshold_bytes, metrics, + mmap_cache, deferred_closes, template, ret); +} |