summaryrefslogtreecommitdiffstats
path: root/plugin/handler_socket/handlersocket/hstcpsvr.cpp
blob: 250ef2c7be581728152a1c6ddf4af4a2e7cba912 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// vim:sw=2:ai

/*
 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
 * See COPYRIGHT.txt for details.
 */

#include <my_global.h>
#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/resource.h>

#include "hstcpsvr.hpp"
#include "hstcpsvr_worker.hpp"
#include "thread.hpp"
#include "fatal.hpp"
#include "auto_ptrcontainer.hpp"

#define DBG(x)

namespace dena {

struct worker_throbj {
  worker_throbj(const hstcpsvr_worker_arg& arg)
    : worker(hstcpsvr_worker_i::create(arg)) { }
  void operator ()() {
    worker->run();
  }
  hstcpsvr_worker_ptr worker;
};

struct hstcpsvr : public hstcpsvr_i, private noncopyable {
  hstcpsvr(const config& c);
  ~hstcpsvr();
  virtual std::string start_listen();
 private:
  hstcpsvr_shared_c cshared;
  volatile hstcpsvr_shared_v vshared;
  typedef thread<worker_throbj> worker_thread_type;
  typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type;
  threads_type threads;
  std::vector<unsigned int> thread_num_conns_vec;
 private:
  void stop_workers();
};

namespace {

void
check_nfile(size_t nfile)
{
  struct rlimit rl;
  const int r = getrlimit(RLIMIT_NOFILE, &rl);
  if (r != 0) {
    fatal_abort("check_nfile: getrlimit failed");
  }
  if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) {
    fprintf(stderr,
      "[Warning] handlersocket: open_files_limit is too small.\n");
  }
}

};

hstcpsvr::hstcpsvr(const config& c)
  : cshared(), vshared()
{
  vshared.shutdown = 0;
  cshared.conf = c; /* copy */
  if (cshared.conf["port"] == "") {
    cshared.conf["port"] = "9999";
  }
  cshared.num_threads = cshared.conf.get_int("num_threads", 32);
  cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1);
  cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1);
  if (cshared.sockargs.use_epoll) {
    cshared.sockargs.nonblocking = 1;
  }
  cshared.readsize = cshared.conf.get_int("readsize", 1);
  cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024);
  cshared.for_write_flag = cshared.conf.get_int("for_write", 0);
  cshared.plain_secret = cshared.conf.get_str("plain_secret", "");
  cshared.require_auth = !cshared.plain_secret.empty();
  cshared.sockargs.set(cshared.conf);
  cshared.dbptr = database_i::create(c);
  check_nfile(cshared.num_threads * cshared.nb_conn_per_thread);
  thread_num_conns_vec.resize(cshared.num_threads);
  cshared.thread_num_conns = thread_num_conns_vec.empty()
    ? 0 : &thread_num_conns_vec[0];
}

hstcpsvr::~hstcpsvr()
{
  stop_workers();
}

std::string
hstcpsvr::start_listen()
{
  std::string err;
  if (threads.size() != 0) {
    return "start_listen: already running";
  }
  if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) {
    return "bind: " + err;
  }
  DENA_VERBOSE(20, fprintf(stderr, "bind done\n"));
  const size_t stack_size = std::max(
    cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024);
  for (long i = 0; i < cshared.num_threads; ++i) {
    hstcpsvr_worker_arg arg;
    arg.cshared = &cshared;
    arg.vshared = &vshared;
    arg.worker_id = i;
    std::auto_ptr< thread<worker_throbj> > thr(
      new thread<worker_throbj>(arg, stack_size));
    threads.push_back_ptr(thr);
  }
  DENA_VERBOSE(20, fprintf(stderr, "threads created\n"));
  for (size_t i = 0; i < threads.size(); ++i) {
    threads[i]->start();
  }
  DENA_VERBOSE(20, fprintf(stderr, "threads started\n"));
  return std::string();
}

void
hstcpsvr::stop_workers()
{
  vshared.shutdown = 1;
  for (size_t i = 0; i < threads.size(); ++i) {
    threads[i]->join();
  }
  threads.clear();
}

hstcpsvr_ptr
hstcpsvr_i::create(const config& conf)
{
  return hstcpsvr_ptr(new hstcpsvr(conf));
}

};