summaryrefslogtreecommitdiffstats
path: root/src/common/Thread.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/common/Thread.cc230
1 files changed, 230 insertions, 0 deletions
diff --git a/src/common/Thread.cc b/src/common/Thread.cc
new file mode 100644
index 000000000..aaee01aef
--- /dev/null
+++ b/src/common/Thread.cc
@@ -0,0 +1,230 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <signal.h>
+#include <unistd.h>
+#ifdef __linux__
+#include <sys/syscall.h> /* For SYS_xxx definitions */
+#endif
+
+#ifdef WITH_SEASTAR
+#include "crimson/os/alienstore/alien_store.h"
+#endif
+
+#include "common/Thread.h"
+#include "common/code_environment.h"
+#include "common/debug.h"
+#include "common/signal.h"
+
+#ifdef HAVE_SCHED
+#include <sched.h>
+#endif
+
+
+pid_t ceph_gettid(void)
+{
+#ifdef __linux__
+ return syscall(SYS_gettid);
+#else
+ return -ENOSYS;
+#endif
+}
+
+static int _set_affinity(int id)
+{
+#ifdef HAVE_SCHED
+ if (id >= 0 && id < CPU_SETSIZE) {
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+
+ CPU_SET(id, &cpuset);
+
+ if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0)
+ return -errno;
+ /* guaranteed to take effect immediately */
+ sched_yield();
+ }
+#endif
+ return 0;
+}
+
+Thread::Thread()
+ : thread_id(0),
+ pid(0),
+ cpuid(-1)
+{
+}
+
+Thread::~Thread()
+{
+}
+
+void *Thread::_entry_func(void *arg) {
+ void *r = ((Thread*)arg)->entry_wrapper();
+ return r;
+}
+
+void *Thread::entry_wrapper()
+{
+ int p = ceph_gettid(); // may return -ENOSYS on other platforms
+ if (p > 0)
+ pid = p;
+ if (pid && cpuid >= 0)
+ _set_affinity(cpuid);
+
+ ceph_pthread_setname(pthread_self(), thread_name.c_str());
+ return entry();
+}
+
+const pthread_t &Thread::get_thread_id() const
+{
+ return thread_id;
+}
+
+bool Thread::is_started() const
+{
+ return thread_id != 0;
+}
+
+bool Thread::am_self() const
+{
+ return (pthread_self() == thread_id);
+}
+
+int Thread::kill(int signal)
+{
+ if (thread_id)
+ return pthread_kill(thread_id, signal);
+ else
+ return -EINVAL;
+}
+
+int Thread::try_create(size_t stacksize)
+{
+ pthread_attr_t *thread_attr = NULL;
+ pthread_attr_t thread_attr_loc;
+
+ stacksize &= CEPH_PAGE_MASK; // must be multiple of page
+ if (stacksize) {
+ thread_attr = &thread_attr_loc;
+ pthread_attr_init(thread_attr);
+ pthread_attr_setstacksize(thread_attr, stacksize);
+ }
+
+ int r;
+
+ // The child thread will inherit our signal mask. Set our signal mask to
+ // the set of signals we want to block. (It's ok to block signals more
+ // signals than usual for a little while-- they will just be delivered to
+ // another thread or delieverd to this thread later.)
+
+ #ifndef _WIN32
+ sigset_t old_sigset;
+ if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
+ block_signals(NULL, &old_sigset);
+ }
+ else {
+ int to_block[] = { SIGPIPE , 0 };
+ block_signals(to_block, &old_sigset);
+ }
+ r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
+ restore_sigset(&old_sigset);
+ #else
+ r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
+ #endif
+
+ if (thread_attr) {
+ pthread_attr_destroy(thread_attr);
+ }
+
+ return r;
+}
+
+void Thread::create(const char *name, size_t stacksize)
+{
+ ceph_assert(strlen(name) < 16);
+ thread_name = name;
+
+ int ret = try_create(stacksize);
+ if (ret != 0) {
+ char buf[256];
+ snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
+ "failed with error %d", ret);
+ dout_emergency(buf);
+ ceph_assert(ret == 0);
+ }
+}
+
+int Thread::join(void **prval)
+{
+ if (thread_id == 0) {
+ ceph_abort_msg("join on thread that was never started");
+ return -EINVAL;
+ }
+
+ int status = pthread_join(thread_id, prval);
+ if (status != 0) {
+ char buf[256];
+ snprintf(buf, sizeof(buf), "Thread::join(): pthread_join "
+ "failed with error %d\n", status);
+ dout_emergency(buf);
+ ceph_assert(status == 0);
+ }
+
+ thread_id = 0;
+ return status;
+}
+
+int Thread::detach()
+{
+ return pthread_detach(thread_id);
+}
+
+int Thread::set_affinity(int id)
+{
+ int r = 0;
+ cpuid = id;
+ if (pid && ceph_gettid() == pid)
+ r = _set_affinity(id);
+ return r;
+}
+
+// Functions for std::thread
+// =========================
+
+void set_thread_name(std::thread& t, const std::string& s) {
+ int r = ceph_pthread_setname(t.native_handle(), s.c_str());
+ if (r != 0) {
+ throw std::system_error(r, std::generic_category());
+ }
+}
+std::string get_thread_name(const std::thread& t) {
+ std::string s(256, '\0');
+
+ int r = ceph_pthread_getname(const_cast<std::thread&>(t).native_handle(),
+ s.data(), s.length());
+ if (r != 0) {
+ throw std::system_error(r, std::generic_category());
+ }
+ s.resize(std::strlen(s.data()));
+ return s;
+}
+
+void kill(std::thread& t, int signal)
+{
+ auto r = pthread_kill(t.native_handle(), signal);
+ if (r != 0) {
+ throw std::system_error(r, std::generic_category());
+ }
+}