diff options
Diffstat (limited to '')
44 files changed, 8424 insertions, 0 deletions
diff --git a/src/blk/BlockDevice.cc b/src/blk/BlockDevice.cc new file mode 100644 index 000000000..6804ee50c --- /dev/null +++ b/src/blk/BlockDevice.cc @@ -0,0 +1,215 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 XSky <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <libgen.h> +#include <unistd.h> + +#include "BlockDevice.h" + +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) +#include "kernel/KernelDevice.h" +#endif + +#if defined(HAVE_SPDK) +#include "spdk/NVMEDevice.h" +#endif + +#if defined(HAVE_BLUESTORE_PMEM) +#include "pmem/PMEMDevice.h" +#endif + +#if defined(HAVE_LIBZBD) +#include "zoned/HMSMRDevice.h" +#endif + +#include "common/debug.h" +#include "common/EventTrace.h" +#include "common/errno.h" +#include "include/compat.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev " + +using std::string; + +void IOContext::aio_wait() +{ + std::unique_lock l(lock); + // see _aio_thread for waker logic + while (num_running.load() > 0) { + dout(10) << __func__ << " " << this + << " waiting for " << num_running.load() << " aios to complete" + << dendl; + cond.wait(l); + } + dout(20) << __func__ << " " << this << " done" << dendl; +} + +uint64_t IOContext::get_num_ios() const +{ + // this is about the simplest model for transaction cost you can + // imagine. there is some fixed overhead cost by saying there is a + // minimum of one "io". and then we have some cost per "io" that is + // a configurable (with different hdd and ssd defaults), and add + // that to the bytes value. + uint64_t ios = 0; +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) + ios += pending_aios.size(); +#endif +#ifdef HAVE_SPDK + ios += total_nseg; +#endif + return ios; +} + +void IOContext::release_running_aios() +{ + ceph_assert(!num_running); +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) + // release aio contexts (including pinned buffers). + running_aios.clear(); +#endif +} + +BlockDevice::block_device_t +BlockDevice::detect_device_type(const std::string& path) +{ +#if defined(HAVE_SPDK) + if (NVMEDevice::support(path)) { + return block_device_t::spdk; + } +#endif +#if defined(HAVE_BLUESTORE_PMEM) + if (PMEMDevice::support(path)) { + return block_device_t::pmem; + } +#endif +#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD) + if (HMSMRDevice::support(path)) { + return block_device_t::hm_smr; + } +#endif + + return block_device_t::aio; +} + +BlockDevice::block_device_t +BlockDevice::device_type_from_name(const std::string& blk_dev_name) +{ +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) + if (blk_dev_name == "aio") { + return block_device_t::aio; + } +#endif +#if defined(HAVE_SPDK) + if (blk_dev_name == "spdk") { + return block_device_t::spdk; + } +#endif +#if defined(HAVE_BLUESTORE_PMEM) + if (blk_dev_name == "pmem") { + return block_device_t::pmem; + } +#endif +#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD) + if (blk_dev_name == "hm_smr") { + return block_device_t::hm_smr; + } +#endif + return block_device_t::unknown; +} + +BlockDevice* BlockDevice::create_with_type(block_device_t device_type, + CephContext* cct, const std::string& path, aio_callback_t cb, + void *cbpriv, aio_callback_t d_cb, void *d_cbpriv) +{ + + switch (device_type) { +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) + case block_device_t::aio: + return new KernelDevice(cct, cb, cbpriv, d_cb, d_cbpriv); +#endif +#if defined(HAVE_SPDK) + case block_device_t::spdk: + return new NVMEDevice(cct, cb, cbpriv); +#endif +#if defined(HAVE_BLUESTORE_PMEM) + case block_device_t::pmem: + return new PMEMDevice(cct, cb, cbpriv); +#endif +#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD) + case block_device_t::hm_smr: + return new HMSMRDevice(cct, cb, cbpriv, d_cb, d_cbpriv); +#endif + default: + ceph_abort_msg("unsupported device"); + return nullptr; + } +} + +BlockDevice *BlockDevice::create( + CephContext* cct, const string& path, aio_callback_t cb, + void *cbpriv, aio_callback_t d_cb, void *d_cbpriv) +{ + const string blk_dev_name = cct->_conf.get_val<string>("bdev_type"); + block_device_t device_type = block_device_t::unknown; + if (blk_dev_name.empty()) { + device_type = detect_device_type(path); + } else { + device_type = device_type_from_name(blk_dev_name); + } + return create_with_type(device_type, cct, path, cb, cbpriv, d_cb, d_cbpriv); +} + +void BlockDevice::queue_reap_ioc(IOContext *ioc) +{ + std::lock_guard l(ioc_reap_lock); + if (ioc_reap_count.load() == 0) + ++ioc_reap_count; + ioc_reap_queue.push_back(ioc); +} + +void BlockDevice::reap_ioc() +{ + if (ioc_reap_count.load()) { + std::lock_guard l(ioc_reap_lock); + for (auto p : ioc_reap_queue) { + dout(20) << __func__ << " reap ioc " << p << dendl; + delete p; + } + ioc_reap_queue.clear(); + --ioc_reap_count; + } +} + +bool BlockDevice::is_valid_io(uint64_t off, uint64_t len) const { + bool ret = (off % block_size == 0 && + len % block_size == 0 && + len > 0 && + off < size && + off + len <= size); + + if (!ret) { + derr << __func__ << " " << std::hex + << off << "~" << len + << " block_size " << block_size + << " size " << size + << std::dec << dendl; + } + return ret; +} diff --git a/src/blk/BlockDevice.h b/src/blk/BlockDevice.h new file mode 100644 index 000000000..191eb8ec9 --- /dev/null +++ b/src/blk/BlockDevice.h @@ -0,0 +1,278 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 XSky <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_BLK_BLOCKDEVICE_H +#define CEPH_BLK_BLOCKDEVICE_H + +#include <atomic> +#include <condition_variable> +#include <list> +#include <map> +#include <mutex> +#include <set> +#include <string> +#include <vector> + +#include "acconfig.h" +#include "common/ceph_mutex.h" +#include "include/common_fwd.h" + +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) +#include "aio/aio.h" +#endif +#include "include/ceph_assert.h" +#include "include/buffer.h" +#include "include/interval_set.h" +#define SPDK_PREFIX "spdk:" + +#if defined(__linux__) +#if !defined(F_SET_FILE_RW_HINT) +#define F_LINUX_SPECIFIC_BASE 1024 +#define F_SET_FILE_RW_HINT (F_LINUX_SPECIFIC_BASE + 14) +#endif +// These values match Linux definition +// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fcntl.h#n56 +#define WRITE_LIFE_NOT_SET 0 // No hint information set +#define WRITE_LIFE_NONE 1 // No hints about write life time +#define WRITE_LIFE_SHORT 2 // Data written has a short life time +#define WRITE_LIFE_MEDIUM 3 // Data written has a medium life time +#define WRITE_LIFE_LONG 4 // Data written has a long life time +#define WRITE_LIFE_EXTREME 5 // Data written has an extremely long life time +#define WRITE_LIFE_MAX 6 +#else +// On systems don't have WRITE_LIFE_* only use one FD +// And all files are created equal +#define WRITE_LIFE_NOT_SET 0 // No hint information set +#define WRITE_LIFE_NONE 0 // No hints about write life time +#define WRITE_LIFE_SHORT 0 // Data written has a short life time +#define WRITE_LIFE_MEDIUM 0 // Data written has a medium life time +#define WRITE_LIFE_LONG 0 // Data written has a long life time +#define WRITE_LIFE_EXTREME 0 // Data written has an extremely long life time +#define WRITE_LIFE_MAX 1 +#endif + + +/// track in-flight io +struct IOContext { +private: + ceph::mutex lock = ceph::make_mutex("IOContext::lock"); + ceph::condition_variable cond; + int r = 0; + +public: + CephContext* cct; + void *priv; +#ifdef HAVE_SPDK + void *nvme_task_first = nullptr; + void *nvme_task_last = nullptr; + std::atomic_int total_nseg = {0}; +#endif + +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) + std::list<aio_t> pending_aios; ///< not yet submitted + std::list<aio_t> running_aios; ///< submitting or submitted +#endif + std::atomic_int num_pending = {0}; + std::atomic_int num_running = {0}; + bool allow_eio; + + explicit IOContext(CephContext* cct, void *p, bool allow_eio = false) + : cct(cct), priv(p), allow_eio(allow_eio) + {} + + // no copying + IOContext(const IOContext& other) = delete; + IOContext &operator=(const IOContext& other) = delete; + + bool has_pending_aios() { + return num_pending.load(); + } + void release_running_aios(); + void aio_wait(); + uint64_t get_num_ios() const; + + void try_aio_wake() { + assert(num_running >= 1); + + std::lock_guard l(lock); + if (num_running.fetch_sub(1) == 1) { + + // we might have some pending IOs submitted after the check + // as there is no lock protection for aio_submit. + // Hence we might have false conditional trigger. + // aio_wait has to handle that hence do not care here. + cond.notify_all(); + } + } + + void set_return_value(int _r) { + r = _r; + } + + int get_return_value() const { + return r; + } +}; + + +class BlockDevice { +public: + CephContext* cct; + typedef void (*aio_callback_t)(void *handle, void *aio); +private: + ceph::mutex ioc_reap_lock = ceph::make_mutex("BlockDevice::ioc_reap_lock"); + std::vector<IOContext*> ioc_reap_queue; + std::atomic_int ioc_reap_count = {0}; + enum class block_device_t { + unknown, +#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO) + aio, +#if defined(HAVE_LIBZBD) + hm_smr, +#endif +#endif +#if defined(HAVE_SPDK) + spdk, +#endif +#if defined(HAVE_BLUESTORE_PMEM) + pmem, +#endif + }; + static block_device_t detect_device_type(const std::string& path); + static block_device_t device_type_from_name(const std::string& blk_dev_name); + static BlockDevice *create_with_type(block_device_t device_type, + CephContext* cct, const std::string& path, aio_callback_t cb, + void *cbpriv, aio_callback_t d_cb, void *d_cbpriv); + +protected: + uint64_t size = 0; + uint64_t block_size = 0; + bool support_discard = false; + bool rotational = true; + bool lock_exclusive = true; + + // HM-SMR specific properties. In HM-SMR drives the LBA space is divided into + // fixed-size zones. Typically, the first few zones are randomly writable; + // they form a conventional region of the drive. The remaining zones must be + // written sequentially and they must be reset before rewritten. For example, + // a 14 TB HGST HSH721414AL drive has 52156 zones each of size is 256 MiB. + // The zones 0-523 are randomly writable and they form the conventional region + // of the drive. The zones 524-52155 are sequential zones. + uint64_t conventional_region_size = 0; + uint64_t zone_size = 0; + +public: + aio_callback_t aio_callback; + void *aio_callback_priv; + BlockDevice(CephContext* cct, aio_callback_t cb, void *cbpriv) + : cct(cct), + aio_callback(cb), + aio_callback_priv(cbpriv) + {} + virtual ~BlockDevice() = default; + + static BlockDevice *create( + CephContext* cct, const std::string& path, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv); + virtual bool supported_bdev_label() { return true; } + virtual bool is_rotational() { return rotational; } + + // HM-SMR-specific calls + virtual bool is_smr() const { return false; } + virtual uint64_t get_zone_size() const { + ceph_assert(is_smr()); + return zone_size; + } + virtual uint64_t get_conventional_region_size() const { + ceph_assert(is_smr()); + return conventional_region_size; + } + + virtual void aio_submit(IOContext *ioc) = 0; + + void set_no_exclusive_lock() { + lock_exclusive = false; + } + + uint64_t get_size() const { return size; } + uint64_t get_block_size() const { return block_size; } + + /// hook to provide utilization of thinly-provisioned device + virtual bool get_thin_utilization(uint64_t *total, uint64_t *avail) const { + return false; + } + + virtual int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const = 0; + + virtual int get_devname(std::string *out) const { + return -ENOENT; + } + virtual int get_devices(std::set<std::string> *ls) const { + std::string s; + if (get_devname(&s) == 0) { + ls->insert(s); + } + return 0; + } + virtual int get_numa_node(int *node) const { + return -EOPNOTSUPP; + } + + virtual int read( + uint64_t off, + uint64_t len, + ceph::buffer::list *pbl, + IOContext *ioc, + bool buffered) = 0; + virtual int read_random( + uint64_t off, + uint64_t len, + char *buf, + bool buffered) = 0; + virtual int write( + uint64_t off, + ceph::buffer::list& bl, + bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) = 0; + + virtual int aio_read( + uint64_t off, + uint64_t len, + ceph::buffer::list *pbl, + IOContext *ioc) = 0; + virtual int aio_write( + uint64_t off, + ceph::buffer::list& bl, + IOContext *ioc, + bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) = 0; + virtual int flush() = 0; + virtual int discard(uint64_t offset, uint64_t len) { return 0; } + virtual int queue_discard(interval_set<uint64_t> &to_release) { return -1; } + virtual void discard_drain() { return; } + + void queue_reap_ioc(IOContext *ioc); + void reap_ioc(); + + // for managing buffered readers/writers + virtual int invalidate_cache(uint64_t off, uint64_t len) = 0; + virtual int open(const std::string& path) = 0; + virtual void close() = 0; + +protected: + bool is_valid_io(uint64_t off, uint64_t len) const; +}; + +#endif //CEPH_BLK_BLOCKDEVICE_H diff --git a/src/blk/CMakeLists.txt b/src/blk/CMakeLists.txt new file mode 100644 index 000000000..849f3eef9 --- /dev/null +++ b/src/blk/CMakeLists.txt @@ -0,0 +1,60 @@ +if(WITH_BLUESTORE OR WITH_RBD_SSD_CACHE) +list(APPEND libblk_srcs + BlockDevice.cc) +endif() + +if(HAVE_LIBAIO OR HAVE_POSIXAIO) + list(APPEND libblk_srcs + kernel/KernelDevice.cc + kernel/io_uring.cc + aio/aio.cc) +endif() + +if(WITH_BLUESTORE_PMEM) + list(APPEND libblk_srcs + pmem/PMEMDevice.cc) +endif() + +if(WITH_SPDK) + list(APPEND libblk_srcs + spdk/NVMEDevice.cc) +endif() + +if(WITH_ZBD) + list(APPEND libblk_srcs + zoned/HMSMRDevice.cc) +endif() + +add_library(blk STATIC ${libblk_srcs}) +target_include_directories(blk PRIVATE "./") + +if(HAVE_LIBAIO) + target_link_libraries(blk PUBLIC ${AIO_LIBRARIES}) +endif(HAVE_LIBAIO) + +if(WITH_SPDK) + target_link_libraries(blk PRIVATE ${SPDK_LIBRARIES}) +endif() + +if(WITH_ZBD) + target_link_libraries(blk PRIVATE ${ZBD_LIBRARIES}) +endif() + +if(WITH_BLUESTORE_PMEM) + target_link_libraries(blk + PRIVATE pmem::pmem) +endif() + +if(WITH_EVENTTRACE) + add_dependencies(blk eventtrace_tp) +endif() + +if(WITH_LIBURING) + if(WITH_SYSTEM_LIBURING) + find_package(uring REQUIRED) + else() + include(Builduring) + build_uring() + endif() + target_link_libraries(blk PRIVATE uring::uring) +endif() diff --git a/src/blk/aio/aio.cc b/src/blk/aio/aio.cc new file mode 100644 index 000000000..00a12bfd1 --- /dev/null +++ b/src/blk/aio/aio.cc @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <algorithm> +#include "aio.h" + +std::ostream& operator<<(std::ostream& os, const aio_t& aio) +{ + unsigned i = 0; + os << "aio: "; + for (auto& iov : aio.iov) { + os << "\n [" << i++ << "] 0x" + << std::hex << iov.iov_base << "~" << iov.iov_len << std::dec; + } + return os; +} + +int aio_queue_t::submit_batch(aio_iter begin, aio_iter end, + uint16_t aios_size, void *priv, + int *retries) +{ + // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds + int attempts = 16; + int delay = 125; + int r; + + aio_iter cur = begin; + struct aio_t *piocb[aios_size]; + int left = 0; + while (cur != end) { + cur->priv = priv; + *(piocb+left) = &(*cur); + ++left; + ++cur; + } + ceph_assert(aios_size >= left); + int done = 0; + while (left > 0) { +#if defined(HAVE_LIBAIO) + r = io_submit(ctx, std::min(left, max_iodepth), (struct iocb**)(piocb + done)); +#elif defined(HAVE_POSIXAIO) + if (piocb[done]->n_aiocb == 1) { + // TODO: consider batching multiple reads together with lio_listio + piocb[done]->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + piocb[done]->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx; + piocb[done]->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = piocb[done]; + r = aio_read(&piocb[done]->aio.aiocb); + } else { + struct sigevent sev; + sev.sigev_notify = SIGEV_KEVENT; + sev.sigev_notify_kqueue = ctx; + sev.sigev_value.sival_ptr = piocb[done]; + r = lio_listio(LIO_NOWAIT, &piocb[done]->aio.aiocbp, piocb[done]->n_aiocb, &sev); + } +#endif + if (r < 0) { + if (r == -EAGAIN && attempts-- > 0) { + usleep(delay); + delay *= 2; + (*retries)++; + continue; + } + return r; + } + ceph_assert(r > 0); + done += r; + left -= r; + attempts = 16; + delay = 125; + } + return done; +} + +int aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) +{ +#if defined(HAVE_LIBAIO) + io_event events[max]; +#elif defined(HAVE_POSIXAIO) + struct kevent events[max]; +#endif + struct timespec t = { + timeout_ms / 1000, + (timeout_ms % 1000) * 1000 * 1000 + }; + + int r = 0; + do { +#if defined(HAVE_LIBAIO) + r = io_getevents(ctx, 1, max, events, &t); +#elif defined(HAVE_POSIXAIO) + r = kevent(ctx, NULL, 0, events, max, &t); + if (r < 0) + r = -errno; +#endif + } while (r == -EINTR); + + for (int i=0; i<r; ++i) { +#if defined(HAVE_LIBAIO) + paio[i] = (aio_t *)events[i].obj; + paio[i]->rval = events[i].res; +#else + paio[i] = (aio_t*)events[i].udata; + if (paio[i]->n_aiocb == 1) { + paio[i]->rval = aio_return(&paio[i]->aio.aiocb); + } else { + // Emulate the return value of pwritev. I can't find any documentation + // for what the value of io_event.res is supposed to be. I'm going to + // assume that it's just like pwritev/preadv/pwrite/pread. + paio[i]->rval = 0; + for (int j = 0; j < paio[i]->n_aiocb; j++) { + int res = aio_return(&paio[i]->aio.aiocbp[j]); + if (res < 0) { + paio[i]->rval = res; + break; + } else { + paio[i]->rval += res; + } + } + free(paio[i]->aio.aiocbp); + } +#endif + } + return r; +} diff --git a/src/blk/aio/aio.h b/src/blk/aio/aio.h new file mode 100644 index 000000000..14b89784b --- /dev/null +++ b/src/blk/aio/aio.h @@ -0,0 +1,159 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "acconfig.h" + +#if defined(HAVE_LIBAIO) +#include <libaio.h> +#elif defined(HAVE_POSIXAIO) +#include <aio.h> +#include <sys/event.h> +#endif + +#include <boost/intrusive/list.hpp> +#include <boost/container/small_vector.hpp> + +#include "include/buffer.h" +#include "include/types.h" + +struct aio_t { +#if defined(HAVE_LIBAIO) + struct iocb iocb{}; // must be first element; see shenanigans in aio_queue_t +#elif defined(HAVE_POSIXAIO) + // static long aio_listio_max = -1; + union { + struct aiocb aiocb; + struct aiocb *aiocbp; + } aio; + int n_aiocb; +#endif + void *priv; + int fd; + boost::container::small_vector<iovec,4> iov; + uint64_t offset, length; + long rval; + ceph::buffer::list bl; ///< write payload (so that it remains stable for duration) + + boost::intrusive::list_member_hook<> queue_item; + + aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) { + } + + void pwritev(uint64_t _offset, uint64_t len) { + offset = _offset; + length = len; +#if defined(HAVE_LIBAIO) + io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset); +#elif defined(HAVE_POSIXAIO) + n_aiocb = iov.size(); + aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb)); + for (int i = 0; i < iov.size(); i++) { + aio.aiocbp[i].aio_fildes = fd; + aio.aiocbp[i].aio_offset = offset; + aio.aiocbp[i].aio_buf = iov[i].iov_base; + aio.aiocbp[i].aio_nbytes = iov[i].iov_len; + aio.aiocbp[i].aio_lio_opcode = LIO_WRITE; + offset += iov[i].iov_len; + } +#endif + } + + void preadv(uint64_t _offset, uint64_t len) { + offset = _offset; + length = len; +#if defined(HAVE_LIBAIO) + io_prep_preadv(&iocb, fd, &iov[0], iov.size(), offset); +#elif defined(HAVE_POSIXAIO) + n_aiocb = iov.size(); + aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb)); + for (size_t i = 0; i < iov.size(); i++) { + aio.aiocbp[i].aio_fildes = fd; + aio.aiocbp[i].aio_buf = iov[i].iov_base; + aio.aiocbp[i].aio_nbytes = iov[i].iov_len; + aio.aiocbp[i].aio_offset = offset; + aio.aiocbp[i].aio_lio_opcode = LIO_READ; + offset += iov[i].iov_len; + } +#endif + } + + long get_return_value() { + return rval; + } +}; + +std::ostream& operator<<(std::ostream& os, const aio_t& aio); + +typedef boost::intrusive::list< + aio_t, + boost::intrusive::member_hook< + aio_t, + boost::intrusive::list_member_hook<>, + &aio_t::queue_item> > aio_list_t; + +struct io_queue_t { + typedef std::list<aio_t>::iterator aio_iter; + + virtual ~io_queue_t() {}; + + virtual int init(std::vector<int> &fds) = 0; + virtual void shutdown() = 0; + virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size, + void *priv, int *retries) = 0; + virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0; +}; + +struct aio_queue_t final : public io_queue_t { + int max_iodepth; +#if defined(HAVE_LIBAIO) + io_context_t ctx; +#elif defined(HAVE_POSIXAIO) + int ctx; +#endif + + explicit aio_queue_t(unsigned max_iodepth) + : max_iodepth(max_iodepth), + ctx(0) { + } + ~aio_queue_t() final { + ceph_assert(ctx == 0); + } + + int init(std::vector<int> &fds) final { + (void)fds; + ceph_assert(ctx == 0); +#if defined(HAVE_LIBAIO) + int r = io_setup(max_iodepth, &ctx); + if (r < 0) { + if (ctx) { + io_destroy(ctx); + ctx = 0; + } + } + return r; +#elif defined(HAVE_POSIXAIO) + ctx = kqueue(); + if (ctx < 0) + return -errno; + else + return 0; +#endif + } + void shutdown() final { + if (ctx) { +#if defined(HAVE_LIBAIO) + int r = io_destroy(ctx); +#elif defined(HAVE_POSIXAIO) + int r = close(ctx); +#endif + ceph_assert(r == 0); + ctx = 0; + } + } + + int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size, + void *priv, int *retries) final; + int get_next_completed(int timeout_ms, aio_t **paio, int max) final; +}; diff --git a/src/blk/kernel/KernelDevice.cc b/src/blk/kernel/KernelDevice.cc new file mode 100644 index 000000000..d93279965 --- /dev/null +++ b/src/blk/kernel/KernelDevice.cc @@ -0,0 +1,1235 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <limits> +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <sys/file.h> + +#include "KernelDevice.h" +#include "include/intarith.h" +#include "include/types.h" +#include "include/compat.h" +#include "include/stringify.h" +#include "common/blkdev.h" +#include "common/errno.h" +#if defined(__FreeBSD__) +#include "bsm/audit_errno.h" +#endif +#include "common/debug.h" +#include "common/numa.h" + +#include "global/global_context.h" +#include "io_uring.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev(" << this << " " << path << ") " + +using std::list; +using std::map; +using std::string; +using std::vector; + +using ceph::bufferlist; +using ceph::bufferptr; +using ceph::make_timespan; +using ceph::mono_clock; +using ceph::operator <<; + +KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv) + : BlockDevice(cct, cb, cbpriv), + aio(false), dio(false), + discard_callback(d_cb), + discard_callback_priv(d_cbpriv), + aio_stop(false), + discard_started(false), + discard_stop(false), + aio_thread(this), + discard_thread(this), + injecting_crash(0) +{ + fd_directs.resize(WRITE_LIFE_MAX, -1); + fd_buffereds.resize(WRITE_LIFE_MAX, -1); + + bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring"); + unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth; + + if (use_ioring && ioring_queue_t::supported()) { + bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri"); + bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll"); + io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll); + } else { + static bool once; + if (use_ioring && !once) { + derr << "WARNING: io_uring API is not supported! Fallback to libaio!" + << dendl; + once = true; + } + io_queue = std::make_unique<aio_queue_t>(iodepth); + } +} + +int KernelDevice::_lock() +{ + dout(10) << __func__ << " " << fd_directs[WRITE_LIFE_NOT_SET] << dendl; + // When the block changes, systemd-udevd will open the block, + // read some information and close it. Then a failure occurs here. + // So we need to try again here. + int fd = fd_directs[WRITE_LIFE_NOT_SET]; + uint64_t nr_tries = 0; + for (;;) { + struct flock fl = { F_WRLCK, + SEEK_SET }; + int r = ::fcntl(fd, F_OFD_SETLK, &fl); + if (r < 0) { + if (errno == EINVAL) { + r = ::flock(fd, LOCK_EX | LOCK_NB); + } + } + if (r == 0) { + return 0; + } + if (errno != EAGAIN) { + return -errno; + } + dout(1) << __func__ << " flock busy on " << path << dendl; + if (const uint64_t max_retry = + cct->_conf.get_val<uint64_t>("bdev_flock_retry"); + max_retry > 0 && nr_tries++ == max_retry) { + return -EAGAIN; + } + double retry_interval = + cct->_conf.get_val<double>("bdev_flock_retry_interval"); + std::this_thread::sleep_for(ceph::make_timespan(retry_interval)); + } +} + +int KernelDevice::open(const string& p) +{ + path = p; + int r = 0, i = 0; + dout(1) << __func__ << " path " << path << dendl; + + for (i = 0; i < WRITE_LIFE_MAX; i++) { + int fd = ::open(path.c_str(), O_RDWR | O_DIRECT); + if (fd < 0) { + r = -errno; + break; + } + fd_directs[i] = fd; + + fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC); + if (fd < 0) { + r = -errno; + break; + } + fd_buffereds[i] = fd; + } + + if (i != WRITE_LIFE_MAX) { + derr << __func__ << " open got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + +#if defined(F_SET_FILE_RW_HINT) + for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) { + if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) { + r = -errno; + break; + } + if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) { + r = -errno; + break; + } + } + if (i != WRITE_LIFE_MAX) { + enable_wrt = false; + dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl; + } +#endif + + dio = true; + aio = cct->_conf->bdev_aio; + if (!aio) { + ceph_abort_msg("non-aio not supported"); + } + + // disable readahead as it will wreak havoc on our mix of + // directio/aio and buffered io. + r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM); + if (r) { + r = -r; + derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + + if (lock_exclusive) { + r = _lock(); + if (r < 0) { + derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r) + << dendl; + goto out_fail; + } + } + + struct stat st; + r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st); + if (r < 0) { + r = -errno; + derr << __func__ << " fstat got " << cpp_strerror(r) << dendl; + goto out_fail; + } + + // Operate as though the block size is 4 KB. The backing file + // blksize doesn't strictly matter except that some file systems may + // require a read/modify/write if we write something smaller than + // it. + block_size = cct->_conf->bdev_block_size; + if (block_size != (unsigned)st.st_blksize) { + dout(1) << __func__ << " backing device/file reports st_blksize " + << st.st_blksize << ", using bdev_block_size " + << block_size << " anyway" << dendl; + } + + + { + BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]); + BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]); + + if (S_ISBLK(st.st_mode)) { + int64_t s; + r = blkdev_direct.get_size(&s); + if (r < 0) { + goto out_fail; + } + size = s; + } else { + size = st.st_size; + } + + char partition[PATH_MAX], devname[PATH_MAX]; + if ((r = blkdev_buffered.partition(partition, PATH_MAX)) || + (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) { + derr << "unable to get device name for " << path << ": " + << cpp_strerror(r) << dendl; + rotational = true; + } else { + dout(20) << __func__ << " devname " << devname << dendl; + rotational = blkdev_buffered.is_rotational(); + support_discard = blkdev_buffered.support_discard(); + this->devname = devname; + _detect_vdo(); + } + } + + r = _aio_start(); + if (r < 0) { + goto out_fail; + } + _discard_start(); + + // round size down to an even block + size &= ~(block_size - 1); + + dout(1) << __func__ + << " size " << size + << " (0x" << std::hex << size << std::dec << ", " + << byte_u_t(size) << ")" + << " block_size " << block_size + << " (" << byte_u_t(block_size) << ")" + << " " << (rotational ? "rotational" : "non-rotational") + << " discard " << (support_discard ? "supported" : "not supported") + << dendl; + return 0; + +out_fail: + for (i = 0; i < WRITE_LIFE_MAX; i++) { + if (fd_directs[i] >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i])); + fd_directs[i] = -1; + } else { + break; + } + if (fd_buffereds[i] >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i])); + fd_buffereds[i] = -1; + } else { + break; + } + } + return r; +} + +int KernelDevice::get_devices(std::set<std::string> *ls) const +{ + if (devname.empty()) { + return 0; + } + get_raw_devices(devname, ls); + return 0; +} + +void KernelDevice::close() +{ + dout(1) << __func__ << dendl; + _aio_stop(); + _discard_stop(); + + if (vdo_fd >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(vdo_fd)); + vdo_fd = -1; + } + + for (int i = 0; i < WRITE_LIFE_MAX; i++) { + assert(fd_directs[i] >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i])); + fd_directs[i] = -1; + + assert(fd_buffereds[i] >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i])); + fd_buffereds[i] = -1; + } + path.clear(); +} + +int KernelDevice::collect_metadata(const string& prefix, map<string,string> *pm) const +{ + (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard); + (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational); + (*pm)[prefix + "size"] = stringify(get_size()); + (*pm)[prefix + "block_size"] = stringify(get_block_size()); + (*pm)[prefix + "driver"] = "KernelDevice"; + if (rotational) { + (*pm)[prefix + "type"] = "hdd"; + } else { + (*pm)[prefix + "type"] = "ssd"; + } + if (vdo_fd >= 0) { + (*pm)[prefix + "vdo"] = "true"; + uint64_t total, avail; + get_vdo_utilization(vdo_fd, &total, &avail); + (*pm)[prefix + "vdo_physical_size"] = stringify(total); + } + + { + string res_names; + std::set<std::string> devnames; + if (get_devices(&devnames) == 0) { + for (auto& dev : devnames) { + if (!res_names.empty()) { + res_names += ","; + } + res_names += dev; + } + if (res_names.size()) { + (*pm)[prefix + "devices"] = res_names; + } + } + } + + struct stat st; + int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st); + if (r < 0) + return -errno; + if (S_ISBLK(st.st_mode)) { + (*pm)[prefix + "access_mode"] = "blk"; + + char buffer[1024] = {0}; + BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]}; + if (r = blkdev.partition(buffer, sizeof(buffer)); r) { + (*pm)[prefix + "partition_path"] = "unknown"; + } else { + (*pm)[prefix + "partition_path"] = buffer; + } + buffer[0] = '\0'; + if (r = blkdev.partition(buffer, sizeof(buffer)); r) { + (*pm)[prefix + "dev_node"] = "unknown"; + } else { + (*pm)[prefix + "dev_node"] = buffer; + } + if (!r) { + return 0; + } + buffer[0] = '\0'; + blkdev.model(buffer, sizeof(buffer)); + (*pm)[prefix + "model"] = buffer; + + buffer[0] = '\0'; + blkdev.dev(buffer, sizeof(buffer)); + (*pm)[prefix + "dev"] = buffer; + + // nvme exposes a serial number + buffer[0] = '\0'; + blkdev.serial(buffer, sizeof(buffer)); + (*pm)[prefix + "serial"] = buffer; + + // numa + int node; + r = blkdev.get_numa_node(&node); + if (r >= 0) { + (*pm)[prefix + "numa_node"] = stringify(node); + } + } else { + (*pm)[prefix + "access_mode"] = "file"; + (*pm)[prefix + "path"] = path; + } + return 0; +} + +void KernelDevice::_detect_vdo() +{ + vdo_fd = get_vdo_stats_handle(devname.c_str(), &vdo_name); + if (vdo_fd >= 0) { + dout(1) << __func__ << " VDO volume " << vdo_name + << " maps to " << devname << dendl; + } else { + dout(20) << __func__ << " no VDO volume maps to " << devname << dendl; + } + return; +} + +bool KernelDevice::get_thin_utilization(uint64_t *total, uint64_t *avail) const +{ + if (vdo_fd < 0) { + return false; + } + return get_vdo_utilization(vdo_fd, total, avail); +} + +int KernelDevice::choose_fd(bool buffered, int write_hint) const +{ + assert(write_hint >= WRITE_LIFE_NOT_SET && write_hint < WRITE_LIFE_MAX); + if (!enable_wrt) + write_hint = WRITE_LIFE_NOT_SET; + return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint]; +} + +int KernelDevice::flush() +{ + // protect flush with a mutex. note that we are not really protecting + // data here. instead, we're ensuring that if any flush() caller + // sees that io_since_flush is true, they block any racing callers + // until the flush is observed. that allows racing threads to be + // calling flush while still ensuring that *any* of them that got an + // aio completion notification will not return before that aio is + // stable on disk: whichever thread sees the flag first will block + // followers until the aio is stable. + std::lock_guard l(flush_mutex); + + bool expect = true; + if (!io_since_flush.compare_exchange_strong(expect, false)) { + dout(10) << __func__ << " no-op (no ios since last flush), flag is " + << (int)io_since_flush.load() << dendl; + return 0; + } + + dout(10) << __func__ << " start" << dendl; + if (cct->_conf->bdev_inject_crash) { + ++injecting_crash; + // sleep for a moment to give other threads a chance to submit or + // wait on io that races with a flush. + derr << __func__ << " injecting crash. first we sleep..." << dendl; + sleep(cct->_conf->bdev_inject_crash_flush_delay); + derr << __func__ << " and now we die" << dendl; + cct->_log->flush(); + _exit(1); + } + utime_t start = ceph_clock_now(); + int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]); + utime_t end = ceph_clock_now(); + utime_t dur = end - start; + if (r < 0) { + r = -errno; + derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl; + ceph_abort(); + } + dout(5) << __func__ << " in " << dur << dendl;; + return r; +} + +int KernelDevice::_aio_start() +{ + if (aio) { + dout(10) << __func__ << dendl; + int r = io_queue->init(fd_directs); + if (r < 0) { + if (r == -EAGAIN) { + derr << __func__ << " io_setup(2) failed with EAGAIN; " + << "try increasing /proc/sys/fs/aio-max-nr" << dendl; + } else { + derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl; + } + return r; + } + aio_thread.create("bstore_aio"); + } + return 0; +} + +void KernelDevice::_aio_stop() +{ + if (aio) { + dout(10) << __func__ << dendl; + aio_stop = true; + aio_thread.join(); + aio_stop = false; + io_queue->shutdown(); + } +} + +int KernelDevice::_discard_start() +{ + discard_thread.create("bstore_discard"); + return 0; +} + +void KernelDevice::_discard_stop() +{ + dout(10) << __func__ << dendl; + { + std::unique_lock l(discard_lock); + while (!discard_started) { + discard_cond.wait(l); + } + discard_stop = true; + discard_cond.notify_all(); + } + discard_thread.join(); + { + std::lock_guard l(discard_lock); + discard_stop = false; + } + dout(10) << __func__ << " stopped" << dendl; +} + +void KernelDevice::discard_drain() +{ + dout(10) << __func__ << dendl; + std::unique_lock l(discard_lock); + while (!discard_queued.empty() || discard_running) { + discard_cond.wait(l); + } +} + +static bool is_expected_ioerr(const int r) +{ + // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135 + return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC || + r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO || + r == -ENODATA || r == -EILSEQ || r == -ENOMEM || +#if defined(__linux__) + r == -EREMCHG || r == -EBADE +#elif defined(__FreeBSD__) + r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE +#endif + ); +} + +void KernelDevice::_aio_thread() +{ + dout(10) << __func__ << " start" << dendl; + int inject_crash_count = 0; + while (!aio_stop) { + dout(40) << __func__ << " polling" << dendl; + int max = cct->_conf->bdev_aio_reap_max; + aio_t *aio[max]; + int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms, + aio, max); + if (r < 0) { + derr << __func__ << " got " << cpp_strerror(r) << dendl; + ceph_abort_msg("got unexpected error from io_getevents"); + } + if (r > 0) { + dout(30) << __func__ << " got " << r << " completed aios" << dendl; + for (int i = 0; i < r; ++i) { + IOContext *ioc = static_cast<IOContext*>(aio[i]->priv); + _aio_log_finish(ioc, aio[i]->offset, aio[i]->length); + if (aio[i]->queue_item.is_linked()) { + std::lock_guard l(debug_queue_lock); + debug_aio_unlink(*aio[i]); + } + + // set flag indicating new ios have completed. we do this *before* + // any completion or notifications so that any user flush() that + // follows the observed io completion will include this io. Note + // that an earlier, racing flush() could observe and clear this + // flag, but that also ensures that the IO will be stable before the + // later flush() occurs. + io_since_flush.store(true); + + long r = aio[i]->get_return_value(); + if (r < 0) { + derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")" + << dendl; + if (ioc->allow_eio && is_expected_ioerr(r)) { + derr << __func__ << " translating the error to EIO for upper layer" + << dendl; + ioc->set_return_value(-EIO); + } else { + if (is_expected_ioerr(r)) { + note_io_error_event( + devname.c_str(), + path.c_str(), + r, +#if defined(HAVE_POSIXAIO) + aio[i]->aio.aiocb.aio_lio_opcode, +#else + aio[i]->iocb.aio_lio_opcode, +#endif + aio[i]->offset, + aio[i]->length); + ceph_abort_msg( + "Unexpected IO error. " + "This may suggest a hardware issue. " + "Please check your kernel log!"); + } + ceph_abort_msg( + "Unexpected IO error. " + "This may suggest HW issue. Please check your dmesg!"); + } + } else if (aio[i]->length != (uint64_t)r) { + derr << "aio to 0x" << std::hex << aio[i]->offset + << "~" << aio[i]->length << std::dec + << " but returned: " << r << dendl; + ceph_abort_msg("unexpected aio return value: does not match length"); + } + + dout(10) << __func__ << " finished aio " << aio[i] << " r " << r + << " ioc " << ioc + << " with " << (ioc->num_running.load() - 1) + << " aios left" << dendl; + + // NOTE: once num_running and we either call the callback or + // call aio_wake we cannot touch ioc or aio[] as the caller + // may free it. + if (ioc->priv) { + if (--ioc->num_running == 0) { + aio_callback(aio_callback_priv, ioc->priv); + } + } else { + ioc->try_aio_wake(); + } + } + } + if (cct->_conf->bdev_debug_aio) { + utime_t now = ceph_clock_now(); + std::lock_guard l(debug_queue_lock); + if (debug_oldest) { + if (debug_stall_since == utime_t()) { + debug_stall_since = now; + } else { + if (cct->_conf->bdev_debug_aio_suicide_timeout) { + utime_t cutoff = now; + cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout; + if (debug_stall_since < cutoff) { + derr << __func__ << " stalled aio " << debug_oldest + << " since " << debug_stall_since << ", timeout is " + << cct->_conf->bdev_debug_aio_suicide_timeout + << "s, suicide" << dendl; + ceph_abort_msg("stalled aio... buggy kernel or bad device?"); + } + } + } + } + } + reap_ioc(); + if (cct->_conf->bdev_inject_crash) { + ++inject_crash_count; + if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 > + cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) { + derr << __func__ << " bdev_inject_crash trigger from aio thread" + << dendl; + cct->_log->flush(); + _exit(1); + } + } + } + reap_ioc(); + dout(10) << __func__ << " end" << dendl; +} + +void KernelDevice::_discard_thread() +{ + std::unique_lock l(discard_lock); + ceph_assert(!discard_started); + discard_started = true; + discard_cond.notify_all(); + while (true) { + ceph_assert(discard_finishing.empty()); + if (discard_queued.empty()) { + if (discard_stop) + break; + dout(20) << __func__ << " sleep" << dendl; + discard_cond.notify_all(); // for the thread trying to drain... + discard_cond.wait(l); + dout(20) << __func__ << " wake" << dendl; + } else { + discard_finishing.swap(discard_queued); + discard_running = true; + l.unlock(); + dout(20) << __func__ << " finishing" << dendl; + for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) { + discard(p.get_start(), p.get_len()); + } + + discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing)); + discard_finishing.clear(); + l.lock(); + discard_running = false; + } + } + dout(10) << __func__ << " finish" << dendl; + discard_started = false; +} + +int KernelDevice::queue_discard(interval_set<uint64_t> &to_release) +{ + if (!support_discard) + return -1; + + if (to_release.empty()) + return 0; + + std::lock_guard l(discard_lock); + discard_queued.insert(to_release); + discard_cond.notify_all(); + return 0; +} + +void KernelDevice::_aio_log_start( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length + << std::dec << dendl; + if (cct->_conf->bdev_debug_inflight_ios) { + std::lock_guard l(debug_lock); + if (debug_inflight.intersects(offset, length)) { + derr << __func__ << " inflight overlap of 0x" + << std::hex + << offset << "~" << length << std::dec + << " with " << debug_inflight << dendl; + ceph_abort(); + } + debug_inflight.insert(offset, length); + } +} + +void KernelDevice::debug_aio_link(aio_t& aio) +{ + if (debug_queue.empty()) { + debug_oldest = &aio; + } + debug_queue.push_back(aio); +} + +void KernelDevice::debug_aio_unlink(aio_t& aio) +{ + if (aio.queue_item.is_linked()) { + debug_queue.erase(debug_queue.iterator_to(aio)); + if (debug_oldest == &aio) { + auto age = cct->_conf->bdev_debug_aio_log_age; + if (age && debug_stall_since != utime_t()) { + utime_t cutoff = ceph_clock_now(); + cutoff -= age; + if (debug_stall_since < cutoff) { + derr << __func__ << " stalled aio " << debug_oldest + << " since " << debug_stall_since << ", timeout is " + << age + << "s" << dendl; + } + } + + if (debug_queue.empty()) { + debug_oldest = nullptr; + } else { + debug_oldest = &debug_queue.front(); + } + debug_stall_since = utime_t(); + } + } +} + +void KernelDevice::_aio_log_finish( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " " << aio << " 0x" + << std::hex << offset << "~" << length << std::dec << dendl; + if (cct->_conf->bdev_debug_inflight_ios) { + std::lock_guard l(debug_lock); + debug_inflight.erase(offset, length); + } +} + +void KernelDevice::aio_submit(IOContext *ioc) +{ + dout(20) << __func__ << " ioc " << ioc + << " pending " << ioc->num_pending.load() + << " running " << ioc->num_running.load() + << dendl; + + if (ioc->num_pending.load() == 0) { + return; + } + + // move these aside, and get our end iterator position now, as the + // aios might complete as soon as they are submitted and queue more + // wal aio's. + list<aio_t>::iterator e = ioc->running_aios.begin(); + ioc->running_aios.splice(e, ioc->pending_aios); + + int pending = ioc->num_pending.load(); + ioc->num_running += pending; + ioc->num_pending -= pending; + ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this + ceph_assert(ioc->pending_aios.size() == 0); + + if (cct->_conf->bdev_debug_aio) { + list<aio_t>::iterator p = ioc->running_aios.begin(); + while (p != e) { + dout(30) << __func__ << " " << *p << dendl; + std::lock_guard l(debug_queue_lock); + debug_aio_link(*p++); + } + } + + void *priv = static_cast<void*>(ioc); + int r, retries = 0; + // num of pending aios should not overflow when passed to submit_batch() + assert(pending <= std::numeric_limits<uint16_t>::max()); + r = io_queue->submit_batch(ioc->running_aios.begin(), e, + pending, priv, &retries); + + if (retries) + derr << __func__ << " retries " << retries << dendl; + if (r < 0) { + derr << " aio submit got " << cpp_strerror(r) << dendl; + ceph_assert(r == 0); + } +} + +int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint) +{ + uint64_t len = bl.length(); + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << (buffered ? " (buffered)" : " (direct)") << dendl; + if (cct->_conf->bdev_inject_crash && + rand() % cct->_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex + << off << "~" << len << std::dec << dendl; + ++injecting_crash; + return 0; + } + vector<iovec> iov; + bl.prepare_iov(&iov); + + auto left = len; + auto o = off; + size_t idx = 0; + do { + auto r = ::pwritev(choose_fd(buffered, write_hint), + &iov[idx], iov.size() - idx, o); + + if (r < 0) { + r = -errno; + derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl; + return r; + } + o += r; + left -= r; + if (left) { + // skip fully processed IOVs + while (idx < iov.size() && (size_t)r >= iov[idx].iov_len) { + r -= iov[idx++].iov_len; + } + // update partially processed one if any + if (r) { + ceph_assert(idx < iov.size()); + ceph_assert((size_t)r < iov[idx].iov_len); + iov[idx].iov_base = static_cast<char*>(iov[idx].iov_base) + r; + iov[idx].iov_len -= r; + r = 0; + } + ceph_assert(r == 0); + } + } while (left); + +#ifdef HAVE_SYNC_FILE_RANGE + if (buffered) { + // initiate IO and wait till it completes + auto r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE); + if (r < 0) { + r = -errno; + derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl; + return r; + } + } +#endif + + io_since_flush.store(true); + + return 0; +} + +int KernelDevice::write( + uint64_t off, + bufferlist &bl, + bool buffered, + int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << dendl; + ceph_assert(is_valid_io(off, len)); + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + + if ((!buffered || bl.get_num_buffers() >= IOV_MAX) && + bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) { + dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl; + } + dout(40) << "data: "; + bl.hexdump(*_dout); + *_dout << dendl; + + return _sync_write(off, bl, buffered, write_hint); +} + +int KernelDevice::aio_write( + uint64_t off, + bufferlist &bl, + IOContext *ioc, + bool buffered, + int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << dendl; + ceph_assert(is_valid_io(off, len)); + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + + if ((!buffered || bl.get_num_buffers() >= IOV_MAX) && + bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) { + dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl; + } + dout(40) << "data: "; + bl.hexdump(*_dout); + *_dout << dendl; + + _aio_log_start(ioc, off, len); + +#ifdef HAVE_LIBAIO + if (aio && dio && !buffered) { + if (cct->_conf->bdev_inject_crash && + rand() % cct->_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex + << off << "~" << len << std::dec + << dendl; + // generate a real io so that aio_wait behaves properly, but make it + // a read instead of write, and toss the result. + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + bufferptr p = ceph::buffer::create_small_page_aligned(len); + aio.bl.append(std::move(p)); + aio.bl.prepare_iov(&aio.iov); + aio.preadv(off, len); + ++injecting_crash; + } else { + if (bl.length() <= RW_IO_MAX) { + // fast path (non-huge write) + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + bl.prepare_iov(&aio.iov); + aio.bl.claim_append(bl); + aio.pwritev(off, len); + dout(30) << aio << dendl; + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << " aio " << &aio << dendl; + } else { + // write in RW_IO_MAX-sized chunks + uint64_t prev_len = 0; + while (prev_len < bl.length()) { + bufferlist tmp; + if (prev_len + RW_IO_MAX < bl.length()) { + tmp.substr_of(bl, prev_len, RW_IO_MAX); + } else { + tmp.substr_of(bl, prev_len, bl.length() - prev_len); + } + auto len = tmp.length(); + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + tmp.prepare_iov(&aio.iov); + aio.bl.claim_append(tmp); + aio.pwritev(off + prev_len, len); + dout(30) << aio << dendl; + dout(5) << __func__ << " 0x" << std::hex << off + prev_len + << "~" << len + << std::dec << " aio " << &aio << " (piece)" << dendl; + prev_len += len; + } + } + } + } else +#endif + { + int r = _sync_write(off, bl, buffered, write_hint); + _aio_log_finish(ioc, off, len); + if (r < 0) + return r; + } + return 0; +} + +int KernelDevice::discard(uint64_t offset, uint64_t len) +{ + int r = 0; + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + if (support_discard) { + dout(10) << __func__ + << " 0x" << std::hex << offset << "~" << len << std::dec + << dendl; + + r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len); + } + return r; +} + +int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << dendl; + ceph_assert(is_valid_io(off, len)); + + _aio_log_start(ioc, off, len); + + auto start1 = mono_clock::now(); + + auto p = ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len)); + int r = ::pread(buffered ? fd_buffereds[WRITE_LIFE_NOT_SET] : fd_directs[WRITE_LIFE_NOT_SET], + p->c_str(), len, off); + auto age = cct->_conf->bdev_debug_aio_log_age; + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << " since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + + if (r < 0) { + if (ioc->allow_eio && is_expected_ioerr(r)) { + r = -EIO; + } else { + r = -errno; + } + goto out; + } + ceph_assert((uint64_t)r == len); + pbl->push_back(std::move(p)); + + dout(40) << "data: "; + pbl->hexdump(*_dout); + *_dout << dendl; + + out: + _aio_log_finish(ioc, off, len); + return r < 0 ? r : 0; +} + +int KernelDevice::aio_read( + uint64_t off, + uint64_t len, + bufferlist *pbl, + IOContext *ioc) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << dendl; + + int r = 0; +#ifdef HAVE_LIBAIO + if (aio && dio) { + ceph_assert(is_valid_io(off, len)); + _aio_log_start(ioc, off, len); + ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET])); + ++ioc->num_pending; + aio_t& aio = ioc->pending_aios.back(); + bufferptr p = ceph::buffer::create_small_page_aligned(len); + aio.bl.append(std::move(p)); + aio.bl.prepare_iov(&aio.iov); + aio.preadv(off, len); + dout(30) << aio << dendl; + pbl->append(aio.bl); + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << " aio " << &aio << dendl; + } else +#endif + { + r = read(off, len, pbl, ioc, false); + } + + return r; +} + +int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf) +{ + uint64_t aligned_off = p2align(off, block_size); + uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off; + bufferptr p = ceph::buffer::create_small_page_aligned(aligned_len); + int r = 0; + + auto start1 = mono_clock::now(); + r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off); + auto age = cct->_conf->bdev_debug_aio_log_age; + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << " since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + + if (r < 0) { + r = -errno; + derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " error: " << cpp_strerror(r) << dendl; + goto out; + } + ceph_assert((uint64_t)r == aligned_len); + memcpy(buf, p.c_str() + (off - aligned_off), len); + + dout(40) << __func__ << " data: "; + bufferlist bl; + bl.append(buf, len); + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r < 0 ? r : 0; +} + +int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf, + bool buffered) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << "buffered " << buffered + << dendl; + ceph_assert(len > 0); + ceph_assert(off < size); + ceph_assert(off + len <= size); + int r = 0; + auto age = cct->_conf->bdev_debug_aio_log_age; + + //if it's direct io and unaligned, we have to use a internal buffer + if (!buffered && ((off % block_size != 0) + || (len % block_size != 0) + || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0))) + return direct_read_unaligned(off, len, buf); + + auto start1 = mono_clock::now(); + if (buffered) { + //buffered read + auto off0 = off; + char *t = buf; + uint64_t left = len; + while (left > 0) { + r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off); + if (r < 0) { + r = -errno; + derr << __func__ << " 0x" << std::hex << off << "~" << left + << std::dec << " error: " << cpp_strerror(r) << dendl; + goto out; + } + off += r; + t += r; + left -= r; + } + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off0 << "~" << len << std::dec + << " (buffered) since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + } else { + //direct and aligned read + r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off); + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << " (direct) since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + if (r < 0) { + r = -errno; + derr << __func__ << " direct_aligned_read" << " 0x" << std::hex + << off << "~" << std::left << std::dec << " error: " << cpp_strerror(r) + << dendl; + goto out; + } + ceph_assert((uint64_t)r == len); + } + + dout(40) << __func__ << " data: "; + bufferlist bl; + bl.append(buf, len); + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r < 0 ? r : 0; +} + +int KernelDevice::invalidate_cache(uint64_t off, uint64_t len) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << dendl; + ceph_assert(off % block_size == 0); + ceph_assert(len % block_size == 0); + int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED); + if (r) { + r = -r; + derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " error: " << cpp_strerror(r) << dendl; + } + return r; +} diff --git a/src/blk/kernel/KernelDevice.h b/src/blk/kernel/KernelDevice.h new file mode 100644 index 000000000..7ac9b1e7e --- /dev/null +++ b/src/blk/kernel/KernelDevice.h @@ -0,0 +1,150 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_BLK_KERNELDEVICE_H +#define CEPH_BLK_KERNELDEVICE_H + +#include <atomic> + +#include "include/types.h" +#include "include/interval_set.h" +#include "common/Thread.h" +#include "include/utime.h" + +#include "aio/aio.h" +#include "BlockDevice.h" + +#define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK) + + +class KernelDevice : public BlockDevice { + std::vector<int> fd_directs, fd_buffereds; + bool enable_wrt = true; + std::string path; + bool aio, dio; + + int vdo_fd = -1; ///< fd for vdo sysfs directory + std::string vdo_name; + + std::string devname; ///< kernel dev name (/sys/block/$devname), if any + + ceph::mutex debug_lock = ceph::make_mutex("KernelDevice::debug_lock"); + interval_set<uint64_t> debug_inflight; + + std::atomic<bool> io_since_flush = {false}; + ceph::mutex flush_mutex = ceph::make_mutex("KernelDevice::flush_mutex"); + + std::unique_ptr<io_queue_t> io_queue; + aio_callback_t discard_callback; + void *discard_callback_priv; + bool aio_stop; + bool discard_started; + bool discard_stop; + + ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock"); + ceph::condition_variable discard_cond; + bool discard_running = false; + interval_set<uint64_t> discard_queued; + interval_set<uint64_t> discard_finishing; + + struct AioCompletionThread : public Thread { + KernelDevice *bdev; + explicit AioCompletionThread(KernelDevice *b) : bdev(b) {} + void *entry() override { + bdev->_aio_thread(); + return NULL; + } + } aio_thread; + + struct DiscardThread : public Thread { + KernelDevice *bdev; + explicit DiscardThread(KernelDevice *b) : bdev(b) {} + void *entry() override { + bdev->_discard_thread(); + return NULL; + } + } discard_thread; + + std::atomic_int injecting_crash; + + void _aio_thread(); + void _discard_thread(); + int queue_discard(interval_set<uint64_t> &to_release) override; + + int _aio_start(); + void _aio_stop(); + + int _discard_start(); + void _discard_stop(); + + void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length); + void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length); + + int _sync_write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint); + + int _lock(); + + int direct_read_unaligned(uint64_t off, uint64_t len, char *buf); + + // stalled aio debugging + aio_list_t debug_queue; + ceph::mutex debug_queue_lock = ceph::make_mutex("KernelDevice::debug_queue_lock"); + aio_t *debug_oldest = nullptr; + utime_t debug_stall_since; + void debug_aio_link(aio_t& aio); + void debug_aio_unlink(aio_t& aio); + + void _detect_vdo(); + int choose_fd(bool buffered, int write_hint) const; + +public: + KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv); + + void aio_submit(IOContext *ioc) override; + void discard_drain() override; + + int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override; + int get_devname(std::string *s) const override { + if (devname.empty()) { + return -ENOENT; + } + *s = devname; + return 0; + } + int get_devices(std::set<std::string> *ls) const override; + + bool get_thin_utilization(uint64_t *total, uint64_t *avail) const override; + + int read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, + IOContext *ioc, + bool buffered) override; + int aio_read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, + IOContext *ioc) override; + int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override; + + int write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override; + int aio_write(uint64_t off, ceph::buffer::list& bl, + IOContext *ioc, + bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) override; + int flush() override; + int discard(uint64_t offset, uint64_t len) override; + + // for managing buffered readers/writers + int invalidate_cache(uint64_t off, uint64_t len) override; + int open(const std::string& path) override; + void close() override; +}; + +#endif diff --git a/src/blk/kernel/io_uring.cc b/src/blk/kernel/io_uring.cc new file mode 100644 index 000000000..17bd35bb4 --- /dev/null +++ b/src/blk/kernel/io_uring.cc @@ -0,0 +1,261 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "io_uring.h" + +#if defined(HAVE_LIBURING) + +#include "liburing.h" +#include <sys/epoll.h> + +struct ioring_data { + struct io_uring io_uring; + pthread_mutex_t cq_mutex; + pthread_mutex_t sq_mutex; + int epoll_fd = -1; + std::map<int, int> fixed_fds_map; +}; + +static int ioring_get_cqe(struct ioring_data *d, unsigned int max, + struct aio_t **paio) +{ + struct io_uring *ring = &d->io_uring; + struct io_uring_cqe *cqe; + + unsigned nr = 0; + unsigned head; + io_uring_for_each_cqe(ring, head, cqe) { + struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe); + io->rval = cqe->res; + + paio[nr++] = io; + + if (nr == max) + break; + } + io_uring_cq_advance(ring, nr); + + return nr; +} + +static int find_fixed_fd(struct ioring_data *d, int real_fd) +{ + auto it = d->fixed_fds_map.find(real_fd); + if (it == d->fixed_fds_map.end()) + return -1; + + return it->second; +} + +static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe, + struct aio_t *io) +{ + int fixed_fd = find_fixed_fd(d, io->fd); + + ceph_assert(fixed_fd != -1); + + if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV) + io_uring_prep_writev(sqe, fixed_fd, &io->iov[0], + io->iov.size(), io->offset); + else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV) + io_uring_prep_readv(sqe, fixed_fd, &io->iov[0], + io->iov.size(), io->offset); + else + ceph_assert(0); + + io_uring_sqe_set_data(sqe, io); + io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); +} + +static int ioring_queue(struct ioring_data *d, void *priv, + list<aio_t>::iterator beg, list<aio_t>::iterator end) +{ + struct io_uring *ring = &d->io_uring; + struct aio_t *io = nullptr; + + ceph_assert(beg != end); + + do { + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) + break; + + io = &*beg; + io->priv = priv; + + init_sqe(d, sqe, io); + + } while (++beg != end); + + if (!io) + /* Queue is full, go and reap something first */ + return 0; + + return io_uring_submit(ring); +} + +static void build_fixed_fds_map(struct ioring_data *d, + std::vector<int> &fds) +{ + int fixed_fd = 0; + for (int real_fd : fds) { + d->fixed_fds_map[real_fd] = fixed_fd++; + } +} + +ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_) : + d(make_unique<ioring_data>()), + iodepth(iodepth_), + hipri(hipri_), + sq_thread(sq_thread_) +{ +} + +ioring_queue_t::~ioring_queue_t() +{ +} + +int ioring_queue_t::init(std::vector<int> &fds) +{ + unsigned flags = 0; + + pthread_mutex_init(&d->cq_mutex, NULL); + pthread_mutex_init(&d->sq_mutex, NULL); + + if (hipri) + flags |= IORING_SETUP_IOPOLL; + if (sq_thread) + flags |= IORING_SETUP_SQPOLL; + + int ret = io_uring_queue_init(iodepth, &d->io_uring, flags); + if (ret < 0) + return ret; + + ret = io_uring_register_files(&d->io_uring, + &fds[0], fds.size()); + if (ret < 0) { + ret = -errno; + goto close_ring_fd; + } + + build_fixed_fds_map(d.get(), fds); + + d->epoll_fd = epoll_create1(0); + if (d->epoll_fd < 0) { + ret = -errno; + goto close_ring_fd; + } + + struct epoll_event ev; + ev.events = EPOLLIN; + ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev); + if (ret < 0) { + ret = -errno; + goto close_epoll_fd; + } + + return 0; + +close_epoll_fd: + close(d->epoll_fd); +close_ring_fd: + io_uring_queue_exit(&d->io_uring); + + return ret; +} + +void ioring_queue_t::shutdown() +{ + d->fixed_fds_map.clear(); + close(d->epoll_fd); + d->epoll_fd = -1; + io_uring_queue_exit(&d->io_uring); +} + +int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end, + uint16_t aios_size, void *priv, + int *retries) +{ + (void)aios_size; + (void)retries; + + pthread_mutex_lock(&d->sq_mutex); + int rc = ioring_queue(d.get(), priv, beg, end); + pthread_mutex_unlock(&d->sq_mutex); + + return rc; +} + +int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) +{ +get_cqe: + pthread_mutex_lock(&d->cq_mutex); + int events = ioring_get_cqe(d.get(), max, paio); + pthread_mutex_unlock(&d->cq_mutex); + + if (events == 0) { + struct epoll_event ev; + int ret = TEMP_FAILURE_RETRY(epoll_wait(d->epoll_fd, &ev, 1, timeout_ms)); + if (ret < 0) + events = -errno; + else if (ret > 0) + /* Time to reap */ + goto get_cqe; + } + + return events; +} + +bool ioring_queue_t::supported() +{ + struct io_uring ring; + int ret = io_uring_queue_init(16, &ring, 0); + if (ret) { + return false; + } + io_uring_queue_exit(&ring); + return true; +} + +#else // #if defined(HAVE_LIBURING) + +struct ioring_data {}; + +ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_) +{ + ceph_assert(0); +} + +ioring_queue_t::~ioring_queue_t() +{ + ceph_assert(0); +} + +int ioring_queue_t::init(std::vector<int> &fds) +{ + ceph_assert(0); +} + +void ioring_queue_t::shutdown() +{ + ceph_assert(0); +} + +int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end, + uint16_t aios_size, void *priv, + int *retries) +{ + ceph_assert(0); +} + +int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) +{ + ceph_assert(0); +} + +bool ioring_queue_t::supported() +{ + return false; +} + +#endif // #if defined(HAVE_LIBURING) diff --git a/src/blk/kernel/io_uring.h b/src/blk/kernel/io_uring.h new file mode 100644 index 000000000..e7d0acde0 --- /dev/null +++ b/src/blk/kernel/io_uring.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "acconfig.h" + +#include "include/types.h" +#include "aio/aio.h" + +struct ioring_data; + +struct ioring_queue_t final : public io_queue_t { + std::unique_ptr<ioring_data> d; + unsigned iodepth = 0; + bool hipri = false; + bool sq_thread = false; + + typedef std::list<aio_t>::iterator aio_iter; + + // Returns true if arch is x86-64 and kernel supports io_uring + static bool supported(); + + ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_); + ~ioring_queue_t() final; + + int init(std::vector<int> &fds) final; + void shutdown() final; + + int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size, + void *priv, int *retries) final; + int get_next_completed(int timeout_ms, aio_t **paio, int max) final; +}; diff --git a/src/blk/pmem/PMEMDevice.cc b/src/blk/pmem/PMEMDevice.cc new file mode 100644 index 000000000..247ed0692 --- /dev/null +++ b/src/blk/pmem/PMEMDevice.cc @@ -0,0 +1,282 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Intel <jianpeng.ma@intel.com> + * + * Author: Jianpeng Ma <jianpeng.ma@intel.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "PMEMDevice.h" +#include "libpmem.h" +#include "include/types.h" +#include "include/compat.h" +#include "include/stringify.h" +#include "common/errno.h" +#include "common/debug.h" +#include "common/blkdev.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev-PMEM(" << path << ") " + +PMEMDevice::PMEMDevice(CephContext *cct, aio_callback_t cb, void *cbpriv) + : BlockDevice(cct, cb, cbpriv), + fd(-1), addr(0), + injecting_crash(0) +{ +} + +int PMEMDevice::_lock() +{ + struct flock l; + memset(&l, 0, sizeof(l)); + l.l_type = F_WRLCK; + l.l_whence = SEEK_SET; + l.l_start = 0; + l.l_len = 0; + int r = ::fcntl(fd, F_SETLK, &l); + if (r < 0) + return -errno; + return 0; +} + +int PMEMDevice::open(const string& p) +{ + path = p; + int r = 0; + dout(1) << __func__ << " path " << path << dendl; + + fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC); + if (fd < 0) { + r = -errno; + derr << __func__ << " open got: " << cpp_strerror(r) << dendl; + return r; + } + + r = _lock(); + if (r < 0) { + derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r) + << dendl; + goto out_fail; + } + + struct stat st; + r = ::fstat(fd, &st); + if (r < 0) { + r = -errno; + derr << __func__ << " fstat got " << cpp_strerror(r) << dendl; + goto out_fail; + } + + size_t map_len; + addr = (char *)pmem_map_file(path.c_str(), 0, PMEM_FILE_EXCL, O_RDWR, &map_len, NULL); + if (addr == NULL) { + derr << __func__ << " pmem_map_file failed: " << pmem_errormsg() << dendl; + goto out_fail; + } + size = map_len; + + // Operate as though the block size is 4 KB. The backing file + // blksize doesn't strictly matter except that some file systems may + // require a read/modify/write if we write something smaller than + // it. + block_size = g_conf()->bdev_block_size; + if (block_size != (unsigned)st.st_blksize) { + dout(1) << __func__ << " backing device/file reports st_blksize " + << st.st_blksize << ", using bdev_block_size " + << block_size << " anyway" << dendl; + } + + dout(1) << __func__ + << " size " << size + << " (" << byte_u_t(size) << ")" + << " block_size " << block_size + << " (" << byte_u_t(block_size) << ")" + << dendl; + return 0; + + out_fail: + VOID_TEMP_FAILURE_RETRY(::close(fd)); + fd = -1; + return r; +} + +void PMEMDevice::close() +{ + dout(1) << __func__ << dendl; + + ceph_assert(addr != NULL); + pmem_unmap(addr, size); + ceph_assert(fd >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd)); + fd = -1; + + path.clear(); +} + +int PMEMDevice::collect_metadata(const string& prefix, map<string,string> *pm) const +{ + (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational); + (*pm)[prefix + "size"] = stringify(get_size()); + (*pm)[prefix + "block_size"] = stringify(get_block_size()); + (*pm)[prefix + "driver"] = "PMEMDevice"; + (*pm)[prefix + "type"] = "ssd"; + + struct stat st; + int r = ::fstat(fd, &st); + if (r < 0) + return -errno; + if (S_ISBLK(st.st_mode)) { + (*pm)[prefix + "access_mode"] = "blk"; + char buffer[1024] = {0}; + BlkDev blkdev(fd); + + blkdev.model(buffer, sizeof(buffer)); + (*pm)[prefix + "model"] = buffer; + + buffer[0] = '\0'; + blkdev.dev(buffer, sizeof(buffer)); + (*pm)[prefix + "dev"] = buffer; + + // nvme exposes a serial number + buffer[0] = '\0'; + blkdev.serial(buffer, sizeof(buffer)); + (*pm)[prefix + "serial"] = buffer; + + } else { + (*pm)[prefix + "access_mode"] = "file"; + (*pm)[prefix + "path"] = path; + } + return 0; +} + +bool PMEMDevice::support(const std::string &path) +{ + int is_pmem = 0; + size_t map_len = 0; + void *addr = pmem_map_file(path.c_str(), 0, PMEM_FILE_EXCL, O_RDONLY, &map_len, &is_pmem); + if (addr != NULL) { + if (is_pmem) { + return true; + } + pmem_unmap(addr, map_len); + } + return false; +} + +int PMEMDevice::flush() +{ + //Because all write is persist. So no need + return 0; +} + + +void PMEMDevice::aio_submit(IOContext *ioc) +{ + if (ioc->priv) { + ceph_assert(ioc->num_running == 0); + aio_callback(aio_callback_priv, ioc->priv); + } else { + ioc->try_aio_wake(); + } + return; +} + +int PMEMDevice::write(uint64_t off, bufferlist& bl, bool buffered, int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " " << off << "~" << len << dendl; + ceph_assert(is_valid_io(off, len)); + + dout(40) << "data: "; + bl.hexdump(*_dout); + *_dout << dendl; + + if (g_conf()->bdev_inject_crash && + rand() % g_conf()->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io " << off << "~" << len + << dendl; + ++injecting_crash; + return 0; + } + + bufferlist::iterator p = bl.begin(); + uint64_t off1 = off; + while (len) { + const char *data; + uint32_t l = p.get_ptr_and_advance(len, &data); + pmem_memcpy_persist(addr + off1, data, l); + len -= l; + off1 += l; + } + return 0; +} + +int PMEMDevice::aio_write( + uint64_t off, + bufferlist &bl, + IOContext *ioc, + bool buffered, + int write_hint) +{ + return write(off, bl, buffered); +} + + +int PMEMDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + ceph_assert(is_valid_io(off, len)); + + bufferptr p = buffer::create_small_page_aligned(len); + memcpy(p.c_str(), addr + off, len); + + pbl->clear(); + pbl->push_back(std::move(p)); + + dout(40) << "data: "; + pbl->hexdump(*_dout); + *_dout << dendl; + + return 0; +} + +int PMEMDevice::aio_read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc) +{ + return read(off, len, pbl, ioc, false); +} + +int PMEMDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + ceph_assert(is_valid_io(off, len)); + + memcpy(buf, addr + off, len); + return 0; +} + + +int PMEMDevice::invalidate_cache(uint64_t off, uint64_t len) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + return 0; +} + + diff --git a/src/blk/pmem/PMEMDevice.h b/src/blk/pmem/PMEMDevice.h new file mode 100644 index 000000000..a240d2a7b --- /dev/null +++ b/src/blk/pmem/PMEMDevice.h @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Intel <jianpeng.ma@intel.com> + * + * Author: Jianpeng Ma <jianpeng.ma@intel.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_BLK_PMEMDEVICE_H +#define CEPH_BLK_PMEMDEVICE_H + +#include <atomic> + +#include "os/fs/FS.h" +#include "include/interval_set.h" +#include "aio/aio.h" +#include "BlockDevice.h" + +class PMEMDevice : public BlockDevice { + int fd; + char *addr; //the address of mmap + std::string path; + + ceph::mutex debug_lock = ceph::make_mutex("PMEMDevice::debug_lock"); + interval_set<uint64_t> debug_inflight; + + std::atomic_int injecting_crash; + int _lock(); + +public: + PMEMDevice(CephContext *cct, aio_callback_t cb, void *cbpriv); + + + void aio_submit(IOContext *ioc) override; + + int collect_metadata(const std::string& prefix, map<std::string,std::string> *pm) const override; + + static bool support(const std::string& path); + + int read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) override; + int aio_read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc) override; + + int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override; + int write(uint64_t off, bufferlist& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override; + int aio_write(uint64_t off, bufferlist& bl, + IOContext *ioc, + bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) override; + int flush() override; + + // for managing buffered readers/writers + int invalidate_cache(uint64_t off, uint64_t len) override; + int open(const std::string &path) override; + void close() override; + +private: + bool is_valid_io(uint64_t off, uint64_t len) const { + return (len > 0 && + off < size && + off + len <= size); + } +}; + +#endif diff --git a/src/blk/spdk/NVMEDevice.cc b/src/blk/spdk/NVMEDevice.cc new file mode 100644 index 000000000..935d3bbd1 --- /dev/null +++ b/src/blk/spdk/NVMEDevice.cc @@ -0,0 +1,971 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 XSky <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <unistd.h> +#include <stdlib.h> +#include <strings.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +#include <chrono> +#include <fstream> +#include <functional> +#include <map> +#include <thread> +#include <boost/intrusive/slist.hpp> + +#include <spdk/nvme.h> + +#include "include/intarith.h" +#include "include/stringify.h" +#include "include/types.h" +#include "include/compat.h" +#include "common/errno.h" +#include "common/debug.h" +#include "common/perf_counters.h" + +#include "NVMEDevice.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev(" << sn << ") " + +static constexpr uint16_t data_buffer_default_num = 1024; + +static constexpr uint32_t data_buffer_size = 8192; + +static constexpr uint16_t inline_segment_num = 32; + +static void io_complete(void *t, const struct spdk_nvme_cpl *completion); + +struct IORequest { + uint16_t cur_seg_idx = 0; + uint16_t nseg; + uint32_t cur_seg_left = 0; + void *inline_segs[inline_segment_num]; + void **extra_segs = nullptr; +}; + +namespace bi = boost::intrusive; +struct data_cache_buf : public bi::slist_base_hook<bi::link_mode<bi::normal_link>> +{}; + +struct Task; + +class SharedDriverData { + unsigned id; + spdk_nvme_transport_id trid; + spdk_nvme_ctrlr *ctrlr; + spdk_nvme_ns *ns; + uint32_t block_size = 0; + uint64_t size = 0; + + public: + std::vector<NVMEDevice*> registered_devices; + friend class SharedDriverQueueData; + SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_, + spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_) + : id(id_), + trid(trid_), + ctrlr(c), + ns(ns_) { + block_size = spdk_nvme_ns_get_extended_sector_size(ns); + size = spdk_nvme_ns_get_size(ns); + } + + bool is_equal(const spdk_nvme_transport_id& trid2) const { + return spdk_nvme_transport_id_compare(&trid, &trid2) == 0; + } + ~SharedDriverData() { + } + + void register_device(NVMEDevice *device) { + registered_devices.push_back(device); + } + + void remove_device(NVMEDevice *device) { + std::vector<NVMEDevice*> new_devices; + for (auto &&it : registered_devices) { + if (it != device) + new_devices.push_back(it); + } + registered_devices.swap(new_devices); + } + + uint32_t get_block_size() { + return block_size; + } + uint64_t get_size() { + return size; + } +}; + +class SharedDriverQueueData { + NVMEDevice *bdev; + SharedDriverData *driver; + spdk_nvme_ctrlr *ctrlr; + spdk_nvme_ns *ns; + std::string sn; + uint32_t block_size; + uint32_t max_queue_depth; + struct spdk_nvme_qpair *qpair; + bool reap_io = false; + int alloc_buf_from_pool(Task *t, bool write); + + public: + uint32_t current_queue_depth = 0; + std::atomic_ulong completed_op_seq, queue_op_seq; + bi::slist<data_cache_buf, bi::constant_time_size<true>> data_buf_list; + void _aio_handle(Task *t, IOContext *ioc); + + SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver) + : bdev(bdev), + driver(driver) { + ctrlr = driver->ctrlr; + ns = driver->ns; + block_size = driver->block_size; + + struct spdk_nvme_io_qpair_opts opts = {}; + spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts)); + opts.qprio = SPDK_NVME_QPRIO_URGENT; + // usable queue depth should minus 1 to aovid overflow. + max_queue_depth = opts.io_queue_size - 1; + qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts)); + ceph_assert(qpair != NULL); + + // allocate spdk dma memory + for (uint16_t i = 0; i < data_buffer_default_num; i++) { + void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL); + if (!b) { + derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl; + ceph_assert(b); + } + data_buf_list.push_front(*reinterpret_cast<data_cache_buf *>(b)); + } + + bdev->queue_number++; + if (bdev->queue_number.load() == 1) + reap_io = true; + } + + ~SharedDriverQueueData() { + if (qpair) { + spdk_nvme_ctrlr_free_io_qpair(qpair); + bdev->queue_number--; + } + + data_buf_list.clear_and_dispose(spdk_dma_free); + } +}; + +struct Task { + NVMEDevice *device; + IOContext *ctx = nullptr; + IOCommand command; + uint64_t offset; + uint64_t len; + bufferlist bl; + std::function<void()> fill_cb; + Task *next = nullptr; + int64_t return_code; + Task *primary = nullptr; + ceph::coarse_real_clock::time_point start; + IORequest io_request = {}; + ceph::mutex lock = ceph::make_mutex("Task::lock"); + ceph::condition_variable cond; + SharedDriverQueueData *queue = nullptr; + // reference count by subtasks. + int ref = 0; + Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0, + Task *p = nullptr) + : device(dev), command(c), offset(off), len(l), + return_code(rc), primary(p), + start(ceph::coarse_real_clock::now()) { + if (primary) { + primary->ref++; + return_code = primary->return_code; + } + } + ~Task() { + if (primary) + primary->ref--; + ceph_assert(!io_request.nseg); + } + void release_segs(SharedDriverQueueData *queue_data) { + if (io_request.extra_segs) { + for (uint16_t i = 0; i < io_request.nseg; i++) { + auto buf = reinterpret_cast<data_cache_buf *>(io_request.extra_segs[i]); + queue_data->data_buf_list.push_front(*buf); + } + delete io_request.extra_segs; + } else if (io_request.nseg) { + for (uint16_t i = 0; i < io_request.nseg; i++) { + auto buf = reinterpret_cast<data_cache_buf *>(io_request.inline_segs[i]); + queue_data->data_buf_list.push_front(*buf); + } + } + ctx->total_nseg -= io_request.nseg; + io_request.nseg = 0; + } + + void copy_to_buf(char *buf, uint64_t off, uint64_t len) { + uint64_t copied = 0; + uint64_t left = len; + void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs; + uint16_t i = 0; + while (left > 0) { + char *src = static_cast<char*>(segs[i++]); + uint64_t need_copy = std::min(left, data_buffer_size-off); + memcpy(buf+copied, src+off, need_copy); + off = 0; + left -= need_copy; + copied += need_copy; + } + } +}; + +static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset) +{ + Task *t = static_cast<Task*>(cb_arg); + uint32_t i = sgl_offset / data_buffer_size; + uint32_t offset = i * data_buffer_size; + ceph_assert(i <= t->io_request.nseg); + + for (; i < t->io_request.nseg; i++) { + offset += data_buffer_size; + if (offset > sgl_offset) { + if (offset > t->len) + offset = t->len; + break; + } + } + + t->io_request.cur_seg_idx = i; + t->io_request.cur_seg_left = offset - sgl_offset; + return ; +} + +static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length) +{ + uint32_t size; + void *addr; + Task *t = static_cast<Task*>(cb_arg); + if (t->io_request.cur_seg_idx >= t->io_request.nseg) { + *length = 0; + *address = 0; + return 0; + } + + addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx]; + + size = data_buffer_size; + if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) { + uint64_t tail = t->len % data_buffer_size; + if (tail) { + size = (uint32_t) tail; + } + } + + if (t->io_request.cur_seg_left) { + *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left); + *length = t->io_request.cur_seg_left; + t->io_request.cur_seg_left = 0; + } else { + *address = addr; + *length = size; + } + + t->io_request.cur_seg_idx++; + return 0; +} + +int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write) +{ + uint64_t count = t->len / data_buffer_size; + if (t->len % data_buffer_size) + ++count; + void **segs; + if (count > data_buf_list.size()) + return -ENOMEM; + if (count <= inline_segment_num) { + segs = t->io_request.inline_segs; + } else { + t->io_request.extra_segs = new void*[count]; + segs = t->io_request.extra_segs; + } + for (uint16_t i = 0; i < count; i++) { + ceph_assert(!data_buf_list.empty()); + segs[i] = &data_buf_list.front(); + ceph_assert(segs[i] != nullptr); + data_buf_list.pop_front(); + } + t->io_request.nseg = count; + t->ctx->total_nseg += count; + if (write) { + auto blp = t->bl.begin(); + uint32_t len = 0; + uint16_t i = 0; + for (; i < count - 1; ++i) { + blp.copy(data_buffer_size, static_cast<char*>(segs[i])); + len += data_buffer_size; + } + blp.copy(t->bl.length() - len, static_cast<char*>(segs[i])); + } + + return 0; +} + +void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc) +{ + dout(20) << __func__ << " start" << dendl; + + int r = 0; + uint64_t lba_off, lba_count; + uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion"); + uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep"); + + while (ioc->num_running) { + again: + dout(40) << __func__ << " polling" << dendl; + if (current_queue_depth) { + r = spdk_nvme_qpair_process_completions(qpair, max_io_completion); + if (r < 0) { + ceph_abort(); + } else if (r == 0) { + usleep(io_sleep_in_us); + } + } + + for (; t; t = t->next) { + if (current_queue_depth == max_queue_depth) { + // no slots + goto again; + } + + t->queue = this; + lba_off = t->offset / block_size; + lba_count = t->len / block_size; + switch (t->command) { + case IOCommand::WRITE_COMMAND: + { + dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl; + r = alloc_buf_from_pool(t, true); + if (r < 0) { + goto again; + } + + r = spdk_nvme_ns_cmd_writev( + ns, qpair, lba_off, lba_count, io_complete, t, 0, + data_buf_reset_sgl, data_buf_next_sge); + if (r < 0) { + derr << __func__ << " failed to do write command: " << cpp_strerror(r) << dendl; + t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr; + t->release_segs(this); + delete t; + ceph_abort(); + } + break; + } + case IOCommand::READ_COMMAND: + { + dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl; + r = alloc_buf_from_pool(t, false); + if (r < 0) { + goto again; + } + + r = spdk_nvme_ns_cmd_readv( + ns, qpair, lba_off, lba_count, io_complete, t, 0, + data_buf_reset_sgl, data_buf_next_sge); + if (r < 0) { + derr << __func__ << " failed to read: " << cpp_strerror(r) << dendl; + t->release_segs(this); + delete t; + ceph_abort(); + } + break; + } + case IOCommand::FLUSH_COMMAND: + { + dout(20) << __func__ << " flush command issueed " << dendl; + r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t); + if (r < 0) { + derr << __func__ << " failed to flush: " << cpp_strerror(r) << dendl; + t->release_segs(this); + delete t; + ceph_abort(); + } + break; + } + } + current_queue_depth++; + } + } + + if (reap_io) + bdev->reap_ioc(); + dout(20) << __func__ << " end" << dendl; +} + +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev " + +class NVMEManager { + public: + struct ProbeContext { + spdk_nvme_transport_id trid; + NVMEManager *manager; + SharedDriverData *driver; + bool done; + }; + + private: + ceph::mutex lock = ceph::make_mutex("NVMEManager::lock"); + bool stopping = false; + std::vector<SharedDriverData*> shared_driver_datas; + std::thread dpdk_thread; + ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock"); + ceph::condition_variable probe_queue_cond; + std::list<ProbeContext*> probe_queue; + + public: + NVMEManager() {} + ~NVMEManager() { + if (!dpdk_thread.joinable()) + return; + { + std::lock_guard guard(probe_queue_lock); + stopping = true; + probe_queue_cond.notify_all(); + } + dpdk_thread.join(); + } + + int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver); + void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) { + ceph_assert(ceph_mutex_is_locked(lock)); + spdk_nvme_ns *ns; + int num_ns = spdk_nvme_ctrlr_get_num_ns(c); + ceph_assert(num_ns >= 1); + if (num_ns > 1) { + dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl; + } + ns = spdk_nvme_ctrlr_get_ns(c, 1); + if (!ns) { + derr << __func__ << " failed to get namespace at 1" << dendl; + ceph_abort(); + } + dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl; + + // only support one device per osd now! + ceph_assert(shared_driver_datas.empty()); + // index 0 is occurred by master thread + shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns)); + *driver = shared_driver_datas.back(); + } +}; + +static NVMEManager manager; + +static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts) +{ + NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx); + + if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) { + dout(0) << __func__ << " only probe local nvme device" << dendl; + return false; + } + + dout(0) << __func__ << " found device at: " + << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", " + << "traddr=" << trid->traddr << dendl; + if (spdk_nvme_transport_id_compare(&ctx->trid, trid)) { + dout(0) << __func__ << " device traddr (" << ctx->trid.traddr << ") not match " << trid->traddr << dendl; + return false; + } + + opts->io_queue_size = UINT16_MAX; + + return true; +} + +static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, + struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts) +{ + auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx); + ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver); +} + +static int hex2dec(unsigned char c) +{ + if (isdigit(c)) + return c - '0'; + else if (isupper(c)) + return c - 'A' + 10; + else + return c - 'a' + 10; +} + +static int find_first_bitset(const string& s) +{ + auto e = s.rend(); + if (s.compare(0, 2, "0x") == 0 || + s.compare(0, 2, "0X") == 0) { + advance(e, -2); + } + auto p = s.rbegin(); + for (int pos = 0; p != e; ++p, pos += 4) { + if (!isxdigit(*p)) { + return -EINVAL; + } + if (int val = hex2dec(*p); val != 0) { + return pos + ffs(val); + } + } + return 0; +} + +int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver) +{ + std::lock_guard l(lock); + for (auto &&it : shared_driver_datas) { + if (it->is_equal(trid)) { + *driver = it; + return 0; + } + } + + struct spdk_pci_addr pci_addr; + int rc = spdk_pci_addr_parse(&pci_addr, trid.traddr); + if (rc < 0) { + derr << __func__ << " invalid transport address: " << trid.traddr << dendl; + return -ENOENT; + } + auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask"); + int m_core_arg = find_first_bitset(coremask_arg); + // at least one core is needed for using spdk + if (m_core_arg <= 0) { + derr << __func__ << " invalid bluestore_spdk_coremask, " + << "at least one core is needed" << dendl; + return -ENOENT; + } + m_core_arg -= 1; + + uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem"); + + if (!dpdk_thread.joinable()) { + dpdk_thread = std::thread( + [this, coremask_arg, m_core_arg, mem_size_arg, pci_addr]() { + struct spdk_env_opts opts; + struct spdk_pci_addr addr = pci_addr; + int r; + + spdk_env_opts_init(&opts); + opts.name = "nvme-device-manager"; + opts.core_mask = coremask_arg.c_str(); + opts.master_core = m_core_arg; + opts.mem_size = mem_size_arg; + opts.pci_whitelist = &addr; + opts.num_pci_addr = 1; + spdk_env_init(&opts); + spdk_unaffinitize_thread(); + + std::unique_lock l(probe_queue_lock); + while (!stopping) { + if (!probe_queue.empty()) { + ProbeContext* ctxt = probe_queue.front(); + probe_queue.pop_front(); + r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL); + if (r < 0) { + ceph_assert(!ctxt->driver); + derr << __func__ << " device probe nvme failed" << dendl; + } + ctxt->done = true; + probe_queue_cond.notify_all(); + } else { + probe_queue_cond.wait(l); + } + } + for (auto p : probe_queue) + p->done = true; + probe_queue_cond.notify_all(); + } + ); + } + + ProbeContext ctx{trid, this, nullptr, false}; + { + std::unique_lock l(probe_queue_lock); + probe_queue.push_back(&ctx); + while (!ctx.done) + probe_queue_cond.wait(l); + } + if (!ctx.driver) + return -1; + *driver = ctx.driver; + + return 0; +} + +void io_complete(void *t, const struct spdk_nvme_cpl *completion) +{ + Task *task = static_cast<Task*>(t); + IOContext *ctx = task->ctx; + SharedDriverQueueData *queue = task->queue; + + ceph_assert(queue != NULL); + ceph_assert(ctx != NULL); + --queue->current_queue_depth; + if (task->command == IOCommand::WRITE_COMMAND) { + ceph_assert(!spdk_nvme_cpl_is_error(completion)); + dout(20) << __func__ << " write/zero op successfully, left " + << queue->queue_op_seq - queue->completed_op_seq << dendl; + // check waiting count before doing callback (which may + // destroy this ioc). + if (ctx->priv) { + if (!--ctx->num_running) { + task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + } + } else { + ctx->try_aio_wake(); + } + task->release_segs(queue); + delete task; + } else if (task->command == IOCommand::READ_COMMAND) { + ceph_assert(!spdk_nvme_cpl_is_error(completion)); + dout(20) << __func__ << " read op successfully" << dendl; + task->fill_cb(); + task->release_segs(queue); + // read submitted by AIO + if (!task->return_code) { + if (ctx->priv) { + if (!--ctx->num_running) { + task->device->aio_callback(task->device->aio_callback_priv, ctx->priv); + } + } else { + ctx->try_aio_wake(); + } + delete task; + } else { + if (Task* primary = task->primary; primary != nullptr) { + delete task; + if (!primary->ref) + primary->return_code = 0; + } else { + task->return_code = 0; + } + --ctx->num_running; + } + } else { + ceph_assert(task->command == IOCommand::FLUSH_COMMAND); + ceph_assert(!spdk_nvme_cpl_is_error(completion)); + dout(20) << __func__ << " flush op successfully" << dendl; + task->return_code = 0; + } +} + +// ---------------- +#undef dout_prefix +#define dout_prefix *_dout << "bdev(" << name << ") " + +NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv) + : BlockDevice(cct, cb, cbpriv), + driver(nullptr) +{ +} + +bool NVMEDevice::support(const std::string& path) +{ + char buf[PATH_MAX + 1]; + int r = ::readlink(path.c_str(), buf, sizeof(buf) - 1); + if (r >= 0) { + buf[r] = '\0'; + char *bname = ::basename(buf); + if (strncmp(bname, SPDK_PREFIX, sizeof(SPDK_PREFIX)-1) == 0) { + return true; + } + } + return false; +} + +int NVMEDevice::open(const string& p) +{ + dout(1) << __func__ << " path " << p << dendl; + + std::ifstream ifs(p); + if (!ifs) { + derr << __func__ << " unable to open " << p << dendl; + return -1; + } + string val; + std::getline(ifs, val); + spdk_nvme_transport_id trid; + if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) { + derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r) + << dendl; + return r; + } + if (int r = manager.try_get(trid, &driver); r < 0) { + derr << __func__ << " failed to get nvme device with transport address " << trid.traddr << dendl; + return r; + } + + driver->register_device(this); + block_size = driver->get_block_size(); + size = driver->get_size(); + name = trid.traddr; + + //nvme is non-rotational device. + rotational = false; + + // round size down to an even block + size &= ~(block_size - 1); + + dout(1) << __func__ << " size " << size << " (" << byte_u_t(size) << ")" + << " block_size " << block_size << " (" << byte_u_t(block_size) + << ")" << dendl; + + + return 0; +} + +void NVMEDevice::close() +{ + dout(1) << __func__ << dendl; + + name.clear(); + driver->remove_device(this); + + dout(1) << __func__ << " end" << dendl; +} + +int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const +{ + (*pm)[prefix + "rotational"] = "0"; + (*pm)[prefix + "size"] = stringify(get_size()); + (*pm)[prefix + "block_size"] = stringify(get_block_size()); + (*pm)[prefix + "driver"] = "NVMEDevice"; + (*pm)[prefix + "type"] = "nvme"; + (*pm)[prefix + "access_mode"] = "spdk"; + (*pm)[prefix + "nvme_serial_number"] = name; + + return 0; +} + +int NVMEDevice::flush() +{ + return 0; +} + +void NVMEDevice::aio_submit(IOContext *ioc) +{ + dout(20) << __func__ << " ioc " << ioc << " pending " + << ioc->num_pending.load() << " running " + << ioc->num_running.load() << dendl; + int pending = ioc->num_pending.load(); + Task *t = static_cast<Task*>(ioc->nvme_task_first); + if (pending && t) { + ioc->num_running += pending; + ioc->num_pending -= pending; + ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this + // Only need to push the first entry + ioc->nvme_task_first = ioc->nvme_task_last = nullptr; + + thread_local SharedDriverQueueData queue_t = SharedDriverQueueData(this, driver); + queue_t._aio_handle(t, ioc); + } +} + +static void ioc_append_task(IOContext *ioc, Task *t) +{ + Task *first, *last; + + first = static_cast<Task*>(ioc->nvme_task_first); + last = static_cast<Task*>(ioc->nvme_task_last); + if (last) + last->next = t; + if (!first) + ioc->nvme_task_first = t; + ioc->nvme_task_last = t; + ++ioc->num_pending; +} + +static void write_split( + NVMEDevice *dev, + uint64_t off, + bufferlist &bl, + IOContext *ioc) +{ + uint64_t remain_len = bl.length(), begin = 0, write_size; + Task *t; + // This value may need to be got from configuration later. + uint64_t split_size = 131072; // 128KB. + + while (remain_len > 0) { + write_size = std::min(remain_len, split_size); + t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size); + // TODO: if upper layer alloc memory with known physical address, + // we can reduce this copy + bl.splice(0, write_size, &t->bl); + remain_len -= write_size; + t->ctx = ioc; + ioc_append_task(ioc, t); + begin += write_size; + } +} + +static void make_read_tasks( + NVMEDevice *dev, + uint64_t aligned_off, + IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary, + uint64_t orig_off, uint64_t orig_len) +{ + // This value may need to be got from configuration later. + uint64_t split_size = 131072; // 128KB. + uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len; + auto begin = aligned_off; + const auto aligned_end = begin + aligned_len; + + for (; begin < aligned_end; begin += split_size) { + auto read_size = std::min(aligned_end - begin, split_size); + auto tmp_len = std::min(remain_orig_len, read_size - tmp_off); + Task *t = nullptr; + + if (primary && (aligned_len <= split_size)) { + t = primary; + } else { + t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary); + } + + t->ctx = ioc; + + // TODO: if upper layer alloc memory with known physical address, + // we can reduce this copy + t->fill_cb = [buf, t, tmp_off, tmp_len] { + t->copy_to_buf(buf, tmp_off, tmp_len); + }; + + ioc_append_task(ioc, t); + remain_orig_len -= tmp_len; + buf += tmp_len; + tmp_off = 0; + } +} + +int NVMEDevice::aio_write( + uint64_t off, + bufferlist &bl, + IOContext *ioc, + bool buffered, + int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc + << " buffered " << buffered << dendl; + ceph_assert(is_valid_io(off, len)); + + write_split(this, off, bl, ioc); + dout(5) << __func__ << " " << off << "~" << len << dendl; + + return 0; +} + +int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " " << off << "~" << len << " buffered " + << buffered << dendl; + ceph_assert(off % block_size == 0); + ceph_assert(len % block_size == 0); + ceph_assert(len > 0); + ceph_assert(off < size); + ceph_assert(off + len <= size); + + IOContext ioc(cct, NULL); + write_split(this, off, bl, &ioc); + dout(5) << __func__ << " " << off << "~" << len << dendl; + aio_submit(&ioc); + ioc.aio_wait(); + return 0; +} + +int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) +{ + dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl; + ceph_assert(is_valid_io(off, len)); + + Task t(this, IOCommand::READ_COMMAND, off, len, 1); + bufferptr p = buffer::create_small_page_aligned(len); + char *buf = p.c_str(); + + // for sync read, need to control IOContext in itself + IOContext read_ioc(cct, nullptr); + make_read_tasks(this, off, &read_ioc, buf, len, &t, off, len); + dout(5) << __func__ << " " << off << "~" << len << dendl; + aio_submit(&read_ioc); + + pbl->push_back(std::move(p)); + return t.return_code; +} + +int NVMEDevice::aio_read( + uint64_t off, + uint64_t len, + bufferlist *pbl, + IOContext *ioc) +{ + dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl; + ceph_assert(is_valid_io(off, len)); + bufferptr p = buffer::create_small_page_aligned(len); + pbl->append(p); + char* buf = p.c_str(); + + make_read_tasks(this, off, ioc, buf, len, NULL, off, len); + dout(5) << __func__ << " " << off << "~" << len << dendl; + return 0; +} + +int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered) +{ + ceph_assert(len > 0); + ceph_assert(off < size); + ceph_assert(off + len <= size); + + uint64_t aligned_off = p2align(off, block_size); + uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off; + dout(5) << __func__ << " " << off << "~" << len + << " aligned " << aligned_off << "~" << aligned_len << dendl; + IOContext ioc(g_ceph_context, nullptr); + Task t(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1); + + make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, &t, off, len); + aio_submit(&ioc); + + return t.return_code; +} + +int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len) +{ + dout(5) << __func__ << " " << off << "~" << len << dendl; + return 0; +} diff --git a/src/blk/spdk/NVMEDevice.h b/src/blk/spdk/NVMEDevice.h new file mode 100644 index 000000000..352856093 --- /dev/null +++ b/src/blk/spdk/NVMEDevice.h @@ -0,0 +1,85 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 XSky <haomai@xsky.com> + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_BLK_NVMEDEVICE +#define CEPH_BLK_NVMEDEVICE + +#include <queue> +#include <map> +#include <limits> + +// since _Static_assert introduced in c11 +#define _Static_assert static_assert + + +#include "include/interval_set.h" +#include "common/ceph_time.h" +#include "BlockDevice.h" + +enum class IOCommand { + READ_COMMAND, + WRITE_COMMAND, + FLUSH_COMMAND +}; + +class SharedDriverData; +class SharedDriverQueueData; + +class NVMEDevice : public BlockDevice { + /** + * points to pinned, physically contiguous memory region; + * contains 4KB IDENTIFY structure for controller which is + * target for CONTROLLER IDENTIFY command during initialization + */ + SharedDriverData *driver; + string name; + + public: + std::atomic_int queue_number = {0}; + SharedDriverData *get_driver() { return driver; } + + NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv); + + bool supported_bdev_label() override { return false; } + + static bool support(const std::string& path); + + void aio_submit(IOContext *ioc) override; + + int read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) override; + int aio_read( + uint64_t off, + uint64_t len, + bufferlist *pbl, + IOContext *ioc) override; + int aio_write(uint64_t off, bufferlist& bl, + IOContext *ioc, + bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) override; + int write(uint64_t off, bufferlist& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override; + int flush() override; + int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override; + + // for managing buffered readers/writers + int invalidate_cache(uint64_t off, uint64_t len) override; + int open(const string& path) override; + void close() override; + int collect_metadata(const string& prefix, map<string,string> *pm) const override; +}; + +#endif diff --git a/src/blk/zoned/HMSMRDevice.cc b/src/blk/zoned/HMSMRDevice.cc new file mode 100644 index 000000000..8a30be9b0 --- /dev/null +++ b/src/blk/zoned/HMSMRDevice.cc @@ -0,0 +1,1217 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * Copyright (C) 2020 Abutalib Aghayev + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <sys/file.h> + +#include "HMSMRDevice.h" +#include "include/intarith.h" +#include "include/types.h" +#include "include/compat.h" +#include "include/scope_guard.h" +#include "include/stringify.h" +#include "common/blkdev.h" +#include "common/errno.h" +#if defined(__FreeBSD__) +#include "bsm/audit_errno.h" +#endif +#include "common/debug.h" +#include "common/numa.h" + +#include "kernel/io_uring.h" + +extern "C" { +#include <libzbd/zbd.h> +} + +#define dout_context cct +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "smrbdev(" << this << " " << path << ") " + +HMSMRDevice::HMSMRDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv) + : BlockDevice(cct, cb, cbpriv), + aio(false), dio(false), + discard_callback(d_cb), + discard_callback_priv(d_cbpriv), + aio_stop(false), + discard_started(false), + discard_stop(false), + aio_thread(this), + discard_thread(this), + injecting_crash(0) +{ + fd_directs.resize(WRITE_LIFE_MAX, -1); + fd_buffereds.resize(WRITE_LIFE_MAX, -1); + + bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring"); + unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth; + + if (use_ioring && ioring_queue_t::supported()) { + bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri"); + bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll"); + io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll); + } else { + static bool once; + if (use_ioring && !once) { + derr << "WARNING: io_uring API is not supported! Fallback to libaio!" + << dendl; + once = true; + } + io_queue = std::make_unique<aio_queue_t>(iodepth); + } +} + +bool HMSMRDevice::support(const std::string& path) +{ + return zbd_device_is_zoned(path.c_str()) == 1; +} + +int HMSMRDevice::_lock() +{ + dout(10) << __func__ << " " << fd_directs[WRITE_LIFE_NOT_SET] << dendl; + int r = ::flock(fd_directs[WRITE_LIFE_NOT_SET], LOCK_EX | LOCK_NB); + if (r < 0) { + derr << __func__ << " flock failed on " << path << dendl; + return -errno; + } + return 0; +} + +bool HMSMRDevice::set_smr_params(const std::string& path) { + dout(10) << __func__ << " opening " << path << dendl; + + int dev = zbd_open(path.c_str(), O_RDWR | O_DIRECT | O_LARGEFILE, nullptr); + if (dev < 0) { + return false; + } + auto close_dev = make_scope_guard([dev] { zbd_close(dev); }); + + unsigned int nr_zones = 0; + if (zbd_report_nr_zones(dev, 0, 0, ZBD_RO_NOT_WP, &nr_zones) != 0) { + return false; + } + + std::vector<zbd_zone> zones(nr_zones); + if (zbd_report_zones(dev, 0, 0, ZBD_RO_NOT_WP, zones.data(), &nr_zones) != 0) { + return false; + } + + zone_size = zbd_zone_len(&zones[0]); + conventional_region_size = nr_zones * zone_size; + + dout(10) << __func__ << " setting zone size to " << zone_size + << " and conventional region size to " << conventional_region_size + << dendl; + + return true; +} + +int HMSMRDevice::open(const string& p) +{ + path = p; + int r = 0, i = 0; + dout(1) << __func__ << " path " << path << dendl; + + for (i = 0; i < WRITE_LIFE_MAX; i++) { + int fd = ::open(path.c_str(), O_RDWR | O_DIRECT); + if (fd < 0) { + r = -errno; + break; + } + fd_directs[i] = fd; + + fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC); + if (fd < 0) { + r = -errno; + break; + } + fd_buffereds[i] = fd; + } + + if (i != WRITE_LIFE_MAX) { + derr << __func__ << " open got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + + if (!set_smr_params(p)) { + derr << __func__ << " failed to set HM-SMR parameters" << dendl; + goto out_fail; + } + +#if defined(F_SET_FILE_RW_HINT) + for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) { + if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) { + r = -errno; + break; + } + if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) { + r = -errno; + break; + } + } + if (i != WRITE_LIFE_MAX) { + enable_wrt = false; + dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl; + } +#endif + + dio = true; + aio = cct->_conf->bdev_aio; + if (!aio) { + ceph_abort_msg("non-aio not supported"); + } + + // disable readahead as it will wreak havoc on our mix of + // directio/aio and buffered io. + r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM); + if (r) { + r = -r; + derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + + if (lock_exclusive) { + r = _lock(); + if (r < 0) { + derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r) + << dendl; + goto out_fail; + } + } + + struct stat st; + r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st); + if (r < 0) { + r = -errno; + derr << __func__ << " fstat got " << cpp_strerror(r) << dendl; + goto out_fail; + } + + // Operate as though the block size is 4 KB. The backing file + // blksize doesn't strictly matter except that some file systems may + // require a read/modify/write if we write something smaller than + // it. + block_size = cct->_conf->bdev_block_size; + if (block_size != (unsigned)st.st_blksize) { + dout(1) << __func__ << " backing device/file reports st_blksize " + << st.st_blksize << ", using bdev_block_size " + << block_size << " anyway" << dendl; + } + + + { + BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]); + BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]); + + if (S_ISBLK(st.st_mode)) { + int64_t s; + r = blkdev_direct.get_size(&s); + if (r < 0) { + goto out_fail; + } + size = s; + } else { + size = st.st_size; + } + + char partition[PATH_MAX], devname[PATH_MAX]; + if ((r = blkdev_buffered.partition(partition, PATH_MAX)) || + (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) { + derr << "unable to get device name for " << path << ": " + << cpp_strerror(r) << dendl; + rotational = true; + } else { + dout(20) << __func__ << " devname " << devname << dendl; + rotational = blkdev_buffered.is_rotational(); + support_discard = blkdev_buffered.support_discard(); + this->devname = devname; + _detect_vdo(); + } + } + + r = _aio_start(); + if (r < 0) { + goto out_fail; + } + _discard_start(); + + // round size down to an even block + size &= ~(block_size - 1); + + dout(1) << __func__ + << " size " << size + << " (0x" << std::hex << size << std::dec << ", " + << byte_u_t(size) << ")" + << " block_size " << block_size + << " (" << byte_u_t(block_size) << ")" + << " " << (rotational ? "rotational" : "non-rotational") + << " discard " << (support_discard ? "supported" : "not supported") + << dendl; + return 0; + +out_fail: + for (i = 0; i < WRITE_LIFE_MAX; i++) { + if (fd_directs[i] >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i])); + fd_directs[i] = -1; + } else { + break; + } + if (fd_buffereds[i] >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i])); + fd_buffereds[i] = -1; + } else { + break; + } + } + return r; +} + +int HMSMRDevice::get_devices(std::set<std::string> *ls) const +{ + if (devname.empty()) { + return 0; + } + get_raw_devices(devname, ls); + return 0; +} + +void HMSMRDevice::close() +{ + dout(1) << __func__ << dendl; + _aio_stop(); + _discard_stop(); + + if (vdo_fd >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(vdo_fd)); + vdo_fd = -1; + } + + for (int i = 0; i < WRITE_LIFE_MAX; i++) { + assert(fd_directs[i] >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i])); + fd_directs[i] = -1; + + assert(fd_buffereds[i] >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i])); + fd_buffereds[i] = -1; + } + path.clear(); +} + +int HMSMRDevice::collect_metadata(const string& prefix, map<string,string> *pm) const +{ + (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard); + (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational); + (*pm)[prefix + "size"] = stringify(get_size()); + (*pm)[prefix + "block_size"] = stringify(get_block_size()); + (*pm)[prefix + "driver"] = "HMSMRDevice"; + if (rotational) { + (*pm)[prefix + "type"] = "hdd"; + } else { + (*pm)[prefix + "type"] = "ssd"; + } + if (vdo_fd >= 0) { + (*pm)[prefix + "vdo"] = "true"; + uint64_t total, avail; + get_vdo_utilization(vdo_fd, &total, &avail); + (*pm)[prefix + "vdo_physical_size"] = stringify(total); + } + + { + string res_names; + std::set<std::string> devnames; + if (get_devices(&devnames) == 0) { + for (auto& dev : devnames) { + if (!res_names.empty()) { + res_names += ","; + } + res_names += dev; + } + if (res_names.size()) { + (*pm)[prefix + "devices"] = res_names; + } + } + } + + struct stat st; + int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st); + if (r < 0) + return -errno; + if (S_ISBLK(st.st_mode)) { + (*pm)[prefix + "access_mode"] = "blk"; + + char buffer[1024] = {0}; + BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]}; + if (r = blkdev.partition(buffer, sizeof(buffer)); r) { + (*pm)[prefix + "partition_path"] = "unknown"; + } else { + (*pm)[prefix + "partition_path"] = buffer; + } + buffer[0] = '\0'; + if (r = blkdev.partition(buffer, sizeof(buffer)); r) { + (*pm)[prefix + "dev_node"] = "unknown"; + } else { + (*pm)[prefix + "dev_node"] = buffer; + } + if (!r) { + return 0; + } + buffer[0] = '\0'; + blkdev.model(buffer, sizeof(buffer)); + (*pm)[prefix + "model"] = buffer; + + buffer[0] = '\0'; + blkdev.dev(buffer, sizeof(buffer)); + (*pm)[prefix + "dev"] = buffer; + + // nvme exposes a serial number + buffer[0] = '\0'; + blkdev.serial(buffer, sizeof(buffer)); + (*pm)[prefix + "serial"] = buffer; + + // numa + int node; + r = blkdev.get_numa_node(&node); + if (r >= 0) { + (*pm)[prefix + "numa_node"] = stringify(node); + } + } else { + (*pm)[prefix + "access_mode"] = "file"; + (*pm)[prefix + "path"] = path; + } + return 0; +} + +void HMSMRDevice::_detect_vdo() +{ + vdo_fd = get_vdo_stats_handle(devname.c_str(), &vdo_name); + if (vdo_fd >= 0) { + dout(1) << __func__ << " VDO volume " << vdo_name + << " maps to " << devname << dendl; + } else { + dout(20) << __func__ << " no VDO volume maps to " << devname << dendl; + } + return; +} + +bool HMSMRDevice::get_thin_utilization(uint64_t *total, uint64_t *avail) const +{ + if (vdo_fd < 0) { + return false; + } + return get_vdo_utilization(vdo_fd, total, avail); +} + +int HMSMRDevice::choose_fd(bool buffered, int write_hint) const +{ + assert(write_hint >= WRITE_LIFE_NOT_SET && write_hint < WRITE_LIFE_MAX); + if (!enable_wrt) + write_hint = WRITE_LIFE_NOT_SET; + return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint]; +} + +int HMSMRDevice::flush() +{ + // protect flush with a mutex. note that we are not really protecting + // data here. instead, we're ensuring that if any flush() caller + // sees that io_since_flush is true, they block any racing callers + // until the flush is observed. that allows racing threads to be + // calling flush while still ensuring that *any* of them that got an + // aio completion notification will not return before that aio is + // stable on disk: whichever thread sees the flag first will block + // followers until the aio is stable. + std::lock_guard l(flush_mutex); + + bool expect = true; + if (!io_since_flush.compare_exchange_strong(expect, false)) { + dout(10) << __func__ << " no-op (no ios since last flush), flag is " + << (int)io_since_flush.load() << dendl; + return 0; + } + + dout(10) << __func__ << " start" << dendl; + if (cct->_conf->bdev_inject_crash) { + ++injecting_crash; + // sleep for a moment to give other threads a chance to submit or + // wait on io that races with a flush. + derr << __func__ << " injecting crash. first we sleep..." << dendl; + sleep(cct->_conf->bdev_inject_crash_flush_delay); + derr << __func__ << " and now we die" << dendl; + cct->_log->flush(); + _exit(1); + } + utime_t start = ceph_clock_now(); + int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]); + utime_t end = ceph_clock_now(); + utime_t dur = end - start; + if (r < 0) { + r = -errno; + derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl; + ceph_abort(); + } + dout(5) << __func__ << " in " << dur << dendl;; + return r; +} + +int HMSMRDevice::_aio_start() +{ + if (aio) { + dout(10) << __func__ << dendl; + int r = io_queue->init(fd_directs); + if (r < 0) { + if (r == -EAGAIN) { + derr << __func__ << " io_setup(2) failed with EAGAIN; " + << "try increasing /proc/sys/fs/aio-max-nr" << dendl; + } else { + derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl; + } + return r; + } + aio_thread.create("bstore_aio"); + } + return 0; +} + +void HMSMRDevice::_aio_stop() +{ + if (aio) { + dout(10) << __func__ << dendl; + aio_stop = true; + aio_thread.join(); + aio_stop = false; + io_queue->shutdown(); + } +} + +int HMSMRDevice::_discard_start() +{ + discard_thread.create("bstore_discard"); + return 0; +} + +void HMSMRDevice::_discard_stop() +{ + dout(10) << __func__ << dendl; + { + std::unique_lock l(discard_lock); + while (!discard_started) { + discard_cond.wait(l); + } + discard_stop = true; + discard_cond.notify_all(); + } + discard_thread.join(); + { + std::lock_guard l(discard_lock); + discard_stop = false; + } + dout(10) << __func__ << " stopped" << dendl; +} + +void HMSMRDevice::discard_drain() +{ + dout(10) << __func__ << dendl; + std::unique_lock l(discard_lock); + while (!discard_queued.empty() || discard_running) { + discard_cond.wait(l); + } +} + +static bool is_expected_ioerr(const int r) +{ + // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135 + return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC || + r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO || + r == -ENODATA || r == -EILSEQ || r == -ENOMEM || +#if defined(__linux__) + r == -EREMCHG || r == -EBADE +#elif defined(__FreeBSD__) + r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE +#endif + ); +} + +void HMSMRDevice::_aio_thread() +{ + dout(10) << __func__ << " start" << dendl; + int inject_crash_count = 0; + while (!aio_stop) { + dout(40) << __func__ << " polling" << dendl; + int max = cct->_conf->bdev_aio_reap_max; + aio_t *aio[max]; + int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms, + aio, max); + if (r < 0) { + derr << __func__ << " got " << cpp_strerror(r) << dendl; + ceph_abort_msg("got unexpected error from io_getevents"); + } + if (r > 0) { + dout(30) << __func__ << " got " << r << " completed aios" << dendl; + for (int i = 0; i < r; ++i) { + IOContext *ioc = static_cast<IOContext*>(aio[i]->priv); + _aio_log_finish(ioc, aio[i]->offset, aio[i]->length); + if (aio[i]->queue_item.is_linked()) { + std::lock_guard l(debug_queue_lock); + debug_aio_unlink(*aio[i]); + } + + // set flag indicating new ios have completed. we do this *before* + // any completion or notifications so that any user flush() that + // follows the observed io completion will include this io. Note + // that an earlier, racing flush() could observe and clear this + // flag, but that also ensures that the IO will be stable before the + // later flush() occurs. + io_since_flush.store(true); + + long r = aio[i]->get_return_value(); + if (r < 0) { + derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")" + << dendl; + if (ioc->allow_eio && is_expected_ioerr(r)) { + derr << __func__ << " translating the error to EIO for upper layer" + << dendl; + ioc->set_return_value(-EIO); + } else { + if (is_expected_ioerr(r)) { + note_io_error_event( + devname.c_str(), + path.c_str(), + r, +#if defined(HAVE_POSIXAIO) + aio[i]->aio.aiocb.aio_lio_opcode, +#else + aio[i]->iocb.aio_lio_opcode, +#endif + aio[i]->offset, + aio[i]->length); + ceph_abort_msg( + "Unexpected IO error. " + "This may suggest a hardware issue. " + "Please check your kernel log!"); + } + ceph_abort_msg( + "Unexpected IO error. " + "This may suggest HW issue. Please check your dmesg!"); + } + } else if (aio[i]->length != (uint64_t)r) { + derr << "aio to 0x" << std::hex << aio[i]->offset + << "~" << aio[i]->length << std::dec + << " but returned: " << r << dendl; + ceph_abort_msg("unexpected aio return value: does not match length"); + } + + dout(10) << __func__ << " finished aio " << aio[i] << " r " << r + << " ioc " << ioc + << " with " << (ioc->num_running.load() - 1) + << " aios left" << dendl; + + // NOTE: once num_running and we either call the callback or + // call aio_wake we cannot touch ioc or aio[] as the caller + // may free it. + if (ioc->priv) { + if (--ioc->num_running == 0) { + aio_callback(aio_callback_priv, ioc->priv); + } + } else { + ioc->try_aio_wake(); + } + } + } + if (cct->_conf->bdev_debug_aio) { + utime_t now = ceph_clock_now(); + std::lock_guard l(debug_queue_lock); + if (debug_oldest) { + if (debug_stall_since == utime_t()) { + debug_stall_since = now; + } else { + if (cct->_conf->bdev_debug_aio_suicide_timeout) { + utime_t cutoff = now; + cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout; + if (debug_stall_since < cutoff) { + derr << __func__ << " stalled aio " << debug_oldest + << " since " << debug_stall_since << ", timeout is " + << cct->_conf->bdev_debug_aio_suicide_timeout + << "s, suicide" << dendl; + ceph_abort_msg("stalled aio... buggy kernel or bad device?"); + } + } + } + } + } + reap_ioc(); + if (cct->_conf->bdev_inject_crash) { + ++inject_crash_count; + if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 > + cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) { + derr << __func__ << " bdev_inject_crash trigger from aio thread" + << dendl; + cct->_log->flush(); + _exit(1); + } + } + } + reap_ioc(); + dout(10) << __func__ << " end" << dendl; +} + +void HMSMRDevice::_discard_thread() +{ + std::unique_lock l(discard_lock); + ceph_assert(!discard_started); + discard_started = true; + discard_cond.notify_all(); + while (true) { + ceph_assert(discard_finishing.empty()); + if (discard_queued.empty()) { + if (discard_stop) + break; + dout(20) << __func__ << " sleep" << dendl; + discard_cond.notify_all(); // for the thread trying to drain... + discard_cond.wait(l); + dout(20) << __func__ << " wake" << dendl; + } else { + discard_finishing.swap(discard_queued); + discard_running = true; + l.unlock(); + dout(20) << __func__ << " finishing" << dendl; + for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) { + discard(p.get_start(), p.get_len()); + } + + discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing)); + discard_finishing.clear(); + l.lock(); + discard_running = false; + } + } + dout(10) << __func__ << " finish" << dendl; + discard_started = false; +} + +int HMSMRDevice::queue_discard(interval_set<uint64_t> &to_release) +{ + if (!support_discard) + return -1; + + if (to_release.empty()) + return 0; + + std::lock_guard l(discard_lock); + discard_queued.insert(to_release); + discard_cond.notify_all(); + return 0; +} + +void HMSMRDevice::_aio_log_start( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length + << std::dec << dendl; + if (cct->_conf->bdev_debug_inflight_ios) { + std::lock_guard l(debug_lock); + if (debug_inflight.intersects(offset, length)) { + derr << __func__ << " inflight overlap of 0x" + << std::hex + << offset << "~" << length << std::dec + << " with " << debug_inflight << dendl; + ceph_abort(); + } + debug_inflight.insert(offset, length); + } +} + +void HMSMRDevice::debug_aio_link(aio_t& aio) +{ + if (debug_queue.empty()) { + debug_oldest = &aio; + } + debug_queue.push_back(aio); +} + +void HMSMRDevice::debug_aio_unlink(aio_t& aio) +{ + if (aio.queue_item.is_linked()) { + debug_queue.erase(debug_queue.iterator_to(aio)); + if (debug_oldest == &aio) { + auto age = cct->_conf->bdev_debug_aio_log_age; + if (age && debug_stall_since != utime_t()) { + utime_t cutoff = ceph_clock_now(); + cutoff -= age; + if (debug_stall_since < cutoff) { + derr << __func__ << " stalled aio " << debug_oldest + << " since " << debug_stall_since << ", timeout is " + << age + << "s" << dendl; + } + } + + if (debug_queue.empty()) { + debug_oldest = nullptr; + } else { + debug_oldest = &debug_queue.front(); + } + debug_stall_since = utime_t(); + } + } +} + +void HMSMRDevice::_aio_log_finish( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " " << aio << " 0x" + << std::hex << offset << "~" << length << std::dec << dendl; + if (cct->_conf->bdev_debug_inflight_ios) { + std::lock_guard l(debug_lock); + debug_inflight.erase(offset, length); + } +} + +void HMSMRDevice::aio_submit(IOContext *ioc) +{ + dout(20) << __func__ << " ioc " << ioc + << " pending " << ioc->num_pending.load() + << " running " << ioc->num_running.load() + << dendl; + + if (ioc->num_pending.load() == 0) { + return; + } + + // move these aside, and get our end iterator position now, as the + // aios might complete as soon as they are submitted and queue more + // wal aio's. + list<aio_t>::iterator e = ioc->running_aios.begin(); + ioc->running_aios.splice(e, ioc->pending_aios); + + int pending = ioc->num_pending.load(); + ioc->num_running += pending; + ioc->num_pending -= pending; + ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this + ceph_assert(ioc->pending_aios.size() == 0); + + if (cct->_conf->bdev_debug_aio) { + list<aio_t>::iterator p = ioc->running_aios.begin(); + while (p != e) { + dout(30) << __func__ << " " << *p << dendl; + std::lock_guard l(debug_queue_lock); + debug_aio_link(*p++); + } + } + + void *priv = static_cast<void*>(ioc); + int r, retries = 0; + r = io_queue->submit_batch(ioc->running_aios.begin(), e, + pending, priv, &retries); + + if (retries) + derr << __func__ << " retries " << retries << dendl; + if (r < 0) { + derr << " aio submit got " << cpp_strerror(r) << dendl; + ceph_assert(r == 0); + } +} + +int HMSMRDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint) +{ + uint64_t len = bl.length(); + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << (buffered ? " (buffered)" : " (direct)") << dendl; + if (cct->_conf->bdev_inject_crash && + rand() % cct->_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex + << off << "~" << len << std::dec << dendl; + ++injecting_crash; + return 0; + } + vector<iovec> iov; + bl.prepare_iov(&iov); + int r = ::pwritev(choose_fd(buffered, write_hint), + &iov[0], iov.size(), off); + + if (r < 0) { + r = -errno; + derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl; + return r; + } +#ifdef HAVE_SYNC_FILE_RANGE + if (buffered) { + // initiate IO and wait till it completes + r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE); + if (r < 0) { + r = -errno; + derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl; + return r; + } + } +#endif + + io_since_flush.store(true); + + return 0; +} + +int HMSMRDevice::write( + uint64_t off, + bufferlist &bl, + bool buffered, + int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << dendl; + ceph_assert(is_valid_io(off, len)); + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + + if ((!buffered || bl.get_num_buffers() >= IOV_MAX) && + bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) { + dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl; + } + dout(40) << "data: "; + bl.hexdump(*_dout); + *_dout << dendl; + + return _sync_write(off, bl, buffered, write_hint); +} + +int HMSMRDevice::aio_write( + uint64_t off, + bufferlist &bl, + IOContext *ioc, + bool buffered, + int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << dendl; + ceph_assert(is_valid_io(off, len)); + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + + if ((!buffered || bl.get_num_buffers() >= IOV_MAX) && + bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) { + dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl; + } + dout(40) << "data: "; + bl.hexdump(*_dout); + *_dout << dendl; + + _aio_log_start(ioc, off, len); + +#ifdef HAVE_LIBAIO + if (aio && dio && !buffered) { + if (cct->_conf->bdev_inject_crash && + rand() % cct->_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex + << off << "~" << len << std::dec + << dendl; + // generate a real io so that aio_wait behaves properly, but make it + // a read instead of write, and toss the result. + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + bufferptr p = buffer::create_small_page_aligned(len); + aio.bl.append(std::move(p)); + aio.bl.prepare_iov(&aio.iov); + aio.preadv(off, len); + ++injecting_crash; + } else { + if (bl.length() <= RW_IO_MAX) { + // fast path (non-huge write) + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + bl.prepare_iov(&aio.iov); + aio.bl.claim_append(bl); + aio.pwritev(off, len); + dout(30) << aio << dendl; + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << " aio " << &aio << dendl; + } else { + // write in RW_IO_MAX-sized chunks + uint64_t prev_len = 0; + while (prev_len < bl.length()) { + bufferlist tmp; + if (prev_len + RW_IO_MAX < bl.length()) { + tmp.substr_of(bl, prev_len, RW_IO_MAX); + } else { + tmp.substr_of(bl, prev_len, bl.length() - prev_len); + } + auto len = tmp.length(); + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + tmp.prepare_iov(&aio.iov); + aio.bl.claim_append(tmp); + aio.pwritev(off + prev_len, len); + dout(30) << aio << dendl; + dout(5) << __func__ << " 0x" << std::hex << off + prev_len + << "~" << len + << std::dec << " aio " << &aio << " (piece)" << dendl; + prev_len += len; + } + } + } + } else +#endif + { + int r = _sync_write(off, bl, buffered, write_hint); + _aio_log_finish(ioc, off, len); + if (r < 0) + return r; + } + return 0; +} + +int HMSMRDevice::discard(uint64_t offset, uint64_t len) +{ + int r = 0; + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + if (support_discard) { + dout(10) << __func__ + << " 0x" << std::hex << offset << "~" << len << std::dec + << dendl; + + r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len); + } + return r; +} + +int HMSMRDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << dendl; + ceph_assert(is_valid_io(off, len)); + + _aio_log_start(ioc, off, len); + + auto start1 = mono_clock::now(); + + auto p = buffer::ptr_node::create(buffer::create_small_page_aligned(len)); + int r = ::pread(buffered ? fd_buffereds[WRITE_LIFE_NOT_SET] : fd_directs[WRITE_LIFE_NOT_SET], + p->c_str(), len, off); + auto age = cct->_conf->bdev_debug_aio_log_age; + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " (buffered)" : " (direct)") + << " since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + + if (r < 0) { + if (ioc->allow_eio && is_expected_ioerr(r)) { + r = -EIO; + } else { + r = -errno; + } + goto out; + } + ceph_assert((uint64_t)r == len); + pbl->push_back(std::move(p)); + + dout(40) << "data: "; + pbl->hexdump(*_dout); + *_dout << dendl; + + out: + _aio_log_finish(ioc, off, len); + return r < 0 ? r : 0; +} + +int HMSMRDevice::aio_read( + uint64_t off, + uint64_t len, + bufferlist *pbl, + IOContext *ioc) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << dendl; + + int r = 0; +#ifdef HAVE_LIBAIO + if (aio && dio) { + ceph_assert(is_valid_io(off, len)); + _aio_log_start(ioc, off, len); + ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET])); + ++ioc->num_pending; + aio_t& aio = ioc->pending_aios.back(); + bufferptr p = buffer::create_small_page_aligned(len); + aio.bl.append(std::move(p)); + aio.bl.prepare_iov(&aio.iov); + aio.preadv(off, len); + dout(30) << aio << dendl; + pbl->append(aio.bl); + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << " aio " << &aio << dendl; + } else +#endif + { + r = read(off, len, pbl, ioc, false); + } + + return r; +} + +int HMSMRDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf) +{ + uint64_t aligned_off = p2align(off, block_size); + uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off; + bufferptr p = buffer::create_small_page_aligned(aligned_len); + int r = 0; + + auto start1 = mono_clock::now(); + r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off); + auto age = cct->_conf->bdev_debug_aio_log_age; + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << " since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + + if (r < 0) { + r = -errno; + derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " error: " << cpp_strerror(r) << dendl; + goto out; + } + ceph_assert((uint64_t)r == aligned_len); + memcpy(buf, p.c_str() + (off - aligned_off), len); + + dout(40) << __func__ << " data: "; + bufferlist bl; + bl.append(buf, len); + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r < 0 ? r : 0; +} + +int HMSMRDevice::read_random(uint64_t off, uint64_t len, char *buf, + bool buffered) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << "buffered " << buffered + << dendl; + ceph_assert(len > 0); + ceph_assert(off < size); + ceph_assert(off + len <= size); + int r = 0; + auto age = cct->_conf->bdev_debug_aio_log_age; + + //if it's direct io and unaligned, we have to use a internal buffer + if (!buffered && ((off % block_size != 0) + || (len % block_size != 0) + || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0))) + return direct_read_unaligned(off, len, buf); + + auto start1 = mono_clock::now(); + if (buffered) { + //buffered read + auto off0 = off; + char *t = buf; + uint64_t left = len; + while (left > 0) { + r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off); + if (r < 0) { + r = -errno; + derr << __func__ << " 0x" << std::hex << off << "~" << left + << std::dec << " error: " << cpp_strerror(r) << dendl; + goto out; + } + off += r; + t += r; + left -= r; + } + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off0 << "~" << len << std::dec + << " (buffered) since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + } else { + //direct and aligned read + r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off); + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << " (direct) since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + if (r < 0) { + r = -errno; + derr << __func__ << " direct_aligned_read" << " 0x" << std::hex + << off << "~" << left << std::dec << " error: " << cpp_strerror(r) + << dendl; + goto out; + } + ceph_assert((uint64_t)r == len); + } + + dout(40) << __func__ << " data: "; + bufferlist bl; + bl.append(buf, len); + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r < 0 ? r : 0; +} + +int HMSMRDevice::invalidate_cache(uint64_t off, uint64_t len) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << dendl; + ceph_assert(off % block_size == 0); + ceph_assert(len % block_size == 0); + int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED); + if (r) { + r = -r; + derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " error: " << cpp_strerror(r) << dendl; + } + return r; +} diff --git a/src/blk/zoned/HMSMRDevice.h b/src/blk/zoned/HMSMRDevice.h new file mode 100644 index 000000000..30941f2f9 --- /dev/null +++ b/src/blk/zoned/HMSMRDevice.h @@ -0,0 +1,163 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * Copyright (C) 2020 Abutalib Aghayev + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +// Copied from KernelDevice with HM-SMR specific functionality added. Will be +// further specialized for HM-SMR. + +#ifndef CEPH_BLK_HMSMRDEVICE_H +#define CEPH_BLK_HMSMRDEVICE_H + +#include <atomic> + +#include "include/types.h" +#include "include/interval_set.h" +#include "common/Thread.h" +#include "include/utime.h" + +#include "aio/aio.h" +#include "BlockDevice.h" + +#define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK) + +class HMSMRDevice final : public BlockDevice { + std::vector<int> fd_directs, fd_buffereds; + bool enable_wrt = true; + std::string path; + bool aio, dio; + + int vdo_fd = -1; ///< fd for vdo sysfs directory + string vdo_name; + + std::string devname; ///< kernel dev name (/sys/block/$devname), if any + + ceph::mutex debug_lock = ceph::make_mutex("HMSMRDevice::debug_lock"); + interval_set<uint64_t> debug_inflight; + + std::atomic<bool> io_since_flush = {false}; + ceph::mutex flush_mutex = ceph::make_mutex("HMSMRDevice::flush_mutex"); + + std::unique_ptr<io_queue_t> io_queue; + aio_callback_t discard_callback; + void *discard_callback_priv; + bool aio_stop; + bool discard_started; + bool discard_stop; + + ceph::mutex discard_lock = ceph::make_mutex("HMSMRDevice::discard_lock"); + ceph::condition_variable discard_cond; + bool discard_running = false; + interval_set<uint64_t> discard_queued; + interval_set<uint64_t> discard_finishing; + + struct AioCompletionThread : public Thread { + HMSMRDevice *bdev; + explicit AioCompletionThread(HMSMRDevice *b) : bdev(b) {} + void *entry() override { + bdev->_aio_thread(); + return NULL; + } + } aio_thread; + + struct DiscardThread : public Thread { + HMSMRDevice *bdev; + explicit DiscardThread(HMSMRDevice *b) : bdev(b) {} + void *entry() override { + bdev->_discard_thread(); + return NULL; + } + } discard_thread; + + std::atomic_int injecting_crash; + + void _aio_thread(); + void _discard_thread(); + int queue_discard(interval_set<uint64_t> &to_release) final; + + int _aio_start(); + void _aio_stop(); + + int _discard_start(); + void _discard_stop(); + + void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length); + void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length); + + int _sync_write(uint64_t off, bufferlist& bl, bool buffered, int write_hint); + + int _lock(); + + int direct_read_unaligned(uint64_t off, uint64_t len, char *buf); + + // stalled aio debugging + aio_list_t debug_queue; + ceph::mutex debug_queue_lock = + ceph::make_mutex("HMSMRDevice::debug_queue_lock"); + aio_t *debug_oldest = nullptr; + utime_t debug_stall_since; + void debug_aio_link(aio_t& aio); + void debug_aio_unlink(aio_t& aio); + + void _detect_vdo(); + int choose_fd(bool buffered, int write_hint) const; + + bool set_smr_params(const std::string& path); + +public: + HMSMRDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, + aio_callback_t d_cb, void *d_cbpriv); + static bool support(const std::string& path); + + void aio_submit(IOContext *ioc) final; + void discard_drain() final; + + int collect_metadata(const std::string& prefix, + map<std::string,std::string> *pm) const final; + int get_devname(std::string *s) const final { + if (devname.empty()) { + return -ENOENT; + } + *s = devname; + return 0; + } + int get_devices(std::set<std::string> *ls) const final; + + bool is_smr() const final { return true; } + + bool get_thin_utilization(uint64_t *total, uint64_t *avail) const final; + + int read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) final; + int aio_read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc) final; + int read_random(uint64_t off, uint64_t len, char *buf, + bool buffered) final; + + int write(uint64_t off, bufferlist& bl, bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) final; + int aio_write(uint64_t off, bufferlist& bl, + IOContext *ioc, + bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) final; + int flush() final; + int discard(uint64_t offset, uint64_t len) final; + + // for managing buffered readers/writers + int invalidate_cache(uint64_t off, uint64_t len) final; + int open(const std::string& path) final; + void close() final; +}; + +#endif //CEPH_BLK_HMSMRDEVICE_H diff --git a/src/blkin/CMakeLists.txt b/src/blkin/CMakeLists.txt new file mode 100644 index 000000000..d9a25b1cb --- /dev/null +++ b/src/blkin/CMakeLists.txt @@ -0,0 +1,9 @@ +cmake_minimum_required(VERSION 2.8.11) + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/modules/") +find_package(LTTng REQUIRED) + +# make && make test +enable_testing() + +add_subdirectory(blkin-lib) diff --git a/src/blkin/COPYRIGHT b/src/blkin/COPYRIGHT new file mode 100644 index 000000000..55828c9af --- /dev/null +++ b/src/blkin/COPYRIGHT @@ -0,0 +1,27 @@ +Copyright 2014 Marios Kogias <marioskogias@gmail.com> +All rights reserved. + +Redistribution and use in source and binary forms, with or +without modification, are permitted provided that the following +conditions are met: + + 1. Redistributions of source code must retain the above + copyright notice, this list of conditions and the following + disclaimer. + 2. Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials + provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS +OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. diff --git a/src/blkin/README.md b/src/blkin/README.md new file mode 100644 index 000000000..2c8a6c99e --- /dev/null +++ b/src/blkin/README.md @@ -0,0 +1,59 @@ +#blkin + +BlkKin is a project that enables you to trace low-overhead applications using +LTTng following the tracing semantics that are described in Google's [Dapper +Paper](http://static.googleusercontent.com/media/research.google.com/el//pubs/archive/36356.pdf) + +According to this paper the logged information is called `Annotation` and +belongs to a specific span and trace. Each trace is comprised of multiple spans +which are related with each other with causal relationships. So, the BlkKin +library gives the user the API to easily instrument C/C++ applications. In +order to instrument applications you should take a look at ``blkin-lib/tests`` +for some testcases and at the ``blkin-lib/zipkin_c.h`` file + +As a tracing backend BlkKin uses LTTng. So you must have LTTng installed. + +In order to build and install the lib, go to blkin-lib folder and: + +``` +make +make install +``` + +You should take a look at the examples to find out how to link the blkin lib +with your instrumented application. + +In order to visualize the aggregated information you can use Twitter's +[Zipkin](http://twitter.github.io/zipkin/) and send the data that you created, +by running the equivalent babeltrace plugin. In order to do you can run + +``` +./zipkin/src/babeltrace_zipkin.py </path/to/lttng/traces> -s <server_ip> +-p <port_number> + +``` + +within the babeltrace-plugins directory. + +In case you have not used the blkin-lib to instrument your application, you can +still send your data to a Scribe server. To do that you can use another +Babeltrace plugin. This plugin tranforms LTTng trace data to a JSON format and +sends them to a Scribe sever.To do so we can equivalently run + +``` +./json/src/babeltrace_json.py </path/to/lttng/traces> -s <server_ip> +-p <port_number> +``` +within the babeltrace-plugins directory + +Both of these plugins require that you have installed Babeltrace with its +Python bindings enabled. +The path to the lttng traces should not be the root directory but the directory +where the channel directories are included. + +## Dependencies + +* libboost-all-dev +* lttng-tools + +Note that BlkKin is tested only with LTTng2.4 diff --git a/src/blkin/babeltrace-plugins/.gitignore b/src/blkin/babeltrace-plugins/.gitignore new file mode 100644 index 000000000..c9b568f7e --- /dev/null +++ b/src/blkin/babeltrace-plugins/.gitignore @@ -0,0 +1,2 @@ +*.pyc +*.swp diff --git a/src/blkin/babeltrace-plugins/json/README.md b/src/blkin/babeltrace-plugins/json/README.md new file mode 100644 index 000000000..be0bbe2c4 --- /dev/null +++ b/src/blkin/babeltrace-plugins/json/README.md @@ -0,0 +1,5 @@ +babeltrace-json plugin +======================== + +This plugin enables us to send LTTng trace data to a Scribe server in a valid +json format diff --git a/src/blkin/babeltrace-plugins/json/src/babeltrace_json.py b/src/blkin/babeltrace-plugins/json/src/babeltrace_json.py new file mode 100755 index 000000000..0d21e0181 --- /dev/null +++ b/src/blkin/babeltrace-plugins/json/src/babeltrace_json.py @@ -0,0 +1,88 @@ +#!/usr/bin/python +# babeltrace_zipkin.py + +import sys +sys.path.append("../../babeltrace-plugins") +import json +import getopt +from babeltrace import * +from scribe_client import ScribeClient + +HELP = "Usage: python babeltrace_zipkin.py path/to/file -s <server> -p <port>" +CATEGORY = "LTTng" + + +def main(argv): + try: + path = argv[0] + except: + raise TypeError(HELP) + + try: + opts, args = getopt.getopt(argv[1:], "hs:p:") + except getopt.GetoptError: + raise TypeError(HELP) + + server = None + port = None + for opt, arg in opts: + if opt == '-h': + raise TypeError(HELP) + elif opt == '-s': + server = arg + elif opt == '-p': + port = arg + + if not server: + server = "localhost" + if not port: + port = 1463 + + # Open connection with scribe + scribe_client = ScribeClient(port, server) + + # Create TraceCollection and add trace: + traces = TraceCollection() + trace_handle = traces.add_trace(path, "ctf") + if trace_handle is None: + raise IOError("Error adding trace") + + #iterate over events + for event in traces.events: + data = dict() + + data["parent_span_id"]= event["parent_span_id"] + data['name'] = event["trace_name"] + data ["trace_id"] = event["trace_id"] + data["span_id"] = event["span_id"] + data['port'] = event['port_no'] + data['service_name'] = event['service_name'] + data['ip'] = event['ip'] + data['evemt'] = event['event'] + data['timestamp'] = event.timestamp + ''' + for k, v in event.items(): + field_type = event._field(k).type + data[k] = format_value(field_type, v) + ''' + json_data = json.dumps(data) + + #send data to scribe + scribe_client.log(CATEGORY, json_data) + + scribe_client.close() + + +def format_value(field_type, value): + + if field_type == 1: + return int(value) + elif field_type == 2: + return float(value) + elif field_type == 8: + return [x for x in value] + else: + return str(value) + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/src/blkin/babeltrace-plugins/scribe_client/__init__.py b/src/blkin/babeltrace-plugins/scribe_client/__init__.py new file mode 100644 index 000000000..503803846 --- /dev/null +++ b/src/blkin/babeltrace-plugins/scribe_client/__init__.py @@ -0,0 +1 @@ +from scribe_client import * diff --git a/src/blkin/babeltrace-plugins/scribe_client/scribe_client.py b/src/blkin/babeltrace-plugins/scribe_client/scribe_client.py new file mode 100644 index 000000000..b382ed995 --- /dev/null +++ b/src/blkin/babeltrace-plugins/scribe_client/scribe_client.py @@ -0,0 +1,31 @@ +#!/usr/bin/python +# scribe_client.py + +from scribe import scribe +from thrift.transport import TTransport, TSocket +from thrift.protocol import TBinaryProtocol + +class ScribeClient(object): + + def __init__(self, port, host): + print host + self.port = port + self.host = host + self.openConnection() + + def openConnection(self): + socket = TSocket.TSocket(host=self.host, port=self.port) + self.transport = TTransport.TFramedTransport(socket) + protocol = TBinaryProtocol.TBinaryProtocol(trans=self.transport, + strictRead=False, + strictWrite=False) + self.client = scribe.Client(protocol) + self.transport.open() + + def log(self, category, message): + log_entry = scribe.LogEntry(category, message) + result = self.client.Log(messages=[log_entry]) + return result # 0 for success + + def close(self): + self.transport.close() diff --git a/src/blkin/babeltrace-plugins/zipkin/README.md b/src/blkin/babeltrace-plugins/zipkin/README.md new file mode 100644 index 000000000..95cebe87b --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/README.md @@ -0,0 +1,6 @@ +babeltrace-zipkin plugin +======================== + +In order to use this plugin, the traces created by LTTng should follow a +specific format. This format is provided in zipkin_trace.h file. If this +format is not followed the traces will be dropped. diff --git a/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py new file mode 100755 index 000000000..5677338c9 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py @@ -0,0 +1,69 @@ +#!/usr/bin/python +# babeltrace_zipkin.py + +import sys +sys.path.append("../../babeltrace-plugins") +import sys +import getopt +from babeltrace import * +from zipkin_logic.zipkin_client import ZipkinClient +HELP = "Usage: python babeltrace_zipkin.py path/to/file -s <server> -p <port>" + + +def main(argv): + try: + path = argv[0] + except: + raise TypeError(HELP) + + try: + opts, args = getopt.getopt(argv[1:], "hs:p:") + except getopt.GetoptError: + raise TypeError(HELP) + + server = None + port = None + for opt, arg in opts: + if opt == '-h': + raise TypeError(HELP) + elif opt == '-s': + server = arg + elif opt == '-p': + port = arg + + if not server: + server = "83.212.113.88" + if not port: + port = 1463 + + # Open connection with scribe + zipkin = ZipkinClient(port, server) + + # Create TraceCollection and add trace: + traces = TraceCollection() + trace_handle = traces.add_trace(path, "ctf") + if trace_handle is None: + raise IOError("Error adding trace") + + for event in traces.events: + name = event.name + try: + provider, kind = name.split(":") + if provider != "zipkin": + raise + except: + continue + + #create a zipkin trace from event info + trace = zipkin.create_trace(event) + + #create a zipkin annotation from event info + annotation = zipkin.create_annotation(event, kind) + + #record the trace + zipkin.record(trace, annotation) + + zipkin.close() + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py new file mode 100644 index 000000000..70874c0b6 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py @@ -0,0 +1,131 @@ +# Copyright 2012 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import struct +import socket + +from thrift.protocol import TBinaryProtocol +from thrift.transport import TTransport + +import ttypes + + +def hex_str(n): + return '%0.16x' % (n,) + + +def json_formatter(traces, *json_args, **json_kwargs): + json_traces = [] + + for (trace, annotations) in traces: + json_trace = { + 'trace_id': hex_str(trace.trace_id), + 'span_id': hex_str(trace.span_id), + 'name': trace.name, + 'annotations': [] + } + + if trace.parent_span_id: + json_trace['parent_span_id'] = hex_str(trace.parent_span_id) + + for annotation in annotations: + json_annotation = { + 'key': annotation.name, + 'value': annotation.value, + 'type': annotation.annotation_type + } + + if annotation.endpoint: + json_annotation['host'] = { + 'ipv4': annotation.endpoint.ipv4, + 'port': annotation.endpoint.port, + 'service_name': annotation.endpoint.service_name + } + + json_trace['annotations'].append(json_annotation) + + json_traces.append(json_trace) + + return json.dumps(json_traces, *json_args, **json_kwargs) + + +def ipv4_to_int(ipv4): + return struct.unpack('!i', socket.inet_aton(ipv4))[0] + + +def base64_thrift(thrift_obj): + trans = TTransport.TMemoryBuffer() + tbp = TBinaryProtocol.TBinaryProtocol(trans) + + thrift_obj.write(tbp) + res = trans.getvalue().encode('base64').strip() + res = res.replace("\n","") + #print res + #print len(res) + return res + #return trans.getvalue().encode('base64').strip() + + +def binary_annotation_formatter(annotation, host=None): + annotation_types = { + 'string': ttypes.AnnotationType.STRING, + 'bytes': ttypes.AnnotationType.BYTES, + } + + annotation_type = annotation_types[annotation.annotation_type] + + value = annotation.value + + if isinstance(value, unicode): + value = value.encode('utf-8') + + return ttypes.BinaryAnnotation( + annotation.name, + value, + annotation_type, + host) + + +def base64_thrift_formatter(trace, annotations): + thrift_annotations = [] + binary_annotations = [] + + for annotation in annotations: + host = None + if annotation.endpoint: + host = ttypes.Endpoint( + ipv4=ipv4_to_int(annotation.endpoint.ipv4), + port=annotation.endpoint.port, + service_name=annotation.endpoint.service_name) + + if annotation.annotation_type == 'timestamp': + thrift_annotations.append(ttypes.Annotation( + timestamp=annotation.value, + value=annotation.name, + host=host)) + else: + binary_annotations.append( + binary_annotation_formatter(annotation, host)) + + thrift_trace = ttypes.Span( + trace_id=trace.trace_id, + name=trace.name, + id=trace.span_id, + parent_id=trace.parent_span_id, + annotations=thrift_annotations, + binary_annotations=binary_annotations + ) + + return base64_thrift(thrift_trace) diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py new file mode 100644 index 000000000..e0a3ed2c3 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py @@ -0,0 +1,137 @@ +import math +import time +import random + + +class Trace(object): + """ + An L{ITrace} provider which delegates to zero or more L{ITracers} and + allows setting a default L{IEndpoint} to associate with L{IAnnotation}s + + @ivar _tracers: C{list} of one or more L{ITracer} providers. + @ivar _endpoint: An L{IEndpoint} provider. + """ + def __init__(self, name, trace_id=None, span_id=None, + parent_span_id=None, tracers=None): + """ + @param name: C{str} describing the current span. + @param trace_id: C{int} or C{None} + @param span_id: C{int} or C{None} + @param parent_span_id: C{int} or C{None} + + @param tracers: C{list} of L{ITracer} providers, primarily useful + for unit testing. + """ + self.name = name + # If no trace_id and span_id are given we want to generate new + # 64-bit integer ids. + self.trace_id = trace_id + self.span_id = span_id + + # If no parent_span_id is given then we assume there is no parent span + # and leave it as None. + self.parent_span_id = parent_span_id + + # If no tracers are given we get the global list of tracers. + self._tracers = tracers + + # By default no endpoint will be associated with annotations recorded + # to this trace. + self._endpoint = None + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return ( + '{0.__class__.__name__}({0.name!r}, trace_id={0.trace_id!r}, ' + 'span_id={0.span_id!r}, parent_span_id={0.parent_span_id!r})' + ).format(self) + + def set_endpoint(self, endpoint): + """ + Set a default L{IEndpoint} provider for the current L{Trace}. + All annotations recorded after this endpoint is set will use it, + unless they provide their own endpoint. + """ + self._endpoint = endpoint + + +class Endpoint(object): + + def __init__(self, ipv4, port, service_name): + """ + @param ipv4: C{str} ipv4 address. + @param port: C{int} port number. + @param service_name: C{str} service name. + """ + self.ipv4 = ipv4 + self.port = port + self.service_name = service_name + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return ('{0.__class__.__name__}({0.ipv4!r}, {0.port!r}, ' + '{0.service_name!r})').format(self) + + +class Annotation(object): + + def __init__(self, name, value, annotation_type, endpoint=None): + """ + @param name: C{str} name of this annotation. + + @param value: A value of the appropriate type based on + C{annotation_type}. + + @param annotation_type: C{str} the expected type of our C{value}. + + @param endpoint: An optional L{IEndpoint} provider to associate with + this annotation or C{None} + """ + self.name = name + self.value = value + self.annotation_type = annotation_type + self.endpoint = endpoint + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return ( + '{0.__class__.__name__}({0.name!r}, {0.value!r}, ' + '{0.annotation_type!r}, {0.endpoint})' + ).format(self) + + @classmethod + def timestamp(cls, name, timestamp=None): + if timestamp is None: + timestamp = math.trunc(time.time() * 1000 * 1000) + + return cls(name, timestamp, 'timestamp') + + @classmethod + def client_send(cls, timestamp=None): + return cls.timestamp(constants.CLIENT_SEND, timestamp) + + @classmethod + def client_recv(cls, timestamp=None): + return cls.timestamp(constants.CLIENT_RECV, timestamp) + + @classmethod + def server_send(cls, timestamp=None): + return cls.timestamp(constants.SERVER_SEND, timestamp) + + @classmethod + def server_recv(cls, timestamp=None): + return cls.timestamp(constants.SERVER_RECV, timestamp) + + @classmethod + def string(cls, name, value): + return cls(name, value, 'string') + + @classmethod + def bytes(cls, name, value): + return cls(name, value, 'bytes') diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py new file mode 100644 index 000000000..8605559b7 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py @@ -0,0 +1,453 @@ +# +# Autogenerated by Thrift Compiler (0.8.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py:twisted +# + +from thrift.Thrift import TType, TMessageType, TException + +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol, TProtocol +try: + from thrift.protocol import fastbinary +except: + fastbinary = None + + +class AnnotationType: + BOOL = 0 + BYTES = 1 + I16 = 2 + I32 = 3 + I64 = 4 + DOUBLE = 5 + STRING = 6 + + _VALUES_TO_NAMES = { + 0: "BOOL", + 1: "BYTES", + 2: "I16", + 3: "I32", + 4: "I64", + 5: "DOUBLE", + 6: "STRING", + } + + _NAMES_TO_VALUES = { + "BOOL": 0, + "BYTES": 1, + "I16": 2, + "I32": 3, + "I64": 4, + "DOUBLE": 5, + "STRING": 6, + } + + +class Endpoint: + """ + Attributes: + - ipv4 + - port + - service_name + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'ipv4', None, None, ), # 1 + (2, TType.I16, 'port', None, None, ), # 2 + (3, TType.STRING, 'service_name', None, None, ), # 3 + ) + + def __init__(self, ipv4=None, port=None, service_name=None,): + self.ipv4 = ipv4 + self.port = port + self.service_name = service_name + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.ipv4 = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I16: + self.port = iprot.readI16(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.service_name = iprot.readString(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Endpoint') + if self.ipv4 is not None: + oprot.writeFieldBegin('ipv4', TType.I32, 1) + oprot.writeI32(self.ipv4) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.I16, 2) + oprot.writeI16(self.port) + oprot.writeFieldEnd() + if self.service_name is not None: + oprot.writeFieldBegin('service_name', TType.STRING, 3) + oprot.writeString(self.service_name) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Annotation: + """ + Attributes: + - timestamp + - value + - host + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'timestamp', None, None, ), # 1 + (2, TType.STRING, 'value', None, None, ), # 2 + (3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3 + ) + + def __init__(self, timestamp=None, value=None, host=None,): + self.timestamp = timestamp + self.value = value + self.host = host + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.timestamp = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Annotation') + if self.timestamp is not None: + oprot.writeFieldBegin('timestamp', TType.I64, 1) + oprot.writeI64(self.timestamp) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeString(self.value) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 3) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class BinaryAnnotation: + """ + Attributes: + - key + - value + - annotation_type + - host + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'key', None, None, ), # 1 + (2, TType.STRING, 'value', None, None, ), # 2 + (3, TType.I32, 'annotation_type', None, None, ), # 3 + (4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4 + ) + + def __init__(self, key=None, value=None, annotation_type=None, host=None,): + self.key = key + self.value = value + self.annotation_type = annotation_type + self.host = host + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.key = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.annotation_type = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('BinaryAnnotation') + if self.key is not None: + oprot.writeFieldBegin('key', TType.STRING, 1) + oprot.writeString(self.key) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeString(self.value) + oprot.writeFieldEnd() + if self.annotation_type is not None: + oprot.writeFieldBegin('annotation_type', TType.I32, 3) + oprot.writeI32(self.annotation_type) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 4) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Span: + """ + Attributes: + - trace_id + - name + - id + - parent_id + - annotations + - binary_annotations + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'trace_id', None, None, ), # 1 + None, # 2 + (3, TType.STRING, 'name', None, None, ), # 3 + (4, TType.I64, 'id', None, None, ), # 4 + (5, TType.I64, 'parent_id', None, None, ), # 5 + (6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6 + None, # 7 + (8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8 + ) + + def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None,): + self.trace_id = trace_id + self.name = name + self.id = id + self.parent_id = parent_id + self.annotations = annotations + self.binary_annotations = binary_annotations + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.trace_id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.name = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.parent_id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.annotations = [] + (_etype3, _size0) = iprot.readListBegin() + for _i4 in xrange(_size0): + _elem5 = Annotation() + _elem5.read(iprot) + self.annotations.append(_elem5) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.LIST: + self.binary_annotations = [] + (_etype9, _size6) = iprot.readListBegin() + for _i10 in xrange(_size6): + _elem11 = BinaryAnnotation() + _elem11.read(iprot) + self.binary_annotations.append(_elem11) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Span') + if self.trace_id is not None: + oprot.writeFieldBegin('trace_id', TType.I64, 1) + oprot.writeI64(self.trace_id) + oprot.writeFieldEnd() + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 3) + oprot.writeString(self.name) + oprot.writeFieldEnd() + if self.id is not None: + oprot.writeFieldBegin('id', TType.I64, 4) + oprot.writeI64(self.id) + oprot.writeFieldEnd() + if self.parent_id is not None: + oprot.writeFieldBegin('parent_id', TType.I64, 5) + oprot.writeI64(self.parent_id) + oprot.writeFieldEnd() + if self.annotations is not None: + oprot.writeFieldBegin('annotations', TType.LIST, 6) + oprot.writeListBegin(TType.STRUCT, len(self.annotations)) + for iter12 in self.annotations: + iter12.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.binary_annotations is not None: + oprot.writeFieldBegin('binary_annotations', TType.LIST, 8) + oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations)) + for iter13 in self.binary_annotations: + iter13.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py new file mode 100644 index 000000000..28118facb --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py @@ -0,0 +1,70 @@ +#!/usr/bin/python + +from scribe_client import ScribeClient +from trace import Annotation, Trace, Endpoint +from collections import defaultdict +from formatters import base64_thrift_formatter + + +class ZipkinClient(ScribeClient): + + DEFAULT_END_ANNOTATIONS = ("ss", "cr", "end") + + def __init__(self, port, host): + super(ZipkinClient, self).__init__(port, host) + self._annotations_for_trace = defaultdict(list) + + def create_trace(self, event): + service = event["trace_name"] + trace_id = event["trace_id"] + span_id = event["span_id"] + parent_span = event["parent_span_id"] + if parent_span == 0: + parent_span = None + trace = Trace(service, trace_id, span_id, parent_span) + return trace + + def create_annotation(self, event, kind): + if kind == "keyval_string": + key = event["key"] + val = event["val"] + annotation = Annotation.string(key, val) + elif kind == "keyval_integer": + key = event["key"] + val = str(event["val"]) + annotation = Annotation.string(key, val) + elif kind == "timestamp": + timestamp = event.timestamp + #timestamp has different digit length + timestamp = str(timestamp) + timestamp = timestamp[:-3] + event_name = event["event"] + annotation = Annotation.timestamp(event_name, int(timestamp)) + + # create and set endpoint + port = event["port_no"] + service = event["service_name"] + ip = event["ip"] + endpoint = Endpoint(ip, int(port), service) + annotation.endpoint = endpoint + + print annotation + return annotation + + def record(self, trace, annotation): + self.scribe_log(trace, [annotation]) + ''' + trace_key = (trace.trace_id, trace.span_id) + self._annotations_for_trace[trace_key].append(annotation) + if (annotation.name in self.DEFAULT_END_ANNOTATIONS): + saved_annotations = self._annotations_for_trace[trace_key] + del self._annotations_for_trace[trace_key] + self.scribe_log(trace, saved_annotations) + print "Record event" + ''' + + def scribe_log(self, trace, annotations): + trace._endpoint = None + message = base64_thrift_formatter(trace, annotations) + category = 'zipkin' + return self.log(category, message) diff --git a/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h new file mode 100644 index 000000000..4abc87b87 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h @@ -0,0 +1,67 @@ +/* + * Zipkin lttng-ust tracepoint provider. + */ + +#undef TRACEPOINT_PROVIDER +#define TRACEPOINT_PROVIDER zipkin + +#undef TRACEPOINT_INCLUDE +#define TRACEPOINT_INCLUDE "./zipkin_trace.h" + +#if !defined(_ZIPKIN_H) || defined(TRACEPOINT_HEADER_MULTI_READ) +#define _ZIPKIN_H + +#include <lttng/tracepoint.h> + +TRACEPOINT_EVENT( + zipkin, + keyval, + TP_ARGS(char *, service, char *, trace_name, + int, port, char *, ip, long, trace, + long, span, long, parent_span, + char *, key, char *, val ), + + TP_FIELDS( + ctf_string(trace_name, trace_name) + ctf_string(service_name, service) + ctf_integer(int, port_no, port) + ctf_string(ip, ip) + ctf_integer(long, trace_id, trace) + ctf_integer(long, span_id, span) + ctf_integer(long, parent_span_id, parent_span) + ctf_string(key, key) + ctf_string(val, val) + ) +) +TRACEPOINT_LOGLEVEL( + zipkin, + keyval, + TRACE_WARNING) + + +TRACEPOINT_EVENT( + zipkin, + timestamp, + TP_ARGS(char *, service, char *, trace_name, + int, port, char *, ip, long, trace, + long, span, long, parent_span, + char *, event), + + TP_FIELDS( + ctf_string(trace_name, trace_name) + ctf_string(service_name, service) + ctf_integer(int, port_no, port) + ctf_string(ip, ip) + ctf_integer(long, trace_id, trace) + ctf_integer(long, span_id, span) + ctf_integer(long, parent_span_id, parent_span) + ctf_string(event, event) + ) +) +TRACEPOINT_LOGLEVEL( + zipkin, + timestamp, + TRACE_WARNING) +#endif /* _ZIPKIN_H */ + +#include <lttng/tracepoint-event.h> diff --git a/src/blkin/blkin-lib/CMakeLists.txt b/src/blkin/blkin-lib/CMakeLists.txt new file mode 100644 index 000000000..d2e1d0f73 --- /dev/null +++ b/src/blkin/blkin-lib/CMakeLists.txt @@ -0,0 +1,18 @@ +include_directories(.) + +#blkin +set(blkin_srcs + zipkin_c.c + tp.c +) +add_library(blkin ${blkin_srcs}) +set_target_properties(blkin PROPERTIES POSITION_INDEPENDENT_CODE ON) +target_link_libraries(blkin dl ${LTTNG_LIBRARIES}) + +set(blkin_headers + zipkin_c.h + zipkin_trace.h + ztracer.hpp +) + +add_subdirectory(tests) diff --git a/src/blkin/blkin-lib/Makefile b/src/blkin/blkin-lib/Makefile new file mode 100644 index 000000000..52852f449 --- /dev/null +++ b/src/blkin/blkin-lib/Makefile @@ -0,0 +1,92 @@ +# Copyright 2014 Marios Kogias <marioskogias@gmail.com> +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +.PHONY: default clean distclean run run_c run_pp + +MAJOR=0 +MINOR=1 +LIBS= -ldl -llttng-ust +DLIB=libblkin + +LIB_DIR=$(shell pwd) +TEST_DIR=$(shell pwd)/tests +prefix= /usr/local +libdir= $(prefix)/lib +incdir= $(prefix)/include + +H_FILES= zipkin_c.h zipkin_trace.h ztracer.hpp + +default: $(DLIB).so test testpp testppp + +$(DLIB).so: $(DLIB).$(MAJOR).so + ln -sf $< $@ + +$(DLIB).$(MAJOR).so: $(DLIB).$(MAJOR).$(MINOR).so + ln -sf $< $@ + +$(DLIB).$(MAJOR).$(MINOR).so: zipkin_c.o tp.o + g++ -shared -o $@ $^ $(LIBS) + +zipkin_c.o: zipkin_c.c zipkin_c.h zipkin_trace.h + gcc -I. -Wall -fpic -g -c -o $@ $< + +tp.o: tp.c zipkin_trace.h + gcc -I. -fpic -g -c -o $@ $< + +test: $(TEST_DIR)/test.c $(DLIB).so + make -C tests test + +testpp: $(TEST_DIR)/test.cc $(DLIB).so + make -C tests testpp + +testppp: $(TEST_DIR)/test_p.cc $(DLIB).so + make -C tests testppp + +run_c: + make -C tests run_c + +run_pp: + make -C tests run_pp + +run_ppp: + make -C tests run_ppp + +run: run_c run_pp + +install: + install -m 644 $(DLIB).$(MAJOR).$(MINOR).so $(DESTDIR)/$(libdir) + cp -P $(DLIB).$(MAJOR).so $(DESTDIR)/$(libdir) + cp -P $(DLIB).so $(DESTDIR)/$(libdir) + install -m 644 $(H_FILES) $(DESTDIR)/$(incdir) + +clean: + rm -f *.o *.so + make -C tests clean + +distclean: clean + rm -f socket diff --git a/src/blkin/blkin-lib/tests/CMakeLists.txt b/src/blkin/blkin-lib/tests/CMakeLists.txt new file mode 100644 index 000000000..d69cec3da --- /dev/null +++ b/src/blkin/blkin-lib/tests/CMakeLists.txt @@ -0,0 +1,15 @@ +#test +add_executable(testc test.c) +target_link_libraries(testc blkin lttng-ust) +add_test(NAME testc COMMAND $<TARGET_FILE:testc>) + +#testpp +add_executable(testpp test.cc) +set_target_properties(testpp PROPERTIES COMPILE_FLAGS "-std=c++11") +target_link_libraries(testpp blkin lttng-ust pthread) +add_test(NAME testpp COMMAND $<TARGET_FILE:testpp>) + +#testppp +add_executable(testppp test_p.cc) +target_link_libraries(testppp blkin lttng-ust) +add_test(NAME testppp COMMAND $<TARGET_FILE:testppp>) diff --git a/src/blkin/blkin-lib/tests/Makefile b/src/blkin/blkin-lib/tests/Makefile new file mode 100644 index 000000000..bed3fc869 --- /dev/null +++ b/src/blkin/blkin-lib/tests/Makefile @@ -0,0 +1,55 @@ +# Copyright 2014 Marios Kogias <marioskogias@gmail.com> +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +.PHONY: default clean distclean run run_c run_pp + +LIB_DIR=$(shell pwd)/.. +DLIB=$(LIB_DIR)/libblkin + +test: test.c $(DLIB).so + gcc test.c -o test -g -I$(LIB_DIR) -L$(LIB_DIR) -lblkin + +testpp: test.cc $(DLIB).so + LD_LIBRARY_PATH=$(LIB_DIR) g++ $< -o testpp -std=c++11 -g -I$(LIB_DIR) -L$(LIB_DIR) -lblkin -lpthread + +testppp: test_p.cc $(DLIB).so + LD_LIBRARY_PATH=$(LIB_DIR) g++ $< -o testppp -g -I$(LIB_DIR) -L$(LIB_DIR) -lblkin + +run_c: + LD_LIBRARY_PATH=$(LIB_DIR) ./test + +run_pp: + LD_LIBRARY_PATH=$(LIB_DIR) ./testpp + +run_ppp: + LD_LIBRARY_PATH=$(LIB_DIR) ./testppp + +run: run_c run_pp + +clean: + rm -f *.o *.so test testpp testppp socket diff --git a/src/blkin/blkin-lib/tests/test.c b/src/blkin/blkin-lib/tests/test.c new file mode 100644 index 000000000..2cabd99bf --- /dev/null +++ b/src/blkin/blkin-lib/tests/test.c @@ -0,0 +1,200 @@ +/* + * Copyright 2014 Marios Kogias <marioskogias@gmail.com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +/* + * In this example we have 2 processes communicating over a unix socket. + * We are going to trace the communication with our library + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> + +#include <zipkin_c.h> + +#define SOCK_PATH "/tmp/socket" + +struct message { + char actual_message[20]; + struct blkin_trace_info trace_info; +}; + +void process_a() +{ + int i, r; + printf("I am process A: %d\n", getpid()); + + r = blkin_init(); + if (r < 0) { + fprintf(stderr, "Could not initialize blkin\n"); + exit(1); + } + + /*initialize endpoint*/ + struct blkin_endpoint endp; + blkin_init_endpoint(&endp, "10.0.0.1", 5000, "service a"); + + struct blkin_trace trace; + struct blkin_annotation ant; + struct message msg = {.actual_message = "message"}; + char ack; + + /*create and bind socket*/ + int s, s2, len; + socklen_t t; + struct sockaddr_un local, remote; + + if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + perror("socket"); + exit(1); + } + + local.sun_family = AF_UNIX; + strcpy(local.sun_path, SOCK_PATH); + unlink(local.sun_path); + len = strlen(local.sun_path) + sizeof(local.sun_family); + if (bind(s, (struct sockaddr *)&local, len) == -1) { + perror("bind"); + exit(1); + } + + if (listen(s, 5) == -1) { + perror("listen"); + exit(1); + } + + printf("Waiting for a connection...\n"); + t = sizeof(remote); + if ((s2 = accept(s, (struct sockaddr *)&remote, &t)) == -1) { + perror("accept"); + exit(1); + } + + printf("Connected.\n"); + + for (i=0;i<10;i++) { + + /*create trace*/ + blkin_init_new_trace(&trace, "process a", &endp); + + blkin_init_timestamp_annotation(&ant, "start", &endp); + blkin_record(&trace, &ant); + + /*set trace fields to message*/ + blkin_get_trace_info(&trace, &msg.trace_info); + + /*send*/ + send(s2, &msg, sizeof(struct message), 0); + + /*wait for ack*/ + recv(s2, &ack, 1, 0); + + /*create annotation and log*/ + blkin_init_timestamp_annotation(&ant, "end", &endp); + blkin_record(&trace, &ant); + } + close(s2); +} + +void process_b() +{ + int i, r; + printf("I am process B: %d\n", getpid()); + + r = blkin_init(); + if (r < 0) { + fprintf(stderr, "Could not initialize blkin\n"); + exit(1); + } + /*initialize endpoint*/ + struct blkin_endpoint endp; + blkin_init_endpoint(&endp, "10.0.0.2", 5001, "service b"); + + struct blkin_trace trace; + struct blkin_annotation ant; + struct message msg; + int s, len; + struct sockaddr_un remote; + + /*Connect*/ + if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + perror("socket"); + exit(1); + } + + printf("Trying to connect...\n"); + + remote.sun_family = AF_UNIX; + strcpy(remote.sun_path, SOCK_PATH); + len = strlen(remote.sun_path) + sizeof(remote.sun_family); + if (connect(s, (struct sockaddr *)&remote, len) == -1) { + perror("connect"); + exit(1); + } + + printf("Connected.\n"); + + for (i=0;i<10;i++) { + recv(s, &msg, sizeof(struct message), 0); + + /*create child trace*/ + blkin_init_child_info(&trace, &msg.trace_info, &endp, "process b"); + + /*create annotation and log*/ + blkin_init_timestamp_annotation(&ant, "start", &endp); + blkin_record(&trace, &ant); + + /*Process...*/ + usleep(10); + printf("Message received %s\n", msg.actual_message); + + /*create annotation and log*/ + blkin_init_timestamp_annotation(&ant, "end", &endp); + blkin_record(&trace, &ant); + + /*send ack*/ + send(s, "*", 1, 0); + } +} + + +int main() +{ + if (fork()){ + process_a(); + exit(0); + } + else{ + process_b(); + exit(0); + } +} diff --git a/src/blkin/blkin-lib/tests/test.cc b/src/blkin/blkin-lib/tests/test.cc new file mode 100644 index 000000000..f93597dd3 --- /dev/null +++ b/src/blkin/blkin-lib/tests/test.cc @@ -0,0 +1,204 @@ +/* + * Copyright 2014 Marios Kogias <marioskogias@gmail.com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#include <thread> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <ztracer.hpp> +#include <iostream> + +#define SOCK_PATH "socket" + +struct message { + int actual_message; + struct blkin_trace_info trace_info; + message(int s) : actual_message(s) {}; + message() {} +}; + +class Parent { + private: + int s, s2; + ZTracer::Endpoint e; + public: + Parent() : e("0.0.0.0", 1, "parent") + { + connect(); + } + void operator()() + { + struct sockaddr_un remote; + int t; + std::cout << "I am parent process : " << getpid() << std::endl; + + /* Wait for connection */ + t = sizeof(remote); + if ((s2 = accept(s, (struct sockaddr *)&remote, (socklen_t *)&t)) == -1) { + std::cerr << "accept" << std::endl; + exit(1); + } + + std::cerr << "Connected" << std::endl; + + for (int i=0;i<10;i++) { + /*Init trace*/ + ZTracer::Trace tr("parent process", &e); + + process(tr); + + wait_response(); + + /*Log received*/ + tr.event("parent end"); + } + } + + void process(ZTracer::Trace &tr) + { + struct message msg(rand()); + /*Annotate*/ + tr.event("parent start"); + /*Set trace info to the message*/ + msg.trace_info = *tr.get_info(); + + /*send*/ + send(s2, &msg, sizeof(struct message), 0); + } + + void wait_response() + { + char ack; + recv(s2, &ack, 1, 0); + } + + void connect() + { + /*create and bind socket*/ + int len; + struct sockaddr_un local; + + if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + std::cerr << "socket" << std::endl; + exit(1); + } + + local.sun_family = AF_UNIX; + strcpy(local.sun_path, SOCK_PATH); + unlink(local.sun_path); + len = strlen(local.sun_path) + sizeof(local.sun_family); + if (bind(s, (struct sockaddr *)&local, len) == -1) { + std::cerr << "bind" << std::endl; + exit(1); + } + + if (listen(s, 5) == -1) { + std::cerr << "listen" << std::endl; + exit(1); + } + + std::cout << "Waiting for a connection..." << std::endl; + } + +}; + +class Child { + private: + int s; + ZTracer::Endpoint e; + public: + Child() : e("0.0.0.1", 2, "child") + { + } + void operator()() + { + /*Connect to the socket*/ + soc_connect(); + + for (int i=0;i<10;i++) + process(); + } + + void process() + { + struct message msg; + recv(s, &msg, sizeof(struct message), 0); + + ZTracer::Trace tr("Child process", &e, &msg.trace_info, true); + tr.event("child start"); + + usleep(10); + std::cout << "Message received : " << msg.actual_message << ::std::endl; + tr.event("child end"); + + send(s, "*", 1, 0); + } + + + void soc_connect() + { + int len; + struct sockaddr_un remote; + + if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + std::cerr << "socket" << std::endl; + exit(1); + } + + std::cout << "Trying to connect...\n" << std::endl; + + remote.sun_family = AF_UNIX; + strcpy(remote.sun_path, SOCK_PATH); + len = strlen(remote.sun_path) + sizeof(remote.sun_family); + if (connect(s, (struct sockaddr *)&remote, len) == -1) { + std::cerr << "connect" << std::endl;; + exit(1); + } + + std::cout << "Connected" << std::endl; + } + +}; +int main(int argc, const char *argv[]) +{ + int r = ZTracer::ztrace_init(); + if (r < 0) { + std::cout << "Error initializing blkin" << std::endl; + return -1; + } + Parent p; + Child c; + std::thread workerThread1(p); + std::thread workerThread2(c); + workerThread1.join(); + workerThread2.join(); + + return 0; +} diff --git a/src/blkin/blkin-lib/tests/test_p.cc b/src/blkin/blkin-lib/tests/test_p.cc new file mode 100644 index 000000000..ec59da3b6 --- /dev/null +++ b/src/blkin/blkin-lib/tests/test_p.cc @@ -0,0 +1,210 @@ +/* + * Copyright 2014 Marios Kogias <marioskogias@gmail.com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <ztracer.hpp> +#include <iostream> +#include <cstdlib> + +#define SOCK_PATH "socket" + +struct message { + int actual_message; + struct blkin_trace_info trace_info; + message(int s) : actual_message(s) {}; + message() {} +}; + +class Parent { + private: + int s, s2; + ZTracer::Endpoint e; + public: + Parent() : e("0.0.0.0", 1, "parent") + { + connect(); + } + void operator()() + { + struct sockaddr_un remote; + int t; + std::cout << "I am parent process : " << getpid() << std::endl; + + /* Wait for connection */ + t = sizeof(remote); + if ((s2 = accept(s, (struct sockaddr *)&remote, (socklen_t *)&t)) == -1) { + std::cerr << "accept" << std::endl; + exit(1); + } + + std::cerr << "Connected" << std::endl; + + for (int i=0;i<10;i++) { + /*Init trace*/ + ZTracer::Trace tr("parent process", &e); + process(tr); + + wait_response(); + + /*Log received*/ + tr.event("parent end"); + } + } + + void process(ZTracer::Trace &tr) + { + struct message msg(rand()); + /*Annotate*/ + tr.event("parent start"); + /*Set trace info to the message*/ + msg.trace_info = *tr.get_info(); + + /*send*/ + send(s2, &msg, sizeof(struct message), 0); + } + + void wait_response() + { + char ack; + recv(s2, &ack, 1, 0); + } + + void connect() + { + /*create and bind socket*/ + int len; + struct sockaddr_un local; + + if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + std::cerr << "socket" << std::endl; + exit(1); + } + + local.sun_family = AF_UNIX; + strcpy(local.sun_path, SOCK_PATH); + unlink(local.sun_path); + len = strlen(local.sun_path) + sizeof(local.sun_family); + if (bind(s, (struct sockaddr *)&local, len) == -1) { + std::cerr << "bind" << std::endl; + exit(1); + } + + if (listen(s, 5) == -1) { + std::cerr << "listen" << std::endl; + exit(1); + } + + std::cout << "Waiting for a connection..." << std::endl; + } + +}; + +class Child { + private: + int s; + ZTracer::Endpoint e; + public: + Child() : e("0.0.0.1", 2, "child") + { + } + void operator()() + { + /*Connect to the socket*/ + soc_connect(); + + for (int i=0;i<10;i++) + process(); + } + + void process() + { + struct message msg; + recv(s, &msg, sizeof(struct message), 0); + + ZTracer::Trace tr("Child process", &e, &msg.trace_info, true); + tr.event("child start"); + + usleep(10); + std::cout << "Message received : " << msg.actual_message << ::std::endl; + tr.event("child end"); + + send(s, "*", 1, 0); + } + + + void soc_connect() + { + int len; + struct sockaddr_un remote; + + if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + std::cerr << "socket" << std::endl; + exit(1); + } + + std::cout << "Trying to connect...\n" << std::endl; + + remote.sun_family = AF_UNIX; + strcpy(remote.sun_path, SOCK_PATH); + len = strlen(remote.sun_path) + sizeof(remote.sun_family); + if (connect(s, (struct sockaddr *)&remote, len) == -1) { + std::cerr << "connect" << std::endl;; + exit(1); + } + + std::cout << "Connected" << std::endl; + } + +}; +int main(int argc, const char *argv[]) +{ + if (fork()) { + int r = ZTracer::ztrace_init(); + if (r < 0) { + std::cout << "Error initializing blkin" << std::endl; + exit(1); + } + Parent p; + p(); + exit(0); + } else { + int r = ZTracer::ztrace_init(); + if (r < 0) { + std::cout << "Error initializing blkin" << std::endl; + exit(1); + } + Child c; + c(); + exit(0); + } + return 0; +} diff --git a/src/blkin/blkin-lib/tp.c b/src/blkin/blkin-lib/tp.c new file mode 100644 index 000000000..e72177e57 --- /dev/null +++ b/src/blkin/blkin-lib/tp.c @@ -0,0 +1,35 @@ +/* + * tp.c + * + * Copyright (c) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com> + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* + * Defining macro creates the code objects of the traceprobes, only do + * it once per file + */ +#define TRACEPOINT_CREATE_PROBES +/* + * The header containing our TRACEPOINT_EVENTs. + */ +#include <zipkin_trace.h> diff --git a/src/blkin/blkin-lib/zipkin_c.c b/src/blkin/blkin-lib/zipkin_c.c new file mode 100644 index 000000000..a68182b55 --- /dev/null +++ b/src/blkin/blkin-lib/zipkin_c.c @@ -0,0 +1,356 @@ +/* + * Copyright 2014 Marios Kogias <marioskogias@gmail.com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#include "zipkin_c.h" +#include <pthread.h> +#include <errno.h> +#include <unistd.h> +#include <fcntl.h> + +#define TRACEPOINT_DEFINE +#include <zipkin_trace.h> + +const char *default_ip = "NaN"; +const char *default_name = "NoName"; + +const char* const CLIENT_SEND = "cs"; +const char* const CLIENT_RECV = "cr"; +const char* const SERVER_SEND = "ss"; +const char* const SERVER_RECV = "sr"; +const char* const WIRE_SEND = "ws"; +const char* const WIRE_RECV = "wr"; +const char* const CLIENT_SEND_FRAGMENT = "csf"; +const char* const CLIENT_RECV_FRAGMENT = "crf"; +const char* const SERVER_SEND_FRAGMENT = "ssf"; +const char* const SERVER_RECV_FRAGMENT = "srf"; + +static int64_t random_big() +{ + int64_t a; + a = rand(); + a = a << 32; + int b = rand(); + a = a + b; + if (a<0) + a = !a; + return a; +} + +int blkin_init() +{ + static pthread_mutex_t blkin_init_mutex = PTHREAD_MUTEX_INITIALIZER; + static int initialized = 0; + + /* + * Initialize srand with sth appropriete + * time is not good for archipelago: several deamons -> same timstamp + */ + pthread_mutex_lock(&blkin_init_mutex); + if (!initialized) { + int inf, seed; + inf = open("/dev/urandom", O_RDONLY); //file descriptor 1 + read(inf, &seed, sizeof(int)); + close(inf); + srand(seed); + initialized = 1; + } + pthread_mutex_unlock(&blkin_init_mutex); + return 0; +} + +int blkin_init_new_trace(struct blkin_trace *new_trace, const char *service, + const struct blkin_endpoint *endpoint) +{ + int res; + if (!new_trace) { + res = -EINVAL; + goto OUT; + } + new_trace->name = service; + blkin_init_trace_info(&(new_trace->info)); + new_trace->endpoint = endpoint; + res = 0; + +OUT: + return res; +} + +void blkin_init_trace_info(struct blkin_trace_info *trace_info) +{ + trace_info->span_id = trace_info->trace_id = random_big(); + trace_info->parent_span_id = 0; +} + +int blkin_init_child_info(struct blkin_trace *child, + const struct blkin_trace_info *parent_info, + const struct blkin_endpoint *endpoint, + const char *child_name) +{ + int res; + if ((!child) || (!parent_info) || (!endpoint)){ + res = -EINVAL; + goto OUT; + } + child->info.trace_id = parent_info->trace_id; + child->info.span_id = random_big(); + child->info.parent_span_id = parent_info->span_id; + child->name = child_name; + child->endpoint = endpoint; + res = 0; + +OUT: + return res; +} + +int blkin_init_child(struct blkin_trace *child, + const struct blkin_trace *parent, + const struct blkin_endpoint *endpoint, + const char *child_name) +{ + int res; + if (!parent) { + res = -EINVAL; + goto OUT; + } + if (!endpoint) + endpoint = parent->endpoint; + if (blkin_init_child_info(child, &parent->info, endpoint, child_name) != 0){ + res = -EINVAL; + goto OUT; + } + res = 0; + +OUT: + return res; +} + +int blkin_init_endpoint(struct blkin_endpoint *endp, const char *ip, + int16_t port, const char *name) +{ + int res; + if (!endp){ + res = -EINVAL; + goto OUT; + } + if (!ip) + ip = default_ip; + + endp->ip = ip; + endp->port = port; + endp->name = name; + res = 0; + +OUT: + return res; +} + +int blkin_init_string_annotation(struct blkin_annotation *annotation, + const char *key, const char *val, const struct blkin_endpoint *endpoint) +{ + int res; + if ((!annotation) || (!key) || (!val)){ + res = -EINVAL; + goto OUT; + } + annotation->type = ANNOT_STRING; + annotation->key = key; + annotation->val_str = val; + annotation->endpoint = endpoint; + res = 0; + +OUT: + return res; +} + +int blkin_init_integer_annotation(struct blkin_annotation *annotation, + const char *key, int64_t val, const struct blkin_endpoint *endpoint) +{ + int res; + if ((!annotation) || (!key)) { + res = -EINVAL; + goto OUT; + } + annotation->type = ANNOT_INTEGER; + annotation->key = key; + annotation->val_int = val; + annotation->endpoint = endpoint; + res = 0; + +OUT: + return res; +} + +int blkin_init_timestamp_annotation(struct blkin_annotation *annotation, + const char *event, const struct blkin_endpoint *endpoint) +{ + int res; + if ((!annotation) || (!event)){ + res = -EINVAL; + goto OUT; + } + annotation->type = ANNOT_TIMESTAMP; + annotation->val_str = event; + annotation->endpoint = endpoint; + res = 0; + +OUT: + return res; +} + +int blkin_record(const struct blkin_trace *trace, + const struct blkin_annotation *annotation) +{ + int res; + if (!annotation || !trace || !trace->name) { + res = -EINVAL; + goto OUT; + } + + const struct blkin_endpoint *endpoint = + annotation->endpoint ? : trace->endpoint; + if (!endpoint || !endpoint->ip || !endpoint->name) { + res = -EINVAL; + goto OUT; + } + + if (annotation->type == ANNOT_STRING) { + if ((!annotation->key) || (!annotation->val_str)) { + res = -EINVAL; + goto OUT; + } + tracepoint(zipkin, keyval_string, trace->name, + endpoint->name, endpoint->port, endpoint->ip, + trace->info.trace_id, trace->info.span_id, + trace->info.parent_span_id, + annotation->key, annotation->val_str); + } + else if (annotation->type == ANNOT_INTEGER) { + if (!annotation->key) { + res = -EINVAL; + goto OUT; + } + tracepoint(zipkin, keyval_integer, trace->name, + endpoint->name, endpoint->port, endpoint->ip, + trace->info.trace_id, trace->info.span_id, + trace->info.parent_span_id, + annotation->key, annotation->val_int); + } + else { + if (!annotation->val_str) { + res = -EINVAL; + goto OUT; + } + tracepoint(zipkin, timestamp , trace->name, + endpoint->name, endpoint->port, endpoint->ip, + trace->info.trace_id, trace->info.span_id, + trace->info.parent_span_id, + annotation->val_str); + } + res = 0; +OUT: + return res; +} + +int blkin_get_trace_info(const struct blkin_trace *trace, + struct blkin_trace_info *info) +{ + int res; + if ((!trace) || (!info)){ + res = -EINVAL; + goto OUT; + } + + res = 0; + *info = trace->info; +OUT: + return res; +} + +int blkin_set_trace_info(struct blkin_trace *trace, + const struct blkin_trace_info *info) +{ + int res; + if ((!trace) || (!info)){ + res = -EINVAL; + goto OUT; + } + + res = 0; + trace->info = *info; +OUT: + return res; +} + +int blkin_set_trace_properties(struct blkin_trace *trace, + const struct blkin_trace_info *info, + const char *name, + const struct blkin_endpoint *endpoint) +{ + int res; + if ((!trace) || (!info) || (!endpoint) || (!name)){ + res = -EINVAL; + goto OUT; + } + + res = 0; + trace->info = *info; + trace->name = name; + trace->endpoint = endpoint; + +OUT: + return res; +} + +int blkin_pack_trace_info(const struct blkin_trace_info *info, + struct blkin_trace_info_packed *pinfo) +{ + if (!info || !pinfo) { + return -EINVAL; + } + + pinfo->trace_id = __cpu_to_be64(info->trace_id); + pinfo->span_id = __cpu_to_be64(info->span_id); + pinfo->parent_span_id = __cpu_to_be64(info->parent_span_id); + + return 0; +} + +int blkin_unpack_trace_info(const struct blkin_trace_info_packed *pinfo, + struct blkin_trace_info *info) +{ + if (!info || !pinfo) { + return -EINVAL; + } + + info->trace_id = __be64_to_cpu(pinfo->trace_id); + info->span_id = __be64_to_cpu(pinfo->span_id); + info->parent_span_id = __be64_to_cpu(pinfo->parent_span_id); + + return 0; +} diff --git a/src/blkin/blkin-lib/zipkin_c.h b/src/blkin/blkin-lib/zipkin_c.h new file mode 100644 index 000000000..77b5bc6d7 --- /dev/null +++ b/src/blkin/blkin-lib/zipkin_c.h @@ -0,0 +1,334 @@ +/* + * Copyright 2014 Marios Kogias <marioskogias@gmail.com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef ZIPKIN_C_H_ +#define ZIPKIN_C_H_ + +#include <stdint.h> +#include <asm/byteorder.h> + +#define BLKIN_TIMESTAMP(trace, endp, event) \ + do { \ + struct blkin_annotation __annot; \ + blkin_init_timestamp_annotation(&__annot, event, endp); \ + blkin_record(trace, &__annot); \ + } while (0); + +#define BLKIN_KEYVAL_STRING(trace, endp, key, val) \ + do { \ + struct blkin_annotation __annot; \ + blkin_init_string_annotation(&__annot, key, val, endp); \ + blkin_record(trace, &__annot); \ + } while (0); + +#define BLKIN_KEYVAL_INTEGER(trace, endp, key, val) \ + do { \ + struct blkin_annotation __annot; \ + blkin_init_integer_annotation(&__annot, key, val, endp);\ + blkin_record(trace, &__annot); \ + } while (0); + +/** + * Core annotations used by Zipkin used to denote the beginning and end of + * client and server spans. + * For more information refer to + * https://github.com/openzipkin/zipkin/blob/master/zipkin-thrift/src/main/thrift/com/twitter/zipkin/zipkinCore.thrift + */ +extern const char* const CLIENT_SEND; +extern const char* const CLIENT_RECV; +extern const char* const SERVER_SEND; +extern const char* const SERVER_RECV; +extern const char* const WIRE_SEND; +extern const char* const WIRE_RECV; +extern const char* const CLIENT_SEND_FRAGMENT; +extern const char* const CLIENT_RECV_FRAGMENT; +extern const char* const SERVER_SEND_FRAGMENT; +extern const char* const SERVER_RECV_FRAGMENT; + +/** + * @struct blkin_endpoint + * Information about an endpoint of our instrumented application where + * annotations take place + */ +struct blkin_endpoint { + const char *ip; + int16_t port; + const char *name; +}; + +/** + * @struct blkin_trace_info + * The information exchanged between different layers offering the needed + * trace semantics + */ +struct blkin_trace_info { + int64_t trace_id; + int64_t span_id; + int64_t parent_span_id; +}; + +/** + * @struct blkin_trace_info_packed + * + * Packed version of the struct blkin_trace_info. Usefull when sending over + * network. + * + */ +struct blkin_trace_info_packed { + __be64 trace_id; + __be64 span_id; + __be64 parent_span_id; +} __attribute__((packed)); + + +/** + * @struct blkin_trace + * Struct used to define the context in which an annotation happens + */ +struct blkin_trace { + const char *name; + struct blkin_trace_info info; + const struct blkin_endpoint *endpoint; +}; + +/** + * @typedef blkin_annotation_type + * There are 2 kinds of annotation key-val and timestamp + */ +typedef enum { + ANNOT_STRING = 0, + ANNOT_INTEGER, + ANNOT_TIMESTAMP +} blkin_annotation_type; + +/** + * @struct blkin_annotation + * Struct carrying information about an annotation. This information can either + * be key-val or that a specific event happened + */ +struct blkin_annotation { + blkin_annotation_type type; + const char *key; + union { + const char *val_str; + int64_t val_int; + }; + const struct blkin_endpoint *endpoint; +}; + +/** + * Initialize the zipkin library. + * + * @return 0 on success + */ +int blkin_init(); + +/** + * Initialize a new blkin_trace with the information given. The new trace will + * have no parent so the parent id will be zero. + * + * @param new_trace the blkin_trace to be initialized + * @param name the trace's name + * @param endpoint a pointer to a blkin_endpoint struct that contains + * info about where the specific trace takes place + * + * @returns 0 on success or negative error code + */ +int blkin_init_new_trace(struct blkin_trace *new_trace, const char *name, + const struct blkin_endpoint *endpoint); + +/** + * Initialize blkin_trace_info for a root span. The new trace will + * have no parent so the parent id will be zero, and the id and trace id will + * be the same. + * + * @param trace_info the blkin_trace_info to be initialized + */ +void blkin_init_trace_info(struct blkin_trace_info *trace_info); + +/** + * Initialize a blkin_trace as a child of the given parent + * bkin_trace. The child trace will have the same trace_id, new + * span_id and parent_span_id its parent's span_id. + * + * @param child the blkin_trace to be initialized + * @param parent the parent blkin_trace + * @param child_name the blkin_trace name of the child + * + * @returns 0 on success or negative error code + */ +int blkin_init_child(struct blkin_trace *child, + const struct blkin_trace *parent, + const struct blkin_endpoint *endpoint, + const char *child_name); + +/** + * Initialize a blkin_trace struct and set the blkin_trace_info field to be + * child of the given blkin_trace_info. This means + * Same trace_id + * Different span_id + * Child's parent_span_id == parent's span_id + * + * @param child the new child blkin_trace_info + * @param info the parent's blkin_trace_info struct + * @param child_name the blkin_trace struct name field + * + * @returns 0 on success or negative error code + */ +int blkin_init_child_info(struct blkin_trace *child, + const struct blkin_trace_info *info, + const struct blkin_endpoint *endpoint, + const char *child_name); + +/** + * Initialize a blkin_endpoint struct with the information given + * + * @param endp the endpoint to be initialized + * @param ip the ip address of the specific endpoint + * @param port the TCP/UDP port of the specific endpoint + * @param name the name of the service running on the specific endpoint + * + * @returns 0 on success or negative error code + */ +int blkin_init_endpoint(struct blkin_endpoint *endpoint, + const char *ip, int16_t port, const char *name); + +/** + * Initialize a key-value blkin_annotation + * + * @param annotation the annotation to be initialized + * @param key the annotation's key + * @param val the annotation's string value + * @param endpoint where did this annotation occured + * + * @returns 0 on success or negative error code + */ +int blkin_init_string_annotation(struct blkin_annotation *annotation, + const char *key, const char *val, + const struct blkin_endpoint *endpoint); +/** + * Initialize a key-value blkin_annotation + * + * @param annotation the annotation to be initialized + * @param key the annotation's key + * @param val the annotation's int value + * @param endpoint where did this annotation occured + * + * @returns 0 on success or negative error code + */ + +int blkin_init_integer_annotation(struct blkin_annotation *annotation, + const char *key, int64_t val, + const struct blkin_endpoint *endpoint); + +/** + * Initialize a timestamp blkin_annotation + * + * @param annotation the annotation to be initialized + * @param event the event happened to be annotated + * @param endpoint where did this annotation occured + * + * @returns 0 on success or negative error code + */ + +int blkin_init_timestamp_annotation(struct blkin_annotation *annot, + const char *event, + const struct blkin_endpoint *endpoint); + +/** + * Log an annotation in terms of a specific trace + * + * @param trace the trace to which the annotation belongs + * @param annotation the annotation to be logged + * + * @returns 0 on success or negative error code + */ +int blkin_record(const struct blkin_trace *trace, + const struct blkin_annotation *annotation); + +/** + * Copy a blkin_trace_info struct into a the field info of a blkin_trace struct + * + * @param trace the destination + * @param info where to copy from + * + * @returns 0 on success or negative error code + */ +int blkin_get_trace_info(const struct blkin_trace *trace, + struct blkin_trace_info *info); + +/** + * Copy the blkin_trace_info from a blkin_trace to another blkin_trace_info + * + * @param trace the trace with the essential info + * @param info the destination + * + * @returns 0 on success or negative error code + */ +int blkin_set_trace_info(struct blkin_trace *trace, + const struct blkin_trace_info *info); + +/** + * Set the trace information, name and endpoint of a trace. + * + * @param trace the trace to which the properties will be assigned + * @param info blkin_trace_information with the trace identifiers + * @param name span name + * @param endpoint associated host + * + * @returns 0 on success or negative error code + */ +int blkin_set_trace_properties(struct blkin_trace *trace, + const struct blkin_trace_info *info, + const char *name, + const struct blkin_endpoint *endpoint); + +/** + * Convert a blkin_trace_info to the packed version. + * + * @param info The unpacked trace info. + * @param pinfo The provided packed version to be initialized. + * + * @returns 0 on success or negative error code + */ +int blkin_pack_trace_info(const struct blkin_trace_info *info, + struct blkin_trace_info_packed *pinfo); + +/** + * Convert a packed blkin_trace_info to the unpacked version. + * + * @param pinfo The provided packed version to be unpacked. + * @param info The unpacked trace info. + * + * @returns 0 on success or negative error code + */ +int blkin_unpack_trace_info(const struct blkin_trace_info_packed *pinfo, + struct blkin_trace_info *info); + +#endif /* ZIPKIN_C_H_ */ diff --git a/src/blkin/blkin-lib/zipkin_trace.h b/src/blkin/blkin-lib/zipkin_trace.h new file mode 100644 index 000000000..2a72d47bd --- /dev/null +++ b/src/blkin/blkin-lib/zipkin_trace.h @@ -0,0 +1,130 @@ +/* + * Zipkin lttng-ust tracepoint provider. + */ + +#undef TRACEPOINT_PROVIDER +#define TRACEPOINT_PROVIDER zipkin + +#undef TRACEPOINT_INCLUDE +#define TRACEPOINT_INCLUDE "./zipkin_trace.h" + +#if !defined(_ZIPKIN_H) || defined(TRACEPOINT_HEADER_MULTI_READ) +#define _ZIPKIN_H + +#include <lttng/tracepoint.h> + +TRACEPOINT_EVENT( + zipkin, + keyval_string, + TP_ARGS(const char *, trace_name, const char *, service, + int, port, const char *, ip, long, trace, + long, span, long, parent_span, + const char *, key, const char *, val ), + + TP_FIELDS( + /* + * Each span has a name mentioned on it in the UI + * This is the trace name + */ + ctf_string(trace_name, trace_name) + /* + * Each trace takes place in a specific machine-endpoint + * This is identified by a name, a port number and an ip + */ + ctf_string(service_name, service) + ctf_integer(int, port_no, port) + ctf_string(ip, ip) + /* + * According to the tracing semantics each trace should have + * a trace id, a span id and a parent span id + */ + ctf_integer(long, trace_id, trace) + ctf_integer(long, span_id, span) + ctf_integer(long, parent_span_id, parent_span) + /* + * The following is the real annotated information + */ + ctf_string(key, key) + ctf_string(val, val) + ) + ) +TRACEPOINT_LOGLEVEL( + zipkin, + keyval_string, + TRACE_WARNING) + +/* + * This tracepoint allows for integers to come out keyval + */ + +TRACEPOINT_EVENT( + zipkin, + keyval_integer, + TP_ARGS(const char *, trace_name, const char *, service, + int, port, const char *, ip, long, trace, + long, span, long, parent_span, + const char *, key, int64_t, val ), + + TP_FIELDS( + /* + * Each span has a name mentioned on it in the UI + * This is the trace name + */ + ctf_string(trace_name, trace_name) + /* + * Each trace takes place in a specific machine-endpoint + * This is identified by a name, a port number and an ip + */ + ctf_string(service_name, service) + ctf_integer(int, port_no, port) + ctf_string(ip, ip) + /* + * According to the tracing semantics each trace should have + * a trace id, a span id and a parent span id + */ + ctf_integer(long, trace_id, trace) + ctf_integer(long, span_id, span) + ctf_integer(long, parent_span_id, parent_span) + /* + * The following is the real annotated information + */ + ctf_string(key, key) + ctf_integer(int64_t, val, val) + ) + ) +TRACEPOINT_LOGLEVEL( + zipkin, + keyval_integer, + TRACE_WARNING) +/* + * In this event we follow the same semantics but we trace timestamp + * annotations + */ + +TRACEPOINT_EVENT( + zipkin, + timestamp, + TP_ARGS(const char *, trace_name, const char *, service, + int, port, const char *, ip, long, trace, + long, span, long, parent_span, + const char *, event), + + TP_FIELDS( + ctf_string(trace_name, trace_name) + ctf_string(service_name, service) + ctf_integer(int, port_no, port) + ctf_string(ip, ip) + ctf_integer(long, trace_id, trace) + ctf_integer(long, span_id, span) + ctf_integer(long, parent_span_id, parent_span) + ctf_string(event, event) + ) + ) +TRACEPOINT_LOGLEVEL( + zipkin, + timestamp, + TRACE_WARNING) +#endif /* _ZIPKIN_H */ + +#include <lttng/tracepoint-event.h> + diff --git a/src/blkin/blkin-lib/ztracer.hpp b/src/blkin/blkin-lib/ztracer.hpp new file mode 100644 index 000000000..3c4707ea4 --- /dev/null +++ b/src/blkin/blkin-lib/ztracer.hpp @@ -0,0 +1,248 @@ +/* + * Copyright 2014 Marios Kogias <marioskogias@gmail.com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef ZTRACER_H + +#define ZTRACER_H + +#include <string> +extern "C" { +#include <zipkin_c.h> +} + +namespace ZTracer { + using std::string; + + const char* const CLIENT_SEND = "cs"; + const char* const CLIENT_RECV = "cr"; + const char* const SERVER_SEND = "ss"; + const char* const SERVER_RECV = "sr"; + const char* const WIRE_SEND = "ws"; + const char* const WIRE_RECV = "wr"; + const char* const CLIENT_SEND_FRAGMENT = "csf"; + const char* const CLIENT_RECV_FRAGMENT = "crf"; + const char* const SERVER_SEND_FRAGMENT = "ssf"; + const char* const SERVER_RECV_FRAGMENT = "srf"; + + static inline int ztrace_init() { return blkin_init(); } + + class Endpoint : private blkin_endpoint { + private: + string _ip; // storage for blkin_endpoint.ip, see copy_ip() + string _name; // storage for blkin_endpoint.name, see copy_name() + + friend class Trace; + public: + Endpoint(const char *name) + { + blkin_init_endpoint(this, "0.0.0.0", 0, name); + } + Endpoint(const char *ip, int port, const char *name) + { + blkin_init_endpoint(this, ip, port, name); + } + + // copy constructor and operator need to account for ip/name storage + Endpoint(const Endpoint &rhs) : _ip(rhs._ip), _name(rhs._name) + { + blkin_init_endpoint(this, _ip.size() ? _ip.c_str() : rhs.ip, + rhs.port, + _name.size() ? _name.c_str(): rhs.name); + } + const Endpoint& operator=(const Endpoint &rhs) + { + _ip.assign(rhs._ip); + _name.assign(rhs._name); + blkin_init_endpoint(this, _ip.size() ? _ip.c_str() : rhs.ip, + rhs.port, + _name.size() ? _name.c_str() : rhs.name); + return *this; + } + + // Endpoint assumes that ip and name will be string literals, and + // avoids making a copy and freeing on destruction. if you need to + // initialize Endpoint with a temporary string, copy_ip/copy_name() + // will store it in a std::string and assign from that + void copy_ip(const string &newip) + { + _ip.assign(newip); + ip = _ip.c_str(); + } + void copy_name(const string &newname) + { + _name.assign(newname); + name = _name.c_str(); + } + + void copy_address_from(const Endpoint *endpoint) + { + _ip.assign(endpoint->ip); + ip = _ip.c_str(); + port = endpoint->port; + } + void share_address_from(const Endpoint *endpoint) + { + ip = endpoint->ip; + port = endpoint->port; + } + void set_port(int p) { port = p; } + }; + + class Trace : private blkin_trace { + private: + string _name; // storage for blkin_trace.name, see copy_name() + + public: + // default constructor zero-initializes blkin_trace valid() + // will return false on a default-constructed Trace until + // init() + Trace() + { + // zero-initialize so valid() returns false + name = NULL; + info.trace_id = 0; + info.span_id = 0; + info.parent_span_id = 0; + endpoint = NULL; + } + + // construct a Trace with an optional parent + Trace(const char *name, const Endpoint *ep, const Trace *parent = NULL) + { + if (parent && parent->valid()) { + blkin_init_child(this, parent, ep ? : parent->endpoint, + name); + } else { + blkin_init_new_trace(this, name, ep); + } + } + + // construct a Trace from blkin_trace_info + Trace(const char *name, const Endpoint *ep, + const blkin_trace_info *i, bool child=false) + { + if (child) + blkin_init_child_info(this, i, ep, name); + else { + blkin_init_new_trace(this, name, ep); + set_info(i); + } + } + + // copy constructor and operator need to account for name storage + Trace(const Trace &rhs) : _name(rhs._name) + { + name = _name.size() ? _name.c_str() : rhs.name; + info = rhs.info; + endpoint = rhs.endpoint; + } + const Trace& operator=(const Trace &rhs) + { + _name.assign(rhs._name); + name = _name.size() ? _name.c_str() : rhs.name; + info = rhs.info; + endpoint = rhs.endpoint; + return *this; + } + + // return true if the Trace has been initialized + bool valid() const { return info.trace_id != 0; } + operator bool() const { return valid(); } + + // (re)initialize a Trace with an optional parent + int init(const char *name, const Endpoint *ep, + const Trace *parent = NULL) + { + if (parent && parent->valid()) + return blkin_init_child(this, parent, + ep ? : parent->endpoint, name); + + return blkin_init_new_trace(this, name, ep); + } + + // (re)initialize a Trace from blkin_trace_info + int init(const char *name, const Endpoint *ep, + const blkin_trace_info *i, bool child=false) + { + if (child) + return blkin_init_child_info(this, i, ep, _name.c_str()); + + return blkin_set_trace_properties(this, i, _name.c_str(), ep); + } + + // Trace assumes that name will be a string literal, and avoids + // making a copy and freeing on destruction. if you need to + // initialize Trace with a temporary string, copy_name() will store + // it in a std::string and assign from that + void copy_name(const string &newname) + { + _name.assign(newname); + name = _name.c_str(); + } + + const blkin_trace_info* get_info() const { return &info; } + void set_info(const blkin_trace_info *i) { info = *i; } + + // record key-value annotations + void keyval(const char *key, const char *val) const + { + if (valid()) + BLKIN_KEYVAL_STRING(this, endpoint, key, val); + } + void keyval(const char *key, int64_t val) const + { + if (valid()) + BLKIN_KEYVAL_INTEGER(this, endpoint, key, val); + } + void keyval(const char *key, const char *val, const Endpoint *ep) const + { + if (valid()) + BLKIN_KEYVAL_STRING(this, ep, key, val); + } + void keyval(const char *key, int64_t val, const Endpoint *ep) const + { + if (valid()) + BLKIN_KEYVAL_INTEGER(this, ep, key, val); + } + + // record timestamp annotations + void event(const char *event) const + { + if (valid()) + BLKIN_TIMESTAMP(this, endpoint, event); + } + void event(const char *event, const Endpoint *ep) const + { + if (valid()) + BLKIN_TIMESTAMP(this, ep, event); + } + }; + +} +#endif /* end of include guard: ZTRACER_H */ diff --git a/src/blkin/cmake/modules/FindLTTng.cmake b/src/blkin/cmake/modules/FindLTTng.cmake new file mode 100644 index 000000000..d0462a187 --- /dev/null +++ b/src/blkin/cmake/modules/FindLTTng.cmake @@ -0,0 +1,64 @@ +# - Find LTTng +# Find the Linux Trace Toolkit - next generation with associated includes path. +# See http://lttng.org/ +# +# This module accepts the following optional variables: +# LTTNG_PATH_HINT = A hint on LTTNG install path. +# +# This module defines the following variables: +# LTTNG_FOUND = Was LTTng found or not? +# LTTNG_EXECUTABLE = The path to lttng command +# LTTNG_LIBRARIES = The list of libraries to link to when using LTTng +# LTTNG_INCLUDE_DIR = The path to LTTng include directory +# +# On can set LTTNG_PATH_HINT before using find_package(LTTng) and the +# module with use the PATH as a hint to find LTTng. +# +# The hint can be given on the command line too: +# cmake -DLTTNG_PATH_HINT=/DATA/ERIC/LTTng /path/to/source + +find_package(PkgConfig) +pkg_check_modules(PC_LTTNG QUIET lttng-ust) + +find_path(LTTNG_INCLUDE_DIR + NAMES lttng/tracepoint.h + HINTS ${PC_LTTNG_INCLUDEDIR} ${PC_LTTNG_INCLUDE_DIRS} + PATH_SUFFIXES include + DOC "The LTTng include headers") + +find_path(LTTNG_LIBRARY_DIR + NAMES liblttng-ust.so + HINTS ${PC_LTTNG_LIBDIR} ${PC_LTTNG_LIBRARY_DIRS} + DOC "The LTTng libraries") + +find_library(LTTNG_UST_LIBRARY lttng-ust PATHS ${LTTNG_LIBRARY_DIR}) +find_library(URCU_LIBRARY urcu-bp PATHS ${LTTNG_LIBRARY_DIR}) +find_library(UUID_LIBRARY uuid) + +set(LTTNG_LIBRARIES ${LTTNG_UST_LIBRARY} ${URCU_LIBRARY} ${UUID_LIBRARY}) + +message(STATUS "Looking for lttng executable...") +set(LTTNG_NAMES "lttng;lttng-ctl") +# FIND_PROGRAM twice using NO_DEFAULT_PATH on first shot +find_program(LTTNG_EXECUTABLE + NAMES ${LTTNG_NAMES} + PATHS ${LTTNG_PATH_HINT}/bin + NO_DEFAULT_PATH + DOC "The LTTNG command line tool") +find_program(LTTNG_PROGRAM + NAMES ${LTTNG_NAMES} + PATHS ${LTTNG_PATH_HINT}/bin + DOC "The LTTNG command line tool") + +# handle the QUIETLY and REQUIRED arguments and set LTTNG_FOUND to TRUE if +# all listed variables are TRUE +include(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS(LTTNG DEFAULT_MSG LTTNG_INCLUDE_DIR LTTNG_LIBRARY_DIR) +if (NOT LTTNG_FOUND) + if (LTTng_FIND_REQUIRED) + message(FATAL_ERROR "LTTng not found") + endif () +endif () + +mark_as_advanced(LTTNG_INCLUDE_DIR) +mark_as_advanced(LTTNG_LIBRARY_DIR) |