// vim:sw=2:ai /* * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. * See COPYRIGHT.txt for details. */ #include #include #include #include #include #include #include "hstcpsvr.hpp" #include "hstcpsvr_worker.hpp" #include "thread.hpp" #include "fatal.hpp" #include "auto_ptrcontainer.hpp" #define DBG(x) namespace dena { struct worker_throbj { worker_throbj(const hstcpsvr_worker_arg& arg) : worker(hstcpsvr_worker_i::create(arg)) { } void operator ()() { worker->run(); } hstcpsvr_worker_ptr worker; }; struct hstcpsvr : public hstcpsvr_i, private noncopyable { hstcpsvr(const config& c); ~hstcpsvr(); virtual std::string start_listen(); private: hstcpsvr_shared_c cshared; volatile hstcpsvr_shared_v vshared; typedef thread worker_thread_type; typedef auto_ptrcontainer< std::vector > threads_type; threads_type threads; std::vector thread_num_conns_vec; private: void stop_workers(); }; namespace { void check_nfile(size_t nfile) { struct rlimit rl; const int r = getrlimit(RLIMIT_NOFILE, &rl); if (r != 0) { fatal_abort("check_nfile: getrlimit failed"); } if (rl.rlim_cur < static_cast(nfile + 1000)) { fprintf(stderr, "[Warning] handlersocket: open_files_limit is too small.\n"); } } }; hstcpsvr::hstcpsvr(const config& c) : cshared(), vshared() { vshared.shutdown = 0; cshared.conf = c; /* copy */ if (cshared.conf["port"] == "") { cshared.conf["port"] = "9999"; } cshared.num_threads = cshared.conf.get_int("num_threads", 32); cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1); cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1); if (cshared.sockargs.use_epoll) { cshared.sockargs.nonblocking = 1; } cshared.readsize = cshared.conf.get_int("readsize", 1); cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024); cshared.for_write_flag = cshared.conf.get_int("for_write", 0); cshared.plain_secret = cshared.conf.get_str("plain_secret", ""); cshared.require_auth = !cshared.plain_secret.empty(); cshared.sockargs.set(cshared.conf); cshared.dbptr = database_i::create(c); check_nfile(cshared.num_threads * cshared.nb_conn_per_thread); thread_num_conns_vec.resize(cshared.num_threads); cshared.thread_num_conns = thread_num_conns_vec.empty() ? 0 : &thread_num_conns_vec[0]; } hstcpsvr::~hstcpsvr() { stop_workers(); } std::string hstcpsvr::start_listen() { std::string err; if (threads.size() != 0) { return "start_listen: already running"; } if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) { return "bind: " + err; } DENA_VERBOSE(20, fprintf(stderr, "bind done\n")); const size_t stack_size = std::max( cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024); for (long i = 0; i < cshared.num_threads; ++i) { hstcpsvr_worker_arg arg; arg.cshared = &cshared; arg.vshared = &vshared; arg.worker_id = i; std::auto_ptr< thread > thr( new thread(arg, stack_size)); threads.push_back_ptr(thr); } DENA_VERBOSE(20, fprintf(stderr, "threads created\n")); for (size_t i = 0; i < threads.size(); ++i) { threads[i]->start(); } DENA_VERBOSE(20, fprintf(stderr, "threads started\n")); return std::string(); } void hstcpsvr::stop_workers() { vshared.shutdown = 1; for (size_t i = 0; i < threads.size(); ++i) { threads[i]->join(); } threads.clear(); } hstcpsvr_ptr hstcpsvr_i::create(const config& conf) { return hstcpsvr_ptr(new hstcpsvr(conf)); } };