// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "acconfig.h" #include "common/debug.h" #include "common/errno.h" #include "common/safe_io.h" #include "FileJournal.h" #include "include/color.h" #include "common/perf_counters.h" #include "FileStore.h" #include "include/compat.h" #include #include #include #include #include #include #include #include #include "common/blkdev.h" #if defined(__linux__) #include "common/linux_version.h" #endif #if defined(__FreeBSD__) #define O_DSYNC O_SYNC #endif #define dout_context cct #define dout_subsys ceph_subsys_journal #undef dout_prefix #define dout_prefix *_dout << "journal " const static int64_t ONE_MEG(1 << 20); const static int CEPH_DIRECTIO_ALIGNMENT(4096); int FileJournal::_open(bool forwrite, bool create) { int flags, ret; if (forwrite) { flags = O_RDWR; if (directio) flags |= O_DIRECT | O_DSYNC; } else { flags = O_RDONLY; } if (create) flags |= O_CREAT; if (fd >= 0) { if (TEMP_FAILURE_RETRY(::close(fd))) { int err = errno; derr << "FileJournal::_open: error closing old fd: " << cpp_strerror(err) << dendl; } } fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags|O_CLOEXEC, 0644)); if (fd < 0) { int err = errno; dout(2) << "FileJournal::_open unable to open journal " << fn << ": " << cpp_strerror(err) << dendl; return -err; } struct stat st; ret = ::fstat(fd, &st); if (ret) { ret = errno; derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl; ret = -ret; goto out_fd; } if (S_ISBLK(st.st_mode)) { ret = _open_block_device(); } else if (S_ISREG(st.st_mode)) { if (aio && !force_aio) { derr << "FileJournal::_open: disabling aio for non-block journal. Use " << "journal_force_aio to force use of aio anyway" << dendl; aio = false; } ret = _open_file(st.st_size, st.st_blksize, create); } else { derr << "FileJournal::_open: wrong journal file type: " << st.st_mode << dendl; ret = -EINVAL; } if (ret) goto out_fd; #ifdef HAVE_LIBAIO if (aio) { aio_ctx = 0; ret = io_setup(128, &aio_ctx); if (ret < 0) { switch (ret) { // Contrary to naive expectations -EAGIAN means ... case -EAGAIN: derr << "FileJournal::_open: user's limit of aio events exceeded. " << "Try increasing /proc/sys/fs/aio-max-nr" << dendl; break; default: derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl; break; } goto out_fd; } } #endif /* We really want max_size to be a multiple of block_size. */ max_size -= max_size % block_size; dout(1) << "_open " << fn << " fd " << fd << ": " << max_size << " bytes, block size " << block_size << " bytes, directio = " << directio << ", aio = " << aio << dendl; return 0; out_fd: VOID_TEMP_FAILURE_RETRY(::close(fd)); fd = -1; return ret; } int FileJournal::_open_block_device() { int64_t bdev_sz = 0; BlkDev blkdev(fd); int ret = blkdev.get_size(&bdev_sz); if (ret) { dout(0) << __func__ << ": failed to read block device size." << dendl; return -EIO; } /* Check for bdev_sz too small */ if (bdev_sz < ONE_MEG) { dout(0) << __func__ << ": your block device must be at least " << ONE_MEG << " bytes to be used for a Ceph journal." << dendl; return -EINVAL; } dout(10) << __func__ << ": ignoring osd journal size. " << "We'll use the entire block device (size: " << bdev_sz << ")" << dendl; max_size = bdev_sz; block_size = cct->_conf->journal_block_size; if (cct->_conf->journal_discard) { discard = blkdev.support_discard(); dout(10) << fn << " support discard: " << (int)discard << dendl; } return 0; } int FileJournal::_open_file(int64_t oldsize, blksize_t blksize, bool create) { int ret; int64_t conf_journal_sz(cct->_conf->osd_journal_size); conf_journal_sz <<= 20; if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) { derr << "I'm sorry, I don't know how large of a journal to create." << "Please specify a block device to use as the journal OR " << "set osd_journal_size in your ceph.conf" << dendl; return -EINVAL; } if (create && (oldsize < conf_journal_sz)) { uint64_t newsize(conf_journal_sz); dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl; ret = ::ftruncate(fd, newsize); if (ret < 0) { int err = errno; derr << "FileJournal::_open_file : unable to extend journal to " << newsize << " bytes: " << cpp_strerror(err) << dendl; return -err; } ret = ceph_posix_fallocate(fd, 0, newsize); if (ret) { derr << "FileJournal::_open_file : unable to preallocation journal to " << newsize << " bytes: " << cpp_strerror(ret) << dendl; return -ret; } max_size = newsize; } else { max_size = oldsize; } block_size = cct->_conf->journal_block_size; if (create && cct->_conf->journal_zero_on_create) { derr << "FileJournal::_open_file : zeroing journal" << dendl; uint64_t write_size = 1 << 20; char *buf; ret = ::posix_memalign((void **)&buf, block_size, write_size); if (ret != 0) { return -ret; } memset(static_cast(buf), 0, write_size); uint64_t i = 0; for (; (i + write_size) <= (uint64_t)max_size; i += write_size) { ret = ::pwrite(fd, static_cast(buf), write_size, i); if (ret < 0) { free(buf); return -errno; } } if (i < (uint64_t)max_size) { ret = ::pwrite(fd, static_cast(buf), max_size - i, i); if (ret < 0) { free(buf); return -errno; } } free(buf); } dout(10) << "_open journal is not a block device, NOT checking disk " << "write cache on '" << fn << "'" << dendl; return 0; } // This can not be used on an active journal int FileJournal::check() { int ret; ceph_assert(fd == -1); ret = _open(false, false); if (ret) return ret; ret = read_header(&header); if (ret < 0) goto done; if (header.fsid != fsid) { derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid << ", invalid (someone else's?) journal" << dendl; ret = -EINVAL; goto done; } dout(1) << "check: header looks ok" << dendl; ret = 0; done: close(); return ret; } int FileJournal::create() { void *buf = 0; int64_t needed_space; int ret; buffer::ptr bp; dout(2) << "create " << fn << " fsid " << fsid << dendl; ret = _open(true, true); if (ret) goto done; // write empty header header = header_t(); header.flags = header_t::FLAG_CRC; // enable crcs on any new journal. header.fsid = fsid; header.max_size = max_size; header.block_size = block_size; if (cct->_conf->journal_block_align || directio) header.alignment = block_size; else header.alignment = 16; // at least stay word aligned on 64bit machines... header.start = get_top(); header.start_seq = 0; print_header(header); // static zeroed buffer for alignment padding delete [] zero_buf; zero_buf = new char[header.alignment]; memset(zero_buf, 0, header.alignment); bp = prepare_header(); if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) { ret = -errno; derr << "FileJournal::create : create write header error " << cpp_strerror(ret) << dendl; goto close_fd; } // zero first little bit, too. ret = posix_memalign(&buf, block_size, block_size); if (ret) { ret = -ret; derr << "FileJournal::create: failed to allocate " << block_size << " bytes of memory: " << cpp_strerror(ret) << dendl; goto close_fd; } memset(buf, 0, block_size); if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) { ret = -errno; derr << "FileJournal::create: error zeroing first " << block_size << " bytes " << cpp_strerror(ret) << dendl; goto free_buf; } needed_space = cct->_conf->osd_max_write_size << 20; needed_space += (2 * sizeof(entry_header_t)) + get_top(); if (header.max_size - header.start < needed_space) { derr << "FileJournal::create: OSD journal is not large enough to hold " << "osd_max_write_size bytes!" << dendl; ret = -ENOSPC; goto free_buf; } dout(2) << "create done" << dendl; ret = 0; free_buf: free(buf); buf = 0; close_fd: if (TEMP_FAILURE_RETRY(::close(fd)) < 0) { ret = -errno; derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret) << dendl; } done: fd = -1; return ret; } // This can not be used on an active journal int FileJournal::peek_fsid(uuid_d& fsid) { ceph_assert(fd == -1); int r = _open(false, false); if (r) return r; r = read_header(&header); if (r < 0) goto out; fsid = header.fsid; out: close(); return r; } int FileJournal::open(uint64_t fs_op_seq) { dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl; uint64_t next_seq = fs_op_seq + 1; uint64_t seq = -1; int err = _open(false); if (err) return err; // assume writeable, unless... read_pos = 0; write_pos = get_top(); // read header? err = read_header(&header); if (err < 0) goto out; // static zeroed buffer for alignment padding delete [] zero_buf; zero_buf = new char[header.alignment]; memset(zero_buf, 0, header.alignment); dout(10) << "open header.fsid = " << header.fsid //<< " vs expected fsid = " << fsid << dendl; if (header.fsid != fsid) { derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid << ", invalid (someone else's?) journal" << dendl; err = -EINVAL; goto out; } if (header.max_size > max_size) { dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl; err = -EINVAL; goto out; } if (header.block_size != block_size) { dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl; err = -EINVAL; goto out; } if (header.max_size % header.block_size) { dout(2) << "open journal max size " << header.max_size << " not a multiple of block size " << header.block_size << dendl; err = -EINVAL; goto out; } if (header.alignment != block_size && directio) { dout(0) << "open journal alignment " << header.alignment << " does not match block size " << block_size << " (required for direct_io journal mode)" << dendl; err = -EINVAL; goto out; } if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) { dout(0) << "open journal alignment " << header.alignment << " is not multiple of minimum directio alignment " << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)" << dendl; err = -EINVAL; goto out; } // looks like a valid header. write_pos = 0; // not writeable yet journaled_seq = header.committed_up_to; // find next entry read_pos = header.start; seq = header.start_seq; while (1) { bufferlist bl; off64_t old_pos = read_pos; if (!read_entry(bl, seq)) { dout(10) << "open reached end of journal." << dendl; break; } if (seq > next_seq) { dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq << ", ignoring journal contents" << dendl; read_pos = -1; last_committed_seq = 0; return 0; } if (seq == next_seq) { dout(10) << "open reached seq " << seq << dendl; read_pos = old_pos; break; } seq++; // next event should follow. } return 0; out: close(); return err; } void FileJournal::_close(int fd) const { VOID_TEMP_FAILURE_RETRY(::close(fd)); } void FileJournal::close() { dout(1) << "close " << fn << dendl; // stop writer thread stop_writer(); // close ceph_assert(writeq_empty()); ceph_assert(!must_write_header); ceph_assert(fd >= 0); _close(fd); fd = -1; } int FileJournal::dump(ostream& out) { return _dump(out, false); } int FileJournal::simple_dump(ostream& out) { return _dump(out, true); } int FileJournal::_dump(ostream& out, bool simple) { JSONFormatter f(true); int ret = _fdump(f, simple); f.flush(out); return ret; } int FileJournal::_fdump(Formatter &f, bool simple) { dout(10) << "_fdump" << dendl; ceph_assert(fd == -1); int err = _open(false, false); if (err) return err; err = read_header(&header); if (err < 0) { close(); return err; } off64_t next_pos = header.start; f.open_object_section("journal"); f.open_object_section("header"); f.dump_unsigned("flags", header.flags); ostringstream os; os << header.fsid; f.dump_string("fsid", os.str()); f.dump_unsigned("block_size", header.block_size); f.dump_unsigned("alignment", header.alignment); f.dump_int("max_size", header.max_size); f.dump_int("start", header.start); f.dump_unsigned("committed_up_to", header.committed_up_to); f.dump_unsigned("start_seq", header.start_seq); f.close_section(); f.open_array_section("entries"); uint64_t seq = header.start_seq; while (1) { bufferlist bl; off64_t pos = next_pos; if (!pos) { dout(2) << "_dump -- not readable" << dendl; err = -EINVAL; break; } stringstream ss; read_entry_result result = do_read_entry( pos, &next_pos, &bl, &seq, &ss); if (result != SUCCESS) { if (seq < header.committed_up_to) { dout(2) << "Unable to read past sequence " << seq << " but header indicates the journal has committed up through " << header.committed_up_to << ", journal is corrupt" << dendl; err = -EINVAL; } dout(25) << ss.str() << dendl; dout(25) << "No further valid entries found, journal is most likely valid" << dendl; break; } f.open_object_section("entry"); f.dump_unsigned("offset", pos); f.dump_unsigned("seq", seq); if (simple) { f.dump_unsigned("bl.length", bl.length()); } else { f.open_array_section("transactions"); auto p = bl.cbegin(); int trans_num = 0; while (!p.end()) { ObjectStore::Transaction t(p); f.open_object_section("transaction"); f.dump_unsigned("trans_num", trans_num); t.dump(&f); f.close_section(); trans_num++; } f.close_section(); } f.close_section(); } f.close_section(); f.close_section(); dout(10) << "dump finish" << dendl; close(); return err; } void FileJournal::start_writer() { write_stop = false; aio_stop = false; write_thread.create("journal_write"); #ifdef HAVE_LIBAIO if (aio) write_finish_thread.create("journal_wrt_fin"); #endif } void FileJournal::stop_writer() { // Do nothing if writer already stopped or never started if (!write_stop) { { Mutex::Locker l(write_lock); Mutex::Locker p(writeq_lock); write_stop = true; writeq_cond.Signal(); // Doesn't hurt to signal commit_cond in case thread is waiting there // and caller didn't use committed_thru() first. commit_cond.Signal(); } write_thread.join(); // write journal header now so that we have less to replay on remount write_header_sync(); } #ifdef HAVE_LIBAIO // stop aio completeion thread *after* writer thread has stopped // and has submitted all of its io if (aio && !aio_stop) { aio_lock.Lock(); aio_stop = true; aio_cond.Signal(); write_finish_cond.Signal(); aio_lock.Unlock(); write_finish_thread.join(); } #endif } void FileJournal::print_header(const header_t &header) const { dout(10) << "header: block_size " << header.block_size << " alignment " << header.alignment << " max_size " << header.max_size << dendl; dout(10) << "header: start " << header.start << dendl; dout(10) << " write_pos " << write_pos << dendl; } int FileJournal::read_header(header_t *hdr) const { dout(10) << "read_header" << dendl; bufferlist bl; buffer::ptr bp = buffer::create_small_page_aligned(block_size); char* bpdata = bp.c_str(); int r = ::pread(fd, bpdata, bp.length(), 0); if (r < 0) { int err = errno; dout(0) << "read_header got " << cpp_strerror(err) << dendl; return -err; } // don't use bp.zero() here, because it also invalidates // crc cache (which is not yet populated anyway) if (bp.length() != (size_t)r) { // r will be always less or equal than bp.length bpdata += r; memset(bpdata, 0, bp.length() - r); } bl.push_back(std::move(bp)); try { auto p = bl.cbegin(); decode(*hdr, p); } catch (buffer::error& e) { derr << "read_header error decoding journal header" << dendl; return -EINVAL; } /* * Unfortunately we weren't initializing the flags field for new * journals! Aie. This is safe(ish) now that we have only one * flag. Probably around when we add the next flag we need to * remove this or else this (eventually old) code will clobber newer * code's flags. */ if (hdr->flags > 3) { derr << "read_header appears to have gibberish flags; assuming 0" << dendl; hdr->flags = 0; } print_header(*hdr); return 0; } bufferptr FileJournal::prepare_header() { bufferlist bl; { Mutex::Locker l(finisher_lock); header.committed_up_to = journaled_seq; } encode(header, bl); bufferptr bp = buffer::create_small_page_aligned(get_top()); // don't use bp.zero() here, because it also invalidates // crc cache (which is not yet populated anyway) char* data = bp.c_str(); memcpy(data, bl.c_str(), bl.length()); data += bl.length(); memset(data, 0, bp.length()-bl.length()); return bp; } void FileJournal::write_header_sync() { Mutex::Locker locker(write_lock); must_write_header = true; bufferlist bl; do_write(bl); dout(20) << __func__ << " finish" << dendl; } int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size) { // already full? if (full_state != FULL_NOTFULL) return -ENOSPC; // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL. off64_t room; if (pos >= header.start) room = (header.max_size - pos) + (header.start - get_top()) - 1; else room = header.start - pos - 1; dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start << " top " << get_top() << dendl; if (do_sync_cond) { if (room >= (header.max_size >> 1) && room - size < (header.max_size >> 1)) { dout(10) << " passing half full mark, triggering commit" << dendl; do_sync_cond->SloppySignal(); // initiate a real commit so we can trim } } if (room >= size) { dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl; if (pos + size > header.max_size) must_write_header = true; return 0; } // full dout(1) << "check_for_full at " << pos << " : JOURNAL FULL " << pos << " >= " << room << " (max_size " << header.max_size << " start " << header.start << ")" << dendl; off64_t max = header.max_size - get_top(); if (size > max) dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl; return -ENOSPC; } int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes) { // gather queued writes off64_t queue_pos = write_pos; int eleft = cct->_conf->journal_max_write_entries; unsigned bmax = cct->_conf->journal_max_write_bytes; if (full_state != FULL_NOTFULL) return -ENOSPC; while (!writeq_empty()) { list items; batch_pop_write(items); list::iterator it = items.begin(); while (it != items.end()) { uint64_t bytes = it->bl.length(); int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes); if (r == 0) { // prepare ok, delete it items.erase(it++); #ifdef HAVE_LIBAIO { Mutex::Locker locker(aio_lock); ceph_assert(aio_write_queue_ops > 0); aio_write_queue_ops--; ceph_assert(aio_write_queue_bytes >= bytes); aio_write_queue_bytes -= bytes; } #else (void)bytes; #endif } if (r == -ENOSPC) { // the journal maybe full, insert the left item to writeq batch_unpop_write(items); if (orig_ops) goto out; // commit what we have if (logger) logger->inc(l_filestore_journal_full); if (wait_on_full) { dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; } else { dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; // throw out what we have so far full_state = FULL_FULL; while (!writeq_empty()) { complete_write(1, peek_write().orig_len); pop_write(); } print_header(header); } return -ENOSPC; // hrm, full on first op } if (eleft) { if (--eleft == 0) { dout(20) << "prepare_multi_write hit max events per write " << cct->_conf->journal_max_write_entries << dendl; batch_unpop_write(items); goto out; } } if (bmax) { if (bl.length() >= bmax) { dout(20) << "prepare_multi_write hit max write size " << cct->_conf->journal_max_write_bytes << dendl; batch_unpop_write(items); goto out; } } } } out: dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl; ceph_assert((write_pos + bl.length() == queue_pos) || (write_pos + bl.length() - header.max_size + get_top() == queue_pos)); return 0; } /* void FileJournal::queue_write_fin(uint64_t seq, Context *fin) { writing_seq.push_back(seq); if (!waiting_for_notfull.empty()) { // make sure previously unjournaled stuff waiting for UNFULL triggers // _before_ newly journaled stuff does dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin << " until after UNFULL" << dendl; C_Gather *g = new C_Gather(writeq.front().fin); writing_fin.push_back(g->new_sub()); waiting_for_notfull.push_back(g->new_sub()); } else { writing_fin.push_back(writeq.front().fin); dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl; } } */ void FileJournal::queue_completions_thru(uint64_t seq) { ceph_assert(finisher_lock.is_locked()); utime_t now = ceph_clock_now(); list items; batch_pop_completions(items); list::iterator it = items.begin(); while (it != items.end()) { completion_item& next = *it; if (next.seq > seq) break; utime_t lat = now; lat -= next.start; dout(10) << "queue_completions_thru seq " << seq << " queueing seq " << next.seq << " " << next.finish << " lat " << lat << dendl; if (logger) { logger->tinc(l_filestore_journal_latency, lat); } if (next.finish) finisher->queue(next.finish); if (next.tracked_op) { next.tracked_op->mark_event("journaled_completion_queued"); next.tracked_op->journal_trace.event("queued completion"); next.tracked_op->journal_trace.keyval("completed through", seq); } items.erase(it++); } batch_unpop_completions(items); finisher_cond.Signal(); } int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) { uint64_t seq = next_write.seq; bufferlist &ebl = next_write.bl; off64_t size = ebl.length(); int r = check_for_full(seq, queue_pos, size); if (r < 0) return r; // ENOSPC or EAGAIN uint32_t orig_len = next_write.orig_len; orig_bytes += orig_len; orig_ops++; // add to write buffer dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq << " len " << orig_len << " -> " << size << dendl; unsigned seq_offset = offsetof(entry_header_t, seq); unsigned magic1_offset = offsetof(entry_header_t, magic1); unsigned magic2_offset = offsetof(entry_header_t, magic2); bufferptr headerptr = ebl.buffers().front(); uint64_t _seq = seq; uint64_t _queue_pos = queue_pos; uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64()); headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq); headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2); bufferptr footerptr = ebl.buffers().back(); unsigned post_offset = footerptr.length() - sizeof(entry_header_t); footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq); footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2); bl.claim_append(ebl); if (next_write.tracked_op) { next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); next_write.tracked_op->journal_trace.event("prepare_single_write"); } journalq.push_back(pair(seq, queue_pos)); writing_seq = seq; queue_pos += size; if (queue_pos >= header.max_size) queue_pos = queue_pos + get_top() - header.max_size; return 0; } void FileJournal::check_align(off64_t pos, bufferlist& bl) { // make sure list segments are page aligned if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) { ceph_assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); ceph_assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); ceph_abort_msg("bl was not aligned"); } } int FileJournal::write_bl(off64_t& pos, bufferlist& bl) { int ret; off64_t spos = ::lseek64(fd, pos, SEEK_SET); if (spos < 0) { ret = -errno; derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl; return ret; } ret = bl.write_fd(fd); if (ret) { derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl; return ret; } pos += bl.length(); if (pos == header.max_size) pos = get_top(); return 0; } void FileJournal::do_write(bufferlist& bl) { // nothing to do? if (bl.length() == 0 && !must_write_header) return; buffer::ptr hbp; if (cct->_conf->journal_write_header_frequency && (((++journaled_since_start) % cct->_conf->journal_write_header_frequency) == 0)) { must_write_header = true; } if (must_write_header) { must_write_header = false; hbp = prepare_header(); } dout(15) << "do_write writing " << write_pos << "~" << bl.length() << (hbp.length() ? " + header":"") << dendl; utime_t from = ceph_clock_now(); // entry off64_t pos = write_pos; // Adjust write_pos write_pos += bl.length(); if (write_pos >= header.max_size) write_pos = write_pos - header.max_size + get_top(); write_lock.Unlock(); // split? off64_t split = 0; if (pos + bl.length() > header.max_size) { bufferlist first, second; split = header.max_size - pos; first.substr_of(bl, 0, split); second.substr_of(bl, split, bl.length() - split); ceph_assert(first.length() + second.length() == bl.length()); dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length() << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl; //Save pos to write first piece second off64_t first_pos = pos; off64_t orig_pos; pos = get_top(); // header too? if (hbp.length()) { // be sneaky: include the header in the second fragment bufferlist tmp; tmp.push_back(hbp); tmp.claim_append(second); second.swap(tmp); pos = 0; // we included the header } // Write the second portion first possible with the header, so // do_read_entry() won't even get a valid entry_header_t if there // is a crash between the two writes. orig_pos = pos; if (write_bl(pos, second)) { derr << "FileJournal::do_write: write_bl(pos=" << orig_pos << ") failed" << dendl; check_align(pos, second); ceph_abort(); } orig_pos = first_pos; if (write_bl(first_pos, first)) { derr << "FileJournal::do_write: write_bl(pos=" << orig_pos << ") failed" << dendl; check_align(first_pos, first); ceph_abort(); } ceph_assert(first_pos == get_top()); } else { // header too? if (hbp.length()) { if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) { int err = errno; derr << "FileJournal::do_write: pwrite(fd=" << fd << ", hbp.length=" << hbp.length() << ") failed :" << cpp_strerror(err) << dendl; ceph_abort(); } } if (write_bl(pos, bl)) { derr << "FileJournal::do_write: write_bl(pos=" << pos << ") failed" << dendl; check_align(pos, bl); ceph_abort(); } } if (!directio) { dout(20) << "do_write fsync" << dendl; /* * We'd really love to have a fsync_range or fdatasync_range and do a: * * if (split) { * ::fsync_range(fd, header.max_size - split, split)l * ::fsync_range(fd, get_top(), bl.length() - split); * else * ::fsync_range(fd, write_pos, bl.length()) * * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be * too hard given all the underlying infrastructure already exist. * * NOTE: using sync_file_range here would not be safe as it does not * flush disk caches or commits any sort of metadata. */ int ret = 0; #if defined(__APPLE__) || defined(__FreeBSD__) ret = ::fsync(fd); #else ret = ::fdatasync(fd); #endif if (ret < 0) { derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl; ceph_abort(); } #ifdef HAVE_POSIX_FADVISE if (cct->_conf->filestore_fadvise) posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); #endif } utime_t lat = ceph_clock_now() - from; dout(20) << "do_write latency " << lat << dendl; write_lock.Lock(); ceph_assert(write_pos == pos); ceph_assert(write_pos % header.alignment == 0); { Mutex::Locker locker(finisher_lock); journaled_seq = writing_seq; // kick finisher? // only if we haven't filled up recently! if (full_state != FULL_NOTFULL) { dout(10) << "do_write NOT queueing finisher seq " << journaled_seq << ", full_commit_seq|full_restart_seq" << dendl; } else { if (plug_journal_completions) { dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq << " due to completion plug" << dendl; } else { dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl; queue_completions_thru(journaled_seq); } } } } void FileJournal::flush() { dout(10) << "waiting for completions to empty" << dendl; { Mutex::Locker l(finisher_lock); while (!completions_empty()) finisher_cond.Wait(finisher_lock); } dout(10) << "flush waiting for finisher" << dendl; finisher->wait_for_empty(); dout(10) << "flush done" << dendl; } void FileJournal::write_thread_entry() { dout(10) << "write_thread_entry start" << dendl; while (1) { { Mutex::Locker locker(writeq_lock); if (writeq.empty() && !must_write_header) { if (write_stop) break; dout(20) << "write_thread_entry going to sleep" << dendl; writeq_cond.Wait(writeq_lock); dout(20) << "write_thread_entry woke up" << dendl; continue; } } #ifdef HAVE_LIBAIO if (aio) { Mutex::Locker locker(aio_lock); // should we back off to limit aios in flight? try to do this // adaptively so that we submit larger aios once we have lots of // them in flight. // // NOTE: our condition here is based on aio_num (protected by // aio_lock) and throttle_bytes (part of the write queue). when // we sleep, we *only* wait for aio_num to change, and do not // wake when more data is queued. this is not strictly correct, // but should be fine given that we will have plenty of aios in // flight if we hit this limit to ensure we keep the device // saturated. while (aio_num > 0) { int exp = std::min(aio_num * 2, 24); long unsigned min_new = 1ull << exp; uint64_t cur = aio_write_queue_bytes; dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes << " ... exp " << exp << " min_new " << min_new << " ... pending " << cur << dendl; if (cur >= min_new) break; dout(20) << "write_thread_entry deferring until more aios complete: " << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new << " bytes to start a new aio (currently " << cur << " pending)" << dendl; aio_cond.Wait(aio_lock); dout(20) << "write_thread_entry woke up" << dendl; } } #endif Mutex::Locker locker(write_lock); uint64_t orig_ops = 0; uint64_t orig_bytes = 0; bufferlist bl; int r = prepare_multi_write(bl, orig_ops, orig_bytes); // Don't care about journal full if stoppping, so drop queue and // possibly let header get written and loop above to notice stop if (r == -ENOSPC) { if (write_stop) { dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; while (!writeq_empty()) { complete_write(1, peek_write().orig_len); pop_write(); } print_header(header); r = 0; } else { dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl; commit_cond.Wait(write_lock); dout(20) << "write_thread_entry woke up" << dendl; continue; } } ceph_assert(r == 0); if (logger) { logger->inc(l_filestore_journal_wr); logger->inc(l_filestore_journal_wr_bytes, bl.length()); } #ifdef HAVE_LIBAIO if (aio) do_aio_write(bl); else do_write(bl); #else do_write(bl); #endif complete_write(orig_ops, orig_bytes); } dout(10) << "write_thread_entry finish" << dendl; } #ifdef HAVE_LIBAIO void FileJournal::do_aio_write(bufferlist& bl) { if (cct->_conf->journal_write_header_frequency && (((++journaled_since_start) % cct->_conf->journal_write_header_frequency) == 0)) { must_write_header = true; } // nothing to do? if (bl.length() == 0 && !must_write_header) return; buffer::ptr hbp; if (must_write_header) { must_write_header = false; hbp = prepare_header(); } // entry off64_t pos = write_pos; dout(15) << "do_aio_write writing " << pos << "~" << bl.length() << (hbp.length() ? " + header":"") << dendl; // split? off64_t split = 0; if (pos + bl.length() > header.max_size) { bufferlist first, second; split = header.max_size - pos; first.substr_of(bl, 0, split); second.substr_of(bl, split, bl.length() - split); ceph_assert(first.length() + second.length() == bl.length()); dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl; if (write_aio_bl(pos, first, 0)) { derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos << ") failed" << dendl; ceph_abort(); } ceph_assert(pos == header.max_size); if (hbp.length()) { // be sneaky: include the header in the second fragment bufferlist tmp; tmp.push_back(hbp); tmp.claim_append(second); second.swap(tmp); pos = 0; // we included the header } else pos = get_top(); // no header, start after that if (write_aio_bl(pos, second, writing_seq)) { derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos << ") failed" << dendl; ceph_abort(); } } else { // header too? if (hbp.length()) { bufferlist hbl; hbl.push_back(hbp); loff_t pos = 0; if (write_aio_bl(pos, hbl, 0)) { derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl; ceph_abort(); } } if (write_aio_bl(pos, bl, writing_seq)) { derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos << ") failed" << dendl; ceph_abort(); } } write_pos = pos; if (write_pos == header.max_size) write_pos = get_top(); ceph_assert(write_pos % header.alignment == 0); } /** * write a buffer using aio * * @param seq seq to trigger when this aio completes. if 0, do not update any state * on completion. */ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq) { dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl; while (bl.length() > 0) { int max = std::min(bl.get_num_buffers(), IOV_MAX-1); iovec *iov = new iovec[max]; int n = 0; unsigned len = 0; for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) { ceph_assert(p != std::cend(bl.buffers())); iov[n].iov_base = const_cast(static_cast(p->c_str())); iov[n].iov_len = p->length(); len += p->length(); } bufferlist tbl; bl.splice(0, len, &tbl); // move bytes from bl -> tbl // lock only aio_queue, current aio, aio_num, aio_bytes, which may be // modified in check_aio_completion aio_lock.Lock(); aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq)); aio_info& aio = aio_queue.back(); aio.iov = iov; io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len << " in " << n << dendl; aio_num++; aio_bytes += aio.len; // need to save current aio len to update write_pos later because current // aio could be ereased from aio_queue once it is done uint64_t cur_len = aio.len; // unlock aio_lock because following io_submit might take time to return aio_lock.Unlock(); iocb *piocb = &aio.iocb; // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds int attempts = 16; int delay = 125; do { int r = io_submit(aio_ctx, 1, &piocb); dout(20) << "write_aio_bl io_submit return value: " << r << dendl; if (r < 0) { derr << "io_submit to " << aio.off << "~" << cur_len << " got " << cpp_strerror(r) << dendl; if (r == -EAGAIN && attempts-- > 0) { usleep(delay); delay *= 2; continue; } check_align(pos, tbl); ceph_abort_msg("io_submit got unexpected error"); } else { break; } } while (true); pos += cur_len; } aio_lock.Lock(); write_finish_cond.Signal(); aio_lock.Unlock(); return 0; } #endif void FileJournal::write_finish_thread_entry() { #ifdef HAVE_LIBAIO dout(10) << __func__ << " enter" << dendl; while (true) { { Mutex::Locker locker(aio_lock); if (aio_queue.empty()) { if (aio_stop) break; dout(20) << __func__ << " sleeping" << dendl; write_finish_cond.Wait(aio_lock); continue; } } dout(20) << __func__ << " waiting for aio(s)" << dendl; io_event event[16]; int r = io_getevents(aio_ctx, 1, 16, event, NULL); if (r < 0) { if (r == -EINTR) { dout(0) << "io_getevents got " << cpp_strerror(r) << dendl; continue; } derr << "io_getevents got " << cpp_strerror(r) << dendl; if (r == -EIO) { note_io_error_event(devname.c_str(), fn.c_str(), -EIO, 0, 0, 0); } ceph_abort_msg("got unexpected error from io_getevents"); } { Mutex::Locker locker(aio_lock); for (int i=0; ilen) { derr << "aio to " << ai->off << "~" << ai->len << " returned: " << (int)event[i].res << dendl; ceph_abort_msg("unexpected aio error"); } dout(10) << __func__ << " aio " << ai->off << "~" << ai->len << " done" << dendl; ai->done = true; } check_aio_completion(); } } dout(10) << __func__ << " exit" << dendl; #endif } #ifdef HAVE_LIBAIO /** * check aio_wait for completed aio, and update state appropriately. */ void FileJournal::check_aio_completion() { ceph_assert(aio_lock.is_locked()); dout(20) << "check_aio_completion" << dendl; bool completed_something = false, signal = false; uint64_t new_journaled_seq = 0; list::iterator p = aio_queue.begin(); while (p != aio_queue.end() && p->done) { dout(20) << "check_aio_completion completed seq " << p->seq << " " << p->off << "~" << p->len << dendl; if (p->seq) { new_journaled_seq = p->seq; completed_something = true; } aio_num--; aio_bytes -= p->len; aio_queue.erase(p++); signal = true; } if (completed_something) { // kick finisher? // only if we haven't filled up recently! Mutex::Locker locker(finisher_lock); journaled_seq = new_journaled_seq; if (full_state != FULL_NOTFULL) { dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq << ", full_commit_seq|full_restart_seq" << dendl; } else { if (plug_journal_completions) { dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq << " due to completion plug" << dendl; } else { dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl; queue_completions_thru(journaled_seq); } } } if (signal) { // maybe write queue was waiting for aio count to drop? aio_cond.Signal(); } } #endif int FileJournal::prepare_entry(vector& tls, bufferlist* tbl) { dout(10) << "prepare_entry " << tls << dendl; int data_len = cct->_conf->journal_align_min_size - 1; int data_align = -1; // -1 indicates that we don't care about the alignment bufferlist bl; for (vector::iterator p = tls.begin(); p != tls.end(); ++p) { if ((int)(*p).get_data_length() > data_len) { data_len = (*p).get_data_length(); data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK; } encode(*p, bl); } if (tbl->length()) { bl.claim_append(*tbl); } // add it this entry entry_header_t h; unsigned head_size = sizeof(entry_header_t); off64_t base_size = 2*head_size + bl.length(); memset(&h, 0, sizeof(h)); if (data_align >= 0) h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK; off64_t size = round_up_to(base_size + h.pre_pad, header.alignment); unsigned post_pad = size - base_size - h.pre_pad; h.len = bl.length(); h.post_pad = post_pad; h.crc32c = bl.crc32c(0); dout(10) << " len " << bl.length() << " -> " << size << " (head " << head_size << " pre_pad " << h.pre_pad << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")" << " (bl alignment " << data_align << ")" << dendl; bufferlist ebl; // header ebl.append((const char*)&h, sizeof(h)); if (h.pre_pad) { ebl.push_back(buffer::create_static(h.pre_pad, zero_buf)); } // payload ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy if (h.post_pad) { ebl.push_back(buffer::create_static(h.post_pad, zero_buf)); } // footer ebl.append((const char*)&h, sizeof(h)); if (directio) ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT); tbl->claim(ebl); return h.len; } void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, Context *oncommit, TrackedOpRef osd_op) { // dump on queue dout(5) << "submit_entry seq " << seq << " len " << e.length() << " (" << oncommit << ")" << dendl; ceph_assert(e.length() > 0); ceph_assert(e.length() < header.max_size); if (logger) { logger->inc(l_filestore_journal_queue_bytes, orig_len); logger->inc(l_filestore_journal_queue_ops, 1); } throttle.register_throttle_seq(seq, e.length()); if (logger) { logger->inc(l_filestore_journal_ops, 1); logger->inc(l_filestore_journal_bytes, e.length()); } if (osd_op) { osd_op->mark_event("commit_queued_for_journal_write"); if (osd_op->store_trace) { osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace); osd_op->journal_trace.event("submit_entry"); osd_op->journal_trace.keyval("seq", seq); } } { Mutex::Locker l1(writeq_lock); #ifdef HAVE_LIBAIO Mutex::Locker l2(aio_lock); #endif Mutex::Locker l3(completions_lock); #ifdef HAVE_LIBAIO aio_write_queue_ops++; aio_write_queue_bytes += e.length(); aio_cond.Signal(); #endif completions.push_back( completion_item( seq, oncommit, ceph_clock_now(), osd_op)); if (writeq.empty()) writeq_cond.Signal(); writeq.push_back(write_item(seq, e, orig_len, osd_op)); if (osd_op) osd_op->journal_trace.keyval("queue depth", writeq.size()); } } bool FileJournal::writeq_empty() { Mutex::Locker locker(writeq_lock); return writeq.empty(); } FileJournal::write_item &FileJournal::peek_write() { ceph_assert(write_lock.is_locked()); Mutex::Locker locker(writeq_lock); return writeq.front(); } void FileJournal::pop_write() { ceph_assert(write_lock.is_locked()); Mutex::Locker locker(writeq_lock); if (logger) { logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len); logger->dec(l_filestore_journal_queue_ops, 1); } writeq.pop_front(); } void FileJournal::batch_pop_write(list &items) { ceph_assert(write_lock.is_locked()); { Mutex::Locker locker(writeq_lock); writeq.swap(items); } for (auto &&i : items) { if (logger) { logger->dec(l_filestore_journal_queue_bytes, i.orig_len); logger->dec(l_filestore_journal_queue_ops, 1); } } } void FileJournal::batch_unpop_write(list &items) { ceph_assert(write_lock.is_locked()); for (auto &&i : items) { if (logger) { logger->inc(l_filestore_journal_queue_bytes, i.orig_len); logger->inc(l_filestore_journal_queue_ops, 1); } } Mutex::Locker locker(writeq_lock); writeq.splice(writeq.begin(), items); } void FileJournal::commit_start(uint64_t seq) { dout(10) << "commit_start" << dendl; // was full? switch (full_state) { case FULL_NOTFULL: break; // all good case FULL_FULL: if (seq >= journaled_seq) { dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq " << seq << " > journaled_seq " << journaled_seq << ", moving to FULL_WAIT." << dendl; full_state = FULL_WAIT; } else { dout(1) << "FULL_FULL commit_start on seq " << seq << " < journaled_seq " << journaled_seq << ", remaining in FULL_FULL" << dendl; } break; case FULL_WAIT: dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl; full_state = FULL_NOTFULL; plug_journal_completions = true; break; } } /* *send discard command to joural block deivce */ void FileJournal::do_discard(int64_t offset, int64_t end) { dout(10) << __func__ << " trim(" << offset << ", " << end << dendl; offset = round_up_to(offset, block_size); if (offset >= end) return; end = round_up_to(end - block_size, block_size); ceph_assert(end >= offset); if (offset < end) { BlkDev blkdev(fd); if (blkdev.discard(offset, end - offset) < 0) { dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl; } } } void FileJournal::committed_thru(uint64_t seq) { Mutex::Locker locker(write_lock); auto released = throttle.flush(seq); if (logger) { logger->dec(l_filestore_journal_ops, released.first); logger->dec(l_filestore_journal_bytes, released.second); } if (seq < last_committed_seq) { dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl; ceph_assert(seq >= last_committed_seq); return; } if (seq == last_committed_seq) { dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl; return; } dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl; last_committed_seq = seq; // completions! { Mutex::Locker locker(finisher_lock); queue_completions_thru(seq); if (plug_journal_completions && seq >= header.start_seq) { dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl; plug_journal_completions = false; queue_completions_thru(journaled_seq); } } // adjust start pointer while (!journalq.empty() && journalq.front().first <= seq) { journalq.pop_front(); } int64_t old_start = header.start; if (!journalq.empty()) { header.start = journalq.front().second; header.start_seq = journalq.front().first; } else { header.start = write_pos; header.start_seq = seq + 1; } if (discard) { dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl; if (old_start < header.start) do_discard(old_start, header.start - 1); else { do_discard(old_start, header.max_size - 1); do_discard(get_top(), header.start - 1); } } must_write_header = true; print_header(header); // committed but unjournaled items while (!writeq_empty() && peek_write().seq <= seq) { dout(15) << " dropping committed but unwritten seq " << peek_write().seq << " len " << peek_write().bl.length() << dendl; complete_write(1, peek_write().orig_len); pop_write(); } commit_cond.Signal(); dout(10) << "committed_thru done" << dendl; } void FileJournal::complete_write(uint64_t ops, uint64_t bytes) { dout(5) << __func__ << " finished " << ops << " ops and " << bytes << " bytes" << dendl; } int FileJournal::make_writeable() { dout(10) << __func__ << dendl; int r = set_throttle_params(); if (r < 0) return r; r = _open(true); if (r < 0) return r; if (read_pos > 0) write_pos = read_pos; else write_pos = get_top(); read_pos = 0; must_write_header = true; start_writer(); return 0; } int FileJournal::set_throttle_params() { stringstream ss; bool valid = throttle.set_params( cct->_conf->journal_throttle_low_threshhold, cct->_conf->journal_throttle_high_threshhold, cct->_conf->filestore_expected_throughput_bytes, cct->_conf->journal_throttle_high_multiple, cct->_conf->journal_throttle_max_multiple, header.max_size - get_top(), &ss); if (!valid) { derr << "tried to set invalid params: " << ss.str() << dendl; } return valid ? 0 : -EINVAL; } const char** FileJournal::get_tracked_conf_keys() const { static const char *KEYS[] = { "journal_throttle_low_threshhold", "journal_throttle_high_threshhold", "journal_throttle_high_multiple", "journal_throttle_max_multiple", "filestore_expected_throughput_bytes", NULL}; return KEYS; } void FileJournal::wrap_read_bl( off64_t pos, int64_t olen, bufferlist* bl, off64_t *out_pos ) const { while (olen > 0) { while (pos >= header.max_size) pos = pos + get_top() - header.max_size; int64_t len; if (pos + olen > header.max_size) len = header.max_size - pos; // partial else len = olen; // rest int64_t actual = ::lseek64(fd, pos, SEEK_SET); ceph_assert(actual == pos); bufferptr bp = buffer::create(len); int r = safe_read_exact(fd, bp.c_str(), len); if (r) { derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned " << cpp_strerror(r) << dendl; ceph_abort(); } bl->push_back(std::move(bp)); pos += len; olen -= len; } if (pos >= header.max_size) pos = pos + get_top() - header.max_size; if (out_pos) *out_pos = pos; } bool FileJournal::read_entry( bufferlist &bl, uint64_t &next_seq, bool *corrupt) { if (corrupt) *corrupt = false; uint64_t seq = next_seq; if (!read_pos) { dout(2) << "read_entry -- not readable" << dendl; return false; } off64_t pos = read_pos; off64_t next_pos = pos; stringstream ss; read_entry_result result = do_read_entry( pos, &next_pos, &bl, &seq, &ss); if (result == SUCCESS) { journalq.push_back( pair(seq, pos)); uint64_t amount_to_take = next_pos > pos ? next_pos - pos : (header.max_size - pos) + (next_pos - get_top()); throttle.take(amount_to_take); throttle.register_throttle_seq(next_seq, amount_to_take); if (logger) { logger->inc(l_filestore_journal_ops, 1); logger->inc(l_filestore_journal_bytes, amount_to_take); } if (next_seq > seq) { return false; } else { read_pos = next_pos; next_seq = seq; if (seq > journaled_seq) journaled_seq = seq; return true; } } else { derr << "do_read_entry(" << pos << "): " << ss.str() << dendl; } if (seq && seq < header.committed_up_to) { derr << "Unable to read past sequence " << seq << " but header indicates the journal has committed up through " << header.committed_up_to << ", journal is corrupt" << dendl; if (cct->_conf->journal_ignore_corruption) { if (corrupt) *corrupt = true; return false; } else { ceph_abort(); } } dout(2) << "No further valid entries found, journal is most likely valid" << dendl; return false; } FileJournal::read_entry_result FileJournal::do_read_entry( off64_t init_pos, off64_t *next_pos, bufferlist *bl, uint64_t *seq, ostream *ss, entry_header_t *_h) const { off64_t cur_pos = init_pos; bufferlist _bl; if (!bl) bl = &_bl; // header entry_header_t *h; bufferlist hbl; off64_t _next_pos; wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos); h = reinterpret_cast(hbl.c_str()); if (!h->check_magic(cur_pos, header.get_fsid64())) { dout(25) << "read_entry " << init_pos << " : bad header magic, end of journal" << dendl; if (ss) *ss << "bad header magic"; if (next_pos) *next_pos = init_pos + (4<<10); // check 4k ahead return MAYBE_CORRUPT; } cur_pos = _next_pos; // pad + body + pad if (h->pre_pad) cur_pos += h->pre_pad; bl->clear(); wrap_read_bl(cur_pos, h->len, bl, &cur_pos); if (h->post_pad) cur_pos += h->post_pad; // footer entry_header_t *f; bufferlist fbl; wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos); f = reinterpret_cast(fbl.c_str()); if (memcmp(f, h, sizeof(*f))) { if (ss) *ss << "bad footer magic, partial entry"; if (next_pos) *next_pos = cur_pos; return MAYBE_CORRUPT; } if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal) h->crc32c != 0) { // newer entry in old journal uint32_t actual_crc = bl->crc32c(0); if (actual_crc != h->crc32c) { if (ss) *ss << "header crc (" << h->crc32c << ") doesn't match body crc (" << actual_crc << ")"; if (next_pos) *next_pos = cur_pos; return MAYBE_CORRUPT; } } // yay! dout(2) << "read_entry " << init_pos << " : seq " << h->seq << " " << h->len << " bytes" << dendl; // ok! if (seq) *seq = h->seq; if (next_pos) *next_pos = cur_pos; if (_h) *_h = *h; ceph_assert(cur_pos % header.alignment == 0); return SUCCESS; } void FileJournal::reserve_throttle_and_backoff(uint64_t count) { throttle.get(count); } void FileJournal::get_header( uint64_t wanted_seq, off64_t *_pos, entry_header_t *h) { off64_t pos = header.start; off64_t next_pos = pos; bufferlist bl; uint64_t seq = 0; dout(2) << __func__ << dendl; while (1) { bl.clear(); pos = next_pos; read_entry_result result = do_read_entry( pos, &next_pos, &bl, &seq, 0, h); if (result == FAILURE || result == MAYBE_CORRUPT) ceph_abort(); if (seq == wanted_seq) { if (_pos) *_pos = pos; return; } } ceph_abort(); // not reachable } void FileJournal::corrupt( int wfd, off64_t corrupt_at) { dout(2) << __func__ << dendl; if (corrupt_at >= header.max_size) corrupt_at = corrupt_at + get_top() - header.max_size; int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET); ceph_assert(actual == corrupt_at); char buf[10]; int r = safe_read_exact(fd, buf, 1); ceph_assert(r == 0); actual = ::lseek64(wfd, corrupt_at, SEEK_SET); ceph_assert(actual == corrupt_at); buf[0]++; r = safe_write(wfd, buf, 1); ceph_assert(r == 0); } void FileJournal::corrupt_payload( int wfd, uint64_t seq) { dout(2) << __func__ << dendl; off64_t pos = 0; entry_header_t h; get_header(seq, &pos, &h); off64_t corrupt_at = pos + sizeof(entry_header_t) + h.pre_pad; corrupt(wfd, corrupt_at); } void FileJournal::corrupt_footer_magic( int wfd, uint64_t seq) { dout(2) << __func__ << dendl; off64_t pos = 0; entry_header_t h; get_header(seq, &pos, &h); off64_t corrupt_at = pos + sizeof(entry_header_t) + h.pre_pad + h.len + h.post_pad + (reinterpret_cast(&h.magic2) - reinterpret_cast(&h)); corrupt(wfd, corrupt_at); } void FileJournal::corrupt_header_magic( int wfd, uint64_t seq) { dout(2) << __func__ << dendl; off64_t pos = 0; entry_header_t h; get_header(seq, &pos, &h); off64_t corrupt_at = pos + (reinterpret_cast(&h.magic2) - reinterpret_cast(&h)); corrupt(wfd, corrupt_at); } off64_t FileJournal::get_journal_size_estimate() { off64_t size, start = header.start; if (write_pos < start) { size = (max_size - start) + write_pos; } else { size = write_pos - start; } dout(20) << __func__ << " journal size=" << size << dendl; return size; } void FileJournal::get_devices(set *ls) { string dev_node; BlkDev blkdev(fd); if (int rc = blkdev.wholedisk(&dev_node); rc) { return; } get_raw_devices(dev_node, ls); } void FileJournal::collect_metadata(map *pm) { BlkDev blkdev(fd); char partition_path[PATH_MAX]; char dev_node[PATH_MAX]; if (blkdev.partition(partition_path, PATH_MAX)) { (*pm)["backend_filestore_journal_partition_path"] = "unknown"; } else { (*pm)["backend_filestore_journal_partition_path"] = string(partition_path); } if (blkdev.wholedisk(dev_node, PATH_MAX)) { (*pm)["backend_filestore_journal_dev_node"] = "unknown"; } else { (*pm)["backend_filestore_journal_dev_node"] = string(dev_node); devname = dev_node; } }