summaryrefslogtreecommitdiffstats
path: root/src/blk
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/blk/BlockDevice.cc215
-rw-r--r--src/blk/BlockDevice.h278
-rw-r--r--src/blk/CMakeLists.txt60
-rw-r--r--src/blk/aio/aio.cc124
-rw-r--r--src/blk/aio/aio.h159
-rw-r--r--src/blk/kernel/KernelDevice.cc1235
-rw-r--r--src/blk/kernel/KernelDevice.h150
-rw-r--r--src/blk/kernel/io_uring.cc261
-rw-r--r--src/blk/kernel/io_uring.h33
-rw-r--r--src/blk/pmem/PMEMDevice.cc282
-rw-r--r--src/blk/pmem/PMEMDevice.h75
-rw-r--r--src/blk/spdk/NVMEDevice.cc971
-rw-r--r--src/blk/spdk/NVMEDevice.h85
-rw-r--r--src/blk/zoned/HMSMRDevice.cc1217
-rw-r--r--src/blk/zoned/HMSMRDevice.h163
-rw-r--r--src/blkin/CMakeLists.txt9
-rw-r--r--src/blkin/COPYRIGHT27
-rw-r--r--src/blkin/README.md59
-rw-r--r--src/blkin/babeltrace-plugins/.gitignore2
-rw-r--r--src/blkin/babeltrace-plugins/json/README.md5
-rwxr-xr-xsrc/blkin/babeltrace-plugins/json/src/babeltrace_json.py88
-rw-r--r--src/blkin/babeltrace-plugins/scribe_client/__init__.py1
-rw-r--r--src/blkin/babeltrace-plugins/scribe_client/scribe_client.py31
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/README.md6
-rwxr-xr-xsrc/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py69
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py0
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py131
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py137
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py453
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py70
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h67
-rw-r--r--src/blkin/blkin-lib/CMakeLists.txt18
-rw-r--r--src/blkin/blkin-lib/Makefile92
-rw-r--r--src/blkin/blkin-lib/tests/CMakeLists.txt15
-rw-r--r--src/blkin/blkin-lib/tests/Makefile55
-rw-r--r--src/blkin/blkin-lib/tests/test.c200
-rw-r--r--src/blkin/blkin-lib/tests/test.cc204
-rw-r--r--src/blkin/blkin-lib/tests/test_p.cc210
-rw-r--r--src/blkin/blkin-lib/tp.c35
-rw-r--r--src/blkin/blkin-lib/zipkin_c.c356
-rw-r--r--src/blkin/blkin-lib/zipkin_c.h334
-rw-r--r--src/blkin/blkin-lib/zipkin_trace.h130
-rw-r--r--src/blkin/blkin-lib/ztracer.hpp248
-rw-r--r--src/blkin/cmake/modules/FindLTTng.cmake64
44 files changed, 8424 insertions, 0 deletions
diff --git a/src/blk/BlockDevice.cc b/src/blk/BlockDevice.cc
new file mode 100644
index 000000000..6804ee50c
--- /dev/null
+++ b/src/blk/BlockDevice.cc
@@ -0,0 +1,215 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <libgen.h>
+#include <unistd.h>
+
+#include "BlockDevice.h"
+
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+#include "kernel/KernelDevice.h"
+#endif
+
+#if defined(HAVE_SPDK)
+#include "spdk/NVMEDevice.h"
+#endif
+
+#if defined(HAVE_BLUESTORE_PMEM)
+#include "pmem/PMEMDevice.h"
+#endif
+
+#if defined(HAVE_LIBZBD)
+#include "zoned/HMSMRDevice.h"
+#endif
+
+#include "common/debug.h"
+#include "common/EventTrace.h"
+#include "common/errno.h"
+#include "include/compat.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev "
+
+using std::string;
+
+void IOContext::aio_wait()
+{
+ std::unique_lock l(lock);
+ // see _aio_thread for waker logic
+ while (num_running.load() > 0) {
+ dout(10) << __func__ << " " << this
+ << " waiting for " << num_running.load() << " aios to complete"
+ << dendl;
+ cond.wait(l);
+ }
+ dout(20) << __func__ << " " << this << " done" << dendl;
+}
+
+uint64_t IOContext::get_num_ios() const
+{
+ // this is about the simplest model for transaction cost you can
+ // imagine. there is some fixed overhead cost by saying there is a
+ // minimum of one "io". and then we have some cost per "io" that is
+ // a configurable (with different hdd and ssd defaults), and add
+ // that to the bytes value.
+ uint64_t ios = 0;
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ ios += pending_aios.size();
+#endif
+#ifdef HAVE_SPDK
+ ios += total_nseg;
+#endif
+ return ios;
+}
+
+void IOContext::release_running_aios()
+{
+ ceph_assert(!num_running);
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ // release aio contexts (including pinned buffers).
+ running_aios.clear();
+#endif
+}
+
+BlockDevice::block_device_t
+BlockDevice::detect_device_type(const std::string& path)
+{
+#if defined(HAVE_SPDK)
+ if (NVMEDevice::support(path)) {
+ return block_device_t::spdk;
+ }
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ if (PMEMDevice::support(path)) {
+ return block_device_t::pmem;
+ }
+#endif
+#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD)
+ if (HMSMRDevice::support(path)) {
+ return block_device_t::hm_smr;
+ }
+#endif
+
+ return block_device_t::aio;
+}
+
+BlockDevice::block_device_t
+BlockDevice::device_type_from_name(const std::string& blk_dev_name)
+{
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ if (blk_dev_name == "aio") {
+ return block_device_t::aio;
+ }
+#endif
+#if defined(HAVE_SPDK)
+ if (blk_dev_name == "spdk") {
+ return block_device_t::spdk;
+ }
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ if (blk_dev_name == "pmem") {
+ return block_device_t::pmem;
+ }
+#endif
+#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD)
+ if (blk_dev_name == "hm_smr") {
+ return block_device_t::hm_smr;
+ }
+#endif
+ return block_device_t::unknown;
+}
+
+BlockDevice* BlockDevice::create_with_type(block_device_t device_type,
+ CephContext* cct, const std::string& path, aio_callback_t cb,
+ void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
+{
+
+ switch (device_type) {
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ case block_device_t::aio:
+ return new KernelDevice(cct, cb, cbpriv, d_cb, d_cbpriv);
+#endif
+#if defined(HAVE_SPDK)
+ case block_device_t::spdk:
+ return new NVMEDevice(cct, cb, cbpriv);
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ case block_device_t::pmem:
+ return new PMEMDevice(cct, cb, cbpriv);
+#endif
+#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD)
+ case block_device_t::hm_smr:
+ return new HMSMRDevice(cct, cb, cbpriv, d_cb, d_cbpriv);
+#endif
+ default:
+ ceph_abort_msg("unsupported device");
+ return nullptr;
+ }
+}
+
+BlockDevice *BlockDevice::create(
+ CephContext* cct, const string& path, aio_callback_t cb,
+ void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
+{
+ const string blk_dev_name = cct->_conf.get_val<string>("bdev_type");
+ block_device_t device_type = block_device_t::unknown;
+ if (blk_dev_name.empty()) {
+ device_type = detect_device_type(path);
+ } else {
+ device_type = device_type_from_name(blk_dev_name);
+ }
+ return create_with_type(device_type, cct, path, cb, cbpriv, d_cb, d_cbpriv);
+}
+
+void BlockDevice::queue_reap_ioc(IOContext *ioc)
+{
+ std::lock_guard l(ioc_reap_lock);
+ if (ioc_reap_count.load() == 0)
+ ++ioc_reap_count;
+ ioc_reap_queue.push_back(ioc);
+}
+
+void BlockDevice::reap_ioc()
+{
+ if (ioc_reap_count.load()) {
+ std::lock_guard l(ioc_reap_lock);
+ for (auto p : ioc_reap_queue) {
+ dout(20) << __func__ << " reap ioc " << p << dendl;
+ delete p;
+ }
+ ioc_reap_queue.clear();
+ --ioc_reap_count;
+ }
+}
+
+bool BlockDevice::is_valid_io(uint64_t off, uint64_t len) const {
+ bool ret = (off % block_size == 0 &&
+ len % block_size == 0 &&
+ len > 0 &&
+ off < size &&
+ off + len <= size);
+
+ if (!ret) {
+ derr << __func__ << " " << std::hex
+ << off << "~" << len
+ << " block_size " << block_size
+ << " size " << size
+ << std::dec << dendl;
+ }
+ return ret;
+}
diff --git a/src/blk/BlockDevice.h b/src/blk/BlockDevice.h
new file mode 100644
index 000000000..191eb8ec9
--- /dev/null
+++ b/src/blk/BlockDevice.h
@@ -0,0 +1,278 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_BLOCKDEVICE_H
+#define CEPH_BLK_BLOCKDEVICE_H
+
+#include <atomic>
+#include <condition_variable>
+#include <list>
+#include <map>
+#include <mutex>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "acconfig.h"
+#include "common/ceph_mutex.h"
+#include "include/common_fwd.h"
+
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+#include "aio/aio.h"
+#endif
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/interval_set.h"
+#define SPDK_PREFIX "spdk:"
+
+#if defined(__linux__)
+#if !defined(F_SET_FILE_RW_HINT)
+#define F_LINUX_SPECIFIC_BASE 1024
+#define F_SET_FILE_RW_HINT (F_LINUX_SPECIFIC_BASE + 14)
+#endif
+// These values match Linux definition
+// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fcntl.h#n56
+#define WRITE_LIFE_NOT_SET 0 // No hint information set
+#define WRITE_LIFE_NONE 1 // No hints about write life time
+#define WRITE_LIFE_SHORT 2 // Data written has a short life time
+#define WRITE_LIFE_MEDIUM 3 // Data written has a medium life time
+#define WRITE_LIFE_LONG 4 // Data written has a long life time
+#define WRITE_LIFE_EXTREME 5 // Data written has an extremely long life time
+#define WRITE_LIFE_MAX 6
+#else
+// On systems don't have WRITE_LIFE_* only use one FD
+// And all files are created equal
+#define WRITE_LIFE_NOT_SET 0 // No hint information set
+#define WRITE_LIFE_NONE 0 // No hints about write life time
+#define WRITE_LIFE_SHORT 0 // Data written has a short life time
+#define WRITE_LIFE_MEDIUM 0 // Data written has a medium life time
+#define WRITE_LIFE_LONG 0 // Data written has a long life time
+#define WRITE_LIFE_EXTREME 0 // Data written has an extremely long life time
+#define WRITE_LIFE_MAX 1
+#endif
+
+
+/// track in-flight io
+struct IOContext {
+private:
+ ceph::mutex lock = ceph::make_mutex("IOContext::lock");
+ ceph::condition_variable cond;
+ int r = 0;
+
+public:
+ CephContext* cct;
+ void *priv;
+#ifdef HAVE_SPDK
+ void *nvme_task_first = nullptr;
+ void *nvme_task_last = nullptr;
+ std::atomic_int total_nseg = {0};
+#endif
+
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ std::list<aio_t> pending_aios; ///< not yet submitted
+ std::list<aio_t> running_aios; ///< submitting or submitted
+#endif
+ std::atomic_int num_pending = {0};
+ std::atomic_int num_running = {0};
+ bool allow_eio;
+
+ explicit IOContext(CephContext* cct, void *p, bool allow_eio = false)
+ : cct(cct), priv(p), allow_eio(allow_eio)
+ {}
+
+ // no copying
+ IOContext(const IOContext& other) = delete;
+ IOContext &operator=(const IOContext& other) = delete;
+
+ bool has_pending_aios() {
+ return num_pending.load();
+ }
+ void release_running_aios();
+ void aio_wait();
+ uint64_t get_num_ios() const;
+
+ void try_aio_wake() {
+ assert(num_running >= 1);
+
+ std::lock_guard l(lock);
+ if (num_running.fetch_sub(1) == 1) {
+
+ // we might have some pending IOs submitted after the check
+ // as there is no lock protection for aio_submit.
+ // Hence we might have false conditional trigger.
+ // aio_wait has to handle that hence do not care here.
+ cond.notify_all();
+ }
+ }
+
+ void set_return_value(int _r) {
+ r = _r;
+ }
+
+ int get_return_value() const {
+ return r;
+ }
+};
+
+
+class BlockDevice {
+public:
+ CephContext* cct;
+ typedef void (*aio_callback_t)(void *handle, void *aio);
+private:
+ ceph::mutex ioc_reap_lock = ceph::make_mutex("BlockDevice::ioc_reap_lock");
+ std::vector<IOContext*> ioc_reap_queue;
+ std::atomic_int ioc_reap_count = {0};
+ enum class block_device_t {
+ unknown,
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ aio,
+#if defined(HAVE_LIBZBD)
+ hm_smr,
+#endif
+#endif
+#if defined(HAVE_SPDK)
+ spdk,
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ pmem,
+#endif
+ };
+ static block_device_t detect_device_type(const std::string& path);
+ static block_device_t device_type_from_name(const std::string& blk_dev_name);
+ static BlockDevice *create_with_type(block_device_t device_type,
+ CephContext* cct, const std::string& path, aio_callback_t cb,
+ void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+
+protected:
+ uint64_t size = 0;
+ uint64_t block_size = 0;
+ bool support_discard = false;
+ bool rotational = true;
+ bool lock_exclusive = true;
+
+ // HM-SMR specific properties. In HM-SMR drives the LBA space is divided into
+ // fixed-size zones. Typically, the first few zones are randomly writable;
+ // they form a conventional region of the drive. The remaining zones must be
+ // written sequentially and they must be reset before rewritten. For example,
+ // a 14 TB HGST HSH721414AL drive has 52156 zones each of size is 256 MiB.
+ // The zones 0-523 are randomly writable and they form the conventional region
+ // of the drive. The zones 524-52155 are sequential zones.
+ uint64_t conventional_region_size = 0;
+ uint64_t zone_size = 0;
+
+public:
+ aio_callback_t aio_callback;
+ void *aio_callback_priv;
+ BlockDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
+ : cct(cct),
+ aio_callback(cb),
+ aio_callback_priv(cbpriv)
+ {}
+ virtual ~BlockDevice() = default;
+
+ static BlockDevice *create(
+ CephContext* cct, const std::string& path, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+ virtual bool supported_bdev_label() { return true; }
+ virtual bool is_rotational() { return rotational; }
+
+ // HM-SMR-specific calls
+ virtual bool is_smr() const { return false; }
+ virtual uint64_t get_zone_size() const {
+ ceph_assert(is_smr());
+ return zone_size;
+ }
+ virtual uint64_t get_conventional_region_size() const {
+ ceph_assert(is_smr());
+ return conventional_region_size;
+ }
+
+ virtual void aio_submit(IOContext *ioc) = 0;
+
+ void set_no_exclusive_lock() {
+ lock_exclusive = false;
+ }
+
+ uint64_t get_size() const { return size; }
+ uint64_t get_block_size() const { return block_size; }
+
+ /// hook to provide utilization of thinly-provisioned device
+ virtual bool get_thin_utilization(uint64_t *total, uint64_t *avail) const {
+ return false;
+ }
+
+ virtual int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const = 0;
+
+ virtual int get_devname(std::string *out) const {
+ return -ENOENT;
+ }
+ virtual int get_devices(std::set<std::string> *ls) const {
+ std::string s;
+ if (get_devname(&s) == 0) {
+ ls->insert(s);
+ }
+ return 0;
+ }
+ virtual int get_numa_node(int *node) const {
+ return -EOPNOTSUPP;
+ }
+
+ virtual int read(
+ uint64_t off,
+ uint64_t len,
+ ceph::buffer::list *pbl,
+ IOContext *ioc,
+ bool buffered) = 0;
+ virtual int read_random(
+ uint64_t off,
+ uint64_t len,
+ char *buf,
+ bool buffered) = 0;
+ virtual int write(
+ uint64_t off,
+ ceph::buffer::list& bl,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) = 0;
+
+ virtual int aio_read(
+ uint64_t off,
+ uint64_t len,
+ ceph::buffer::list *pbl,
+ IOContext *ioc) = 0;
+ virtual int aio_write(
+ uint64_t off,
+ ceph::buffer::list& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) = 0;
+ virtual int flush() = 0;
+ virtual int discard(uint64_t offset, uint64_t len) { return 0; }
+ virtual int queue_discard(interval_set<uint64_t> &to_release) { return -1; }
+ virtual void discard_drain() { return; }
+
+ void queue_reap_ioc(IOContext *ioc);
+ void reap_ioc();
+
+ // for managing buffered readers/writers
+ virtual int invalidate_cache(uint64_t off, uint64_t len) = 0;
+ virtual int open(const std::string& path) = 0;
+ virtual void close() = 0;
+
+protected:
+ bool is_valid_io(uint64_t off, uint64_t len) const;
+};
+
+#endif //CEPH_BLK_BLOCKDEVICE_H
diff --git a/src/blk/CMakeLists.txt b/src/blk/CMakeLists.txt
new file mode 100644
index 000000000..849f3eef9
--- /dev/null
+++ b/src/blk/CMakeLists.txt
@@ -0,0 +1,60 @@
+if(WITH_BLUESTORE OR WITH_RBD_SSD_CACHE)
+list(APPEND libblk_srcs
+ BlockDevice.cc)
+endif()
+
+if(HAVE_LIBAIO OR HAVE_POSIXAIO)
+ list(APPEND libblk_srcs
+ kernel/KernelDevice.cc
+ kernel/io_uring.cc
+ aio/aio.cc)
+endif()
+
+if(WITH_BLUESTORE_PMEM)
+ list(APPEND libblk_srcs
+ pmem/PMEMDevice.cc)
+endif()
+
+if(WITH_SPDK)
+ list(APPEND libblk_srcs
+ spdk/NVMEDevice.cc)
+endif()
+
+if(WITH_ZBD)
+ list(APPEND libblk_srcs
+ zoned/HMSMRDevice.cc)
+endif()
+
+add_library(blk STATIC ${libblk_srcs})
+target_include_directories(blk PRIVATE "./")
+
+if(HAVE_LIBAIO)
+ target_link_libraries(blk PUBLIC ${AIO_LIBRARIES})
+endif(HAVE_LIBAIO)
+
+if(WITH_SPDK)
+ target_link_libraries(blk PRIVATE ${SPDK_LIBRARIES})
+endif()
+
+if(WITH_ZBD)
+ target_link_libraries(blk PRIVATE ${ZBD_LIBRARIES})
+endif()
+
+if(WITH_BLUESTORE_PMEM)
+ target_link_libraries(blk
+ PRIVATE pmem::pmem)
+endif()
+
+if(WITH_EVENTTRACE)
+ add_dependencies(blk eventtrace_tp)
+endif()
+
+if(WITH_LIBURING)
+ if(WITH_SYSTEM_LIBURING)
+ find_package(uring REQUIRED)
+ else()
+ include(Builduring)
+ build_uring()
+ endif()
+ target_link_libraries(blk PRIVATE uring::uring)
+endif()
diff --git a/src/blk/aio/aio.cc b/src/blk/aio/aio.cc
new file mode 100644
index 000000000..00a12bfd1
--- /dev/null
+++ b/src/blk/aio/aio.cc
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+#include "aio.h"
+
+std::ostream& operator<<(std::ostream& os, const aio_t& aio)
+{
+ unsigned i = 0;
+ os << "aio: ";
+ for (auto& iov : aio.iov) {
+ os << "\n [" << i++ << "] 0x"
+ << std::hex << iov.iov_base << "~" << iov.iov_len << std::dec;
+ }
+ return os;
+}
+
+int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
+ uint16_t aios_size, void *priv,
+ int *retries)
+{
+ // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
+ int attempts = 16;
+ int delay = 125;
+ int r;
+
+ aio_iter cur = begin;
+ struct aio_t *piocb[aios_size];
+ int left = 0;
+ while (cur != end) {
+ cur->priv = priv;
+ *(piocb+left) = &(*cur);
+ ++left;
+ ++cur;
+ }
+ ceph_assert(aios_size >= left);
+ int done = 0;
+ while (left > 0) {
+#if defined(HAVE_LIBAIO)
+ r = io_submit(ctx, std::min(left, max_iodepth), (struct iocb**)(piocb + done));
+#elif defined(HAVE_POSIXAIO)
+ if (piocb[done]->n_aiocb == 1) {
+ // TODO: consider batching multiple reads together with lio_listio
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx;
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = piocb[done];
+ r = aio_read(&piocb[done]->aio.aiocb);
+ } else {
+ struct sigevent sev;
+ sev.sigev_notify = SIGEV_KEVENT;
+ sev.sigev_notify_kqueue = ctx;
+ sev.sigev_value.sival_ptr = piocb[done];
+ r = lio_listio(LIO_NOWAIT, &piocb[done]->aio.aiocbp, piocb[done]->n_aiocb, &sev);
+ }
+#endif
+ if (r < 0) {
+ if (r == -EAGAIN && attempts-- > 0) {
+ usleep(delay);
+ delay *= 2;
+ (*retries)++;
+ continue;
+ }
+ return r;
+ }
+ ceph_assert(r > 0);
+ done += r;
+ left -= r;
+ attempts = 16;
+ delay = 125;
+ }
+ return done;
+}
+
+int aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+#if defined(HAVE_LIBAIO)
+ io_event events[max];
+#elif defined(HAVE_POSIXAIO)
+ struct kevent events[max];
+#endif
+ struct timespec t = {
+ timeout_ms / 1000,
+ (timeout_ms % 1000) * 1000 * 1000
+ };
+
+ int r = 0;
+ do {
+#if defined(HAVE_LIBAIO)
+ r = io_getevents(ctx, 1, max, events, &t);
+#elif defined(HAVE_POSIXAIO)
+ r = kevent(ctx, NULL, 0, events, max, &t);
+ if (r < 0)
+ r = -errno;
+#endif
+ } while (r == -EINTR);
+
+ for (int i=0; i<r; ++i) {
+#if defined(HAVE_LIBAIO)
+ paio[i] = (aio_t *)events[i].obj;
+ paio[i]->rval = events[i].res;
+#else
+ paio[i] = (aio_t*)events[i].udata;
+ if (paio[i]->n_aiocb == 1) {
+ paio[i]->rval = aio_return(&paio[i]->aio.aiocb);
+ } else {
+ // Emulate the return value of pwritev. I can't find any documentation
+ // for what the value of io_event.res is supposed to be. I'm going to
+ // assume that it's just like pwritev/preadv/pwrite/pread.
+ paio[i]->rval = 0;
+ for (int j = 0; j < paio[i]->n_aiocb; j++) {
+ int res = aio_return(&paio[i]->aio.aiocbp[j]);
+ if (res < 0) {
+ paio[i]->rval = res;
+ break;
+ } else {
+ paio[i]->rval += res;
+ }
+ }
+ free(paio[i]->aio.aiocbp);
+ }
+#endif
+ }
+ return r;
+}
diff --git a/src/blk/aio/aio.h b/src/blk/aio/aio.h
new file mode 100644
index 000000000..14b89784b
--- /dev/null
+++ b/src/blk/aio/aio.h
@@ -0,0 +1,159 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "acconfig.h"
+
+#if defined(HAVE_LIBAIO)
+#include <libaio.h>
+#elif defined(HAVE_POSIXAIO)
+#include <aio.h>
+#include <sys/event.h>
+#endif
+
+#include <boost/intrusive/list.hpp>
+#include <boost/container/small_vector.hpp>
+
+#include "include/buffer.h"
+#include "include/types.h"
+
+struct aio_t {
+#if defined(HAVE_LIBAIO)
+ struct iocb iocb{}; // must be first element; see shenanigans in aio_queue_t
+#elif defined(HAVE_POSIXAIO)
+ // static long aio_listio_max = -1;
+ union {
+ struct aiocb aiocb;
+ struct aiocb *aiocbp;
+ } aio;
+ int n_aiocb;
+#endif
+ void *priv;
+ int fd;
+ boost::container::small_vector<iovec,4> iov;
+ uint64_t offset, length;
+ long rval;
+ ceph::buffer::list bl; ///< write payload (so that it remains stable for duration)
+
+ boost::intrusive::list_member_hook<> queue_item;
+
+ aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) {
+ }
+
+ void pwritev(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+#if defined(HAVE_LIBAIO)
+ io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset);
+#elif defined(HAVE_POSIXAIO)
+ n_aiocb = iov.size();
+ aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb));
+ for (int i = 0; i < iov.size(); i++) {
+ aio.aiocbp[i].aio_fildes = fd;
+ aio.aiocbp[i].aio_offset = offset;
+ aio.aiocbp[i].aio_buf = iov[i].iov_base;
+ aio.aiocbp[i].aio_nbytes = iov[i].iov_len;
+ aio.aiocbp[i].aio_lio_opcode = LIO_WRITE;
+ offset += iov[i].iov_len;
+ }
+#endif
+ }
+
+ void preadv(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+#if defined(HAVE_LIBAIO)
+ io_prep_preadv(&iocb, fd, &iov[0], iov.size(), offset);
+#elif defined(HAVE_POSIXAIO)
+ n_aiocb = iov.size();
+ aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb));
+ for (size_t i = 0; i < iov.size(); i++) {
+ aio.aiocbp[i].aio_fildes = fd;
+ aio.aiocbp[i].aio_buf = iov[i].iov_base;
+ aio.aiocbp[i].aio_nbytes = iov[i].iov_len;
+ aio.aiocbp[i].aio_offset = offset;
+ aio.aiocbp[i].aio_lio_opcode = LIO_READ;
+ offset += iov[i].iov_len;
+ }
+#endif
+ }
+
+ long get_return_value() {
+ return rval;
+ }
+};
+
+std::ostream& operator<<(std::ostream& os, const aio_t& aio);
+
+typedef boost::intrusive::list<
+ aio_t,
+ boost::intrusive::member_hook<
+ aio_t,
+ boost::intrusive::list_member_hook<>,
+ &aio_t::queue_item> > aio_list_t;
+
+struct io_queue_t {
+ typedef std::list<aio_t>::iterator aio_iter;
+
+ virtual ~io_queue_t() {};
+
+ virtual int init(std::vector<int> &fds) = 0;
+ virtual void shutdown() = 0;
+ virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) = 0;
+ virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0;
+};
+
+struct aio_queue_t final : public io_queue_t {
+ int max_iodepth;
+#if defined(HAVE_LIBAIO)
+ io_context_t ctx;
+#elif defined(HAVE_POSIXAIO)
+ int ctx;
+#endif
+
+ explicit aio_queue_t(unsigned max_iodepth)
+ : max_iodepth(max_iodepth),
+ ctx(0) {
+ }
+ ~aio_queue_t() final {
+ ceph_assert(ctx == 0);
+ }
+
+ int init(std::vector<int> &fds) final {
+ (void)fds;
+ ceph_assert(ctx == 0);
+#if defined(HAVE_LIBAIO)
+ int r = io_setup(max_iodepth, &ctx);
+ if (r < 0) {
+ if (ctx) {
+ io_destroy(ctx);
+ ctx = 0;
+ }
+ }
+ return r;
+#elif defined(HAVE_POSIXAIO)
+ ctx = kqueue();
+ if (ctx < 0)
+ return -errno;
+ else
+ return 0;
+#endif
+ }
+ void shutdown() final {
+ if (ctx) {
+#if defined(HAVE_LIBAIO)
+ int r = io_destroy(ctx);
+#elif defined(HAVE_POSIXAIO)
+ int r = close(ctx);
+#endif
+ ceph_assert(r == 0);
+ ctx = 0;
+ }
+ }
+
+ int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) final;
+ int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
+};
diff --git a/src/blk/kernel/KernelDevice.cc b/src/blk/kernel/KernelDevice.cc
new file mode 100644
index 000000000..d93279965
--- /dev/null
+++ b/src/blk/kernel/KernelDevice.cc
@@ -0,0 +1,1235 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <limits>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/file.h>
+
+#include "KernelDevice.h"
+#include "include/intarith.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "include/stringify.h"
+#include "common/blkdev.h"
+#include "common/errno.h"
+#if defined(__FreeBSD__)
+#include "bsm/audit_errno.h"
+#endif
+#include "common/debug.h"
+#include "common/numa.h"
+
+#include "global/global_context.h"
+#include "io_uring.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::make_timespan;
+using ceph::mono_clock;
+using ceph::operator <<;
+
+KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ aio(false), dio(false),
+ discard_callback(d_cb),
+ discard_callback_priv(d_cbpriv),
+ aio_stop(false),
+ discard_started(false),
+ discard_stop(false),
+ aio_thread(this),
+ discard_thread(this),
+ injecting_crash(0)
+{
+ fd_directs.resize(WRITE_LIFE_MAX, -1);
+ fd_buffereds.resize(WRITE_LIFE_MAX, -1);
+
+ bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring");
+ unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
+
+ if (use_ioring && ioring_queue_t::supported()) {
+ bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri");
+ bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll");
+ io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll);
+ } else {
+ static bool once;
+ if (use_ioring && !once) {
+ derr << "WARNING: io_uring API is not supported! Fallback to libaio!"
+ << dendl;
+ once = true;
+ }
+ io_queue = std::make_unique<aio_queue_t>(iodepth);
+ }
+}
+
+int KernelDevice::_lock()
+{
+ dout(10) << __func__ << " " << fd_directs[WRITE_LIFE_NOT_SET] << dendl;
+ // When the block changes, systemd-udevd will open the block,
+ // read some information and close it. Then a failure occurs here.
+ // So we need to try again here.
+ int fd = fd_directs[WRITE_LIFE_NOT_SET];
+ uint64_t nr_tries = 0;
+ for (;;) {
+ struct flock fl = { F_WRLCK,
+ SEEK_SET };
+ int r = ::fcntl(fd, F_OFD_SETLK, &fl);
+ if (r < 0) {
+ if (errno == EINVAL) {
+ r = ::flock(fd, LOCK_EX | LOCK_NB);
+ }
+ }
+ if (r == 0) {
+ return 0;
+ }
+ if (errno != EAGAIN) {
+ return -errno;
+ }
+ dout(1) << __func__ << " flock busy on " << path << dendl;
+ if (const uint64_t max_retry =
+ cct->_conf.get_val<uint64_t>("bdev_flock_retry");
+ max_retry > 0 && nr_tries++ == max_retry) {
+ return -EAGAIN;
+ }
+ double retry_interval =
+ cct->_conf.get_val<double>("bdev_flock_retry_interval");
+ std::this_thread::sleep_for(ceph::make_timespan(retry_interval));
+ }
+}
+
+int KernelDevice::open(const string& p)
+{
+ path = p;
+ int r = 0, i = 0;
+ dout(1) << __func__ << " path " << path << dendl;
+
+ for (i = 0; i < WRITE_LIFE_MAX; i++) {
+ int fd = ::open(path.c_str(), O_RDWR | O_DIRECT);
+ if (fd < 0) {
+ r = -errno;
+ break;
+ }
+ fd_directs[i] = fd;
+
+ fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
+ if (fd < 0) {
+ r = -errno;
+ break;
+ }
+ fd_buffereds[i] = fd;
+ }
+
+ if (i != WRITE_LIFE_MAX) {
+ derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+#if defined(F_SET_FILE_RW_HINT)
+ for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) {
+ if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) {
+ r = -errno;
+ break;
+ }
+ if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) {
+ r = -errno;
+ break;
+ }
+ }
+ if (i != WRITE_LIFE_MAX) {
+ enable_wrt = false;
+ dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl;
+ }
+#endif
+
+ dio = true;
+ aio = cct->_conf->bdev_aio;
+ if (!aio) {
+ ceph_abort_msg("non-aio not supported");
+ }
+
+ // disable readahead as it will wreak havoc on our mix of
+ // directio/aio and buffered io.
+ r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM);
+ if (r) {
+ r = -r;
+ derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ if (lock_exclusive) {
+ r = _lock();
+ if (r < 0) {
+ derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
+ << dendl;
+ goto out_fail;
+ }
+ }
+
+ struct stat st;
+ r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ // Operate as though the block size is 4 KB. The backing file
+ // blksize doesn't strictly matter except that some file systems may
+ // require a read/modify/write if we write something smaller than
+ // it.
+ block_size = cct->_conf->bdev_block_size;
+ if (block_size != (unsigned)st.st_blksize) {
+ dout(1) << __func__ << " backing device/file reports st_blksize "
+ << st.st_blksize << ", using bdev_block_size "
+ << block_size << " anyway" << dendl;
+ }
+
+
+ {
+ BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]);
+ BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]);
+
+ if (S_ISBLK(st.st_mode)) {
+ int64_t s;
+ r = blkdev_direct.get_size(&s);
+ if (r < 0) {
+ goto out_fail;
+ }
+ size = s;
+ } else {
+ size = st.st_size;
+ }
+
+ char partition[PATH_MAX], devname[PATH_MAX];
+ if ((r = blkdev_buffered.partition(partition, PATH_MAX)) ||
+ (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) {
+ derr << "unable to get device name for " << path << ": "
+ << cpp_strerror(r) << dendl;
+ rotational = true;
+ } else {
+ dout(20) << __func__ << " devname " << devname << dendl;
+ rotational = blkdev_buffered.is_rotational();
+ support_discard = blkdev_buffered.support_discard();
+ this->devname = devname;
+ _detect_vdo();
+ }
+ }
+
+ r = _aio_start();
+ if (r < 0) {
+ goto out_fail;
+ }
+ _discard_start();
+
+ // round size down to an even block
+ size &= ~(block_size - 1);
+
+ dout(1) << __func__
+ << " size " << size
+ << " (0x" << std::hex << size << std::dec << ", "
+ << byte_u_t(size) << ")"
+ << " block_size " << block_size
+ << " (" << byte_u_t(block_size) << ")"
+ << " " << (rotational ? "rotational" : "non-rotational")
+ << " discard " << (support_discard ? "supported" : "not supported")
+ << dendl;
+ return 0;
+
+out_fail:
+ for (i = 0; i < WRITE_LIFE_MAX; i++) {
+ if (fd_directs[i] >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
+ fd_directs[i] = -1;
+ } else {
+ break;
+ }
+ if (fd_buffereds[i] >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
+ fd_buffereds[i] = -1;
+ } else {
+ break;
+ }
+ }
+ return r;
+}
+
+int KernelDevice::get_devices(std::set<std::string> *ls) const
+{
+ if (devname.empty()) {
+ return 0;
+ }
+ get_raw_devices(devname, ls);
+ return 0;
+}
+
+void KernelDevice::close()
+{
+ dout(1) << __func__ << dendl;
+ _aio_stop();
+ _discard_stop();
+
+ if (vdo_fd >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(vdo_fd));
+ vdo_fd = -1;
+ }
+
+ for (int i = 0; i < WRITE_LIFE_MAX; i++) {
+ assert(fd_directs[i] >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
+ fd_directs[i] = -1;
+
+ assert(fd_buffereds[i] >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
+ fd_buffereds[i] = -1;
+ }
+ path.clear();
+}
+
+int KernelDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
+{
+ (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard);
+ (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "driver"] = "KernelDevice";
+ if (rotational) {
+ (*pm)[prefix + "type"] = "hdd";
+ } else {
+ (*pm)[prefix + "type"] = "ssd";
+ }
+ if (vdo_fd >= 0) {
+ (*pm)[prefix + "vdo"] = "true";
+ uint64_t total, avail;
+ get_vdo_utilization(vdo_fd, &total, &avail);
+ (*pm)[prefix + "vdo_physical_size"] = stringify(total);
+ }
+
+ {
+ string res_names;
+ std::set<std::string> devnames;
+ if (get_devices(&devnames) == 0) {
+ for (auto& dev : devnames) {
+ if (!res_names.empty()) {
+ res_names += ",";
+ }
+ res_names += dev;
+ }
+ if (res_names.size()) {
+ (*pm)[prefix + "devices"] = res_names;
+ }
+ }
+ }
+
+ struct stat st;
+ int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st);
+ if (r < 0)
+ return -errno;
+ if (S_ISBLK(st.st_mode)) {
+ (*pm)[prefix + "access_mode"] = "blk";
+
+ char buffer[1024] = {0};
+ BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]};
+ if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
+ (*pm)[prefix + "partition_path"] = "unknown";
+ } else {
+ (*pm)[prefix + "partition_path"] = buffer;
+ }
+ buffer[0] = '\0';
+ if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
+ (*pm)[prefix + "dev_node"] = "unknown";
+ } else {
+ (*pm)[prefix + "dev_node"] = buffer;
+ }
+ if (!r) {
+ return 0;
+ }
+ buffer[0] = '\0';
+ blkdev.model(buffer, sizeof(buffer));
+ (*pm)[prefix + "model"] = buffer;
+
+ buffer[0] = '\0';
+ blkdev.dev(buffer, sizeof(buffer));
+ (*pm)[prefix + "dev"] = buffer;
+
+ // nvme exposes a serial number
+ buffer[0] = '\0';
+ blkdev.serial(buffer, sizeof(buffer));
+ (*pm)[prefix + "serial"] = buffer;
+
+ // numa
+ int node;
+ r = blkdev.get_numa_node(&node);
+ if (r >= 0) {
+ (*pm)[prefix + "numa_node"] = stringify(node);
+ }
+ } else {
+ (*pm)[prefix + "access_mode"] = "file";
+ (*pm)[prefix + "path"] = path;
+ }
+ return 0;
+}
+
+void KernelDevice::_detect_vdo()
+{
+ vdo_fd = get_vdo_stats_handle(devname.c_str(), &vdo_name);
+ if (vdo_fd >= 0) {
+ dout(1) << __func__ << " VDO volume " << vdo_name
+ << " maps to " << devname << dendl;
+ } else {
+ dout(20) << __func__ << " no VDO volume maps to " << devname << dendl;
+ }
+ return;
+}
+
+bool KernelDevice::get_thin_utilization(uint64_t *total, uint64_t *avail) const
+{
+ if (vdo_fd < 0) {
+ return false;
+ }
+ return get_vdo_utilization(vdo_fd, total, avail);
+}
+
+int KernelDevice::choose_fd(bool buffered, int write_hint) const
+{
+ assert(write_hint >= WRITE_LIFE_NOT_SET && write_hint < WRITE_LIFE_MAX);
+ if (!enable_wrt)
+ write_hint = WRITE_LIFE_NOT_SET;
+ return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint];
+}
+
+int KernelDevice::flush()
+{
+ // protect flush with a mutex. note that we are not really protecting
+ // data here. instead, we're ensuring that if any flush() caller
+ // sees that io_since_flush is true, they block any racing callers
+ // until the flush is observed. that allows racing threads to be
+ // calling flush while still ensuring that *any* of them that got an
+ // aio completion notification will not return before that aio is
+ // stable on disk: whichever thread sees the flag first will block
+ // followers until the aio is stable.
+ std::lock_guard l(flush_mutex);
+
+ bool expect = true;
+ if (!io_since_flush.compare_exchange_strong(expect, false)) {
+ dout(10) << __func__ << " no-op (no ios since last flush), flag is "
+ << (int)io_since_flush.load() << dendl;
+ return 0;
+ }
+
+ dout(10) << __func__ << " start" << dendl;
+ if (cct->_conf->bdev_inject_crash) {
+ ++injecting_crash;
+ // sleep for a moment to give other threads a chance to submit or
+ // wait on io that races with a flush.
+ derr << __func__ << " injecting crash. first we sleep..." << dendl;
+ sleep(cct->_conf->bdev_inject_crash_flush_delay);
+ derr << __func__ << " and now we die" << dendl;
+ cct->_log->flush();
+ _exit(1);
+ }
+ utime_t start = ceph_clock_now();
+ int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]);
+ utime_t end = ceph_clock_now();
+ utime_t dur = end - start;
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
+ ceph_abort();
+ }
+ dout(5) << __func__ << " in " << dur << dendl;;
+ return r;
+}
+
+int KernelDevice::_aio_start()
+{
+ if (aio) {
+ dout(10) << __func__ << dendl;
+ int r = io_queue->init(fd_directs);
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ derr << __func__ << " io_setup(2) failed with EAGAIN; "
+ << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
+ } else {
+ derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+ }
+ aio_thread.create("bstore_aio");
+ }
+ return 0;
+}
+
+void KernelDevice::_aio_stop()
+{
+ if (aio) {
+ dout(10) << __func__ << dendl;
+ aio_stop = true;
+ aio_thread.join();
+ aio_stop = false;
+ io_queue->shutdown();
+ }
+}
+
+int KernelDevice::_discard_start()
+{
+ discard_thread.create("bstore_discard");
+ return 0;
+}
+
+void KernelDevice::_discard_stop()
+{
+ dout(10) << __func__ << dendl;
+ {
+ std::unique_lock l(discard_lock);
+ while (!discard_started) {
+ discard_cond.wait(l);
+ }
+ discard_stop = true;
+ discard_cond.notify_all();
+ }
+ discard_thread.join();
+ {
+ std::lock_guard l(discard_lock);
+ discard_stop = false;
+ }
+ dout(10) << __func__ << " stopped" << dendl;
+}
+
+void KernelDevice::discard_drain()
+{
+ dout(10) << __func__ << dendl;
+ std::unique_lock l(discard_lock);
+ while (!discard_queued.empty() || discard_running) {
+ discard_cond.wait(l);
+ }
+}
+
+static bool is_expected_ioerr(const int r)
+{
+ // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
+ return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
+ r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
+ r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
+#if defined(__linux__)
+ r == -EREMCHG || r == -EBADE
+#elif defined(__FreeBSD__)
+ r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE
+#endif
+ );
+}
+
+void KernelDevice::_aio_thread()
+{
+ dout(10) << __func__ << " start" << dendl;
+ int inject_crash_count = 0;
+ while (!aio_stop) {
+ dout(40) << __func__ << " polling" << dendl;
+ int max = cct->_conf->bdev_aio_reap_max;
+ aio_t *aio[max];
+ int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms,
+ aio, max);
+ if (r < 0) {
+ derr << __func__ << " got " << cpp_strerror(r) << dendl;
+ ceph_abort_msg("got unexpected error from io_getevents");
+ }
+ if (r > 0) {
+ dout(30) << __func__ << " got " << r << " completed aios" << dendl;
+ for (int i = 0; i < r; ++i) {
+ IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
+ _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
+ if (aio[i]->queue_item.is_linked()) {
+ std::lock_guard l(debug_queue_lock);
+ debug_aio_unlink(*aio[i]);
+ }
+
+ // set flag indicating new ios have completed. we do this *before*
+ // any completion or notifications so that any user flush() that
+ // follows the observed io completion will include this io. Note
+ // that an earlier, racing flush() could observe and clear this
+ // flag, but that also ensures that the IO will be stable before the
+ // later flush() occurs.
+ io_since_flush.store(true);
+
+ long r = aio[i]->get_return_value();
+ if (r < 0) {
+ derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")"
+ << dendl;
+ if (ioc->allow_eio && is_expected_ioerr(r)) {
+ derr << __func__ << " translating the error to EIO for upper layer"
+ << dendl;
+ ioc->set_return_value(-EIO);
+ } else {
+ if (is_expected_ioerr(r)) {
+ note_io_error_event(
+ devname.c_str(),
+ path.c_str(),
+ r,
+#if defined(HAVE_POSIXAIO)
+ aio[i]->aio.aiocb.aio_lio_opcode,
+#else
+ aio[i]->iocb.aio_lio_opcode,
+#endif
+ aio[i]->offset,
+ aio[i]->length);
+ ceph_abort_msg(
+ "Unexpected IO error. "
+ "This may suggest a hardware issue. "
+ "Please check your kernel log!");
+ }
+ ceph_abort_msg(
+ "Unexpected IO error. "
+ "This may suggest HW issue. Please check your dmesg!");
+ }
+ } else if (aio[i]->length != (uint64_t)r) {
+ derr << "aio to 0x" << std::hex << aio[i]->offset
+ << "~" << aio[i]->length << std::dec
+ << " but returned: " << r << dendl;
+ ceph_abort_msg("unexpected aio return value: does not match length");
+ }
+
+ dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
+ << " ioc " << ioc
+ << " with " << (ioc->num_running.load() - 1)
+ << " aios left" << dendl;
+
+ // NOTE: once num_running and we either call the callback or
+ // call aio_wake we cannot touch ioc or aio[] as the caller
+ // may free it.
+ if (ioc->priv) {
+ if (--ioc->num_running == 0) {
+ aio_callback(aio_callback_priv, ioc->priv);
+ }
+ } else {
+ ioc->try_aio_wake();
+ }
+ }
+ }
+ if (cct->_conf->bdev_debug_aio) {
+ utime_t now = ceph_clock_now();
+ std::lock_guard l(debug_queue_lock);
+ if (debug_oldest) {
+ if (debug_stall_since == utime_t()) {
+ debug_stall_since = now;
+ } else {
+ if (cct->_conf->bdev_debug_aio_suicide_timeout) {
+ utime_t cutoff = now;
+ cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
+ if (debug_stall_since < cutoff) {
+ derr << __func__ << " stalled aio " << debug_oldest
+ << " since " << debug_stall_since << ", timeout is "
+ << cct->_conf->bdev_debug_aio_suicide_timeout
+ << "s, suicide" << dendl;
+ ceph_abort_msg("stalled aio... buggy kernel or bad device?");
+ }
+ }
+ }
+ }
+ }
+ reap_ioc();
+ if (cct->_conf->bdev_inject_crash) {
+ ++inject_crash_count;
+ if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
+ cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
+ derr << __func__ << " bdev_inject_crash trigger from aio thread"
+ << dendl;
+ cct->_log->flush();
+ _exit(1);
+ }
+ }
+ }
+ reap_ioc();
+ dout(10) << __func__ << " end" << dendl;
+}
+
+void KernelDevice::_discard_thread()
+{
+ std::unique_lock l(discard_lock);
+ ceph_assert(!discard_started);
+ discard_started = true;
+ discard_cond.notify_all();
+ while (true) {
+ ceph_assert(discard_finishing.empty());
+ if (discard_queued.empty()) {
+ if (discard_stop)
+ break;
+ dout(20) << __func__ << " sleep" << dendl;
+ discard_cond.notify_all(); // for the thread trying to drain...
+ discard_cond.wait(l);
+ dout(20) << __func__ << " wake" << dendl;
+ } else {
+ discard_finishing.swap(discard_queued);
+ discard_running = true;
+ l.unlock();
+ dout(20) << __func__ << " finishing" << dendl;
+ for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
+ discard(p.get_start(), p.get_len());
+ }
+
+ discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
+ discard_finishing.clear();
+ l.lock();
+ discard_running = false;
+ }
+ }
+ dout(10) << __func__ << " finish" << dendl;
+ discard_started = false;
+}
+
+int KernelDevice::queue_discard(interval_set<uint64_t> &to_release)
+{
+ if (!support_discard)
+ return -1;
+
+ if (to_release.empty())
+ return 0;
+
+ std::lock_guard l(discard_lock);
+ discard_queued.insert(to_release);
+ discard_cond.notify_all();
+ return 0;
+}
+
+void KernelDevice::_aio_log_start(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
+{
+ dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
+ << std::dec << dendl;
+ if (cct->_conf->bdev_debug_inflight_ios) {
+ std::lock_guard l(debug_lock);
+ if (debug_inflight.intersects(offset, length)) {
+ derr << __func__ << " inflight overlap of 0x"
+ << std::hex
+ << offset << "~" << length << std::dec
+ << " with " << debug_inflight << dendl;
+ ceph_abort();
+ }
+ debug_inflight.insert(offset, length);
+ }
+}
+
+void KernelDevice::debug_aio_link(aio_t& aio)
+{
+ if (debug_queue.empty()) {
+ debug_oldest = &aio;
+ }
+ debug_queue.push_back(aio);
+}
+
+void KernelDevice::debug_aio_unlink(aio_t& aio)
+{
+ if (aio.queue_item.is_linked()) {
+ debug_queue.erase(debug_queue.iterator_to(aio));
+ if (debug_oldest == &aio) {
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (age && debug_stall_since != utime_t()) {
+ utime_t cutoff = ceph_clock_now();
+ cutoff -= age;
+ if (debug_stall_since < cutoff) {
+ derr << __func__ << " stalled aio " << debug_oldest
+ << " since " << debug_stall_since << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ }
+
+ if (debug_queue.empty()) {
+ debug_oldest = nullptr;
+ } else {
+ debug_oldest = &debug_queue.front();
+ }
+ debug_stall_since = utime_t();
+ }
+ }
+}
+
+void KernelDevice::_aio_log_finish(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
+{
+ dout(20) << __func__ << " " << aio << " 0x"
+ << std::hex << offset << "~" << length << std::dec << dendl;
+ if (cct->_conf->bdev_debug_inflight_ios) {
+ std::lock_guard l(debug_lock);
+ debug_inflight.erase(offset, length);
+ }
+}
+
+void KernelDevice::aio_submit(IOContext *ioc)
+{
+ dout(20) << __func__ << " ioc " << ioc
+ << " pending " << ioc->num_pending.load()
+ << " running " << ioc->num_running.load()
+ << dendl;
+
+ if (ioc->num_pending.load() == 0) {
+ return;
+ }
+
+ // move these aside, and get our end iterator position now, as the
+ // aios might complete as soon as they are submitted and queue more
+ // wal aio's.
+ list<aio_t>::iterator e = ioc->running_aios.begin();
+ ioc->running_aios.splice(e, ioc->pending_aios);
+
+ int pending = ioc->num_pending.load();
+ ioc->num_running += pending;
+ ioc->num_pending -= pending;
+ ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
+ ceph_assert(ioc->pending_aios.size() == 0);
+
+ if (cct->_conf->bdev_debug_aio) {
+ list<aio_t>::iterator p = ioc->running_aios.begin();
+ while (p != e) {
+ dout(30) << __func__ << " " << *p << dendl;
+ std::lock_guard l(debug_queue_lock);
+ debug_aio_link(*p++);
+ }
+ }
+
+ void *priv = static_cast<void*>(ioc);
+ int r, retries = 0;
+ // num of pending aios should not overflow when passed to submit_batch()
+ assert(pending <= std::numeric_limits<uint16_t>::max());
+ r = io_queue->submit_batch(ioc->running_aios.begin(), e,
+ pending, priv, &retries);
+
+ if (retries)
+ derr << __func__ << " retries " << retries << dendl;
+ if (r < 0) {
+ derr << " aio submit got " << cpp_strerror(r) << dendl;
+ ceph_assert(r == 0);
+ }
+}
+
+int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << (buffered ? " (buffered)" : " (direct)") << dendl;
+ if (cct->_conf->bdev_inject_crash &&
+ rand() % cct->_conf->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
+ << off << "~" << len << std::dec << dendl;
+ ++injecting_crash;
+ return 0;
+ }
+ vector<iovec> iov;
+ bl.prepare_iov(&iov);
+
+ auto left = len;
+ auto o = off;
+ size_t idx = 0;
+ do {
+ auto r = ::pwritev(choose_fd(buffered, write_hint),
+ &iov[idx], iov.size() - idx, o);
+
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ o += r;
+ left -= r;
+ if (left) {
+ // skip fully processed IOVs
+ while (idx < iov.size() && (size_t)r >= iov[idx].iov_len) {
+ r -= iov[idx++].iov_len;
+ }
+ // update partially processed one if any
+ if (r) {
+ ceph_assert(idx < iov.size());
+ ceph_assert((size_t)r < iov[idx].iov_len);
+ iov[idx].iov_base = static_cast<char*>(iov[idx].iov_base) + r;
+ iov[idx].iov_len -= r;
+ r = 0;
+ }
+ ceph_assert(r == 0);
+ }
+ } while (left);
+
+#ifdef HAVE_SYNC_FILE_RANGE
+ if (buffered) {
+ // initiate IO and wait till it completes
+ auto r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+#endif
+
+ io_since_flush.store(true);
+
+ return 0;
+}
+
+int KernelDevice::write(
+ uint64_t off,
+ bufferlist &bl,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+
+ if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
+ bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
+ dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
+ }
+ dout(40) << "data: ";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ return _sync_write(off, bl, buffered, write_hint);
+}
+
+int KernelDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+
+ if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
+ bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
+ dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
+ }
+ dout(40) << "data: ";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ _aio_log_start(ioc, off, len);
+
+#ifdef HAVE_LIBAIO
+ if (aio && dio && !buffered) {
+ if (cct->_conf->bdev_inject_crash &&
+ rand() % cct->_conf->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
+ << off << "~" << len << std::dec
+ << dendl;
+ // generate a real io so that aio_wait behaves properly, but make it
+ // a read instead of write, and toss the result.
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ bufferptr p = ceph::buffer::create_small_page_aligned(len);
+ aio.bl.append(std::move(p));
+ aio.bl.prepare_iov(&aio.iov);
+ aio.preadv(off, len);
+ ++injecting_crash;
+ } else {
+ if (bl.length() <= RW_IO_MAX) {
+ // fast path (non-huge write)
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ bl.prepare_iov(&aio.iov);
+ aio.bl.claim_append(bl);
+ aio.pwritev(off, len);
+ dout(30) << aio << dendl;
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << " aio " << &aio << dendl;
+ } else {
+ // write in RW_IO_MAX-sized chunks
+ uint64_t prev_len = 0;
+ while (prev_len < bl.length()) {
+ bufferlist tmp;
+ if (prev_len + RW_IO_MAX < bl.length()) {
+ tmp.substr_of(bl, prev_len, RW_IO_MAX);
+ } else {
+ tmp.substr_of(bl, prev_len, bl.length() - prev_len);
+ }
+ auto len = tmp.length();
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ tmp.prepare_iov(&aio.iov);
+ aio.bl.claim_append(tmp);
+ aio.pwritev(off + prev_len, len);
+ dout(30) << aio << dendl;
+ dout(5) << __func__ << " 0x" << std::hex << off + prev_len
+ << "~" << len
+ << std::dec << " aio " << &aio << " (piece)" << dendl;
+ prev_len += len;
+ }
+ }
+ }
+ } else
+#endif
+ {
+ int r = _sync_write(off, bl, buffered, write_hint);
+ _aio_log_finish(ioc, off, len);
+ if (r < 0)
+ return r;
+ }
+ return 0;
+}
+
+int KernelDevice::discard(uint64_t offset, uint64_t len)
+{
+ int r = 0;
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+ if (support_discard) {
+ dout(10) << __func__
+ << " 0x" << std::hex << offset << "~" << len << std::dec
+ << dendl;
+
+ r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len);
+ }
+ return r;
+}
+
+int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ _aio_log_start(ioc, off, len);
+
+ auto start1 = mono_clock::now();
+
+ auto p = ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len));
+ int r = ::pread(buffered ? fd_buffereds[WRITE_LIFE_NOT_SET] : fd_directs[WRITE_LIFE_NOT_SET],
+ p->c_str(), len, off);
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << " since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+
+ if (r < 0) {
+ if (ioc->allow_eio && is_expected_ioerr(r)) {
+ r = -EIO;
+ } else {
+ r = -errno;
+ }
+ goto out;
+ }
+ ceph_assert((uint64_t)r == len);
+ pbl->push_back(std::move(p));
+
+ dout(40) << "data: ";
+ pbl->hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ _aio_log_finish(ioc, off, len);
+ return r < 0 ? r : 0;
+}
+
+int KernelDevice::aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << dendl;
+
+ int r = 0;
+#ifdef HAVE_LIBAIO
+ if (aio && dio) {
+ ceph_assert(is_valid_io(off, len));
+ _aio_log_start(ioc, off, len);
+ ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET]));
+ ++ioc->num_pending;
+ aio_t& aio = ioc->pending_aios.back();
+ bufferptr p = ceph::buffer::create_small_page_aligned(len);
+ aio.bl.append(std::move(p));
+ aio.bl.prepare_iov(&aio.iov);
+ aio.preadv(off, len);
+ dout(30) << aio << dendl;
+ pbl->append(aio.bl);
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << " aio " << &aio << dendl;
+ } else
+#endif
+ {
+ r = read(off, len, pbl, ioc, false);
+ }
+
+ return r;
+}
+
+int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
+{
+ uint64_t aligned_off = p2align(off, block_size);
+ uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
+ bufferptr p = ceph::buffer::create_small_page_aligned(aligned_len);
+ int r = 0;
+
+ auto start1 = mono_clock::now();
+ r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off);
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ ceph_assert((uint64_t)r == aligned_len);
+ memcpy(buf, p.c_str() + (off - aligned_off), len);
+
+ dout(40) << __func__ << " data: ";
+ bufferlist bl;
+ bl.append(buf, len);
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ return r < 0 ? r : 0;
+}
+
+int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
+ bool buffered)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << "buffered " << buffered
+ << dendl;
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+ int r = 0;
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+
+ //if it's direct io and unaligned, we have to use a internal buffer
+ if (!buffered && ((off % block_size != 0)
+ || (len % block_size != 0)
+ || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
+ return direct_read_unaligned(off, len, buf);
+
+ auto start1 = mono_clock::now();
+ if (buffered) {
+ //buffered read
+ auto off0 = off;
+ char *t = buf;
+ uint64_t left = len;
+ while (left > 0) {
+ r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " 0x" << std::hex << off << "~" << left
+ << std::dec << " error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ off += r;
+ t += r;
+ left -= r;
+ }
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off0 << "~" << len << std::dec
+ << " (buffered) since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ } else {
+ //direct and aligned read
+ r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off);
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " (direct) since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
+ << off << "~" << std::left << std::dec << " error: " << cpp_strerror(r)
+ << dendl;
+ goto out;
+ }
+ ceph_assert((uint64_t)r == len);
+ }
+
+ dout(40) << __func__ << " data: ";
+ bufferlist bl;
+ bl.append(buf, len);
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ return r < 0 ? r : 0;
+}
+
+int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << dendl;
+ ceph_assert(off % block_size == 0);
+ ceph_assert(len % block_size == 0);
+ int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED);
+ if (r) {
+ r = -r;
+ derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " error: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+}
diff --git a/src/blk/kernel/KernelDevice.h b/src/blk/kernel/KernelDevice.h
new file mode 100644
index 000000000..7ac9b1e7e
--- /dev/null
+++ b/src/blk/kernel/KernelDevice.h
@@ -0,0 +1,150 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_KERNELDEVICE_H
+#define CEPH_BLK_KERNELDEVICE_H
+
+#include <atomic>
+
+#include "include/types.h"
+#include "include/interval_set.h"
+#include "common/Thread.h"
+#include "include/utime.h"
+
+#include "aio/aio.h"
+#include "BlockDevice.h"
+
+#define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK)
+
+
+class KernelDevice : public BlockDevice {
+ std::vector<int> fd_directs, fd_buffereds;
+ bool enable_wrt = true;
+ std::string path;
+ bool aio, dio;
+
+ int vdo_fd = -1; ///< fd for vdo sysfs directory
+ std::string vdo_name;
+
+ std::string devname; ///< kernel dev name (/sys/block/$devname), if any
+
+ ceph::mutex debug_lock = ceph::make_mutex("KernelDevice::debug_lock");
+ interval_set<uint64_t> debug_inflight;
+
+ std::atomic<bool> io_since_flush = {false};
+ ceph::mutex flush_mutex = ceph::make_mutex("KernelDevice::flush_mutex");
+
+ std::unique_ptr<io_queue_t> io_queue;
+ aio_callback_t discard_callback;
+ void *discard_callback_priv;
+ bool aio_stop;
+ bool discard_started;
+ bool discard_stop;
+
+ ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
+ ceph::condition_variable discard_cond;
+ bool discard_running = false;
+ interval_set<uint64_t> discard_queued;
+ interval_set<uint64_t> discard_finishing;
+
+ struct AioCompletionThread : public Thread {
+ KernelDevice *bdev;
+ explicit AioCompletionThread(KernelDevice *b) : bdev(b) {}
+ void *entry() override {
+ bdev->_aio_thread();
+ return NULL;
+ }
+ } aio_thread;
+
+ struct DiscardThread : public Thread {
+ KernelDevice *bdev;
+ explicit DiscardThread(KernelDevice *b) : bdev(b) {}
+ void *entry() override {
+ bdev->_discard_thread();
+ return NULL;
+ }
+ } discard_thread;
+
+ std::atomic_int injecting_crash;
+
+ void _aio_thread();
+ void _discard_thread();
+ int queue_discard(interval_set<uint64_t> &to_release) override;
+
+ int _aio_start();
+ void _aio_stop();
+
+ int _discard_start();
+ void _discard_stop();
+
+ void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length);
+ void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length);
+
+ int _sync_write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint);
+
+ int _lock();
+
+ int direct_read_unaligned(uint64_t off, uint64_t len, char *buf);
+
+ // stalled aio debugging
+ aio_list_t debug_queue;
+ ceph::mutex debug_queue_lock = ceph::make_mutex("KernelDevice::debug_queue_lock");
+ aio_t *debug_oldest = nullptr;
+ utime_t debug_stall_since;
+ void debug_aio_link(aio_t& aio);
+ void debug_aio_unlink(aio_t& aio);
+
+ void _detect_vdo();
+ int choose_fd(bool buffered, int write_hint) const;
+
+public:
+ KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+
+ void aio_submit(IOContext *ioc) override;
+ void discard_drain() override;
+
+ int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override;
+ int get_devname(std::string *s) const override {
+ if (devname.empty()) {
+ return -ENOENT;
+ }
+ *s = devname;
+ return 0;
+ }
+ int get_devices(std::set<std::string> *ls) const override;
+
+ bool get_thin_utilization(uint64_t *total, uint64_t *avail) const override;
+
+ int read(uint64_t off, uint64_t len, ceph::buffer::list *pbl,
+ IOContext *ioc,
+ bool buffered) override;
+ int aio_read(uint64_t off, uint64_t len, ceph::buffer::list *pbl,
+ IOContext *ioc) override;
+ int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override;
+
+ int write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override;
+ int aio_write(uint64_t off, ceph::buffer::list& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) override;
+ int flush() override;
+ int discard(uint64_t offset, uint64_t len) override;
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) override;
+ int open(const std::string& path) override;
+ void close() override;
+};
+
+#endif
diff --git a/src/blk/kernel/io_uring.cc b/src/blk/kernel/io_uring.cc
new file mode 100644
index 000000000..17bd35bb4
--- /dev/null
+++ b/src/blk/kernel/io_uring.cc
@@ -0,0 +1,261 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "io_uring.h"
+
+#if defined(HAVE_LIBURING)
+
+#include "liburing.h"
+#include <sys/epoll.h>
+
+struct ioring_data {
+ struct io_uring io_uring;
+ pthread_mutex_t cq_mutex;
+ pthread_mutex_t sq_mutex;
+ int epoll_fd = -1;
+ std::map<int, int> fixed_fds_map;
+};
+
+static int ioring_get_cqe(struct ioring_data *d, unsigned int max,
+ struct aio_t **paio)
+{
+ struct io_uring *ring = &d->io_uring;
+ struct io_uring_cqe *cqe;
+
+ unsigned nr = 0;
+ unsigned head;
+ io_uring_for_each_cqe(ring, head, cqe) {
+ struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe);
+ io->rval = cqe->res;
+
+ paio[nr++] = io;
+
+ if (nr == max)
+ break;
+ }
+ io_uring_cq_advance(ring, nr);
+
+ return nr;
+}
+
+static int find_fixed_fd(struct ioring_data *d, int real_fd)
+{
+ auto it = d->fixed_fds_map.find(real_fd);
+ if (it == d->fixed_fds_map.end())
+ return -1;
+
+ return it->second;
+}
+
+static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe,
+ struct aio_t *io)
+{
+ int fixed_fd = find_fixed_fd(d, io->fd);
+
+ ceph_assert(fixed_fd != -1);
+
+ if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV)
+ io_uring_prep_writev(sqe, fixed_fd, &io->iov[0],
+ io->iov.size(), io->offset);
+ else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV)
+ io_uring_prep_readv(sqe, fixed_fd, &io->iov[0],
+ io->iov.size(), io->offset);
+ else
+ ceph_assert(0);
+
+ io_uring_sqe_set_data(sqe, io);
+ io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
+}
+
+static int ioring_queue(struct ioring_data *d, void *priv,
+ list<aio_t>::iterator beg, list<aio_t>::iterator end)
+{
+ struct io_uring *ring = &d->io_uring;
+ struct aio_t *io = nullptr;
+
+ ceph_assert(beg != end);
+
+ do {
+ struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+ if (!sqe)
+ break;
+
+ io = &*beg;
+ io->priv = priv;
+
+ init_sqe(d, sqe, io);
+
+ } while (++beg != end);
+
+ if (!io)
+ /* Queue is full, go and reap something first */
+ return 0;
+
+ return io_uring_submit(ring);
+}
+
+static void build_fixed_fds_map(struct ioring_data *d,
+ std::vector<int> &fds)
+{
+ int fixed_fd = 0;
+ for (int real_fd : fds) {
+ d->fixed_fds_map[real_fd] = fixed_fd++;
+ }
+}
+
+ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_) :
+ d(make_unique<ioring_data>()),
+ iodepth(iodepth_),
+ hipri(hipri_),
+ sq_thread(sq_thread_)
+{
+}
+
+ioring_queue_t::~ioring_queue_t()
+{
+}
+
+int ioring_queue_t::init(std::vector<int> &fds)
+{
+ unsigned flags = 0;
+
+ pthread_mutex_init(&d->cq_mutex, NULL);
+ pthread_mutex_init(&d->sq_mutex, NULL);
+
+ if (hipri)
+ flags |= IORING_SETUP_IOPOLL;
+ if (sq_thread)
+ flags |= IORING_SETUP_SQPOLL;
+
+ int ret = io_uring_queue_init(iodepth, &d->io_uring, flags);
+ if (ret < 0)
+ return ret;
+
+ ret = io_uring_register_files(&d->io_uring,
+ &fds[0], fds.size());
+ if (ret < 0) {
+ ret = -errno;
+ goto close_ring_fd;
+ }
+
+ build_fixed_fds_map(d.get(), fds);
+
+ d->epoll_fd = epoll_create1(0);
+ if (d->epoll_fd < 0) {
+ ret = -errno;
+ goto close_ring_fd;
+ }
+
+ struct epoll_event ev;
+ ev.events = EPOLLIN;
+ ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev);
+ if (ret < 0) {
+ ret = -errno;
+ goto close_epoll_fd;
+ }
+
+ return 0;
+
+close_epoll_fd:
+ close(d->epoll_fd);
+close_ring_fd:
+ io_uring_queue_exit(&d->io_uring);
+
+ return ret;
+}
+
+void ioring_queue_t::shutdown()
+{
+ d->fixed_fds_map.clear();
+ close(d->epoll_fd);
+ d->epoll_fd = -1;
+ io_uring_queue_exit(&d->io_uring);
+}
+
+int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
+ uint16_t aios_size, void *priv,
+ int *retries)
+{
+ (void)aios_size;
+ (void)retries;
+
+ pthread_mutex_lock(&d->sq_mutex);
+ int rc = ioring_queue(d.get(), priv, beg, end);
+ pthread_mutex_unlock(&d->sq_mutex);
+
+ return rc;
+}
+
+int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+get_cqe:
+ pthread_mutex_lock(&d->cq_mutex);
+ int events = ioring_get_cqe(d.get(), max, paio);
+ pthread_mutex_unlock(&d->cq_mutex);
+
+ if (events == 0) {
+ struct epoll_event ev;
+ int ret = TEMP_FAILURE_RETRY(epoll_wait(d->epoll_fd, &ev, 1, timeout_ms));
+ if (ret < 0)
+ events = -errno;
+ else if (ret > 0)
+ /* Time to reap */
+ goto get_cqe;
+ }
+
+ return events;
+}
+
+bool ioring_queue_t::supported()
+{
+ struct io_uring ring;
+ int ret = io_uring_queue_init(16, &ring, 0);
+ if (ret) {
+ return false;
+ }
+ io_uring_queue_exit(&ring);
+ return true;
+}
+
+#else // #if defined(HAVE_LIBURING)
+
+struct ioring_data {};
+
+ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_)
+{
+ ceph_assert(0);
+}
+
+ioring_queue_t::~ioring_queue_t()
+{
+ ceph_assert(0);
+}
+
+int ioring_queue_t::init(std::vector<int> &fds)
+{
+ ceph_assert(0);
+}
+
+void ioring_queue_t::shutdown()
+{
+ ceph_assert(0);
+}
+
+int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
+ uint16_t aios_size, void *priv,
+ int *retries)
+{
+ ceph_assert(0);
+}
+
+int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+ ceph_assert(0);
+}
+
+bool ioring_queue_t::supported()
+{
+ return false;
+}
+
+#endif // #if defined(HAVE_LIBURING)
diff --git a/src/blk/kernel/io_uring.h b/src/blk/kernel/io_uring.h
new file mode 100644
index 000000000..e7d0acde0
--- /dev/null
+++ b/src/blk/kernel/io_uring.h
@@ -0,0 +1,33 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "acconfig.h"
+
+#include "include/types.h"
+#include "aio/aio.h"
+
+struct ioring_data;
+
+struct ioring_queue_t final : public io_queue_t {
+ std::unique_ptr<ioring_data> d;
+ unsigned iodepth = 0;
+ bool hipri = false;
+ bool sq_thread = false;
+
+ typedef std::list<aio_t>::iterator aio_iter;
+
+ // Returns true if arch is x86-64 and kernel supports io_uring
+ static bool supported();
+
+ ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_);
+ ~ioring_queue_t() final;
+
+ int init(std::vector<int> &fds) final;
+ void shutdown() final;
+
+ int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) final;
+ int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
+};
diff --git a/src/blk/pmem/PMEMDevice.cc b/src/blk/pmem/PMEMDevice.cc
new file mode 100644
index 000000000..247ed0692
--- /dev/null
+++ b/src/blk/pmem/PMEMDevice.cc
@@ -0,0 +1,282 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Intel <jianpeng.ma@intel.com>
+ *
+ * Author: Jianpeng Ma <jianpeng.ma@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "PMEMDevice.h"
+#include "libpmem.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "include/stringify.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "common/blkdev.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev-PMEM(" << path << ") "
+
+PMEMDevice::PMEMDevice(CephContext *cct, aio_callback_t cb, void *cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ fd(-1), addr(0),
+ injecting_crash(0)
+{
+}
+
+int PMEMDevice::_lock()
+{
+ struct flock l;
+ memset(&l, 0, sizeof(l));
+ l.l_type = F_WRLCK;
+ l.l_whence = SEEK_SET;
+ l.l_start = 0;
+ l.l_len = 0;
+ int r = ::fcntl(fd, F_SETLK, &l);
+ if (r < 0)
+ return -errno;
+ return 0;
+}
+
+int PMEMDevice::open(const string& p)
+{
+ path = p;
+ int r = 0;
+ dout(1) << __func__ << " path " << path << dendl;
+
+ fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
+ if (fd < 0) {
+ r = -errno;
+ derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = _lock();
+ if (r < 0) {
+ derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
+ << dendl;
+ goto out_fail;
+ }
+
+ struct stat st;
+ r = ::fstat(fd, &st);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ size_t map_len;
+ addr = (char *)pmem_map_file(path.c_str(), 0, PMEM_FILE_EXCL, O_RDWR, &map_len, NULL);
+ if (addr == NULL) {
+ derr << __func__ << " pmem_map_file failed: " << pmem_errormsg() << dendl;
+ goto out_fail;
+ }
+ size = map_len;
+
+ // Operate as though the block size is 4 KB. The backing file
+ // blksize doesn't strictly matter except that some file systems may
+ // require a read/modify/write if we write something smaller than
+ // it.
+ block_size = g_conf()->bdev_block_size;
+ if (block_size != (unsigned)st.st_blksize) {
+ dout(1) << __func__ << " backing device/file reports st_blksize "
+ << st.st_blksize << ", using bdev_block_size "
+ << block_size << " anyway" << dendl;
+ }
+
+ dout(1) << __func__
+ << " size " << size
+ << " (" << byte_u_t(size) << ")"
+ << " block_size " << block_size
+ << " (" << byte_u_t(block_size) << ")"
+ << dendl;
+ return 0;
+
+ out_fail:
+ VOID_TEMP_FAILURE_RETRY(::close(fd));
+ fd = -1;
+ return r;
+}
+
+void PMEMDevice::close()
+{
+ dout(1) << __func__ << dendl;
+
+ ceph_assert(addr != NULL);
+ pmem_unmap(addr, size);
+ ceph_assert(fd >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd));
+ fd = -1;
+
+ path.clear();
+}
+
+int PMEMDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
+{
+ (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "driver"] = "PMEMDevice";
+ (*pm)[prefix + "type"] = "ssd";
+
+ struct stat st;
+ int r = ::fstat(fd, &st);
+ if (r < 0)
+ return -errno;
+ if (S_ISBLK(st.st_mode)) {
+ (*pm)[prefix + "access_mode"] = "blk";
+ char buffer[1024] = {0};
+ BlkDev blkdev(fd);
+
+ blkdev.model(buffer, sizeof(buffer));
+ (*pm)[prefix + "model"] = buffer;
+
+ buffer[0] = '\0';
+ blkdev.dev(buffer, sizeof(buffer));
+ (*pm)[prefix + "dev"] = buffer;
+
+ // nvme exposes a serial number
+ buffer[0] = '\0';
+ blkdev.serial(buffer, sizeof(buffer));
+ (*pm)[prefix + "serial"] = buffer;
+
+ } else {
+ (*pm)[prefix + "access_mode"] = "file";
+ (*pm)[prefix + "path"] = path;
+ }
+ return 0;
+}
+
+bool PMEMDevice::support(const std::string &path)
+{
+ int is_pmem = 0;
+ size_t map_len = 0;
+ void *addr = pmem_map_file(path.c_str(), 0, PMEM_FILE_EXCL, O_RDONLY, &map_len, &is_pmem);
+ if (addr != NULL) {
+ if (is_pmem) {
+ return true;
+ }
+ pmem_unmap(addr, map_len);
+ }
+ return false;
+}
+
+int PMEMDevice::flush()
+{
+ //Because all write is persist. So no need
+ return 0;
+}
+
+
+void PMEMDevice::aio_submit(IOContext *ioc)
+{
+ if (ioc->priv) {
+ ceph_assert(ioc->num_running == 0);
+ aio_callback(aio_callback_priv, ioc->priv);
+ } else {
+ ioc->try_aio_wake();
+ }
+ return;
+}
+
+int PMEMDevice::write(uint64_t off, bufferlist& bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ dout(40) << "data: ";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ if (g_conf()->bdev_inject_crash &&
+ rand() % g_conf()->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io " << off << "~" << len
+ << dendl;
+ ++injecting_crash;
+ return 0;
+ }
+
+ bufferlist::iterator p = bl.begin();
+ uint64_t off1 = off;
+ while (len) {
+ const char *data;
+ uint32_t l = p.get_ptr_and_advance(len, &data);
+ pmem_memcpy_persist(addr + off1, data, l);
+ len -= l;
+ off1 += l;
+ }
+ return 0;
+}
+
+int PMEMDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ return write(off, bl, buffered);
+}
+
+
+int PMEMDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ bufferptr p = buffer::create_small_page_aligned(len);
+ memcpy(p.c_str(), addr + off, len);
+
+ pbl->clear();
+ pbl->push_back(std::move(p));
+
+ dout(40) << "data: ";
+ pbl->hexdump(*_dout);
+ *_dout << dendl;
+
+ return 0;
+}
+
+int PMEMDevice::aio_read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc)
+{
+ return read(off, len, pbl, ioc, false);
+}
+
+int PMEMDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ memcpy(buf, addr + off, len);
+ return 0;
+}
+
+
+int PMEMDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
+
+
diff --git a/src/blk/pmem/PMEMDevice.h b/src/blk/pmem/PMEMDevice.h
new file mode 100644
index 000000000..a240d2a7b
--- /dev/null
+++ b/src/blk/pmem/PMEMDevice.h
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Intel <jianpeng.ma@intel.com>
+ *
+ * Author: Jianpeng Ma <jianpeng.ma@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_PMEMDEVICE_H
+#define CEPH_BLK_PMEMDEVICE_H
+
+#include <atomic>
+
+#include "os/fs/FS.h"
+#include "include/interval_set.h"
+#include "aio/aio.h"
+#include "BlockDevice.h"
+
+class PMEMDevice : public BlockDevice {
+ int fd;
+ char *addr; //the address of mmap
+ std::string path;
+
+ ceph::mutex debug_lock = ceph::make_mutex("PMEMDevice::debug_lock");
+ interval_set<uint64_t> debug_inflight;
+
+ std::atomic_int injecting_crash;
+ int _lock();
+
+public:
+ PMEMDevice(CephContext *cct, aio_callback_t cb, void *cbpriv);
+
+
+ void aio_submit(IOContext *ioc) override;
+
+ int collect_metadata(const std::string& prefix, map<std::string,std::string> *pm) const override;
+
+ static bool support(const std::string& path);
+
+ int read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered) override;
+ int aio_read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc) override;
+
+ int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override;
+ int write(uint64_t off, bufferlist& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override;
+ int aio_write(uint64_t off, bufferlist& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) override;
+ int flush() override;
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) override;
+ int open(const std::string &path) override;
+ void close() override;
+
+private:
+ bool is_valid_io(uint64_t off, uint64_t len) const {
+ return (len > 0 &&
+ off < size &&
+ off + len <= size);
+ }
+};
+
+#endif
diff --git a/src/blk/spdk/NVMEDevice.cc b/src/blk/spdk/NVMEDevice.cc
new file mode 100644
index 000000000..935d3bbd1
--- /dev/null
+++ b/src/blk/spdk/NVMEDevice.cc
@@ -0,0 +1,971 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+//
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <fstream>
+#include <functional>
+#include <map>
+#include <thread>
+#include <boost/intrusive/slist.hpp>
+
+#include <spdk/nvme.h>
+
+#include "include/intarith.h"
+#include "include/stringify.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "common/perf_counters.h"
+
+#include "NVMEDevice.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << sn << ") "
+
+static constexpr uint16_t data_buffer_default_num = 1024;
+
+static constexpr uint32_t data_buffer_size = 8192;
+
+static constexpr uint16_t inline_segment_num = 32;
+
+static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
+
+struct IORequest {
+ uint16_t cur_seg_idx = 0;
+ uint16_t nseg;
+ uint32_t cur_seg_left = 0;
+ void *inline_segs[inline_segment_num];
+ void **extra_segs = nullptr;
+};
+
+namespace bi = boost::intrusive;
+struct data_cache_buf : public bi::slist_base_hook<bi::link_mode<bi::normal_link>>
+{};
+
+struct Task;
+
+class SharedDriverData {
+ unsigned id;
+ spdk_nvme_transport_id trid;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ uint32_t block_size = 0;
+ uint64_t size = 0;
+
+ public:
+ std::vector<NVMEDevice*> registered_devices;
+ friend class SharedDriverQueueData;
+ SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_,
+ spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_)
+ : id(id_),
+ trid(trid_),
+ ctrlr(c),
+ ns(ns_) {
+ block_size = spdk_nvme_ns_get_extended_sector_size(ns);
+ size = spdk_nvme_ns_get_size(ns);
+ }
+
+ bool is_equal(const spdk_nvme_transport_id& trid2) const {
+ return spdk_nvme_transport_id_compare(&trid, &trid2) == 0;
+ }
+ ~SharedDriverData() {
+ }
+
+ void register_device(NVMEDevice *device) {
+ registered_devices.push_back(device);
+ }
+
+ void remove_device(NVMEDevice *device) {
+ std::vector<NVMEDevice*> new_devices;
+ for (auto &&it : registered_devices) {
+ if (it != device)
+ new_devices.push_back(it);
+ }
+ registered_devices.swap(new_devices);
+ }
+
+ uint32_t get_block_size() {
+ return block_size;
+ }
+ uint64_t get_size() {
+ return size;
+ }
+};
+
+class SharedDriverQueueData {
+ NVMEDevice *bdev;
+ SharedDriverData *driver;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ std::string sn;
+ uint32_t block_size;
+ uint32_t max_queue_depth;
+ struct spdk_nvme_qpair *qpair;
+ bool reap_io = false;
+ int alloc_buf_from_pool(Task *t, bool write);
+
+ public:
+ uint32_t current_queue_depth = 0;
+ std::atomic_ulong completed_op_seq, queue_op_seq;
+ bi::slist<data_cache_buf, bi::constant_time_size<true>> data_buf_list;
+ void _aio_handle(Task *t, IOContext *ioc);
+
+ SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
+ : bdev(bdev),
+ driver(driver) {
+ ctrlr = driver->ctrlr;
+ ns = driver->ns;
+ block_size = driver->block_size;
+
+ struct spdk_nvme_io_qpair_opts opts = {};
+ spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
+ opts.qprio = SPDK_NVME_QPRIO_URGENT;
+ // usable queue depth should minus 1 to aovid overflow.
+ max_queue_depth = opts.io_queue_size - 1;
+ qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
+ ceph_assert(qpair != NULL);
+
+ // allocate spdk dma memory
+ for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+ void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
+ if (!b) {
+ derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
+ ceph_assert(b);
+ }
+ data_buf_list.push_front(*reinterpret_cast<data_cache_buf *>(b));
+ }
+
+ bdev->queue_number++;
+ if (bdev->queue_number.load() == 1)
+ reap_io = true;
+ }
+
+ ~SharedDriverQueueData() {
+ if (qpair) {
+ spdk_nvme_ctrlr_free_io_qpair(qpair);
+ bdev->queue_number--;
+ }
+
+ data_buf_list.clear_and_dispose(spdk_dma_free);
+ }
+};
+
+struct Task {
+ NVMEDevice *device;
+ IOContext *ctx = nullptr;
+ IOCommand command;
+ uint64_t offset;
+ uint64_t len;
+ bufferlist bl;
+ std::function<void()> fill_cb;
+ Task *next = nullptr;
+ int64_t return_code;
+ Task *primary = nullptr;
+ ceph::coarse_real_clock::time_point start;
+ IORequest io_request = {};
+ ceph::mutex lock = ceph::make_mutex("Task::lock");
+ ceph::condition_variable cond;
+ SharedDriverQueueData *queue = nullptr;
+ // reference count by subtasks.
+ int ref = 0;
+ Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
+ Task *p = nullptr)
+ : device(dev), command(c), offset(off), len(l),
+ return_code(rc), primary(p),
+ start(ceph::coarse_real_clock::now()) {
+ if (primary) {
+ primary->ref++;
+ return_code = primary->return_code;
+ }
+ }
+ ~Task() {
+ if (primary)
+ primary->ref--;
+ ceph_assert(!io_request.nseg);
+ }
+ void release_segs(SharedDriverQueueData *queue_data) {
+ if (io_request.extra_segs) {
+ for (uint16_t i = 0; i < io_request.nseg; i++) {
+ auto buf = reinterpret_cast<data_cache_buf *>(io_request.extra_segs[i]);
+ queue_data->data_buf_list.push_front(*buf);
+ }
+ delete io_request.extra_segs;
+ } else if (io_request.nseg) {
+ for (uint16_t i = 0; i < io_request.nseg; i++) {
+ auto buf = reinterpret_cast<data_cache_buf *>(io_request.inline_segs[i]);
+ queue_data->data_buf_list.push_front(*buf);
+ }
+ }
+ ctx->total_nseg -= io_request.nseg;
+ io_request.nseg = 0;
+ }
+
+ void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
+ uint64_t copied = 0;
+ uint64_t left = len;
+ void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
+ uint16_t i = 0;
+ while (left > 0) {
+ char *src = static_cast<char*>(segs[i++]);
+ uint64_t need_copy = std::min(left, data_buffer_size-off);
+ memcpy(buf+copied, src+off, need_copy);
+ off = 0;
+ left -= need_copy;
+ copied += need_copy;
+ }
+ }
+};
+
+static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
+{
+ Task *t = static_cast<Task*>(cb_arg);
+ uint32_t i = sgl_offset / data_buffer_size;
+ uint32_t offset = i * data_buffer_size;
+ ceph_assert(i <= t->io_request.nseg);
+
+ for (; i < t->io_request.nseg; i++) {
+ offset += data_buffer_size;
+ if (offset > sgl_offset) {
+ if (offset > t->len)
+ offset = t->len;
+ break;
+ }
+ }
+
+ t->io_request.cur_seg_idx = i;
+ t->io_request.cur_seg_left = offset - sgl_offset;
+ return ;
+}
+
+static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
+{
+ uint32_t size;
+ void *addr;
+ Task *t = static_cast<Task*>(cb_arg);
+ if (t->io_request.cur_seg_idx >= t->io_request.nseg) {
+ *length = 0;
+ *address = 0;
+ return 0;
+ }
+
+ addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx];
+
+ size = data_buffer_size;
+ if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) {
+ uint64_t tail = t->len % data_buffer_size;
+ if (tail) {
+ size = (uint32_t) tail;
+ }
+ }
+
+ if (t->io_request.cur_seg_left) {
+ *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left);
+ *length = t->io_request.cur_seg_left;
+ t->io_request.cur_seg_left = 0;
+ } else {
+ *address = addr;
+ *length = size;
+ }
+
+ t->io_request.cur_seg_idx++;
+ return 0;
+}
+
+int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
+{
+ uint64_t count = t->len / data_buffer_size;
+ if (t->len % data_buffer_size)
+ ++count;
+ void **segs;
+ if (count > data_buf_list.size())
+ return -ENOMEM;
+ if (count <= inline_segment_num) {
+ segs = t->io_request.inline_segs;
+ } else {
+ t->io_request.extra_segs = new void*[count];
+ segs = t->io_request.extra_segs;
+ }
+ for (uint16_t i = 0; i < count; i++) {
+ ceph_assert(!data_buf_list.empty());
+ segs[i] = &data_buf_list.front();
+ ceph_assert(segs[i] != nullptr);
+ data_buf_list.pop_front();
+ }
+ t->io_request.nseg = count;
+ t->ctx->total_nseg += count;
+ if (write) {
+ auto blp = t->bl.begin();
+ uint32_t len = 0;
+ uint16_t i = 0;
+ for (; i < count - 1; ++i) {
+ blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
+ len += data_buffer_size;
+ }
+ blp.copy(t->bl.length() - len, static_cast<char*>(segs[i]));
+ }
+
+ return 0;
+}
+
+void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
+{
+ dout(20) << __func__ << " start" << dendl;
+
+ int r = 0;
+ uint64_t lba_off, lba_count;
+ uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion");
+ uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep");
+
+ while (ioc->num_running) {
+ again:
+ dout(40) << __func__ << " polling" << dendl;
+ if (current_queue_depth) {
+ r = spdk_nvme_qpair_process_completions(qpair, max_io_completion);
+ if (r < 0) {
+ ceph_abort();
+ } else if (r == 0) {
+ usleep(io_sleep_in_us);
+ }
+ }
+
+ for (; t; t = t->next) {
+ if (current_queue_depth == max_queue_depth) {
+ // no slots
+ goto again;
+ }
+
+ t->queue = this;
+ lba_off = t->offset / block_size;
+ lba_count = t->len / block_size;
+ switch (t->command) {
+ case IOCommand::WRITE_COMMAND:
+ {
+ dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
+ r = alloc_buf_from_pool(t, true);
+ if (r < 0) {
+ goto again;
+ }
+
+ r = spdk_nvme_ns_cmd_writev(
+ ns, qpair, lba_off, lba_count, io_complete, t, 0,
+ data_buf_reset_sgl, data_buf_next_sge);
+ if (r < 0) {
+ derr << __func__ << " failed to do write command: " << cpp_strerror(r) << dendl;
+ t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ case IOCommand::READ_COMMAND:
+ {
+ dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl;
+ r = alloc_buf_from_pool(t, false);
+ if (r < 0) {
+ goto again;
+ }
+
+ r = spdk_nvme_ns_cmd_readv(
+ ns, qpair, lba_off, lba_count, io_complete, t, 0,
+ data_buf_reset_sgl, data_buf_next_sge);
+ if (r < 0) {
+ derr << __func__ << " failed to read: " << cpp_strerror(r) << dendl;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ case IOCommand::FLUSH_COMMAND:
+ {
+ dout(20) << __func__ << " flush command issueed " << dendl;
+ r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
+ if (r < 0) {
+ derr << __func__ << " failed to flush: " << cpp_strerror(r) << dendl;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ }
+ current_queue_depth++;
+ }
+ }
+
+ if (reap_io)
+ bdev->reap_ioc();
+ dout(20) << __func__ << " end" << dendl;
+}
+
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev "
+
+class NVMEManager {
+ public:
+ struct ProbeContext {
+ spdk_nvme_transport_id trid;
+ NVMEManager *manager;
+ SharedDriverData *driver;
+ bool done;
+ };
+
+ private:
+ ceph::mutex lock = ceph::make_mutex("NVMEManager::lock");
+ bool stopping = false;
+ std::vector<SharedDriverData*> shared_driver_datas;
+ std::thread dpdk_thread;
+ ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock");
+ ceph::condition_variable probe_queue_cond;
+ std::list<ProbeContext*> probe_queue;
+
+ public:
+ NVMEManager() {}
+ ~NVMEManager() {
+ if (!dpdk_thread.joinable())
+ return;
+ {
+ std::lock_guard guard(probe_queue_lock);
+ stopping = true;
+ probe_queue_cond.notify_all();
+ }
+ dpdk_thread.join();
+ }
+
+ int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver);
+ void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+ spdk_nvme_ns *ns;
+ int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
+ ceph_assert(num_ns >= 1);
+ if (num_ns > 1) {
+ dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
+ }
+ ns = spdk_nvme_ctrlr_get_ns(c, 1);
+ if (!ns) {
+ derr << __func__ << " failed to get namespace at 1" << dendl;
+ ceph_abort();
+ }
+ dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl;
+
+ // only support one device per osd now!
+ ceph_assert(shared_driver_datas.empty());
+ // index 0 is occurred by master thread
+ shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns));
+ *driver = shared_driver_datas.back();
+ }
+};
+
+static NVMEManager manager;
+
+static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
+{
+ NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+
+ if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
+ dout(0) << __func__ << " only probe local nvme device" << dendl;
+ return false;
+ }
+
+ dout(0) << __func__ << " found device at: "
+ << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", "
+ << "traddr=" << trid->traddr << dendl;
+ if (spdk_nvme_transport_id_compare(&ctx->trid, trid)) {
+ dout(0) << __func__ << " device traddr (" << ctx->trid.traddr << ") not match " << trid->traddr << dendl;
+ return false;
+ }
+
+ opts->io_queue_size = UINT16_MAX;
+
+ return true;
+}
+
+static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
+ struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
+{
+ auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+ ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver);
+}
+
+static int hex2dec(unsigned char c)
+{
+ if (isdigit(c))
+ return c - '0';
+ else if (isupper(c))
+ return c - 'A' + 10;
+ else
+ return c - 'a' + 10;
+}
+
+static int find_first_bitset(const string& s)
+{
+ auto e = s.rend();
+ if (s.compare(0, 2, "0x") == 0 ||
+ s.compare(0, 2, "0X") == 0) {
+ advance(e, -2);
+ }
+ auto p = s.rbegin();
+ for (int pos = 0; p != e; ++p, pos += 4) {
+ if (!isxdigit(*p)) {
+ return -EINVAL;
+ }
+ if (int val = hex2dec(*p); val != 0) {
+ return pos + ffs(val);
+ }
+ }
+ return 0;
+}
+
+int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver)
+{
+ std::lock_guard l(lock);
+ for (auto &&it : shared_driver_datas) {
+ if (it->is_equal(trid)) {
+ *driver = it;
+ return 0;
+ }
+ }
+
+ struct spdk_pci_addr pci_addr;
+ int rc = spdk_pci_addr_parse(&pci_addr, trid.traddr);
+ if (rc < 0) {
+ derr << __func__ << " invalid transport address: " << trid.traddr << dendl;
+ return -ENOENT;
+ }
+ auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask");
+ int m_core_arg = find_first_bitset(coremask_arg);
+ // at least one core is needed for using spdk
+ if (m_core_arg <= 0) {
+ derr << __func__ << " invalid bluestore_spdk_coremask, "
+ << "at least one core is needed" << dendl;
+ return -ENOENT;
+ }
+ m_core_arg -= 1;
+
+ uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem");
+
+ if (!dpdk_thread.joinable()) {
+ dpdk_thread = std::thread(
+ [this, coremask_arg, m_core_arg, mem_size_arg, pci_addr]() {
+ struct spdk_env_opts opts;
+ struct spdk_pci_addr addr = pci_addr;
+ int r;
+
+ spdk_env_opts_init(&opts);
+ opts.name = "nvme-device-manager";
+ opts.core_mask = coremask_arg.c_str();
+ opts.master_core = m_core_arg;
+ opts.mem_size = mem_size_arg;
+ opts.pci_whitelist = &addr;
+ opts.num_pci_addr = 1;
+ spdk_env_init(&opts);
+ spdk_unaffinitize_thread();
+
+ std::unique_lock l(probe_queue_lock);
+ while (!stopping) {
+ if (!probe_queue.empty()) {
+ ProbeContext* ctxt = probe_queue.front();
+ probe_queue.pop_front();
+ r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
+ if (r < 0) {
+ ceph_assert(!ctxt->driver);
+ derr << __func__ << " device probe nvme failed" << dendl;
+ }
+ ctxt->done = true;
+ probe_queue_cond.notify_all();
+ } else {
+ probe_queue_cond.wait(l);
+ }
+ }
+ for (auto p : probe_queue)
+ p->done = true;
+ probe_queue_cond.notify_all();
+ }
+ );
+ }
+
+ ProbeContext ctx{trid, this, nullptr, false};
+ {
+ std::unique_lock l(probe_queue_lock);
+ probe_queue.push_back(&ctx);
+ while (!ctx.done)
+ probe_queue_cond.wait(l);
+ }
+ if (!ctx.driver)
+ return -1;
+ *driver = ctx.driver;
+
+ return 0;
+}
+
+void io_complete(void *t, const struct spdk_nvme_cpl *completion)
+{
+ Task *task = static_cast<Task*>(t);
+ IOContext *ctx = task->ctx;
+ SharedDriverQueueData *queue = task->queue;
+
+ ceph_assert(queue != NULL);
+ ceph_assert(ctx != NULL);
+ --queue->current_queue_depth;
+ if (task->command == IOCommand::WRITE_COMMAND) {
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " write/zero op successfully, left "
+ << queue->queue_op_seq - queue->completed_op_seq << dendl;
+ // check waiting count before doing callback (which may
+ // destroy this ioc).
+ if (ctx->priv) {
+ if (!--ctx->num_running) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ } else {
+ ctx->try_aio_wake();
+ }
+ task->release_segs(queue);
+ delete task;
+ } else if (task->command == IOCommand::READ_COMMAND) {
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " read op successfully" << dendl;
+ task->fill_cb();
+ task->release_segs(queue);
+ // read submitted by AIO
+ if (!task->return_code) {
+ if (ctx->priv) {
+ if (!--ctx->num_running) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ } else {
+ ctx->try_aio_wake();
+ }
+ delete task;
+ } else {
+ if (Task* primary = task->primary; primary != nullptr) {
+ delete task;
+ if (!primary->ref)
+ primary->return_code = 0;
+ } else {
+ task->return_code = 0;
+ }
+ --ctx->num_running;
+ }
+ } else {
+ ceph_assert(task->command == IOCommand::FLUSH_COMMAND);
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " flush op successfully" << dendl;
+ task->return_code = 0;
+ }
+}
+
+// ----------------
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << name << ") "
+
+NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ driver(nullptr)
+{
+}
+
+bool NVMEDevice::support(const std::string& path)
+{
+ char buf[PATH_MAX + 1];
+ int r = ::readlink(path.c_str(), buf, sizeof(buf) - 1);
+ if (r >= 0) {
+ buf[r] = '\0';
+ char *bname = ::basename(buf);
+ if (strncmp(bname, SPDK_PREFIX, sizeof(SPDK_PREFIX)-1) == 0) {
+ return true;
+ }
+ }
+ return false;
+}
+
+int NVMEDevice::open(const string& p)
+{
+ dout(1) << __func__ << " path " << p << dendl;
+
+ std::ifstream ifs(p);
+ if (!ifs) {
+ derr << __func__ << " unable to open " << p << dendl;
+ return -1;
+ }
+ string val;
+ std::getline(ifs, val);
+ spdk_nvme_transport_id trid;
+ if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) {
+ derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ if (int r = manager.try_get(trid, &driver); r < 0) {
+ derr << __func__ << " failed to get nvme device with transport address " << trid.traddr << dendl;
+ return r;
+ }
+
+ driver->register_device(this);
+ block_size = driver->get_block_size();
+ size = driver->get_size();
+ name = trid.traddr;
+
+ //nvme is non-rotational device.
+ rotational = false;
+
+ // round size down to an even block
+ size &= ~(block_size - 1);
+
+ dout(1) << __func__ << " size " << size << " (" << byte_u_t(size) << ")"
+ << " block_size " << block_size << " (" << byte_u_t(block_size)
+ << ")" << dendl;
+
+
+ return 0;
+}
+
+void NVMEDevice::close()
+{
+ dout(1) << __func__ << dendl;
+
+ name.clear();
+ driver->remove_device(this);
+
+ dout(1) << __func__ << " end" << dendl;
+}
+
+int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
+{
+ (*pm)[prefix + "rotational"] = "0";
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "driver"] = "NVMEDevice";
+ (*pm)[prefix + "type"] = "nvme";
+ (*pm)[prefix + "access_mode"] = "spdk";
+ (*pm)[prefix + "nvme_serial_number"] = name;
+
+ return 0;
+}
+
+int NVMEDevice::flush()
+{
+ return 0;
+}
+
+void NVMEDevice::aio_submit(IOContext *ioc)
+{
+ dout(20) << __func__ << " ioc " << ioc << " pending "
+ << ioc->num_pending.load() << " running "
+ << ioc->num_running.load() << dendl;
+ int pending = ioc->num_pending.load();
+ Task *t = static_cast<Task*>(ioc->nvme_task_first);
+ if (pending && t) {
+ ioc->num_running += pending;
+ ioc->num_pending -= pending;
+ ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
+ // Only need to push the first entry
+ ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
+
+ thread_local SharedDriverQueueData queue_t = SharedDriverQueueData(this, driver);
+ queue_t._aio_handle(t, ioc);
+ }
+}
+
+static void ioc_append_task(IOContext *ioc, Task *t)
+{
+ Task *first, *last;
+
+ first = static_cast<Task*>(ioc->nvme_task_first);
+ last = static_cast<Task*>(ioc->nvme_task_last);
+ if (last)
+ last->next = t;
+ if (!first)
+ ioc->nvme_task_first = t;
+ ioc->nvme_task_last = t;
+ ++ioc->num_pending;
+}
+
+static void write_split(
+ NVMEDevice *dev,
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc)
+{
+ uint64_t remain_len = bl.length(), begin = 0, write_size;
+ Task *t;
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+
+ while (remain_len > 0) {
+ write_size = std::min(remain_len, split_size);
+ t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ bl.splice(0, write_size, &t->bl);
+ remain_len -= write_size;
+ t->ctx = ioc;
+ ioc_append_task(ioc, t);
+ begin += write_size;
+ }
+}
+
+static void make_read_tasks(
+ NVMEDevice *dev,
+ uint64_t aligned_off,
+ IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
+ uint64_t orig_off, uint64_t orig_len)
+{
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+ uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
+ auto begin = aligned_off;
+ const auto aligned_end = begin + aligned_len;
+
+ for (; begin < aligned_end; begin += split_size) {
+ auto read_size = std::min(aligned_end - begin, split_size);
+ auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
+ Task *t = nullptr;
+
+ if (primary && (aligned_len <= split_size)) {
+ t = primary;
+ } else {
+ t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
+ }
+
+ t->ctx = ioc;
+
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ t->fill_cb = [buf, t, tmp_off, tmp_len] {
+ t->copy_to_buf(buf, tmp_off, tmp_len);
+ };
+
+ ioc_append_task(ioc, t);
+ remain_orig_len -= tmp_len;
+ buf += tmp_len;
+ tmp_off = 0;
+ }
+}
+
+int NVMEDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
+ << " buffered " << buffered << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ write_split(this, off, bl, ioc);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+
+ return 0;
+}
+
+int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << " buffered "
+ << buffered << dendl;
+ ceph_assert(off % block_size == 0);
+ ceph_assert(len % block_size == 0);
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+
+ IOContext ioc(cct, NULL);
+ write_split(this, off, bl, &ioc);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&ioc);
+ ioc.aio_wait();
+ return 0;
+}
+
+int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ Task t(this, IOCommand::READ_COMMAND, off, len, 1);
+ bufferptr p = buffer::create_small_page_aligned(len);
+ char *buf = p.c_str();
+
+ // for sync read, need to control IOContext in itself
+ IOContext read_ioc(cct, nullptr);
+ make_read_tasks(this, off, &read_ioc, buf, len, &t, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&read_ioc);
+
+ pbl->push_back(std::move(p));
+ return t.return_code;
+}
+
+int NVMEDevice::aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc)
+{
+ dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+ ceph_assert(is_valid_io(off, len));
+ bufferptr p = buffer::create_small_page_aligned(len);
+ pbl->append(p);
+ char* buf = p.c_str();
+
+ make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
+
+int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
+{
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+
+ uint64_t aligned_off = p2align(off, block_size);
+ uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
+ dout(5) << __func__ << " " << off << "~" << len
+ << " aligned " << aligned_off << "~" << aligned_len << dendl;
+ IOContext ioc(g_ceph_context, nullptr);
+ Task t(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
+
+ make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, &t, off, len);
+ aio_submit(&ioc);
+
+ return t.return_code;
+}
+
+int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
diff --git a/src/blk/spdk/NVMEDevice.h b/src/blk/spdk/NVMEDevice.h
new file mode 100644
index 000000000..352856093
--- /dev/null
+++ b/src/blk/spdk/NVMEDevice.h
@@ -0,0 +1,85 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_NVMEDEVICE
+#define CEPH_BLK_NVMEDEVICE
+
+#include <queue>
+#include <map>
+#include <limits>
+
+// since _Static_assert introduced in c11
+#define _Static_assert static_assert
+
+
+#include "include/interval_set.h"
+#include "common/ceph_time.h"
+#include "BlockDevice.h"
+
+enum class IOCommand {
+ READ_COMMAND,
+ WRITE_COMMAND,
+ FLUSH_COMMAND
+};
+
+class SharedDriverData;
+class SharedDriverQueueData;
+
+class NVMEDevice : public BlockDevice {
+ /**
+ * points to pinned, physically contiguous memory region;
+ * contains 4KB IDENTIFY structure for controller which is
+ * target for CONTROLLER IDENTIFY command during initialization
+ */
+ SharedDriverData *driver;
+ string name;
+
+ public:
+ std::atomic_int queue_number = {0};
+ SharedDriverData *get_driver() { return driver; }
+
+ NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv);
+
+ bool supported_bdev_label() override { return false; }
+
+ static bool support(const std::string& path);
+
+ void aio_submit(IOContext *ioc) override;
+
+ int read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered) override;
+ int aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc) override;
+ int aio_write(uint64_t off, bufferlist& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) override;
+ int write(uint64_t off, bufferlist& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override;
+ int flush() override;
+ int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override;
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) override;
+ int open(const string& path) override;
+ void close() override;
+ int collect_metadata(const string& prefix, map<string,string> *pm) const override;
+};
+
+#endif
diff --git a/src/blk/zoned/HMSMRDevice.cc b/src/blk/zoned/HMSMRDevice.cc
new file mode 100644
index 000000000..8a30be9b0
--- /dev/null
+++ b/src/blk/zoned/HMSMRDevice.cc
@@ -0,0 +1,1217 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ * Copyright (C) 2020 Abutalib Aghayev
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/file.h>
+
+#include "HMSMRDevice.h"
+#include "include/intarith.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "include/scope_guard.h"
+#include "include/stringify.h"
+#include "common/blkdev.h"
+#include "common/errno.h"
+#if defined(__FreeBSD__)
+#include "bsm/audit_errno.h"
+#endif
+#include "common/debug.h"
+#include "common/numa.h"
+
+#include "kernel/io_uring.h"
+
+extern "C" {
+#include <libzbd/zbd.h>
+}
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "smrbdev(" << this << " " << path << ") "
+
+HMSMRDevice::HMSMRDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ aio(false), dio(false),
+ discard_callback(d_cb),
+ discard_callback_priv(d_cbpriv),
+ aio_stop(false),
+ discard_started(false),
+ discard_stop(false),
+ aio_thread(this),
+ discard_thread(this),
+ injecting_crash(0)
+{
+ fd_directs.resize(WRITE_LIFE_MAX, -1);
+ fd_buffereds.resize(WRITE_LIFE_MAX, -1);
+
+ bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring");
+ unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
+
+ if (use_ioring && ioring_queue_t::supported()) {
+ bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri");
+ bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll");
+ io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll);
+ } else {
+ static bool once;
+ if (use_ioring && !once) {
+ derr << "WARNING: io_uring API is not supported! Fallback to libaio!"
+ << dendl;
+ once = true;
+ }
+ io_queue = std::make_unique<aio_queue_t>(iodepth);
+ }
+}
+
+bool HMSMRDevice::support(const std::string& path)
+{
+ return zbd_device_is_zoned(path.c_str()) == 1;
+}
+
+int HMSMRDevice::_lock()
+{
+ dout(10) << __func__ << " " << fd_directs[WRITE_LIFE_NOT_SET] << dendl;
+ int r = ::flock(fd_directs[WRITE_LIFE_NOT_SET], LOCK_EX | LOCK_NB);
+ if (r < 0) {
+ derr << __func__ << " flock failed on " << path << dendl;
+ return -errno;
+ }
+ return 0;
+}
+
+bool HMSMRDevice::set_smr_params(const std::string& path) {
+ dout(10) << __func__ << " opening " << path << dendl;
+
+ int dev = zbd_open(path.c_str(), O_RDWR | O_DIRECT | O_LARGEFILE, nullptr);
+ if (dev < 0) {
+ return false;
+ }
+ auto close_dev = make_scope_guard([dev] { zbd_close(dev); });
+
+ unsigned int nr_zones = 0;
+ if (zbd_report_nr_zones(dev, 0, 0, ZBD_RO_NOT_WP, &nr_zones) != 0) {
+ return false;
+ }
+
+ std::vector<zbd_zone> zones(nr_zones);
+ if (zbd_report_zones(dev, 0, 0, ZBD_RO_NOT_WP, zones.data(), &nr_zones) != 0) {
+ return false;
+ }
+
+ zone_size = zbd_zone_len(&zones[0]);
+ conventional_region_size = nr_zones * zone_size;
+
+ dout(10) << __func__ << " setting zone size to " << zone_size
+ << " and conventional region size to " << conventional_region_size
+ << dendl;
+
+ return true;
+}
+
+int HMSMRDevice::open(const string& p)
+{
+ path = p;
+ int r = 0, i = 0;
+ dout(1) << __func__ << " path " << path << dendl;
+
+ for (i = 0; i < WRITE_LIFE_MAX; i++) {
+ int fd = ::open(path.c_str(), O_RDWR | O_DIRECT);
+ if (fd < 0) {
+ r = -errno;
+ break;
+ }
+ fd_directs[i] = fd;
+
+ fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
+ if (fd < 0) {
+ r = -errno;
+ break;
+ }
+ fd_buffereds[i] = fd;
+ }
+
+ if (i != WRITE_LIFE_MAX) {
+ derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ if (!set_smr_params(p)) {
+ derr << __func__ << " failed to set HM-SMR parameters" << dendl;
+ goto out_fail;
+ }
+
+#if defined(F_SET_FILE_RW_HINT)
+ for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) {
+ if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) {
+ r = -errno;
+ break;
+ }
+ if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) {
+ r = -errno;
+ break;
+ }
+ }
+ if (i != WRITE_LIFE_MAX) {
+ enable_wrt = false;
+ dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl;
+ }
+#endif
+
+ dio = true;
+ aio = cct->_conf->bdev_aio;
+ if (!aio) {
+ ceph_abort_msg("non-aio not supported");
+ }
+
+ // disable readahead as it will wreak havoc on our mix of
+ // directio/aio and buffered io.
+ r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM);
+ if (r) {
+ r = -r;
+ derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ if (lock_exclusive) {
+ r = _lock();
+ if (r < 0) {
+ derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
+ << dendl;
+ goto out_fail;
+ }
+ }
+
+ struct stat st;
+ r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ // Operate as though the block size is 4 KB. The backing file
+ // blksize doesn't strictly matter except that some file systems may
+ // require a read/modify/write if we write something smaller than
+ // it.
+ block_size = cct->_conf->bdev_block_size;
+ if (block_size != (unsigned)st.st_blksize) {
+ dout(1) << __func__ << " backing device/file reports st_blksize "
+ << st.st_blksize << ", using bdev_block_size "
+ << block_size << " anyway" << dendl;
+ }
+
+
+ {
+ BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]);
+ BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]);
+
+ if (S_ISBLK(st.st_mode)) {
+ int64_t s;
+ r = blkdev_direct.get_size(&s);
+ if (r < 0) {
+ goto out_fail;
+ }
+ size = s;
+ } else {
+ size = st.st_size;
+ }
+
+ char partition[PATH_MAX], devname[PATH_MAX];
+ if ((r = blkdev_buffered.partition(partition, PATH_MAX)) ||
+ (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) {
+ derr << "unable to get device name for " << path << ": "
+ << cpp_strerror(r) << dendl;
+ rotational = true;
+ } else {
+ dout(20) << __func__ << " devname " << devname << dendl;
+ rotational = blkdev_buffered.is_rotational();
+ support_discard = blkdev_buffered.support_discard();
+ this->devname = devname;
+ _detect_vdo();
+ }
+ }
+
+ r = _aio_start();
+ if (r < 0) {
+ goto out_fail;
+ }
+ _discard_start();
+
+ // round size down to an even block
+ size &= ~(block_size - 1);
+
+ dout(1) << __func__
+ << " size " << size
+ << " (0x" << std::hex << size << std::dec << ", "
+ << byte_u_t(size) << ")"
+ << " block_size " << block_size
+ << " (" << byte_u_t(block_size) << ")"
+ << " " << (rotational ? "rotational" : "non-rotational")
+ << " discard " << (support_discard ? "supported" : "not supported")
+ << dendl;
+ return 0;
+
+out_fail:
+ for (i = 0; i < WRITE_LIFE_MAX; i++) {
+ if (fd_directs[i] >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
+ fd_directs[i] = -1;
+ } else {
+ break;
+ }
+ if (fd_buffereds[i] >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
+ fd_buffereds[i] = -1;
+ } else {
+ break;
+ }
+ }
+ return r;
+}
+
+int HMSMRDevice::get_devices(std::set<std::string> *ls) const
+{
+ if (devname.empty()) {
+ return 0;
+ }
+ get_raw_devices(devname, ls);
+ return 0;
+}
+
+void HMSMRDevice::close()
+{
+ dout(1) << __func__ << dendl;
+ _aio_stop();
+ _discard_stop();
+
+ if (vdo_fd >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(vdo_fd));
+ vdo_fd = -1;
+ }
+
+ for (int i = 0; i < WRITE_LIFE_MAX; i++) {
+ assert(fd_directs[i] >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
+ fd_directs[i] = -1;
+
+ assert(fd_buffereds[i] >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
+ fd_buffereds[i] = -1;
+ }
+ path.clear();
+}
+
+int HMSMRDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
+{
+ (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard);
+ (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "driver"] = "HMSMRDevice";
+ if (rotational) {
+ (*pm)[prefix + "type"] = "hdd";
+ } else {
+ (*pm)[prefix + "type"] = "ssd";
+ }
+ if (vdo_fd >= 0) {
+ (*pm)[prefix + "vdo"] = "true";
+ uint64_t total, avail;
+ get_vdo_utilization(vdo_fd, &total, &avail);
+ (*pm)[prefix + "vdo_physical_size"] = stringify(total);
+ }
+
+ {
+ string res_names;
+ std::set<std::string> devnames;
+ if (get_devices(&devnames) == 0) {
+ for (auto& dev : devnames) {
+ if (!res_names.empty()) {
+ res_names += ",";
+ }
+ res_names += dev;
+ }
+ if (res_names.size()) {
+ (*pm)[prefix + "devices"] = res_names;
+ }
+ }
+ }
+
+ struct stat st;
+ int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st);
+ if (r < 0)
+ return -errno;
+ if (S_ISBLK(st.st_mode)) {
+ (*pm)[prefix + "access_mode"] = "blk";
+
+ char buffer[1024] = {0};
+ BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]};
+ if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
+ (*pm)[prefix + "partition_path"] = "unknown";
+ } else {
+ (*pm)[prefix + "partition_path"] = buffer;
+ }
+ buffer[0] = '\0';
+ if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
+ (*pm)[prefix + "dev_node"] = "unknown";
+ } else {
+ (*pm)[prefix + "dev_node"] = buffer;
+ }
+ if (!r) {
+ return 0;
+ }
+ buffer[0] = '\0';
+ blkdev.model(buffer, sizeof(buffer));
+ (*pm)[prefix + "model"] = buffer;
+
+ buffer[0] = '\0';
+ blkdev.dev(buffer, sizeof(buffer));
+ (*pm)[prefix + "dev"] = buffer;
+
+ // nvme exposes a serial number
+ buffer[0] = '\0';
+ blkdev.serial(buffer, sizeof(buffer));
+ (*pm)[prefix + "serial"] = buffer;
+
+ // numa
+ int node;
+ r = blkdev.get_numa_node(&node);
+ if (r >= 0) {
+ (*pm)[prefix + "numa_node"] = stringify(node);
+ }
+ } else {
+ (*pm)[prefix + "access_mode"] = "file";
+ (*pm)[prefix + "path"] = path;
+ }
+ return 0;
+}
+
+void HMSMRDevice::_detect_vdo()
+{
+ vdo_fd = get_vdo_stats_handle(devname.c_str(), &vdo_name);
+ if (vdo_fd >= 0) {
+ dout(1) << __func__ << " VDO volume " << vdo_name
+ << " maps to " << devname << dendl;
+ } else {
+ dout(20) << __func__ << " no VDO volume maps to " << devname << dendl;
+ }
+ return;
+}
+
+bool HMSMRDevice::get_thin_utilization(uint64_t *total, uint64_t *avail) const
+{
+ if (vdo_fd < 0) {
+ return false;
+ }
+ return get_vdo_utilization(vdo_fd, total, avail);
+}
+
+int HMSMRDevice::choose_fd(bool buffered, int write_hint) const
+{
+ assert(write_hint >= WRITE_LIFE_NOT_SET && write_hint < WRITE_LIFE_MAX);
+ if (!enable_wrt)
+ write_hint = WRITE_LIFE_NOT_SET;
+ return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint];
+}
+
+int HMSMRDevice::flush()
+{
+ // protect flush with a mutex. note that we are not really protecting
+ // data here. instead, we're ensuring that if any flush() caller
+ // sees that io_since_flush is true, they block any racing callers
+ // until the flush is observed. that allows racing threads to be
+ // calling flush while still ensuring that *any* of them that got an
+ // aio completion notification will not return before that aio is
+ // stable on disk: whichever thread sees the flag first will block
+ // followers until the aio is stable.
+ std::lock_guard l(flush_mutex);
+
+ bool expect = true;
+ if (!io_since_flush.compare_exchange_strong(expect, false)) {
+ dout(10) << __func__ << " no-op (no ios since last flush), flag is "
+ << (int)io_since_flush.load() << dendl;
+ return 0;
+ }
+
+ dout(10) << __func__ << " start" << dendl;
+ if (cct->_conf->bdev_inject_crash) {
+ ++injecting_crash;
+ // sleep for a moment to give other threads a chance to submit or
+ // wait on io that races with a flush.
+ derr << __func__ << " injecting crash. first we sleep..." << dendl;
+ sleep(cct->_conf->bdev_inject_crash_flush_delay);
+ derr << __func__ << " and now we die" << dendl;
+ cct->_log->flush();
+ _exit(1);
+ }
+ utime_t start = ceph_clock_now();
+ int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]);
+ utime_t end = ceph_clock_now();
+ utime_t dur = end - start;
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
+ ceph_abort();
+ }
+ dout(5) << __func__ << " in " << dur << dendl;;
+ return r;
+}
+
+int HMSMRDevice::_aio_start()
+{
+ if (aio) {
+ dout(10) << __func__ << dendl;
+ int r = io_queue->init(fd_directs);
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ derr << __func__ << " io_setup(2) failed with EAGAIN; "
+ << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
+ } else {
+ derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+ }
+ aio_thread.create("bstore_aio");
+ }
+ return 0;
+}
+
+void HMSMRDevice::_aio_stop()
+{
+ if (aio) {
+ dout(10) << __func__ << dendl;
+ aio_stop = true;
+ aio_thread.join();
+ aio_stop = false;
+ io_queue->shutdown();
+ }
+}
+
+int HMSMRDevice::_discard_start()
+{
+ discard_thread.create("bstore_discard");
+ return 0;
+}
+
+void HMSMRDevice::_discard_stop()
+{
+ dout(10) << __func__ << dendl;
+ {
+ std::unique_lock l(discard_lock);
+ while (!discard_started) {
+ discard_cond.wait(l);
+ }
+ discard_stop = true;
+ discard_cond.notify_all();
+ }
+ discard_thread.join();
+ {
+ std::lock_guard l(discard_lock);
+ discard_stop = false;
+ }
+ dout(10) << __func__ << " stopped" << dendl;
+}
+
+void HMSMRDevice::discard_drain()
+{
+ dout(10) << __func__ << dendl;
+ std::unique_lock l(discard_lock);
+ while (!discard_queued.empty() || discard_running) {
+ discard_cond.wait(l);
+ }
+}
+
+static bool is_expected_ioerr(const int r)
+{
+ // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
+ return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
+ r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
+ r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
+#if defined(__linux__)
+ r == -EREMCHG || r == -EBADE
+#elif defined(__FreeBSD__)
+ r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE
+#endif
+ );
+}
+
+void HMSMRDevice::_aio_thread()
+{
+ dout(10) << __func__ << " start" << dendl;
+ int inject_crash_count = 0;
+ while (!aio_stop) {
+ dout(40) << __func__ << " polling" << dendl;
+ int max = cct->_conf->bdev_aio_reap_max;
+ aio_t *aio[max];
+ int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms,
+ aio, max);
+ if (r < 0) {
+ derr << __func__ << " got " << cpp_strerror(r) << dendl;
+ ceph_abort_msg("got unexpected error from io_getevents");
+ }
+ if (r > 0) {
+ dout(30) << __func__ << " got " << r << " completed aios" << dendl;
+ for (int i = 0; i < r; ++i) {
+ IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
+ _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
+ if (aio[i]->queue_item.is_linked()) {
+ std::lock_guard l(debug_queue_lock);
+ debug_aio_unlink(*aio[i]);
+ }
+
+ // set flag indicating new ios have completed. we do this *before*
+ // any completion or notifications so that any user flush() that
+ // follows the observed io completion will include this io. Note
+ // that an earlier, racing flush() could observe and clear this
+ // flag, but that also ensures that the IO will be stable before the
+ // later flush() occurs.
+ io_since_flush.store(true);
+
+ long r = aio[i]->get_return_value();
+ if (r < 0) {
+ derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")"
+ << dendl;
+ if (ioc->allow_eio && is_expected_ioerr(r)) {
+ derr << __func__ << " translating the error to EIO for upper layer"
+ << dendl;
+ ioc->set_return_value(-EIO);
+ } else {
+ if (is_expected_ioerr(r)) {
+ note_io_error_event(
+ devname.c_str(),
+ path.c_str(),
+ r,
+#if defined(HAVE_POSIXAIO)
+ aio[i]->aio.aiocb.aio_lio_opcode,
+#else
+ aio[i]->iocb.aio_lio_opcode,
+#endif
+ aio[i]->offset,
+ aio[i]->length);
+ ceph_abort_msg(
+ "Unexpected IO error. "
+ "This may suggest a hardware issue. "
+ "Please check your kernel log!");
+ }
+ ceph_abort_msg(
+ "Unexpected IO error. "
+ "This may suggest HW issue. Please check your dmesg!");
+ }
+ } else if (aio[i]->length != (uint64_t)r) {
+ derr << "aio to 0x" << std::hex << aio[i]->offset
+ << "~" << aio[i]->length << std::dec
+ << " but returned: " << r << dendl;
+ ceph_abort_msg("unexpected aio return value: does not match length");
+ }
+
+ dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
+ << " ioc " << ioc
+ << " with " << (ioc->num_running.load() - 1)
+ << " aios left" << dendl;
+
+ // NOTE: once num_running and we either call the callback or
+ // call aio_wake we cannot touch ioc or aio[] as the caller
+ // may free it.
+ if (ioc->priv) {
+ if (--ioc->num_running == 0) {
+ aio_callback(aio_callback_priv, ioc->priv);
+ }
+ } else {
+ ioc->try_aio_wake();
+ }
+ }
+ }
+ if (cct->_conf->bdev_debug_aio) {
+ utime_t now = ceph_clock_now();
+ std::lock_guard l(debug_queue_lock);
+ if (debug_oldest) {
+ if (debug_stall_since == utime_t()) {
+ debug_stall_since = now;
+ } else {
+ if (cct->_conf->bdev_debug_aio_suicide_timeout) {
+ utime_t cutoff = now;
+ cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
+ if (debug_stall_since < cutoff) {
+ derr << __func__ << " stalled aio " << debug_oldest
+ << " since " << debug_stall_since << ", timeout is "
+ << cct->_conf->bdev_debug_aio_suicide_timeout
+ << "s, suicide" << dendl;
+ ceph_abort_msg("stalled aio... buggy kernel or bad device?");
+ }
+ }
+ }
+ }
+ }
+ reap_ioc();
+ if (cct->_conf->bdev_inject_crash) {
+ ++inject_crash_count;
+ if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
+ cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
+ derr << __func__ << " bdev_inject_crash trigger from aio thread"
+ << dendl;
+ cct->_log->flush();
+ _exit(1);
+ }
+ }
+ }
+ reap_ioc();
+ dout(10) << __func__ << " end" << dendl;
+}
+
+void HMSMRDevice::_discard_thread()
+{
+ std::unique_lock l(discard_lock);
+ ceph_assert(!discard_started);
+ discard_started = true;
+ discard_cond.notify_all();
+ while (true) {
+ ceph_assert(discard_finishing.empty());
+ if (discard_queued.empty()) {
+ if (discard_stop)
+ break;
+ dout(20) << __func__ << " sleep" << dendl;
+ discard_cond.notify_all(); // for the thread trying to drain...
+ discard_cond.wait(l);
+ dout(20) << __func__ << " wake" << dendl;
+ } else {
+ discard_finishing.swap(discard_queued);
+ discard_running = true;
+ l.unlock();
+ dout(20) << __func__ << " finishing" << dendl;
+ for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
+ discard(p.get_start(), p.get_len());
+ }
+
+ discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
+ discard_finishing.clear();
+ l.lock();
+ discard_running = false;
+ }
+ }
+ dout(10) << __func__ << " finish" << dendl;
+ discard_started = false;
+}
+
+int HMSMRDevice::queue_discard(interval_set<uint64_t> &to_release)
+{
+ if (!support_discard)
+ return -1;
+
+ if (to_release.empty())
+ return 0;
+
+ std::lock_guard l(discard_lock);
+ discard_queued.insert(to_release);
+ discard_cond.notify_all();
+ return 0;
+}
+
+void HMSMRDevice::_aio_log_start(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
+{
+ dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
+ << std::dec << dendl;
+ if (cct->_conf->bdev_debug_inflight_ios) {
+ std::lock_guard l(debug_lock);
+ if (debug_inflight.intersects(offset, length)) {
+ derr << __func__ << " inflight overlap of 0x"
+ << std::hex
+ << offset << "~" << length << std::dec
+ << " with " << debug_inflight << dendl;
+ ceph_abort();
+ }
+ debug_inflight.insert(offset, length);
+ }
+}
+
+void HMSMRDevice::debug_aio_link(aio_t& aio)
+{
+ if (debug_queue.empty()) {
+ debug_oldest = &aio;
+ }
+ debug_queue.push_back(aio);
+}
+
+void HMSMRDevice::debug_aio_unlink(aio_t& aio)
+{
+ if (aio.queue_item.is_linked()) {
+ debug_queue.erase(debug_queue.iterator_to(aio));
+ if (debug_oldest == &aio) {
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (age && debug_stall_since != utime_t()) {
+ utime_t cutoff = ceph_clock_now();
+ cutoff -= age;
+ if (debug_stall_since < cutoff) {
+ derr << __func__ << " stalled aio " << debug_oldest
+ << " since " << debug_stall_since << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ }
+
+ if (debug_queue.empty()) {
+ debug_oldest = nullptr;
+ } else {
+ debug_oldest = &debug_queue.front();
+ }
+ debug_stall_since = utime_t();
+ }
+ }
+}
+
+void HMSMRDevice::_aio_log_finish(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
+{
+ dout(20) << __func__ << " " << aio << " 0x"
+ << std::hex << offset << "~" << length << std::dec << dendl;
+ if (cct->_conf->bdev_debug_inflight_ios) {
+ std::lock_guard l(debug_lock);
+ debug_inflight.erase(offset, length);
+ }
+}
+
+void HMSMRDevice::aio_submit(IOContext *ioc)
+{
+ dout(20) << __func__ << " ioc " << ioc
+ << " pending " << ioc->num_pending.load()
+ << " running " << ioc->num_running.load()
+ << dendl;
+
+ if (ioc->num_pending.load() == 0) {
+ return;
+ }
+
+ // move these aside, and get our end iterator position now, as the
+ // aios might complete as soon as they are submitted and queue more
+ // wal aio's.
+ list<aio_t>::iterator e = ioc->running_aios.begin();
+ ioc->running_aios.splice(e, ioc->pending_aios);
+
+ int pending = ioc->num_pending.load();
+ ioc->num_running += pending;
+ ioc->num_pending -= pending;
+ ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
+ ceph_assert(ioc->pending_aios.size() == 0);
+
+ if (cct->_conf->bdev_debug_aio) {
+ list<aio_t>::iterator p = ioc->running_aios.begin();
+ while (p != e) {
+ dout(30) << __func__ << " " << *p << dendl;
+ std::lock_guard l(debug_queue_lock);
+ debug_aio_link(*p++);
+ }
+ }
+
+ void *priv = static_cast<void*>(ioc);
+ int r, retries = 0;
+ r = io_queue->submit_batch(ioc->running_aios.begin(), e,
+ pending, priv, &retries);
+
+ if (retries)
+ derr << __func__ << " retries " << retries << dendl;
+ if (r < 0) {
+ derr << " aio submit got " << cpp_strerror(r) << dendl;
+ ceph_assert(r == 0);
+ }
+}
+
+int HMSMRDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << (buffered ? " (buffered)" : " (direct)") << dendl;
+ if (cct->_conf->bdev_inject_crash &&
+ rand() % cct->_conf->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
+ << off << "~" << len << std::dec << dendl;
+ ++injecting_crash;
+ return 0;
+ }
+ vector<iovec> iov;
+ bl.prepare_iov(&iov);
+ int r = ::pwritev(choose_fd(buffered, write_hint),
+ &iov[0], iov.size(), off);
+
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+#ifdef HAVE_SYNC_FILE_RANGE
+ if (buffered) {
+ // initiate IO and wait till it completes
+ r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+#endif
+
+ io_since_flush.store(true);
+
+ return 0;
+}
+
+int HMSMRDevice::write(
+ uint64_t off,
+ bufferlist &bl,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+
+ if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
+ bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
+ dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
+ }
+ dout(40) << "data: ";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ return _sync_write(off, bl, buffered, write_hint);
+}
+
+int HMSMRDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+
+ if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
+ bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
+ dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
+ }
+ dout(40) << "data: ";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ _aio_log_start(ioc, off, len);
+
+#ifdef HAVE_LIBAIO
+ if (aio && dio && !buffered) {
+ if (cct->_conf->bdev_inject_crash &&
+ rand() % cct->_conf->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
+ << off << "~" << len << std::dec
+ << dendl;
+ // generate a real io so that aio_wait behaves properly, but make it
+ // a read instead of write, and toss the result.
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ bufferptr p = buffer::create_small_page_aligned(len);
+ aio.bl.append(std::move(p));
+ aio.bl.prepare_iov(&aio.iov);
+ aio.preadv(off, len);
+ ++injecting_crash;
+ } else {
+ if (bl.length() <= RW_IO_MAX) {
+ // fast path (non-huge write)
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ bl.prepare_iov(&aio.iov);
+ aio.bl.claim_append(bl);
+ aio.pwritev(off, len);
+ dout(30) << aio << dendl;
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << " aio " << &aio << dendl;
+ } else {
+ // write in RW_IO_MAX-sized chunks
+ uint64_t prev_len = 0;
+ while (prev_len < bl.length()) {
+ bufferlist tmp;
+ if (prev_len + RW_IO_MAX < bl.length()) {
+ tmp.substr_of(bl, prev_len, RW_IO_MAX);
+ } else {
+ tmp.substr_of(bl, prev_len, bl.length() - prev_len);
+ }
+ auto len = tmp.length();
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ tmp.prepare_iov(&aio.iov);
+ aio.bl.claim_append(tmp);
+ aio.pwritev(off + prev_len, len);
+ dout(30) << aio << dendl;
+ dout(5) << __func__ << " 0x" << std::hex << off + prev_len
+ << "~" << len
+ << std::dec << " aio " << &aio << " (piece)" << dendl;
+ prev_len += len;
+ }
+ }
+ }
+ } else
+#endif
+ {
+ int r = _sync_write(off, bl, buffered, write_hint);
+ _aio_log_finish(ioc, off, len);
+ if (r < 0)
+ return r;
+ }
+ return 0;
+}
+
+int HMSMRDevice::discard(uint64_t offset, uint64_t len)
+{
+ int r = 0;
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+ if (support_discard) {
+ dout(10) << __func__
+ << " 0x" << std::hex << offset << "~" << len << std::dec
+ << dendl;
+
+ r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len);
+ }
+ return r;
+}
+
+int HMSMRDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ _aio_log_start(ioc, off, len);
+
+ auto start1 = mono_clock::now();
+
+ auto p = buffer::ptr_node::create(buffer::create_small_page_aligned(len));
+ int r = ::pread(buffered ? fd_buffereds[WRITE_LIFE_NOT_SET] : fd_directs[WRITE_LIFE_NOT_SET],
+ p->c_str(), len, off);
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " (buffered)" : " (direct)")
+ << " since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+
+ if (r < 0) {
+ if (ioc->allow_eio && is_expected_ioerr(r)) {
+ r = -EIO;
+ } else {
+ r = -errno;
+ }
+ goto out;
+ }
+ ceph_assert((uint64_t)r == len);
+ pbl->push_back(std::move(p));
+
+ dout(40) << "data: ";
+ pbl->hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ _aio_log_finish(ioc, off, len);
+ return r < 0 ? r : 0;
+}
+
+int HMSMRDevice::aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << dendl;
+
+ int r = 0;
+#ifdef HAVE_LIBAIO
+ if (aio && dio) {
+ ceph_assert(is_valid_io(off, len));
+ _aio_log_start(ioc, off, len);
+ ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET]));
+ ++ioc->num_pending;
+ aio_t& aio = ioc->pending_aios.back();
+ bufferptr p = buffer::create_small_page_aligned(len);
+ aio.bl.append(std::move(p));
+ aio.bl.prepare_iov(&aio.iov);
+ aio.preadv(off, len);
+ dout(30) << aio << dendl;
+ pbl->append(aio.bl);
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << " aio " << &aio << dendl;
+ } else
+#endif
+ {
+ r = read(off, len, pbl, ioc, false);
+ }
+
+ return r;
+}
+
+int HMSMRDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
+{
+ uint64_t aligned_off = p2align(off, block_size);
+ uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
+ bufferptr p = buffer::create_small_page_aligned(aligned_len);
+ int r = 0;
+
+ auto start1 = mono_clock::now();
+ r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off);
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ ceph_assert((uint64_t)r == aligned_len);
+ memcpy(buf, p.c_str() + (off - aligned_off), len);
+
+ dout(40) << __func__ << " data: ";
+ bufferlist bl;
+ bl.append(buf, len);
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ return r < 0 ? r : 0;
+}
+
+int HMSMRDevice::read_random(uint64_t off, uint64_t len, char *buf,
+ bool buffered)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << "buffered " << buffered
+ << dendl;
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+ int r = 0;
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+
+ //if it's direct io and unaligned, we have to use a internal buffer
+ if (!buffered && ((off % block_size != 0)
+ || (len % block_size != 0)
+ || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
+ return direct_read_unaligned(off, len, buf);
+
+ auto start1 = mono_clock::now();
+ if (buffered) {
+ //buffered read
+ auto off0 = off;
+ char *t = buf;
+ uint64_t left = len;
+ while (left > 0) {
+ r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " 0x" << std::hex << off << "~" << left
+ << std::dec << " error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ off += r;
+ t += r;
+ left -= r;
+ }
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off0 << "~" << len << std::dec
+ << " (buffered) since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ } else {
+ //direct and aligned read
+ r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off);
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " (direct) since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
+ << off << "~" << left << std::dec << " error: " << cpp_strerror(r)
+ << dendl;
+ goto out;
+ }
+ ceph_assert((uint64_t)r == len);
+ }
+
+ dout(40) << __func__ << " data: ";
+ bufferlist bl;
+ bl.append(buf, len);
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ return r < 0 ? r : 0;
+}
+
+int HMSMRDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << dendl;
+ ceph_assert(off % block_size == 0);
+ ceph_assert(len % block_size == 0);
+ int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED);
+ if (r) {
+ r = -r;
+ derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " error: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+}
diff --git a/src/blk/zoned/HMSMRDevice.h b/src/blk/zoned/HMSMRDevice.h
new file mode 100644
index 000000000..30941f2f9
--- /dev/null
+++ b/src/blk/zoned/HMSMRDevice.h
@@ -0,0 +1,163 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ * Copyright (C) 2020 Abutalib Aghayev
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+// Copied from KernelDevice with HM-SMR specific functionality added. Will be
+// further specialized for HM-SMR.
+
+#ifndef CEPH_BLK_HMSMRDEVICE_H
+#define CEPH_BLK_HMSMRDEVICE_H
+
+#include <atomic>
+
+#include "include/types.h"
+#include "include/interval_set.h"
+#include "common/Thread.h"
+#include "include/utime.h"
+
+#include "aio/aio.h"
+#include "BlockDevice.h"
+
+#define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK)
+
+class HMSMRDevice final : public BlockDevice {
+ std::vector<int> fd_directs, fd_buffereds;
+ bool enable_wrt = true;
+ std::string path;
+ bool aio, dio;
+
+ int vdo_fd = -1; ///< fd for vdo sysfs directory
+ string vdo_name;
+
+ std::string devname; ///< kernel dev name (/sys/block/$devname), if any
+
+ ceph::mutex debug_lock = ceph::make_mutex("HMSMRDevice::debug_lock");
+ interval_set<uint64_t> debug_inflight;
+
+ std::atomic<bool> io_since_flush = {false};
+ ceph::mutex flush_mutex = ceph::make_mutex("HMSMRDevice::flush_mutex");
+
+ std::unique_ptr<io_queue_t> io_queue;
+ aio_callback_t discard_callback;
+ void *discard_callback_priv;
+ bool aio_stop;
+ bool discard_started;
+ bool discard_stop;
+
+ ceph::mutex discard_lock = ceph::make_mutex("HMSMRDevice::discard_lock");
+ ceph::condition_variable discard_cond;
+ bool discard_running = false;
+ interval_set<uint64_t> discard_queued;
+ interval_set<uint64_t> discard_finishing;
+
+ struct AioCompletionThread : public Thread {
+ HMSMRDevice *bdev;
+ explicit AioCompletionThread(HMSMRDevice *b) : bdev(b) {}
+ void *entry() override {
+ bdev->_aio_thread();
+ return NULL;
+ }
+ } aio_thread;
+
+ struct DiscardThread : public Thread {
+ HMSMRDevice *bdev;
+ explicit DiscardThread(HMSMRDevice *b) : bdev(b) {}
+ void *entry() override {
+ bdev->_discard_thread();
+ return NULL;
+ }
+ } discard_thread;
+
+ std::atomic_int injecting_crash;
+
+ void _aio_thread();
+ void _discard_thread();
+ int queue_discard(interval_set<uint64_t> &to_release) final;
+
+ int _aio_start();
+ void _aio_stop();
+
+ int _discard_start();
+ void _discard_stop();
+
+ void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length);
+ void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length);
+
+ int _sync_write(uint64_t off, bufferlist& bl, bool buffered, int write_hint);
+
+ int _lock();
+
+ int direct_read_unaligned(uint64_t off, uint64_t len, char *buf);
+
+ // stalled aio debugging
+ aio_list_t debug_queue;
+ ceph::mutex debug_queue_lock =
+ ceph::make_mutex("HMSMRDevice::debug_queue_lock");
+ aio_t *debug_oldest = nullptr;
+ utime_t debug_stall_since;
+ void debug_aio_link(aio_t& aio);
+ void debug_aio_unlink(aio_t& aio);
+
+ void _detect_vdo();
+ int choose_fd(bool buffered, int write_hint) const;
+
+ bool set_smr_params(const std::string& path);
+
+public:
+ HMSMRDevice(CephContext* cct, aio_callback_t cb, void *cbpriv,
+ aio_callback_t d_cb, void *d_cbpriv);
+ static bool support(const std::string& path);
+
+ void aio_submit(IOContext *ioc) final;
+ void discard_drain() final;
+
+ int collect_metadata(const std::string& prefix,
+ map<std::string,std::string> *pm) const final;
+ int get_devname(std::string *s) const final {
+ if (devname.empty()) {
+ return -ENOENT;
+ }
+ *s = devname;
+ return 0;
+ }
+ int get_devices(std::set<std::string> *ls) const final;
+
+ bool is_smr() const final { return true; }
+
+ bool get_thin_utilization(uint64_t *total, uint64_t *avail) const final;
+
+ int read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered) final;
+ int aio_read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc) final;
+ int read_random(uint64_t off, uint64_t len, char *buf,
+ bool buffered) final;
+
+ int write(uint64_t off, bufferlist& bl, bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) final;
+ int aio_write(uint64_t off, bufferlist& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) final;
+ int flush() final;
+ int discard(uint64_t offset, uint64_t len) final;
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) final;
+ int open(const std::string& path) final;
+ void close() final;
+};
+
+#endif //CEPH_BLK_HMSMRDEVICE_H
diff --git a/src/blkin/CMakeLists.txt b/src/blkin/CMakeLists.txt
new file mode 100644
index 000000000..d9a25b1cb
--- /dev/null
+++ b/src/blkin/CMakeLists.txt
@@ -0,0 +1,9 @@
+cmake_minimum_required(VERSION 2.8.11)
+
+set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/modules/")
+find_package(LTTng REQUIRED)
+
+# make && make test
+enable_testing()
+
+add_subdirectory(blkin-lib)
diff --git a/src/blkin/COPYRIGHT b/src/blkin/COPYRIGHT
new file mode 100644
index 000000000..55828c9af
--- /dev/null
+++ b/src/blkin/COPYRIGHT
@@ -0,0 +1,27 @@
+Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+ 1. Redistributions of source code must retain the above
+ copyright notice, this list of conditions and the following
+ disclaimer.
+ 2. Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials
+ provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/blkin/README.md b/src/blkin/README.md
new file mode 100644
index 000000000..2c8a6c99e
--- /dev/null
+++ b/src/blkin/README.md
@@ -0,0 +1,59 @@
+#blkin
+
+BlkKin is a project that enables you to trace low-overhead applications using
+LTTng following the tracing semantics that are described in Google's [Dapper
+Paper](http://static.googleusercontent.com/media/research.google.com/el//pubs/archive/36356.pdf)
+
+According to this paper the logged information is called `Annotation` and
+belongs to a specific span and trace. Each trace is comprised of multiple spans
+which are related with each other with causal relationships. So, the BlkKin
+library gives the user the API to easily instrument C/C++ applications. In
+order to instrument applications you should take a look at ``blkin-lib/tests``
+for some testcases and at the ``blkin-lib/zipkin_c.h`` file
+
+As a tracing backend BlkKin uses LTTng. So you must have LTTng installed.
+
+In order to build and install the lib, go to blkin-lib folder and:
+
+```
+make
+make install
+```
+
+You should take a look at the examples to find out how to link the blkin lib
+with your instrumented application.
+
+In order to visualize the aggregated information you can use Twitter's
+[Zipkin](http://twitter.github.io/zipkin/) and send the data that you created,
+by running the equivalent babeltrace plugin. In order to do you can run
+
+```
+./zipkin/src/babeltrace_zipkin.py </path/to/lttng/traces> -s <server_ip>
+-p <port_number>
+
+```
+
+within the babeltrace-plugins directory.
+
+In case you have not used the blkin-lib to instrument your application, you can
+still send your data to a Scribe server. To do that you can use another
+Babeltrace plugin. This plugin tranforms LTTng trace data to a JSON format and
+sends them to a Scribe sever.To do so we can equivalently run
+
+```
+./json/src/babeltrace_json.py </path/to/lttng/traces> -s <server_ip>
+-p <port_number>
+```
+within the babeltrace-plugins directory
+
+Both of these plugins require that you have installed Babeltrace with its
+Python bindings enabled.
+The path to the lttng traces should not be the root directory but the directory
+where the channel directories are included.
+
+## Dependencies
+
+* libboost-all-dev
+* lttng-tools
+
+Note that BlkKin is tested only with LTTng2.4
diff --git a/src/blkin/babeltrace-plugins/.gitignore b/src/blkin/babeltrace-plugins/.gitignore
new file mode 100644
index 000000000..c9b568f7e
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/.gitignore
@@ -0,0 +1,2 @@
+*.pyc
+*.swp
diff --git a/src/blkin/babeltrace-plugins/json/README.md b/src/blkin/babeltrace-plugins/json/README.md
new file mode 100644
index 000000000..be0bbe2c4
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/json/README.md
@@ -0,0 +1,5 @@
+babeltrace-json plugin
+========================
+
+This plugin enables us to send LTTng trace data to a Scribe server in a valid
+json format
diff --git a/src/blkin/babeltrace-plugins/json/src/babeltrace_json.py b/src/blkin/babeltrace-plugins/json/src/babeltrace_json.py
new file mode 100755
index 000000000..0d21e0181
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/json/src/babeltrace_json.py
@@ -0,0 +1,88 @@
+#!/usr/bin/python
+# babeltrace_zipkin.py
+
+import sys
+sys.path.append("../../babeltrace-plugins")
+import json
+import getopt
+from babeltrace import *
+from scribe_client import ScribeClient
+
+HELP = "Usage: python babeltrace_zipkin.py path/to/file -s <server> -p <port>"
+CATEGORY = "LTTng"
+
+
+def main(argv):
+ try:
+ path = argv[0]
+ except:
+ raise TypeError(HELP)
+
+ try:
+ opts, args = getopt.getopt(argv[1:], "hs:p:")
+ except getopt.GetoptError:
+ raise TypeError(HELP)
+
+ server = None
+ port = None
+ for opt, arg in opts:
+ if opt == '-h':
+ raise TypeError(HELP)
+ elif opt == '-s':
+ server = arg
+ elif opt == '-p':
+ port = arg
+
+ if not server:
+ server = "localhost"
+ if not port:
+ port = 1463
+
+ # Open connection with scribe
+ scribe_client = ScribeClient(port, server)
+
+ # Create TraceCollection and add trace:
+ traces = TraceCollection()
+ trace_handle = traces.add_trace(path, "ctf")
+ if trace_handle is None:
+ raise IOError("Error adding trace")
+
+ #iterate over events
+ for event in traces.events:
+ data = dict()
+
+ data["parent_span_id"]= event["parent_span_id"]
+ data['name'] = event["trace_name"]
+ data ["trace_id"] = event["trace_id"]
+ data["span_id"] = event["span_id"]
+ data['port'] = event['port_no']
+ data['service_name'] = event['service_name']
+ data['ip'] = event['ip']
+ data['evemt'] = event['event']
+ data['timestamp'] = event.timestamp
+ '''
+ for k, v in event.items():
+ field_type = event._field(k).type
+ data[k] = format_value(field_type, v)
+ '''
+ json_data = json.dumps(data)
+
+ #send data to scribe
+ scribe_client.log(CATEGORY, json_data)
+
+ scribe_client.close()
+
+
+def format_value(field_type, value):
+
+ if field_type == 1:
+ return int(value)
+ elif field_type == 2:
+ return float(value)
+ elif field_type == 8:
+ return [x for x in value]
+ else:
+ return str(value)
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/src/blkin/babeltrace-plugins/scribe_client/__init__.py b/src/blkin/babeltrace-plugins/scribe_client/__init__.py
new file mode 100644
index 000000000..503803846
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/scribe_client/__init__.py
@@ -0,0 +1 @@
+from scribe_client import *
diff --git a/src/blkin/babeltrace-plugins/scribe_client/scribe_client.py b/src/blkin/babeltrace-plugins/scribe_client/scribe_client.py
new file mode 100644
index 000000000..b382ed995
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/scribe_client/scribe_client.py
@@ -0,0 +1,31 @@
+#!/usr/bin/python
+# scribe_client.py
+
+from scribe import scribe
+from thrift.transport import TTransport, TSocket
+from thrift.protocol import TBinaryProtocol
+
+class ScribeClient(object):
+
+ def __init__(self, port, host):
+ print host
+ self.port = port
+ self.host = host
+ self.openConnection()
+
+ def openConnection(self):
+ socket = TSocket.TSocket(host=self.host, port=self.port)
+ self.transport = TTransport.TFramedTransport(socket)
+ protocol = TBinaryProtocol.TBinaryProtocol(trans=self.transport,
+ strictRead=False,
+ strictWrite=False)
+ self.client = scribe.Client(protocol)
+ self.transport.open()
+
+ def log(self, category, message):
+ log_entry = scribe.LogEntry(category, message)
+ result = self.client.Log(messages=[log_entry])
+ return result # 0 for success
+
+ def close(self):
+ self.transport.close()
diff --git a/src/blkin/babeltrace-plugins/zipkin/README.md b/src/blkin/babeltrace-plugins/zipkin/README.md
new file mode 100644
index 000000000..95cebe87b
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/README.md
@@ -0,0 +1,6 @@
+babeltrace-zipkin plugin
+========================
+
+In order to use this plugin, the traces created by LTTng should follow a
+specific format. This format is provided in zipkin_trace.h file. If this
+format is not followed the traces will be dropped.
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py
new file mode 100755
index 000000000..5677338c9
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py
@@ -0,0 +1,69 @@
+#!/usr/bin/python
+# babeltrace_zipkin.py
+
+import sys
+sys.path.append("../../babeltrace-plugins")
+import sys
+import getopt
+from babeltrace import *
+from zipkin_logic.zipkin_client import ZipkinClient
+HELP = "Usage: python babeltrace_zipkin.py path/to/file -s <server> -p <port>"
+
+
+def main(argv):
+ try:
+ path = argv[0]
+ except:
+ raise TypeError(HELP)
+
+ try:
+ opts, args = getopt.getopt(argv[1:], "hs:p:")
+ except getopt.GetoptError:
+ raise TypeError(HELP)
+
+ server = None
+ port = None
+ for opt, arg in opts:
+ if opt == '-h':
+ raise TypeError(HELP)
+ elif opt == '-s':
+ server = arg
+ elif opt == '-p':
+ port = arg
+
+ if not server:
+ server = "83.212.113.88"
+ if not port:
+ port = 1463
+
+ # Open connection with scribe
+ zipkin = ZipkinClient(port, server)
+
+ # Create TraceCollection and add trace:
+ traces = TraceCollection()
+ trace_handle = traces.add_trace(path, "ctf")
+ if trace_handle is None:
+ raise IOError("Error adding trace")
+
+ for event in traces.events:
+ name = event.name
+ try:
+ provider, kind = name.split(":")
+ if provider != "zipkin":
+ raise
+ except:
+ continue
+
+ #create a zipkin trace from event info
+ trace = zipkin.create_trace(event)
+
+ #create a zipkin annotation from event info
+ annotation = zipkin.create_annotation(event, kind)
+
+ #record the trace
+ zipkin.record(trace, annotation)
+
+ zipkin.close()
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py
new file mode 100644
index 000000000..70874c0b6
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py
@@ -0,0 +1,131 @@
+# Copyright 2012 Rackspace Hosting, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import struct
+import socket
+
+from thrift.protocol import TBinaryProtocol
+from thrift.transport import TTransport
+
+import ttypes
+
+
+def hex_str(n):
+ return '%0.16x' % (n,)
+
+
+def json_formatter(traces, *json_args, **json_kwargs):
+ json_traces = []
+
+ for (trace, annotations) in traces:
+ json_trace = {
+ 'trace_id': hex_str(trace.trace_id),
+ 'span_id': hex_str(trace.span_id),
+ 'name': trace.name,
+ 'annotations': []
+ }
+
+ if trace.parent_span_id:
+ json_trace['parent_span_id'] = hex_str(trace.parent_span_id)
+
+ for annotation in annotations:
+ json_annotation = {
+ 'key': annotation.name,
+ 'value': annotation.value,
+ 'type': annotation.annotation_type
+ }
+
+ if annotation.endpoint:
+ json_annotation['host'] = {
+ 'ipv4': annotation.endpoint.ipv4,
+ 'port': annotation.endpoint.port,
+ 'service_name': annotation.endpoint.service_name
+ }
+
+ json_trace['annotations'].append(json_annotation)
+
+ json_traces.append(json_trace)
+
+ return json.dumps(json_traces, *json_args, **json_kwargs)
+
+
+def ipv4_to_int(ipv4):
+ return struct.unpack('!i', socket.inet_aton(ipv4))[0]
+
+
+def base64_thrift(thrift_obj):
+ trans = TTransport.TMemoryBuffer()
+ tbp = TBinaryProtocol.TBinaryProtocol(trans)
+
+ thrift_obj.write(tbp)
+ res = trans.getvalue().encode('base64').strip()
+ res = res.replace("\n","")
+ #print res
+ #print len(res)
+ return res
+ #return trans.getvalue().encode('base64').strip()
+
+
+def binary_annotation_formatter(annotation, host=None):
+ annotation_types = {
+ 'string': ttypes.AnnotationType.STRING,
+ 'bytes': ttypes.AnnotationType.BYTES,
+ }
+
+ annotation_type = annotation_types[annotation.annotation_type]
+
+ value = annotation.value
+
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+
+ return ttypes.BinaryAnnotation(
+ annotation.name,
+ value,
+ annotation_type,
+ host)
+
+
+def base64_thrift_formatter(trace, annotations):
+ thrift_annotations = []
+ binary_annotations = []
+
+ for annotation in annotations:
+ host = None
+ if annotation.endpoint:
+ host = ttypes.Endpoint(
+ ipv4=ipv4_to_int(annotation.endpoint.ipv4),
+ port=annotation.endpoint.port,
+ service_name=annotation.endpoint.service_name)
+
+ if annotation.annotation_type == 'timestamp':
+ thrift_annotations.append(ttypes.Annotation(
+ timestamp=annotation.value,
+ value=annotation.name,
+ host=host))
+ else:
+ binary_annotations.append(
+ binary_annotation_formatter(annotation, host))
+
+ thrift_trace = ttypes.Span(
+ trace_id=trace.trace_id,
+ name=trace.name,
+ id=trace.span_id,
+ parent_id=trace.parent_span_id,
+ annotations=thrift_annotations,
+ binary_annotations=binary_annotations
+ )
+
+ return base64_thrift(thrift_trace)
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py
new file mode 100644
index 000000000..e0a3ed2c3
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py
@@ -0,0 +1,137 @@
+import math
+import time
+import random
+
+
+class Trace(object):
+ """
+ An L{ITrace} provider which delegates to zero or more L{ITracers} and
+ allows setting a default L{IEndpoint} to associate with L{IAnnotation}s
+
+ @ivar _tracers: C{list} of one or more L{ITracer} providers.
+ @ivar _endpoint: An L{IEndpoint} provider.
+ """
+ def __init__(self, name, trace_id=None, span_id=None,
+ parent_span_id=None, tracers=None):
+ """
+ @param name: C{str} describing the current span.
+ @param trace_id: C{int} or C{None}
+ @param span_id: C{int} or C{None}
+ @param parent_span_id: C{int} or C{None}
+
+ @param tracers: C{list} of L{ITracer} providers, primarily useful
+ for unit testing.
+ """
+ self.name = name
+ # If no trace_id and span_id are given we want to generate new
+ # 64-bit integer ids.
+ self.trace_id = trace_id
+ self.span_id = span_id
+
+ # If no parent_span_id is given then we assume there is no parent span
+ # and leave it as None.
+ self.parent_span_id = parent_span_id
+
+ # If no tracers are given we get the global list of tracers.
+ self._tracers = tracers
+
+ # By default no endpoint will be associated with annotations recorded
+ # to this trace.
+ self._endpoint = None
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return (
+ '{0.__class__.__name__}({0.name!r}, trace_id={0.trace_id!r}, '
+ 'span_id={0.span_id!r}, parent_span_id={0.parent_span_id!r})'
+ ).format(self)
+
+ def set_endpoint(self, endpoint):
+ """
+ Set a default L{IEndpoint} provider for the current L{Trace}.
+ All annotations recorded after this endpoint is set will use it,
+ unless they provide their own endpoint.
+ """
+ self._endpoint = endpoint
+
+
+class Endpoint(object):
+
+ def __init__(self, ipv4, port, service_name):
+ """
+ @param ipv4: C{str} ipv4 address.
+ @param port: C{int} port number.
+ @param service_name: C{str} service name.
+ """
+ self.ipv4 = ipv4
+ self.port = port
+ self.service_name = service_name
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return ('{0.__class__.__name__}({0.ipv4!r}, {0.port!r}, '
+ '{0.service_name!r})').format(self)
+
+
+class Annotation(object):
+
+ def __init__(self, name, value, annotation_type, endpoint=None):
+ """
+ @param name: C{str} name of this annotation.
+
+ @param value: A value of the appropriate type based on
+ C{annotation_type}.
+
+ @param annotation_type: C{str} the expected type of our C{value}.
+
+ @param endpoint: An optional L{IEndpoint} provider to associate with
+ this annotation or C{None}
+ """
+ self.name = name
+ self.value = value
+ self.annotation_type = annotation_type
+ self.endpoint = endpoint
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return (
+ '{0.__class__.__name__}({0.name!r}, {0.value!r}, '
+ '{0.annotation_type!r}, {0.endpoint})'
+ ).format(self)
+
+ @classmethod
+ def timestamp(cls, name, timestamp=None):
+ if timestamp is None:
+ timestamp = math.trunc(time.time() * 1000 * 1000)
+
+ return cls(name, timestamp, 'timestamp')
+
+ @classmethod
+ def client_send(cls, timestamp=None):
+ return cls.timestamp(constants.CLIENT_SEND, timestamp)
+
+ @classmethod
+ def client_recv(cls, timestamp=None):
+ return cls.timestamp(constants.CLIENT_RECV, timestamp)
+
+ @classmethod
+ def server_send(cls, timestamp=None):
+ return cls.timestamp(constants.SERVER_SEND, timestamp)
+
+ @classmethod
+ def server_recv(cls, timestamp=None):
+ return cls.timestamp(constants.SERVER_RECV, timestamp)
+
+ @classmethod
+ def string(cls, name, value):
+ return cls(name, value, 'string')
+
+ @classmethod
+ def bytes(cls, name, value):
+ return cls(name, value, 'bytes')
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py
new file mode 100644
index 000000000..8605559b7
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py
@@ -0,0 +1,453 @@
+#
+# Autogenerated by Thrift Compiler (0.8.0)
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+# options string: py:twisted
+#
+
+from thrift.Thrift import TType, TMessageType, TException
+
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol, TProtocol
+try:
+ from thrift.protocol import fastbinary
+except:
+ fastbinary = None
+
+
+class AnnotationType:
+ BOOL = 0
+ BYTES = 1
+ I16 = 2
+ I32 = 3
+ I64 = 4
+ DOUBLE = 5
+ STRING = 6
+
+ _VALUES_TO_NAMES = {
+ 0: "BOOL",
+ 1: "BYTES",
+ 2: "I16",
+ 3: "I32",
+ 4: "I64",
+ 5: "DOUBLE",
+ 6: "STRING",
+ }
+
+ _NAMES_TO_VALUES = {
+ "BOOL": 0,
+ "BYTES": 1,
+ "I16": 2,
+ "I32": 3,
+ "I64": 4,
+ "DOUBLE": 5,
+ "STRING": 6,
+ }
+
+
+class Endpoint:
+ """
+ Attributes:
+ - ipv4
+ - port
+ - service_name
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'ipv4', None, None, ), # 1
+ (2, TType.I16, 'port', None, None, ), # 2
+ (3, TType.STRING, 'service_name', None, None, ), # 3
+ )
+
+ def __init__(self, ipv4=None, port=None, service_name=None,):
+ self.ipv4 = ipv4
+ self.port = port
+ self.service_name = service_name
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.ipv4 = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I16:
+ self.port = iprot.readI16();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.service_name = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Endpoint')
+ if self.ipv4 is not None:
+ oprot.writeFieldBegin('ipv4', TType.I32, 1)
+ oprot.writeI32(self.ipv4)
+ oprot.writeFieldEnd()
+ if self.port is not None:
+ oprot.writeFieldBegin('port', TType.I16, 2)
+ oprot.writeI16(self.port)
+ oprot.writeFieldEnd()
+ if self.service_name is not None:
+ oprot.writeFieldBegin('service_name', TType.STRING, 3)
+ oprot.writeString(self.service_name)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Annotation:
+ """
+ Attributes:
+ - timestamp
+ - value
+ - host
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'timestamp', None, None, ), # 1
+ (2, TType.STRING, 'value', None, None, ), # 2
+ (3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3
+ )
+
+ def __init__(self, timestamp=None, value=None, host=None,):
+ self.timestamp = timestamp
+ self.value = value
+ self.host = host
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.timestamp = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.value = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRUCT:
+ self.host = Endpoint()
+ self.host.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Annotation')
+ if self.timestamp is not None:
+ oprot.writeFieldBegin('timestamp', TType.I64, 1)
+ oprot.writeI64(self.timestamp)
+ oprot.writeFieldEnd()
+ if self.value is not None:
+ oprot.writeFieldBegin('value', TType.STRING, 2)
+ oprot.writeString(self.value)
+ oprot.writeFieldEnd()
+ if self.host is not None:
+ oprot.writeFieldBegin('host', TType.STRUCT, 3)
+ self.host.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class BinaryAnnotation:
+ """
+ Attributes:
+ - key
+ - value
+ - annotation_type
+ - host
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'key', None, None, ), # 1
+ (2, TType.STRING, 'value', None, None, ), # 2
+ (3, TType.I32, 'annotation_type', None, None, ), # 3
+ (4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4
+ )
+
+ def __init__(self, key=None, value=None, annotation_type=None, host=None,):
+ self.key = key
+ self.value = value
+ self.annotation_type = annotation_type
+ self.host = host
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.key = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.value = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.annotation_type = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.STRUCT:
+ self.host = Endpoint()
+ self.host.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('BinaryAnnotation')
+ if self.key is not None:
+ oprot.writeFieldBegin('key', TType.STRING, 1)
+ oprot.writeString(self.key)
+ oprot.writeFieldEnd()
+ if self.value is not None:
+ oprot.writeFieldBegin('value', TType.STRING, 2)
+ oprot.writeString(self.value)
+ oprot.writeFieldEnd()
+ if self.annotation_type is not None:
+ oprot.writeFieldBegin('annotation_type', TType.I32, 3)
+ oprot.writeI32(self.annotation_type)
+ oprot.writeFieldEnd()
+ if self.host is not None:
+ oprot.writeFieldBegin('host', TType.STRUCT, 4)
+ self.host.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Span:
+ """
+ Attributes:
+ - trace_id
+ - name
+ - id
+ - parent_id
+ - annotations
+ - binary_annotations
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'trace_id', None, None, ), # 1
+ None, # 2
+ (3, TType.STRING, 'name', None, None, ), # 3
+ (4, TType.I64, 'id', None, None, ), # 4
+ (5, TType.I64, 'parent_id', None, None, ), # 5
+ (6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6
+ None, # 7
+ (8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8
+ )
+
+ def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None,):
+ self.trace_id = trace_id
+ self.name = name
+ self.id = id
+ self.parent_id = parent_id
+ self.annotations = annotations
+ self.binary_annotations = binary_annotations
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.trace_id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.name = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I64:
+ self.id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.I64:
+ self.parent_id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.LIST:
+ self.annotations = []
+ (_etype3, _size0) = iprot.readListBegin()
+ for _i4 in xrange(_size0):
+ _elem5 = Annotation()
+ _elem5.read(iprot)
+ self.annotations.append(_elem5)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.LIST:
+ self.binary_annotations = []
+ (_etype9, _size6) = iprot.readListBegin()
+ for _i10 in xrange(_size6):
+ _elem11 = BinaryAnnotation()
+ _elem11.read(iprot)
+ self.binary_annotations.append(_elem11)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Span')
+ if self.trace_id is not None:
+ oprot.writeFieldBegin('trace_id', TType.I64, 1)
+ oprot.writeI64(self.trace_id)
+ oprot.writeFieldEnd()
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 3)
+ oprot.writeString(self.name)
+ oprot.writeFieldEnd()
+ if self.id is not None:
+ oprot.writeFieldBegin('id', TType.I64, 4)
+ oprot.writeI64(self.id)
+ oprot.writeFieldEnd()
+ if self.parent_id is not None:
+ oprot.writeFieldBegin('parent_id', TType.I64, 5)
+ oprot.writeI64(self.parent_id)
+ oprot.writeFieldEnd()
+ if self.annotations is not None:
+ oprot.writeFieldBegin('annotations', TType.LIST, 6)
+ oprot.writeListBegin(TType.STRUCT, len(self.annotations))
+ for iter12 in self.annotations:
+ iter12.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.binary_annotations is not None:
+ oprot.writeFieldBegin('binary_annotations', TType.LIST, 8)
+ oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations))
+ for iter13 in self.binary_annotations:
+ iter13.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py
new file mode 100644
index 000000000..28118facb
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py
@@ -0,0 +1,70 @@
+#!/usr/bin/python
+
+from scribe_client import ScribeClient
+from trace import Annotation, Trace, Endpoint
+from collections import defaultdict
+from formatters import base64_thrift_formatter
+
+
+class ZipkinClient(ScribeClient):
+
+ DEFAULT_END_ANNOTATIONS = ("ss", "cr", "end")
+
+ def __init__(self, port, host):
+ super(ZipkinClient, self).__init__(port, host)
+ self._annotations_for_trace = defaultdict(list)
+
+ def create_trace(self, event):
+ service = event["trace_name"]
+ trace_id = event["trace_id"]
+ span_id = event["span_id"]
+ parent_span = event["parent_span_id"]
+ if parent_span == 0:
+ parent_span = None
+ trace = Trace(service, trace_id, span_id, parent_span)
+ return trace
+
+ def create_annotation(self, event, kind):
+ if kind == "keyval_string":
+ key = event["key"]
+ val = event["val"]
+ annotation = Annotation.string(key, val)
+ elif kind == "keyval_integer":
+ key = event["key"]
+ val = str(event["val"])
+ annotation = Annotation.string(key, val)
+ elif kind == "timestamp":
+ timestamp = event.timestamp
+ #timestamp has different digit length
+ timestamp = str(timestamp)
+ timestamp = timestamp[:-3]
+ event_name = event["event"]
+ annotation = Annotation.timestamp(event_name, int(timestamp))
+
+ # create and set endpoint
+ port = event["port_no"]
+ service = event["service_name"]
+ ip = event["ip"]
+ endpoint = Endpoint(ip, int(port), service)
+ annotation.endpoint = endpoint
+
+ print annotation
+ return annotation
+
+ def record(self, trace, annotation):
+ self.scribe_log(trace, [annotation])
+ '''
+ trace_key = (trace.trace_id, trace.span_id)
+ self._annotations_for_trace[trace_key].append(annotation)
+ if (annotation.name in self.DEFAULT_END_ANNOTATIONS):
+ saved_annotations = self._annotations_for_trace[trace_key]
+ del self._annotations_for_trace[trace_key]
+ self.scribe_log(trace, saved_annotations)
+ print "Record event"
+ '''
+
+ def scribe_log(self, trace, annotations):
+ trace._endpoint = None
+ message = base64_thrift_formatter(trace, annotations)
+ category = 'zipkin'
+ return self.log(category, message)
diff --git a/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h
new file mode 100644
index 000000000..4abc87b87
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h
@@ -0,0 +1,67 @@
+/*
+ * Zipkin lttng-ust tracepoint provider.
+ */
+
+#undef TRACEPOINT_PROVIDER
+#define TRACEPOINT_PROVIDER zipkin
+
+#undef TRACEPOINT_INCLUDE
+#define TRACEPOINT_INCLUDE "./zipkin_trace.h"
+
+#if !defined(_ZIPKIN_H) || defined(TRACEPOINT_HEADER_MULTI_READ)
+#define _ZIPKIN_H
+
+#include <lttng/tracepoint.h>
+
+TRACEPOINT_EVENT(
+ zipkin,
+ keyval,
+ TP_ARGS(char *, service, char *, trace_name,
+ int, port, char *, ip, long, trace,
+ long, span, long, parent_span,
+ char *, key, char *, val ),
+
+ TP_FIELDS(
+ ctf_string(trace_name, trace_name)
+ ctf_string(service_name, service)
+ ctf_integer(int, port_no, port)
+ ctf_string(ip, ip)
+ ctf_integer(long, trace_id, trace)
+ ctf_integer(long, span_id, span)
+ ctf_integer(long, parent_span_id, parent_span)
+ ctf_string(key, key)
+ ctf_string(val, val)
+ )
+)
+TRACEPOINT_LOGLEVEL(
+ zipkin,
+ keyval,
+ TRACE_WARNING)
+
+
+TRACEPOINT_EVENT(
+ zipkin,
+ timestamp,
+ TP_ARGS(char *, service, char *, trace_name,
+ int, port, char *, ip, long, trace,
+ long, span, long, parent_span,
+ char *, event),
+
+ TP_FIELDS(
+ ctf_string(trace_name, trace_name)
+ ctf_string(service_name, service)
+ ctf_integer(int, port_no, port)
+ ctf_string(ip, ip)
+ ctf_integer(long, trace_id, trace)
+ ctf_integer(long, span_id, span)
+ ctf_integer(long, parent_span_id, parent_span)
+ ctf_string(event, event)
+ )
+)
+TRACEPOINT_LOGLEVEL(
+ zipkin,
+ timestamp,
+ TRACE_WARNING)
+#endif /* _ZIPKIN_H */
+
+#include <lttng/tracepoint-event.h>
diff --git a/src/blkin/blkin-lib/CMakeLists.txt b/src/blkin/blkin-lib/CMakeLists.txt
new file mode 100644
index 000000000..d2e1d0f73
--- /dev/null
+++ b/src/blkin/blkin-lib/CMakeLists.txt
@@ -0,0 +1,18 @@
+include_directories(.)
+
+#blkin
+set(blkin_srcs
+ zipkin_c.c
+ tp.c
+)
+add_library(blkin ${blkin_srcs})
+set_target_properties(blkin PROPERTIES POSITION_INDEPENDENT_CODE ON)
+target_link_libraries(blkin dl ${LTTNG_LIBRARIES})
+
+set(blkin_headers
+ zipkin_c.h
+ zipkin_trace.h
+ ztracer.hpp
+)
+
+add_subdirectory(tests)
diff --git a/src/blkin/blkin-lib/Makefile b/src/blkin/blkin-lib/Makefile
new file mode 100644
index 000000000..52852f449
--- /dev/null
+++ b/src/blkin/blkin-lib/Makefile
@@ -0,0 +1,92 @@
+# Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+.PHONY: default clean distclean run run_c run_pp
+
+MAJOR=0
+MINOR=1
+LIBS= -ldl -llttng-ust
+DLIB=libblkin
+
+LIB_DIR=$(shell pwd)
+TEST_DIR=$(shell pwd)/tests
+prefix= /usr/local
+libdir= $(prefix)/lib
+incdir= $(prefix)/include
+
+H_FILES= zipkin_c.h zipkin_trace.h ztracer.hpp
+
+default: $(DLIB).so test testpp testppp
+
+$(DLIB).so: $(DLIB).$(MAJOR).so
+ ln -sf $< $@
+
+$(DLIB).$(MAJOR).so: $(DLIB).$(MAJOR).$(MINOR).so
+ ln -sf $< $@
+
+$(DLIB).$(MAJOR).$(MINOR).so: zipkin_c.o tp.o
+ g++ -shared -o $@ $^ $(LIBS)
+
+zipkin_c.o: zipkin_c.c zipkin_c.h zipkin_trace.h
+ gcc -I. -Wall -fpic -g -c -o $@ $<
+
+tp.o: tp.c zipkin_trace.h
+ gcc -I. -fpic -g -c -o $@ $<
+
+test: $(TEST_DIR)/test.c $(DLIB).so
+ make -C tests test
+
+testpp: $(TEST_DIR)/test.cc $(DLIB).so
+ make -C tests testpp
+
+testppp: $(TEST_DIR)/test_p.cc $(DLIB).so
+ make -C tests testppp
+
+run_c:
+ make -C tests run_c
+
+run_pp:
+ make -C tests run_pp
+
+run_ppp:
+ make -C tests run_ppp
+
+run: run_c run_pp
+
+install:
+ install -m 644 $(DLIB).$(MAJOR).$(MINOR).so $(DESTDIR)/$(libdir)
+ cp -P $(DLIB).$(MAJOR).so $(DESTDIR)/$(libdir)
+ cp -P $(DLIB).so $(DESTDIR)/$(libdir)
+ install -m 644 $(H_FILES) $(DESTDIR)/$(incdir)
+
+clean:
+ rm -f *.o *.so
+ make -C tests clean
+
+distclean: clean
+ rm -f socket
diff --git a/src/blkin/blkin-lib/tests/CMakeLists.txt b/src/blkin/blkin-lib/tests/CMakeLists.txt
new file mode 100644
index 000000000..d69cec3da
--- /dev/null
+++ b/src/blkin/blkin-lib/tests/CMakeLists.txt
@@ -0,0 +1,15 @@
+#test
+add_executable(testc test.c)
+target_link_libraries(testc blkin lttng-ust)
+add_test(NAME testc COMMAND $<TARGET_FILE:testc>)
+
+#testpp
+add_executable(testpp test.cc)
+set_target_properties(testpp PROPERTIES COMPILE_FLAGS "-std=c++11")
+target_link_libraries(testpp blkin lttng-ust pthread)
+add_test(NAME testpp COMMAND $<TARGET_FILE:testpp>)
+
+#testppp
+add_executable(testppp test_p.cc)
+target_link_libraries(testppp blkin lttng-ust)
+add_test(NAME testppp COMMAND $<TARGET_FILE:testppp>)
diff --git a/src/blkin/blkin-lib/tests/Makefile b/src/blkin/blkin-lib/tests/Makefile
new file mode 100644
index 000000000..bed3fc869
--- /dev/null
+++ b/src/blkin/blkin-lib/tests/Makefile
@@ -0,0 +1,55 @@
+# Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+.PHONY: default clean distclean run run_c run_pp
+
+LIB_DIR=$(shell pwd)/..
+DLIB=$(LIB_DIR)/libblkin
+
+test: test.c $(DLIB).so
+ gcc test.c -o test -g -I$(LIB_DIR) -L$(LIB_DIR) -lblkin
+
+testpp: test.cc $(DLIB).so
+ LD_LIBRARY_PATH=$(LIB_DIR) g++ $< -o testpp -std=c++11 -g -I$(LIB_DIR) -L$(LIB_DIR) -lblkin -lpthread
+
+testppp: test_p.cc $(DLIB).so
+ LD_LIBRARY_PATH=$(LIB_DIR) g++ $< -o testppp -g -I$(LIB_DIR) -L$(LIB_DIR) -lblkin
+
+run_c:
+ LD_LIBRARY_PATH=$(LIB_DIR) ./test
+
+run_pp:
+ LD_LIBRARY_PATH=$(LIB_DIR) ./testpp
+
+run_ppp:
+ LD_LIBRARY_PATH=$(LIB_DIR) ./testppp
+
+run: run_c run_pp
+
+clean:
+ rm -f *.o *.so test testpp testppp socket
diff --git a/src/blkin/blkin-lib/tests/test.c b/src/blkin/blkin-lib/tests/test.c
new file mode 100644
index 000000000..2cabd99bf
--- /dev/null
+++ b/src/blkin/blkin-lib/tests/test.c
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ * In this example we have 2 processes communicating over a unix socket.
+ * We are going to trace the communication with our library
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <zipkin_c.h>
+
+#define SOCK_PATH "/tmp/socket"
+
+struct message {
+ char actual_message[20];
+ struct blkin_trace_info trace_info;
+};
+
+void process_a()
+{
+ int i, r;
+ printf("I am process A: %d\n", getpid());
+
+ r = blkin_init();
+ if (r < 0) {
+ fprintf(stderr, "Could not initialize blkin\n");
+ exit(1);
+ }
+
+ /*initialize endpoint*/
+ struct blkin_endpoint endp;
+ blkin_init_endpoint(&endp, "10.0.0.1", 5000, "service a");
+
+ struct blkin_trace trace;
+ struct blkin_annotation ant;
+ struct message msg = {.actual_message = "message"};
+ char ack;
+
+ /*create and bind socket*/
+ int s, s2, len;
+ socklen_t t;
+ struct sockaddr_un local, remote;
+
+ if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ perror("socket");
+ exit(1);
+ }
+
+ local.sun_family = AF_UNIX;
+ strcpy(local.sun_path, SOCK_PATH);
+ unlink(local.sun_path);
+ len = strlen(local.sun_path) + sizeof(local.sun_family);
+ if (bind(s, (struct sockaddr *)&local, len) == -1) {
+ perror("bind");
+ exit(1);
+ }
+
+ if (listen(s, 5) == -1) {
+ perror("listen");
+ exit(1);
+ }
+
+ printf("Waiting for a connection...\n");
+ t = sizeof(remote);
+ if ((s2 = accept(s, (struct sockaddr *)&remote, &t)) == -1) {
+ perror("accept");
+ exit(1);
+ }
+
+ printf("Connected.\n");
+
+ for (i=0;i<10;i++) {
+
+ /*create trace*/
+ blkin_init_new_trace(&trace, "process a", &endp);
+
+ blkin_init_timestamp_annotation(&ant, "start", &endp);
+ blkin_record(&trace, &ant);
+
+ /*set trace fields to message*/
+ blkin_get_trace_info(&trace, &msg.trace_info);
+
+ /*send*/
+ send(s2, &msg, sizeof(struct message), 0);
+
+ /*wait for ack*/
+ recv(s2, &ack, 1, 0);
+
+ /*create annotation and log*/
+ blkin_init_timestamp_annotation(&ant, "end", &endp);
+ blkin_record(&trace, &ant);
+ }
+ close(s2);
+}
+
+void process_b()
+{
+ int i, r;
+ printf("I am process B: %d\n", getpid());
+
+ r = blkin_init();
+ if (r < 0) {
+ fprintf(stderr, "Could not initialize blkin\n");
+ exit(1);
+ }
+ /*initialize endpoint*/
+ struct blkin_endpoint endp;
+ blkin_init_endpoint(&endp, "10.0.0.2", 5001, "service b");
+
+ struct blkin_trace trace;
+ struct blkin_annotation ant;
+ struct message msg;
+ int s, len;
+ struct sockaddr_un remote;
+
+ /*Connect*/
+ if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ perror("socket");
+ exit(1);
+ }
+
+ printf("Trying to connect...\n");
+
+ remote.sun_family = AF_UNIX;
+ strcpy(remote.sun_path, SOCK_PATH);
+ len = strlen(remote.sun_path) + sizeof(remote.sun_family);
+ if (connect(s, (struct sockaddr *)&remote, len) == -1) {
+ perror("connect");
+ exit(1);
+ }
+
+ printf("Connected.\n");
+
+ for (i=0;i<10;i++) {
+ recv(s, &msg, sizeof(struct message), 0);
+
+ /*create child trace*/
+ blkin_init_child_info(&trace, &msg.trace_info, &endp, "process b");
+
+ /*create annotation and log*/
+ blkin_init_timestamp_annotation(&ant, "start", &endp);
+ blkin_record(&trace, &ant);
+
+ /*Process...*/
+ usleep(10);
+ printf("Message received %s\n", msg.actual_message);
+
+ /*create annotation and log*/
+ blkin_init_timestamp_annotation(&ant, "end", &endp);
+ blkin_record(&trace, &ant);
+
+ /*send ack*/
+ send(s, "*", 1, 0);
+ }
+}
+
+
+int main()
+{
+ if (fork()){
+ process_a();
+ exit(0);
+ }
+ else{
+ process_b();
+ exit(0);
+ }
+}
diff --git a/src/blkin/blkin-lib/tests/test.cc b/src/blkin/blkin-lib/tests/test.cc
new file mode 100644
index 000000000..f93597dd3
--- /dev/null
+++ b/src/blkin/blkin-lib/tests/test.cc
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <thread>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <ztracer.hpp>
+#include <iostream>
+
+#define SOCK_PATH "socket"
+
+struct message {
+ int actual_message;
+ struct blkin_trace_info trace_info;
+ message(int s) : actual_message(s) {};
+ message() {}
+};
+
+class Parent {
+ private:
+ int s, s2;
+ ZTracer::Endpoint e;
+ public:
+ Parent() : e("0.0.0.0", 1, "parent")
+ {
+ connect();
+ }
+ void operator()()
+ {
+ struct sockaddr_un remote;
+ int t;
+ std::cout << "I am parent process : " << getpid() << std::endl;
+
+ /* Wait for connection */
+ t = sizeof(remote);
+ if ((s2 = accept(s, (struct sockaddr *)&remote, (socklen_t *)&t)) == -1) {
+ std::cerr << "accept" << std::endl;
+ exit(1);
+ }
+
+ std::cerr << "Connected" << std::endl;
+
+ for (int i=0;i<10;i++) {
+ /*Init trace*/
+ ZTracer::Trace tr("parent process", &e);
+
+ process(tr);
+
+ wait_response();
+
+ /*Log received*/
+ tr.event("parent end");
+ }
+ }
+
+ void process(ZTracer::Trace &tr)
+ {
+ struct message msg(rand());
+ /*Annotate*/
+ tr.event("parent start");
+ /*Set trace info to the message*/
+ msg.trace_info = *tr.get_info();
+
+ /*send*/
+ send(s2, &msg, sizeof(struct message), 0);
+ }
+
+ void wait_response()
+ {
+ char ack;
+ recv(s2, &ack, 1, 0);
+ }
+
+ void connect()
+ {
+ /*create and bind socket*/
+ int len;
+ struct sockaddr_un local;
+
+ if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ std::cerr << "socket" << std::endl;
+ exit(1);
+ }
+
+ local.sun_family = AF_UNIX;
+ strcpy(local.sun_path, SOCK_PATH);
+ unlink(local.sun_path);
+ len = strlen(local.sun_path) + sizeof(local.sun_family);
+ if (bind(s, (struct sockaddr *)&local, len) == -1) {
+ std::cerr << "bind" << std::endl;
+ exit(1);
+ }
+
+ if (listen(s, 5) == -1) {
+ std::cerr << "listen" << std::endl;
+ exit(1);
+ }
+
+ std::cout << "Waiting for a connection..." << std::endl;
+ }
+
+};
+
+class Child {
+ private:
+ int s;
+ ZTracer::Endpoint e;
+ public:
+ Child() : e("0.0.0.1", 2, "child")
+ {
+ }
+ void operator()()
+ {
+ /*Connect to the socket*/
+ soc_connect();
+
+ for (int i=0;i<10;i++)
+ process();
+ }
+
+ void process()
+ {
+ struct message msg;
+ recv(s, &msg, sizeof(struct message), 0);
+
+ ZTracer::Trace tr("Child process", &e, &msg.trace_info, true);
+ tr.event("child start");
+
+ usleep(10);
+ std::cout << "Message received : " << msg.actual_message << ::std::endl;
+ tr.event("child end");
+
+ send(s, "*", 1, 0);
+ }
+
+
+ void soc_connect()
+ {
+ int len;
+ struct sockaddr_un remote;
+
+ if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ std::cerr << "socket" << std::endl;
+ exit(1);
+ }
+
+ std::cout << "Trying to connect...\n" << std::endl;
+
+ remote.sun_family = AF_UNIX;
+ strcpy(remote.sun_path, SOCK_PATH);
+ len = strlen(remote.sun_path) + sizeof(remote.sun_family);
+ if (connect(s, (struct sockaddr *)&remote, len) == -1) {
+ std::cerr << "connect" << std::endl;;
+ exit(1);
+ }
+
+ std::cout << "Connected" << std::endl;
+ }
+
+};
+int main(int argc, const char *argv[])
+{
+ int r = ZTracer::ztrace_init();
+ if (r < 0) {
+ std::cout << "Error initializing blkin" << std::endl;
+ return -1;
+ }
+ Parent p;
+ Child c;
+ std::thread workerThread1(p);
+ std::thread workerThread2(c);
+ workerThread1.join();
+ workerThread2.join();
+
+ return 0;
+}
diff --git a/src/blkin/blkin-lib/tests/test_p.cc b/src/blkin/blkin-lib/tests/test_p.cc
new file mode 100644
index 000000000..ec59da3b6
--- /dev/null
+++ b/src/blkin/blkin-lib/tests/test_p.cc
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <ztracer.hpp>
+#include <iostream>
+#include <cstdlib>
+
+#define SOCK_PATH "socket"
+
+struct message {
+ int actual_message;
+ struct blkin_trace_info trace_info;
+ message(int s) : actual_message(s) {};
+ message() {}
+};
+
+class Parent {
+ private:
+ int s, s2;
+ ZTracer::Endpoint e;
+ public:
+ Parent() : e("0.0.0.0", 1, "parent")
+ {
+ connect();
+ }
+ void operator()()
+ {
+ struct sockaddr_un remote;
+ int t;
+ std::cout << "I am parent process : " << getpid() << std::endl;
+
+ /* Wait for connection */
+ t = sizeof(remote);
+ if ((s2 = accept(s, (struct sockaddr *)&remote, (socklen_t *)&t)) == -1) {
+ std::cerr << "accept" << std::endl;
+ exit(1);
+ }
+
+ std::cerr << "Connected" << std::endl;
+
+ for (int i=0;i<10;i++) {
+ /*Init trace*/
+ ZTracer::Trace tr("parent process", &e);
+ process(tr);
+
+ wait_response();
+
+ /*Log received*/
+ tr.event("parent end");
+ }
+ }
+
+ void process(ZTracer::Trace &tr)
+ {
+ struct message msg(rand());
+ /*Annotate*/
+ tr.event("parent start");
+ /*Set trace info to the message*/
+ msg.trace_info = *tr.get_info();
+
+ /*send*/
+ send(s2, &msg, sizeof(struct message), 0);
+ }
+
+ void wait_response()
+ {
+ char ack;
+ recv(s2, &ack, 1, 0);
+ }
+
+ void connect()
+ {
+ /*create and bind socket*/
+ int len;
+ struct sockaddr_un local;
+
+ if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ std::cerr << "socket" << std::endl;
+ exit(1);
+ }
+
+ local.sun_family = AF_UNIX;
+ strcpy(local.sun_path, SOCK_PATH);
+ unlink(local.sun_path);
+ len = strlen(local.sun_path) + sizeof(local.sun_family);
+ if (bind(s, (struct sockaddr *)&local, len) == -1) {
+ std::cerr << "bind" << std::endl;
+ exit(1);
+ }
+
+ if (listen(s, 5) == -1) {
+ std::cerr << "listen" << std::endl;
+ exit(1);
+ }
+
+ std::cout << "Waiting for a connection..." << std::endl;
+ }
+
+};
+
+class Child {
+ private:
+ int s;
+ ZTracer::Endpoint e;
+ public:
+ Child() : e("0.0.0.1", 2, "child")
+ {
+ }
+ void operator()()
+ {
+ /*Connect to the socket*/
+ soc_connect();
+
+ for (int i=0;i<10;i++)
+ process();
+ }
+
+ void process()
+ {
+ struct message msg;
+ recv(s, &msg, sizeof(struct message), 0);
+
+ ZTracer::Trace tr("Child process", &e, &msg.trace_info, true);
+ tr.event("child start");
+
+ usleep(10);
+ std::cout << "Message received : " << msg.actual_message << ::std::endl;
+ tr.event("child end");
+
+ send(s, "*", 1, 0);
+ }
+
+
+ void soc_connect()
+ {
+ int len;
+ struct sockaddr_un remote;
+
+ if ((s = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
+ std::cerr << "socket" << std::endl;
+ exit(1);
+ }
+
+ std::cout << "Trying to connect...\n" << std::endl;
+
+ remote.sun_family = AF_UNIX;
+ strcpy(remote.sun_path, SOCK_PATH);
+ len = strlen(remote.sun_path) + sizeof(remote.sun_family);
+ if (connect(s, (struct sockaddr *)&remote, len) == -1) {
+ std::cerr << "connect" << std::endl;;
+ exit(1);
+ }
+
+ std::cout << "Connected" << std::endl;
+ }
+
+};
+int main(int argc, const char *argv[])
+{
+ if (fork()) {
+ int r = ZTracer::ztrace_init();
+ if (r < 0) {
+ std::cout << "Error initializing blkin" << std::endl;
+ exit(1);
+ }
+ Parent p;
+ p();
+ exit(0);
+ } else {
+ int r = ZTracer::ztrace_init();
+ if (r < 0) {
+ std::cout << "Error initializing blkin" << std::endl;
+ exit(1);
+ }
+ Child c;
+ c();
+ exit(0);
+ }
+ return 0;
+}
diff --git a/src/blkin/blkin-lib/tp.c b/src/blkin/blkin-lib/tp.c
new file mode 100644
index 000000000..e72177e57
--- /dev/null
+++ b/src/blkin/blkin-lib/tp.c
@@ -0,0 +1,35 @@
+/*
+ * tp.c
+ *
+ * Copyright (c) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+/*
+ * Defining macro creates the code objects of the traceprobes, only do
+ * it once per file
+ */
+#define TRACEPOINT_CREATE_PROBES
+/*
+ * The header containing our TRACEPOINT_EVENTs.
+ */
+#include <zipkin_trace.h>
diff --git a/src/blkin/blkin-lib/zipkin_c.c b/src/blkin/blkin-lib/zipkin_c.c
new file mode 100644
index 000000000..a68182b55
--- /dev/null
+++ b/src/blkin/blkin-lib/zipkin_c.c
@@ -0,0 +1,356 @@
+/*
+ * Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#include "zipkin_c.h"
+#include <pthread.h>
+#include <errno.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#define TRACEPOINT_DEFINE
+#include <zipkin_trace.h>
+
+const char *default_ip = "NaN";
+const char *default_name = "NoName";
+
+const char* const CLIENT_SEND = "cs";
+const char* const CLIENT_RECV = "cr";
+const char* const SERVER_SEND = "ss";
+const char* const SERVER_RECV = "sr";
+const char* const WIRE_SEND = "ws";
+const char* const WIRE_RECV = "wr";
+const char* const CLIENT_SEND_FRAGMENT = "csf";
+const char* const CLIENT_RECV_FRAGMENT = "crf";
+const char* const SERVER_SEND_FRAGMENT = "ssf";
+const char* const SERVER_RECV_FRAGMENT = "srf";
+
+static int64_t random_big()
+{
+ int64_t a;
+ a = rand();
+ a = a << 32;
+ int b = rand();
+ a = a + b;
+ if (a<0)
+ a = !a;
+ return a;
+}
+
+int blkin_init()
+{
+ static pthread_mutex_t blkin_init_mutex = PTHREAD_MUTEX_INITIALIZER;
+ static int initialized = 0;
+
+ /*
+ * Initialize srand with sth appropriete
+ * time is not good for archipelago: several deamons -> same timstamp
+ */
+ pthread_mutex_lock(&blkin_init_mutex);
+ if (!initialized) {
+ int inf, seed;
+ inf = open("/dev/urandom", O_RDONLY); //file descriptor 1
+ read(inf, &seed, sizeof(int));
+ close(inf);
+ srand(seed);
+ initialized = 1;
+ }
+ pthread_mutex_unlock(&blkin_init_mutex);
+ return 0;
+}
+
+int blkin_init_new_trace(struct blkin_trace *new_trace, const char *service,
+ const struct blkin_endpoint *endpoint)
+{
+ int res;
+ if (!new_trace) {
+ res = -EINVAL;
+ goto OUT;
+ }
+ new_trace->name = service;
+ blkin_init_trace_info(&(new_trace->info));
+ new_trace->endpoint = endpoint;
+ res = 0;
+
+OUT:
+ return res;
+}
+
+void blkin_init_trace_info(struct blkin_trace_info *trace_info)
+{
+ trace_info->span_id = trace_info->trace_id = random_big();
+ trace_info->parent_span_id = 0;
+}
+
+int blkin_init_child_info(struct blkin_trace *child,
+ const struct blkin_trace_info *parent_info,
+ const struct blkin_endpoint *endpoint,
+ const char *child_name)
+{
+ int res;
+ if ((!child) || (!parent_info) || (!endpoint)){
+ res = -EINVAL;
+ goto OUT;
+ }
+ child->info.trace_id = parent_info->trace_id;
+ child->info.span_id = random_big();
+ child->info.parent_span_id = parent_info->span_id;
+ child->name = child_name;
+ child->endpoint = endpoint;
+ res = 0;
+
+OUT:
+ return res;
+}
+
+int blkin_init_child(struct blkin_trace *child,
+ const struct blkin_trace *parent,
+ const struct blkin_endpoint *endpoint,
+ const char *child_name)
+{
+ int res;
+ if (!parent) {
+ res = -EINVAL;
+ goto OUT;
+ }
+ if (!endpoint)
+ endpoint = parent->endpoint;
+ if (blkin_init_child_info(child, &parent->info, endpoint, child_name) != 0){
+ res = -EINVAL;
+ goto OUT;
+ }
+ res = 0;
+
+OUT:
+ return res;
+}
+
+int blkin_init_endpoint(struct blkin_endpoint *endp, const char *ip,
+ int16_t port, const char *name)
+{
+ int res;
+ if (!endp){
+ res = -EINVAL;
+ goto OUT;
+ }
+ if (!ip)
+ ip = default_ip;
+
+ endp->ip = ip;
+ endp->port = port;
+ endp->name = name;
+ res = 0;
+
+OUT:
+ return res;
+}
+
+int blkin_init_string_annotation(struct blkin_annotation *annotation,
+ const char *key, const char *val, const struct blkin_endpoint *endpoint)
+{
+ int res;
+ if ((!annotation) || (!key) || (!val)){
+ res = -EINVAL;
+ goto OUT;
+ }
+ annotation->type = ANNOT_STRING;
+ annotation->key = key;
+ annotation->val_str = val;
+ annotation->endpoint = endpoint;
+ res = 0;
+
+OUT:
+ return res;
+}
+
+int blkin_init_integer_annotation(struct blkin_annotation *annotation,
+ const char *key, int64_t val, const struct blkin_endpoint *endpoint)
+{
+ int res;
+ if ((!annotation) || (!key)) {
+ res = -EINVAL;
+ goto OUT;
+ }
+ annotation->type = ANNOT_INTEGER;
+ annotation->key = key;
+ annotation->val_int = val;
+ annotation->endpoint = endpoint;
+ res = 0;
+
+OUT:
+ return res;
+}
+
+int blkin_init_timestamp_annotation(struct blkin_annotation *annotation,
+ const char *event, const struct blkin_endpoint *endpoint)
+{
+ int res;
+ if ((!annotation) || (!event)){
+ res = -EINVAL;
+ goto OUT;
+ }
+ annotation->type = ANNOT_TIMESTAMP;
+ annotation->val_str = event;
+ annotation->endpoint = endpoint;
+ res = 0;
+
+OUT:
+ return res;
+}
+
+int blkin_record(const struct blkin_trace *trace,
+ const struct blkin_annotation *annotation)
+{
+ int res;
+ if (!annotation || !trace || !trace->name) {
+ res = -EINVAL;
+ goto OUT;
+ }
+
+ const struct blkin_endpoint *endpoint =
+ annotation->endpoint ? : trace->endpoint;
+ if (!endpoint || !endpoint->ip || !endpoint->name) {
+ res = -EINVAL;
+ goto OUT;
+ }
+
+ if (annotation->type == ANNOT_STRING) {
+ if ((!annotation->key) || (!annotation->val_str)) {
+ res = -EINVAL;
+ goto OUT;
+ }
+ tracepoint(zipkin, keyval_string, trace->name,
+ endpoint->name, endpoint->port, endpoint->ip,
+ trace->info.trace_id, trace->info.span_id,
+ trace->info.parent_span_id,
+ annotation->key, annotation->val_str);
+ }
+ else if (annotation->type == ANNOT_INTEGER) {
+ if (!annotation->key) {
+ res = -EINVAL;
+ goto OUT;
+ }
+ tracepoint(zipkin, keyval_integer, trace->name,
+ endpoint->name, endpoint->port, endpoint->ip,
+ trace->info.trace_id, trace->info.span_id,
+ trace->info.parent_span_id,
+ annotation->key, annotation->val_int);
+ }
+ else {
+ if (!annotation->val_str) {
+ res = -EINVAL;
+ goto OUT;
+ }
+ tracepoint(zipkin, timestamp , trace->name,
+ endpoint->name, endpoint->port, endpoint->ip,
+ trace->info.trace_id, trace->info.span_id,
+ trace->info.parent_span_id,
+ annotation->val_str);
+ }
+ res = 0;
+OUT:
+ return res;
+}
+
+int blkin_get_trace_info(const struct blkin_trace *trace,
+ struct blkin_trace_info *info)
+{
+ int res;
+ if ((!trace) || (!info)){
+ res = -EINVAL;
+ goto OUT;
+ }
+
+ res = 0;
+ *info = trace->info;
+OUT:
+ return res;
+}
+
+int blkin_set_trace_info(struct blkin_trace *trace,
+ const struct blkin_trace_info *info)
+{
+ int res;
+ if ((!trace) || (!info)){
+ res = -EINVAL;
+ goto OUT;
+ }
+
+ res = 0;
+ trace->info = *info;
+OUT:
+ return res;
+}
+
+int blkin_set_trace_properties(struct blkin_trace *trace,
+ const struct blkin_trace_info *info,
+ const char *name,
+ const struct blkin_endpoint *endpoint)
+{
+ int res;
+ if ((!trace) || (!info) || (!endpoint) || (!name)){
+ res = -EINVAL;
+ goto OUT;
+ }
+
+ res = 0;
+ trace->info = *info;
+ trace->name = name;
+ trace->endpoint = endpoint;
+
+OUT:
+ return res;
+}
+
+int blkin_pack_trace_info(const struct blkin_trace_info *info,
+ struct blkin_trace_info_packed *pinfo)
+{
+ if (!info || !pinfo) {
+ return -EINVAL;
+ }
+
+ pinfo->trace_id = __cpu_to_be64(info->trace_id);
+ pinfo->span_id = __cpu_to_be64(info->span_id);
+ pinfo->parent_span_id = __cpu_to_be64(info->parent_span_id);
+
+ return 0;
+}
+
+int blkin_unpack_trace_info(const struct blkin_trace_info_packed *pinfo,
+ struct blkin_trace_info *info)
+{
+ if (!info || !pinfo) {
+ return -EINVAL;
+ }
+
+ info->trace_id = __be64_to_cpu(pinfo->trace_id);
+ info->span_id = __be64_to_cpu(pinfo->span_id);
+ info->parent_span_id = __be64_to_cpu(pinfo->parent_span_id);
+
+ return 0;
+}
diff --git a/src/blkin/blkin-lib/zipkin_c.h b/src/blkin/blkin-lib/zipkin_c.h
new file mode 100644
index 000000000..77b5bc6d7
--- /dev/null
+++ b/src/blkin/blkin-lib/zipkin_c.h
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef ZIPKIN_C_H_
+#define ZIPKIN_C_H_
+
+#include <stdint.h>
+#include <asm/byteorder.h>
+
+#define BLKIN_TIMESTAMP(trace, endp, event) \
+ do { \
+ struct blkin_annotation __annot; \
+ blkin_init_timestamp_annotation(&__annot, event, endp); \
+ blkin_record(trace, &__annot); \
+ } while (0);
+
+#define BLKIN_KEYVAL_STRING(trace, endp, key, val) \
+ do { \
+ struct blkin_annotation __annot; \
+ blkin_init_string_annotation(&__annot, key, val, endp); \
+ blkin_record(trace, &__annot); \
+ } while (0);
+
+#define BLKIN_KEYVAL_INTEGER(trace, endp, key, val) \
+ do { \
+ struct blkin_annotation __annot; \
+ blkin_init_integer_annotation(&__annot, key, val, endp);\
+ blkin_record(trace, &__annot); \
+ } while (0);
+
+/**
+ * Core annotations used by Zipkin used to denote the beginning and end of
+ * client and server spans.
+ * For more information refer to
+ * https://github.com/openzipkin/zipkin/blob/master/zipkin-thrift/src/main/thrift/com/twitter/zipkin/zipkinCore.thrift
+ */
+extern const char* const CLIENT_SEND;
+extern const char* const CLIENT_RECV;
+extern const char* const SERVER_SEND;
+extern const char* const SERVER_RECV;
+extern const char* const WIRE_SEND;
+extern const char* const WIRE_RECV;
+extern const char* const CLIENT_SEND_FRAGMENT;
+extern const char* const CLIENT_RECV_FRAGMENT;
+extern const char* const SERVER_SEND_FRAGMENT;
+extern const char* const SERVER_RECV_FRAGMENT;
+
+/**
+ * @struct blkin_endpoint
+ * Information about an endpoint of our instrumented application where
+ * annotations take place
+ */
+struct blkin_endpoint {
+ const char *ip;
+ int16_t port;
+ const char *name;
+};
+
+/**
+ * @struct blkin_trace_info
+ * The information exchanged between different layers offering the needed
+ * trace semantics
+ */
+struct blkin_trace_info {
+ int64_t trace_id;
+ int64_t span_id;
+ int64_t parent_span_id;
+};
+
+/**
+ * @struct blkin_trace_info_packed
+ *
+ * Packed version of the struct blkin_trace_info. Usefull when sending over
+ * network.
+ *
+ */
+struct blkin_trace_info_packed {
+ __be64 trace_id;
+ __be64 span_id;
+ __be64 parent_span_id;
+} __attribute__((packed));
+
+
+/**
+ * @struct blkin_trace
+ * Struct used to define the context in which an annotation happens
+ */
+struct blkin_trace {
+ const char *name;
+ struct blkin_trace_info info;
+ const struct blkin_endpoint *endpoint;
+};
+
+/**
+ * @typedef blkin_annotation_type
+ * There are 2 kinds of annotation key-val and timestamp
+ */
+typedef enum {
+ ANNOT_STRING = 0,
+ ANNOT_INTEGER,
+ ANNOT_TIMESTAMP
+} blkin_annotation_type;
+
+/**
+ * @struct blkin_annotation
+ * Struct carrying information about an annotation. This information can either
+ * be key-val or that a specific event happened
+ */
+struct blkin_annotation {
+ blkin_annotation_type type;
+ const char *key;
+ union {
+ const char *val_str;
+ int64_t val_int;
+ };
+ const struct blkin_endpoint *endpoint;
+};
+
+/**
+ * Initialize the zipkin library.
+ *
+ * @return 0 on success
+ */
+int blkin_init();
+
+/**
+ * Initialize a new blkin_trace with the information given. The new trace will
+ * have no parent so the parent id will be zero.
+ *
+ * @param new_trace the blkin_trace to be initialized
+ * @param name the trace's name
+ * @param endpoint a pointer to a blkin_endpoint struct that contains
+ * info about where the specific trace takes place
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_init_new_trace(struct blkin_trace *new_trace, const char *name,
+ const struct blkin_endpoint *endpoint);
+
+/**
+ * Initialize blkin_trace_info for a root span. The new trace will
+ * have no parent so the parent id will be zero, and the id and trace id will
+ * be the same.
+ *
+ * @param trace_info the blkin_trace_info to be initialized
+ */
+void blkin_init_trace_info(struct blkin_trace_info *trace_info);
+
+/**
+ * Initialize a blkin_trace as a child of the given parent
+ * bkin_trace. The child trace will have the same trace_id, new
+ * span_id and parent_span_id its parent's span_id.
+ *
+ * @param child the blkin_trace to be initialized
+ * @param parent the parent blkin_trace
+ * @param child_name the blkin_trace name of the child
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_init_child(struct blkin_trace *child,
+ const struct blkin_trace *parent,
+ const struct blkin_endpoint *endpoint,
+ const char *child_name);
+
+/**
+ * Initialize a blkin_trace struct and set the blkin_trace_info field to be
+ * child of the given blkin_trace_info. This means
+ * Same trace_id
+ * Different span_id
+ * Child's parent_span_id == parent's span_id
+ *
+ * @param child the new child blkin_trace_info
+ * @param info the parent's blkin_trace_info struct
+ * @param child_name the blkin_trace struct name field
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_init_child_info(struct blkin_trace *child,
+ const struct blkin_trace_info *info,
+ const struct blkin_endpoint *endpoint,
+ const char *child_name);
+
+/**
+ * Initialize a blkin_endpoint struct with the information given
+ *
+ * @param endp the endpoint to be initialized
+ * @param ip the ip address of the specific endpoint
+ * @param port the TCP/UDP port of the specific endpoint
+ * @param name the name of the service running on the specific endpoint
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_init_endpoint(struct blkin_endpoint *endpoint,
+ const char *ip, int16_t port, const char *name);
+
+/**
+ * Initialize a key-value blkin_annotation
+ *
+ * @param annotation the annotation to be initialized
+ * @param key the annotation's key
+ * @param val the annotation's string value
+ * @param endpoint where did this annotation occured
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_init_string_annotation(struct blkin_annotation *annotation,
+ const char *key, const char *val,
+ const struct blkin_endpoint *endpoint);
+/**
+ * Initialize a key-value blkin_annotation
+ *
+ * @param annotation the annotation to be initialized
+ * @param key the annotation's key
+ * @param val the annotation's int value
+ * @param endpoint where did this annotation occured
+ *
+ * @returns 0 on success or negative error code
+ */
+
+int blkin_init_integer_annotation(struct blkin_annotation *annotation,
+ const char *key, int64_t val,
+ const struct blkin_endpoint *endpoint);
+
+/**
+ * Initialize a timestamp blkin_annotation
+ *
+ * @param annotation the annotation to be initialized
+ * @param event the event happened to be annotated
+ * @param endpoint where did this annotation occured
+ *
+ * @returns 0 on success or negative error code
+ */
+
+int blkin_init_timestamp_annotation(struct blkin_annotation *annot,
+ const char *event,
+ const struct blkin_endpoint *endpoint);
+
+/**
+ * Log an annotation in terms of a specific trace
+ *
+ * @param trace the trace to which the annotation belongs
+ * @param annotation the annotation to be logged
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_record(const struct blkin_trace *trace,
+ const struct blkin_annotation *annotation);
+
+/**
+ * Copy a blkin_trace_info struct into a the field info of a blkin_trace struct
+ *
+ * @param trace the destination
+ * @param info where to copy from
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_get_trace_info(const struct blkin_trace *trace,
+ struct blkin_trace_info *info);
+
+/**
+ * Copy the blkin_trace_info from a blkin_trace to another blkin_trace_info
+ *
+ * @param trace the trace with the essential info
+ * @param info the destination
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_set_trace_info(struct blkin_trace *trace,
+ const struct blkin_trace_info *info);
+
+/**
+ * Set the trace information, name and endpoint of a trace.
+ *
+ * @param trace the trace to which the properties will be assigned
+ * @param info blkin_trace_information with the trace identifiers
+ * @param name span name
+ * @param endpoint associated host
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_set_trace_properties(struct blkin_trace *trace,
+ const struct blkin_trace_info *info,
+ const char *name,
+ const struct blkin_endpoint *endpoint);
+
+/**
+ * Convert a blkin_trace_info to the packed version.
+ *
+ * @param info The unpacked trace info.
+ * @param pinfo The provided packed version to be initialized.
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_pack_trace_info(const struct blkin_trace_info *info,
+ struct blkin_trace_info_packed *pinfo);
+
+/**
+ * Convert a packed blkin_trace_info to the unpacked version.
+ *
+ * @param pinfo The provided packed version to be unpacked.
+ * @param info The unpacked trace info.
+ *
+ * @returns 0 on success or negative error code
+ */
+int blkin_unpack_trace_info(const struct blkin_trace_info_packed *pinfo,
+ struct blkin_trace_info *info);
+
+#endif /* ZIPKIN_C_H_ */
diff --git a/src/blkin/blkin-lib/zipkin_trace.h b/src/blkin/blkin-lib/zipkin_trace.h
new file mode 100644
index 000000000..2a72d47bd
--- /dev/null
+++ b/src/blkin/blkin-lib/zipkin_trace.h
@@ -0,0 +1,130 @@
+/*
+ * Zipkin lttng-ust tracepoint provider.
+ */
+
+#undef TRACEPOINT_PROVIDER
+#define TRACEPOINT_PROVIDER zipkin
+
+#undef TRACEPOINT_INCLUDE
+#define TRACEPOINT_INCLUDE "./zipkin_trace.h"
+
+#if !defined(_ZIPKIN_H) || defined(TRACEPOINT_HEADER_MULTI_READ)
+#define _ZIPKIN_H
+
+#include <lttng/tracepoint.h>
+
+TRACEPOINT_EVENT(
+ zipkin,
+ keyval_string,
+ TP_ARGS(const char *, trace_name, const char *, service,
+ int, port, const char *, ip, long, trace,
+ long, span, long, parent_span,
+ const char *, key, const char *, val ),
+
+ TP_FIELDS(
+ /*
+ * Each span has a name mentioned on it in the UI
+ * This is the trace name
+ */
+ ctf_string(trace_name, trace_name)
+ /*
+ * Each trace takes place in a specific machine-endpoint
+ * This is identified by a name, a port number and an ip
+ */
+ ctf_string(service_name, service)
+ ctf_integer(int, port_no, port)
+ ctf_string(ip, ip)
+ /*
+ * According to the tracing semantics each trace should have
+ * a trace id, a span id and a parent span id
+ */
+ ctf_integer(long, trace_id, trace)
+ ctf_integer(long, span_id, span)
+ ctf_integer(long, parent_span_id, parent_span)
+ /*
+ * The following is the real annotated information
+ */
+ ctf_string(key, key)
+ ctf_string(val, val)
+ )
+ )
+TRACEPOINT_LOGLEVEL(
+ zipkin,
+ keyval_string,
+ TRACE_WARNING)
+
+/*
+ * This tracepoint allows for integers to come out keyval
+ */
+
+TRACEPOINT_EVENT(
+ zipkin,
+ keyval_integer,
+ TP_ARGS(const char *, trace_name, const char *, service,
+ int, port, const char *, ip, long, trace,
+ long, span, long, parent_span,
+ const char *, key, int64_t, val ),
+
+ TP_FIELDS(
+ /*
+ * Each span has a name mentioned on it in the UI
+ * This is the trace name
+ */
+ ctf_string(trace_name, trace_name)
+ /*
+ * Each trace takes place in a specific machine-endpoint
+ * This is identified by a name, a port number and an ip
+ */
+ ctf_string(service_name, service)
+ ctf_integer(int, port_no, port)
+ ctf_string(ip, ip)
+ /*
+ * According to the tracing semantics each trace should have
+ * a trace id, a span id and a parent span id
+ */
+ ctf_integer(long, trace_id, trace)
+ ctf_integer(long, span_id, span)
+ ctf_integer(long, parent_span_id, parent_span)
+ /*
+ * The following is the real annotated information
+ */
+ ctf_string(key, key)
+ ctf_integer(int64_t, val, val)
+ )
+ )
+TRACEPOINT_LOGLEVEL(
+ zipkin,
+ keyval_integer,
+ TRACE_WARNING)
+/*
+ * In this event we follow the same semantics but we trace timestamp
+ * annotations
+ */
+
+TRACEPOINT_EVENT(
+ zipkin,
+ timestamp,
+ TP_ARGS(const char *, trace_name, const char *, service,
+ int, port, const char *, ip, long, trace,
+ long, span, long, parent_span,
+ const char *, event),
+
+ TP_FIELDS(
+ ctf_string(trace_name, trace_name)
+ ctf_string(service_name, service)
+ ctf_integer(int, port_no, port)
+ ctf_string(ip, ip)
+ ctf_integer(long, trace_id, trace)
+ ctf_integer(long, span_id, span)
+ ctf_integer(long, parent_span_id, parent_span)
+ ctf_string(event, event)
+ )
+ )
+TRACEPOINT_LOGLEVEL(
+ zipkin,
+ timestamp,
+ TRACE_WARNING)
+#endif /* _ZIPKIN_H */
+
+#include <lttng/tracepoint-event.h>
+
diff --git a/src/blkin/blkin-lib/ztracer.hpp b/src/blkin/blkin-lib/ztracer.hpp
new file mode 100644
index 000000000..3c4707ea4
--- /dev/null
+++ b/src/blkin/blkin-lib/ztracer.hpp
@@ -0,0 +1,248 @@
+/*
+ * Copyright 2014 Marios Kogias <marioskogias@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef ZTRACER_H
+
+#define ZTRACER_H
+
+#include <string>
+extern "C" {
+#include <zipkin_c.h>
+}
+
+namespace ZTracer {
+ using std::string;
+
+ const char* const CLIENT_SEND = "cs";
+ const char* const CLIENT_RECV = "cr";
+ const char* const SERVER_SEND = "ss";
+ const char* const SERVER_RECV = "sr";
+ const char* const WIRE_SEND = "ws";
+ const char* const WIRE_RECV = "wr";
+ const char* const CLIENT_SEND_FRAGMENT = "csf";
+ const char* const CLIENT_RECV_FRAGMENT = "crf";
+ const char* const SERVER_SEND_FRAGMENT = "ssf";
+ const char* const SERVER_RECV_FRAGMENT = "srf";
+
+ static inline int ztrace_init() { return blkin_init(); }
+
+ class Endpoint : private blkin_endpoint {
+ private:
+ string _ip; // storage for blkin_endpoint.ip, see copy_ip()
+ string _name; // storage for blkin_endpoint.name, see copy_name()
+
+ friend class Trace;
+ public:
+ Endpoint(const char *name)
+ {
+ blkin_init_endpoint(this, "0.0.0.0", 0, name);
+ }
+ Endpoint(const char *ip, int port, const char *name)
+ {
+ blkin_init_endpoint(this, ip, port, name);
+ }
+
+ // copy constructor and operator need to account for ip/name storage
+ Endpoint(const Endpoint &rhs) : _ip(rhs._ip), _name(rhs._name)
+ {
+ blkin_init_endpoint(this, _ip.size() ? _ip.c_str() : rhs.ip,
+ rhs.port,
+ _name.size() ? _name.c_str(): rhs.name);
+ }
+ const Endpoint& operator=(const Endpoint &rhs)
+ {
+ _ip.assign(rhs._ip);
+ _name.assign(rhs._name);
+ blkin_init_endpoint(this, _ip.size() ? _ip.c_str() : rhs.ip,
+ rhs.port,
+ _name.size() ? _name.c_str() : rhs.name);
+ return *this;
+ }
+
+ // Endpoint assumes that ip and name will be string literals, and
+ // avoids making a copy and freeing on destruction. if you need to
+ // initialize Endpoint with a temporary string, copy_ip/copy_name()
+ // will store it in a std::string and assign from that
+ void copy_ip(const string &newip)
+ {
+ _ip.assign(newip);
+ ip = _ip.c_str();
+ }
+ void copy_name(const string &newname)
+ {
+ _name.assign(newname);
+ name = _name.c_str();
+ }
+
+ void copy_address_from(const Endpoint *endpoint)
+ {
+ _ip.assign(endpoint->ip);
+ ip = _ip.c_str();
+ port = endpoint->port;
+ }
+ void share_address_from(const Endpoint *endpoint)
+ {
+ ip = endpoint->ip;
+ port = endpoint->port;
+ }
+ void set_port(int p) { port = p; }
+ };
+
+ class Trace : private blkin_trace {
+ private:
+ string _name; // storage for blkin_trace.name, see copy_name()
+
+ public:
+ // default constructor zero-initializes blkin_trace valid()
+ // will return false on a default-constructed Trace until
+ // init()
+ Trace()
+ {
+ // zero-initialize so valid() returns false
+ name = NULL;
+ info.trace_id = 0;
+ info.span_id = 0;
+ info.parent_span_id = 0;
+ endpoint = NULL;
+ }
+
+ // construct a Trace with an optional parent
+ Trace(const char *name, const Endpoint *ep, const Trace *parent = NULL)
+ {
+ if (parent && parent->valid()) {
+ blkin_init_child(this, parent, ep ? : parent->endpoint,
+ name);
+ } else {
+ blkin_init_new_trace(this, name, ep);
+ }
+ }
+
+ // construct a Trace from blkin_trace_info
+ Trace(const char *name, const Endpoint *ep,
+ const blkin_trace_info *i, bool child=false)
+ {
+ if (child)
+ blkin_init_child_info(this, i, ep, name);
+ else {
+ blkin_init_new_trace(this, name, ep);
+ set_info(i);
+ }
+ }
+
+ // copy constructor and operator need to account for name storage
+ Trace(const Trace &rhs) : _name(rhs._name)
+ {
+ name = _name.size() ? _name.c_str() : rhs.name;
+ info = rhs.info;
+ endpoint = rhs.endpoint;
+ }
+ const Trace& operator=(const Trace &rhs)
+ {
+ _name.assign(rhs._name);
+ name = _name.size() ? _name.c_str() : rhs.name;
+ info = rhs.info;
+ endpoint = rhs.endpoint;
+ return *this;
+ }
+
+ // return true if the Trace has been initialized
+ bool valid() const { return info.trace_id != 0; }
+ operator bool() const { return valid(); }
+
+ // (re)initialize a Trace with an optional parent
+ int init(const char *name, const Endpoint *ep,
+ const Trace *parent = NULL)
+ {
+ if (parent && parent->valid())
+ return blkin_init_child(this, parent,
+ ep ? : parent->endpoint, name);
+
+ return blkin_init_new_trace(this, name, ep);
+ }
+
+ // (re)initialize a Trace from blkin_trace_info
+ int init(const char *name, const Endpoint *ep,
+ const blkin_trace_info *i, bool child=false)
+ {
+ if (child)
+ return blkin_init_child_info(this, i, ep, _name.c_str());
+
+ return blkin_set_trace_properties(this, i, _name.c_str(), ep);
+ }
+
+ // Trace assumes that name will be a string literal, and avoids
+ // making a copy and freeing on destruction. if you need to
+ // initialize Trace with a temporary string, copy_name() will store
+ // it in a std::string and assign from that
+ void copy_name(const string &newname)
+ {
+ _name.assign(newname);
+ name = _name.c_str();
+ }
+
+ const blkin_trace_info* get_info() const { return &info; }
+ void set_info(const blkin_trace_info *i) { info = *i; }
+
+ // record key-value annotations
+ void keyval(const char *key, const char *val) const
+ {
+ if (valid())
+ BLKIN_KEYVAL_STRING(this, endpoint, key, val);
+ }
+ void keyval(const char *key, int64_t val) const
+ {
+ if (valid())
+ BLKIN_KEYVAL_INTEGER(this, endpoint, key, val);
+ }
+ void keyval(const char *key, const char *val, const Endpoint *ep) const
+ {
+ if (valid())
+ BLKIN_KEYVAL_STRING(this, ep, key, val);
+ }
+ void keyval(const char *key, int64_t val, const Endpoint *ep) const
+ {
+ if (valid())
+ BLKIN_KEYVAL_INTEGER(this, ep, key, val);
+ }
+
+ // record timestamp annotations
+ void event(const char *event) const
+ {
+ if (valid())
+ BLKIN_TIMESTAMP(this, endpoint, event);
+ }
+ void event(const char *event, const Endpoint *ep) const
+ {
+ if (valid())
+ BLKIN_TIMESTAMP(this, ep, event);
+ }
+ };
+
+}
+#endif /* end of include guard: ZTRACER_H */
diff --git a/src/blkin/cmake/modules/FindLTTng.cmake b/src/blkin/cmake/modules/FindLTTng.cmake
new file mode 100644
index 000000000..d0462a187
--- /dev/null
+++ b/src/blkin/cmake/modules/FindLTTng.cmake
@@ -0,0 +1,64 @@
+# - Find LTTng
+# Find the Linux Trace Toolkit - next generation with associated includes path.
+# See http://lttng.org/
+#
+# This module accepts the following optional variables:
+# LTTNG_PATH_HINT = A hint on LTTNG install path.
+#
+# This module defines the following variables:
+# LTTNG_FOUND = Was LTTng found or not?
+# LTTNG_EXECUTABLE = The path to lttng command
+# LTTNG_LIBRARIES = The list of libraries to link to when using LTTng
+# LTTNG_INCLUDE_DIR = The path to LTTng include directory
+#
+# On can set LTTNG_PATH_HINT before using find_package(LTTng) and the
+# module with use the PATH as a hint to find LTTng.
+#
+# The hint can be given on the command line too:
+# cmake -DLTTNG_PATH_HINT=/DATA/ERIC/LTTng /path/to/source
+
+find_package(PkgConfig)
+pkg_check_modules(PC_LTTNG QUIET lttng-ust)
+
+find_path(LTTNG_INCLUDE_DIR
+ NAMES lttng/tracepoint.h
+ HINTS ${PC_LTTNG_INCLUDEDIR} ${PC_LTTNG_INCLUDE_DIRS}
+ PATH_SUFFIXES include
+ DOC "The LTTng include headers")
+
+find_path(LTTNG_LIBRARY_DIR
+ NAMES liblttng-ust.so
+ HINTS ${PC_LTTNG_LIBDIR} ${PC_LTTNG_LIBRARY_DIRS}
+ DOC "The LTTng libraries")
+
+find_library(LTTNG_UST_LIBRARY lttng-ust PATHS ${LTTNG_LIBRARY_DIR})
+find_library(URCU_LIBRARY urcu-bp PATHS ${LTTNG_LIBRARY_DIR})
+find_library(UUID_LIBRARY uuid)
+
+set(LTTNG_LIBRARIES ${LTTNG_UST_LIBRARY} ${URCU_LIBRARY} ${UUID_LIBRARY})
+
+message(STATUS "Looking for lttng executable...")
+set(LTTNG_NAMES "lttng;lttng-ctl")
+# FIND_PROGRAM twice using NO_DEFAULT_PATH on first shot
+find_program(LTTNG_EXECUTABLE
+ NAMES ${LTTNG_NAMES}
+ PATHS ${LTTNG_PATH_HINT}/bin
+ NO_DEFAULT_PATH
+ DOC "The LTTNG command line tool")
+find_program(LTTNG_PROGRAM
+ NAMES ${LTTNG_NAMES}
+ PATHS ${LTTNG_PATH_HINT}/bin
+ DOC "The LTTNG command line tool")
+
+# handle the QUIETLY and REQUIRED arguments and set LTTNG_FOUND to TRUE if
+# all listed variables are TRUE
+include(FindPackageHandleStandardArgs)
+FIND_PACKAGE_HANDLE_STANDARD_ARGS(LTTNG DEFAULT_MSG LTTNG_INCLUDE_DIR LTTNG_LIBRARY_DIR)
+if (NOT LTTNG_FOUND)
+ if (LTTng_FIND_REQUIRED)
+ message(FATAL_ERROR "LTTng not found")
+ endif ()
+endif ()
+
+mark_as_advanced(LTTNG_INCLUDE_DIR)
+mark_as_advanced(LTTNG_LIBRARY_DIR)