1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
|
/* This is the bit of VQSIM that runs in the forked process.
It represents a single votequorum instance or, if you like,
a 'node' in the cluster.
*/
#include <sys/types.h>
#include <qb/qblog.h>
#include <qb/qbloop.h>
#include <qb/qbipc_common.h>
#include <netinet/in.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <stdio.h>
#include "../exec/votequorum.h"
#include "../exec/service.h"
#include "../include/corosync/corotypes.h"
#include "../include/corosync/votequorum.h"
#include "../include/corosync/ipc_votequorum.h"
#include <corosync/logsys.h>
#include <corosync/coroapi.h>
#include "icmap.h"
#include "vqsim.h"
#define QDEVICE_NAME "VQsim_qdevice"
/* Static variables here are per-instance because we are forked */
static struct corosync_service_engine *engine;
static int parent_socket; /* Our end of the socket */
static char buffer[8192];
static int our_nodeid;
static char *private_data;
static qb_loop_t *poll_loop;
static qb_loop_timer_handle sync_timer;
static qb_loop_timer_handle qdevice_timer;
static int we_are_quorate;
static void *fake_conn = (void*)1;
static cs_error_t last_lib_error;
static struct memb_ring_id current_ring_id;
static int qdevice_registered;
static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
/* 'Keep the compiler happy' time */
char *get_run_dir(void);
int api_timer_add_duration (
unsigned long long nanosec_duration,
void *data,
void (*timer_fn) (void *data),
corosync_timer_handle_t *handle);
static void api_error_memory_failure(void) __attribute__((noreturn));
static void api_error_memory_failure()
{
fprintf(stderr, "Out of memory error\n");
exit(-1);
}
static void api_timer_delete(corosync_timer_handle_t th)
{
qb_loop_timer_del(poll_loop, th);
}
int api_timer_add_duration (
unsigned long long nanosec_duration,
void *data,
void (*timer_fn) (void *data),
corosync_timer_handle_t *handle)
{
return qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
nanosec_duration,
data,
timer_fn,
handle);
}
static unsigned int api_totem_nodeid_get(void)
{
return our_nodeid;
}
static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
{
struct vqsim_msg_header header;
struct iovec iovec[iovlen+1];
int total = sizeof(header);
int res;
int i;
header.type = VQMSG_EXEC;
header.from_nodeid = our_nodeid;
header.param = 0;
iovec[0].iov_base = &header;
iovec[0].iov_len = sizeof(header);
for (i=0; i<iovlen; i++) {
iovec[i+1].iov_base = iov[i].iov_base;
iovec[i+1].iov_len = iov[i].iov_len;
total += iov[i].iov_len;
}
res = writev(parent_socket, iovec, iovlen+1);
if (res != total) {
fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
}
return 0;
}
static void *api_ipc_private_data_get(void *conn)
{
return private_data;
}
static int api_ipc_response_send(void *conn, const void *msg, size_t len)
{
struct qb_ipc_response_header *qb_header = (void*)msg;
/* Save the error so we can return it */
last_lib_error = qb_header->error;
return 0;
}
static struct corosync_api_v1 corosync_api = {
.error_memory_failure = api_error_memory_failure,
.timer_delete = api_timer_delete,
.timer_add_duration = api_timer_add_duration,
.totem_nodeid_get = api_totem_nodeid_get,
.totem_mcast = api_totem_mcast,
.ipc_private_data_get = api_ipc_private_data_get,
.ipc_response_send = api_ipc_response_send,
};
/* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
/* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
static void start_qdevice_poll(int longwait);
static void start_sync_timer(void);
/* Callback from Votequorum to tell us about the quorum state */
static void quorum_fn(const unsigned int *view_list,
size_t view_list_entries,
int quorate, struct memb_ring_id *ring_id)
{
char msgbuf[8192];
int len;
struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
we_are_quorate = quorate;
/* Send back to parent */
quorum_msg->header.type = VQMSG_QUORUM;
quorum_msg->header.from_nodeid = our_nodeid;
quorum_msg->header.param = 0;
quorum_msg->quorate = quorate;
memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
quorum_msg->view_list_entries = view_list_entries;
memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
perror("write (view list to parent) failed");
}
memcpy(¤t_ring_id, ring_id, sizeof(*ring_id));
}
char *corosync_service_link_and_init(struct corosync_api_v1 *api,
struct default_service *service_engine)
{
/* dummy */
return NULL;
}
/* For votequorum */
char *get_run_dir()
{
static char cwd_buffer[PATH_MAX];
return getcwd(cwd_buffer, PATH_MAX);
}
/* This is different to the one in totemconfig.c in that we already
* know the 'local' node ID, so we can just search for that.
* It needs to be here rather than at main config read time as it's
* (obviously) going to be different for each instance.
*/
static void set_local_node_pos(struct corosync_api_v1 *api)
{
icmap_iter_t iter;
uint32_t node_pos;
char name_str[ICMAP_KEYNAME_MAXLEN];
uint32_t nodeid;
const char *iter_key;
int res;
int found = 0;
iter = icmap_iter_init("nodelist.node.");
while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str);
if (res != 2) {
continue;
}
if (strcmp(name_str, "nodeid")) {
continue;
}
res = icmap_get_uint32(iter_key, &nodeid);
if (res == CS_OK) {
if (nodeid == our_nodeid) {
found = 1;
res = icmap_set_uint32("nodelist.local_node_pos", node_pos);
assert(res == CS_OK);
}
}
}
if (!found) {
/* This probably indicates a dynamically-added node
* set the pos to zero and use the votes of the
* first node in corosync.conf
*/
res = icmap_set_uint32("nodelist.local_node_pos", 0);
assert(res == CS_OK);
}
}
static int load_quorum_instance(struct corosync_api_v1 *api)
{
const char *error_string;
int res;
error_string = votequorum_init(api, quorum_fn);
if (error_string) {
fprintf(stderr, "Votequorum init failed: %s\n", error_string);
return -1;
}
engine = votequorum_get_service_engine_ver0();
error_string = engine->exec_init_fn(api);
if (error_string) {
fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
return -1;
}
private_data = malloc(engine->private_data_size);
if (!private_data) {
perror("Malloc in child failed");
return -1;
}
res = engine->lib_init_fn(fake_conn);
return res;
}
static void sync_dispatch_fn(void *data)
{
if (engine->sync_process()) {
start_sync_timer();
}
else {
engine->sync_activate();
}
}
static void start_sync_timer()
{
qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
10000000,
NULL,
sync_dispatch_fn,
&sync_timer);
}
static void send_sync(char *buf, int len)
{
struct vqsim_sync_msg *msg = (void*)buf;
/* Votequorum doesn't use the transitional node list :-) */
engine->sync_init(NULL, 0,
msg->view_list, msg->view_list_entries,
&msg->ring_id);
start_sync_timer();
}
static void send_exec_msg(char *buf, int len)
{
struct vqsim_exec_msg *execmsg = (void*)buf;
struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
}
static int send_lib_msg(int type, void *msg)
{
/* Clear this as not all lib functions return a response immediately */
last_lib_error = CS_OK;
engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
return last_lib_error;
}
static int poll_qdevice(int onoff)
{
struct req_lib_votequorum_qdevice_poll pollmsg;
int res;
pollmsg.cast_vote = onoff;
pollmsg.ring_id.nodeid = current_ring_id.nodeid;
pollmsg.ring_id.seq = current_ring_id.seq;
strcpy(pollmsg.name, QDEVICE_NAME);
res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
if (res != CS_OK) {
fprintf(stderr, CS_PRI_NODE_ID ": qdevice poll failed: %d\n", our_nodeid, res);
}
return res;
}
static void qdevice_dispatch_fn(void *data)
{
if (poll_qdevice(1) == CS_OK) {
start_qdevice_poll(0);
}
}
static void start_qdevice_poll(int longwait)
{
unsigned long long timeout;
timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
if (longwait) {
timeout *= 2;
}
qb_loop_timer_add(poll_loop,
QB_LOOP_MED,
timeout,
NULL,
qdevice_dispatch_fn,
&qdevice_timer);
}
static void stop_qdevice_poll(void)
{
qb_loop_timer_del(poll_loop, qdevice_timer);
qdevice_timer = 0;
}
static void do_qdevice(int onoff)
{
int res;
if (onoff) {
if (!qdevice_registered) {
struct req_lib_votequorum_qdevice_register regmsg;
strcpy(regmsg.name, QDEVICE_NAME);
if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, ®msg)) == CS_OK) {
qdevice_registered = 1;
start_qdevice_poll(1);
}
else {
fprintf(stderr, CS_PRI_NODE_ID ": qdevice registration failed: %d\n", our_nodeid, res);
}
}
else {
if (!qdevice_timer) {
start_qdevice_poll(0);
}
}
}
else {
poll_qdevice(0);
stop_qdevice_poll();
}
}
/* From controller */
static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
{
struct vqsim_msg_header *header = (void*)buffer;
int len;
len = read(fd, buffer, sizeof(buffer));
if (len > 0) {
/* Check header and route */
switch (header->type) {
case VQMSG_QUIT:
exit(0);
break;
case VQMSG_EXEC: /* For votequorum exec messages */
send_exec_msg(buffer, len);
break;
case VQMSG_SYNC:
send_sync(buffer, len);
break;
case VQMSG_QDEVICE:
do_qdevice(header->param);
break;
case VQMSG_QUORUMQUIT:
if (!we_are_quorate) {
exit(1);
}
break;
case VQMSG_QUORUM:
/* not used here */
break;
}
}
return 0;
}
static void initial_sync(int nodeid)
{
unsigned int trans_list[1] = {nodeid};
unsigned int member_list[1] = {nodeid};
struct memb_ring_id ring_id;
ring_id.nodeid = our_nodeid;
ring_id.seq = 1;
/* cluster with just us in it */
engine->sync_init(trans_list, 1,
member_list, 1,
&ring_id);
start_sync_timer();
}
/* Return pipe FDs & child PID if sucessful */
int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
{
int pipes[2];
pid_t pid;
if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
return -1;
}
parent_socket = pipes[0];
switch ( (pid=fork()) ) {
case -1:
perror("fork failed");
return -1;
case 0:
/* child process - continue below */
break;
default:
/* parent process */
*vq_sock = pipes[1];
*childpid = pid;
return 0;
}
our_nodeid = nodeid;
poll_loop = qb_loop_create();
if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
}
set_local_node_pos(&corosync_api);
load_quorum_instance(&corosync_api);
qb_loop_poll_add(poll_loop,
QB_LOOP_MED,
parent_socket,
POLLIN,
NULL,
parent_pipe_read_fn);
/* Start it up! */
initial_sync(nodeid);
qb_loop_run(poll_loop);
return 0;
}
|