summaryrefslogtreecommitdiffstats
path: root/plugin/handler_socket/handlersocket/hstcpsvr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/handler_socket/handlersocket/hstcpsvr.cpp')
-rw-r--r--plugin/handler_socket/handlersocket/hstcpsvr.cpp147
1 files changed, 147 insertions, 0 deletions
diff --git a/plugin/handler_socket/handlersocket/hstcpsvr.cpp b/plugin/handler_socket/handlersocket/hstcpsvr.cpp
new file mode 100644
index 00000000..250ef2c7
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/hstcpsvr.cpp
@@ -0,0 +1,147 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#include <my_global.h>
+#include <vector>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <sys/resource.h>
+
+#include "hstcpsvr.hpp"
+#include "hstcpsvr_worker.hpp"
+#include "thread.hpp"
+#include "fatal.hpp"
+#include "auto_ptrcontainer.hpp"
+
+#define DBG(x)
+
+namespace dena {
+
+struct worker_throbj {
+ worker_throbj(const hstcpsvr_worker_arg& arg)
+ : worker(hstcpsvr_worker_i::create(arg)) { }
+ void operator ()() {
+ worker->run();
+ }
+ hstcpsvr_worker_ptr worker;
+};
+
+struct hstcpsvr : public hstcpsvr_i, private noncopyable {
+ hstcpsvr(const config& c);
+ ~hstcpsvr();
+ virtual std::string start_listen();
+ private:
+ hstcpsvr_shared_c cshared;
+ volatile hstcpsvr_shared_v vshared;
+ typedef thread<worker_throbj> worker_thread_type;
+ typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type;
+ threads_type threads;
+ std::vector<unsigned int> thread_num_conns_vec;
+ private:
+ void stop_workers();
+};
+
+namespace {
+
+void
+check_nfile(size_t nfile)
+{
+ struct rlimit rl;
+ const int r = getrlimit(RLIMIT_NOFILE, &rl);
+ if (r != 0) {
+ fatal_abort("check_nfile: getrlimit failed");
+ }
+ if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) {
+ fprintf(stderr,
+ "[Warning] handlersocket: open_files_limit is too small.\n");
+ }
+}
+
+};
+
+hstcpsvr::hstcpsvr(const config& c)
+ : cshared(), vshared()
+{
+ vshared.shutdown = 0;
+ cshared.conf = c; /* copy */
+ if (cshared.conf["port"] == "") {
+ cshared.conf["port"] = "9999";
+ }
+ cshared.num_threads = cshared.conf.get_int("num_threads", 32);
+ cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1);
+ cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1);
+ if (cshared.sockargs.use_epoll) {
+ cshared.sockargs.nonblocking = 1;
+ }
+ cshared.readsize = cshared.conf.get_int("readsize", 1);
+ cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024);
+ cshared.for_write_flag = cshared.conf.get_int("for_write", 0);
+ cshared.plain_secret = cshared.conf.get_str("plain_secret", "");
+ cshared.require_auth = !cshared.plain_secret.empty();
+ cshared.sockargs.set(cshared.conf);
+ cshared.dbptr = database_i::create(c);
+ check_nfile(cshared.num_threads * cshared.nb_conn_per_thread);
+ thread_num_conns_vec.resize(cshared.num_threads);
+ cshared.thread_num_conns = thread_num_conns_vec.empty()
+ ? 0 : &thread_num_conns_vec[0];
+}
+
+hstcpsvr::~hstcpsvr()
+{
+ stop_workers();
+}
+
+std::string
+hstcpsvr::start_listen()
+{
+ std::string err;
+ if (threads.size() != 0) {
+ return "start_listen: already running";
+ }
+ if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) {
+ return "bind: " + err;
+ }
+ DENA_VERBOSE(20, fprintf(stderr, "bind done\n"));
+ const size_t stack_size = std::max(
+ cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024);
+ for (long i = 0; i < cshared.num_threads; ++i) {
+ hstcpsvr_worker_arg arg;
+ arg.cshared = &cshared;
+ arg.vshared = &vshared;
+ arg.worker_id = i;
+ std::auto_ptr< thread<worker_throbj> > thr(
+ new thread<worker_throbj>(arg, stack_size));
+ threads.push_back_ptr(thr);
+ }
+ DENA_VERBOSE(20, fprintf(stderr, "threads created\n"));
+ for (size_t i = 0; i < threads.size(); ++i) {
+ threads[i]->start();
+ }
+ DENA_VERBOSE(20, fprintf(stderr, "threads started\n"));
+ return std::string();
+}
+
+void
+hstcpsvr::stop_workers()
+{
+ vshared.shutdown = 1;
+ for (size_t i = 0; i < threads.size(); ++i) {
+ threads[i]->join();
+ }
+ threads.clear();
+}
+
+hstcpsvr_ptr
+hstcpsvr_i::create(const config& conf)
+{
+ return hstcpsvr_ptr(new hstcpsvr(conf));
+}
+
+};
+