summaryrefslogtreecommitdiffstats
path: root/fs/dlm/dir.c
blob: b1ab0adbd9d023c21a6e8e18404577c3473c0556 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
// SPDX-License-Identifier: GPL-2.0-only
/******************************************************************************
*******************************************************************************
**
**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
**
**
*******************************************************************************
******************************************************************************/

#include "dlm_internal.h"
#include "lockspace.h"
#include "member.h"
#include "lowcomms.h"
#include "rcom.h"
#include "config.h"
#include "memory.h"
#include "recover.h"
#include "util.h"
#include "lock.h"
#include "dir.h"

/*
 * We use the upper 16 bits of the hash value to select the directory node.
 * Low bits are used for distribution of rsb's among hash buckets on each node.
 *
 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
 * num_nodes to the hash value.  This value in the desired range is used as an
 * offset into the sorted list of nodeid's to give the particular nodeid.
 */

int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
{
	uint32_t node;

	if (ls->ls_num_nodes == 1)
		return dlm_our_nodeid();
	else {
		node = (hash >> 16) % ls->ls_total_weight;
		return ls->ls_node_array[node];
	}
}

int dlm_dir_nodeid(struct dlm_rsb *r)
{
	return r->res_dir_nodeid;
}

void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
{
	struct dlm_rsb *r;

	list_for_each_entry(r, root_list, res_root_list) {
		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
	}
}

int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
{
	struct dlm_member *memb;
	char *b, *last_name = NULL;
	int error = -ENOMEM, last_len, nodeid, result;
	uint16_t namelen;
	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;

	log_rinfo(ls, "dlm_recover_directory");

	if (dlm_no_directory(ls))
		goto out_status;

	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
	if (!last_name)
		goto out;

	list_for_each_entry(memb, &ls->ls_nodes, list) {
		if (memb->nodeid == dlm_our_nodeid())
			continue;

		memset(last_name, 0, DLM_RESNAME_MAXLEN);
		last_len = 0;

		for (;;) {
			int left;
			if (dlm_recovery_stopped(ls)) {
				error = -EINTR;
				goto out_free;
			}

			error = dlm_rcom_names(ls, memb->nodeid,
					       last_name, last_len, seq);
			if (error)
				goto out_free;

			cond_resched();

			/*
			 * pick namelen/name pairs out of received buffer
			 */

			b = ls->ls_recover_buf->rc_buf;
			left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
			left -= sizeof(struct dlm_rcom);

			for (;;) {
				__be16 v;

				error = -EINVAL;
				if (left < sizeof(__be16))
					goto out_free;

				memcpy(&v, b, sizeof(__be16));
				namelen = be16_to_cpu(v);
				b += sizeof(__be16);
				left -= sizeof(__be16);

				/* namelen of 0xFFFFF marks end of names for
				   this node; namelen of 0 marks end of the
				   buffer */

				if (namelen == 0xFFFF)
					goto done;
				if (!namelen)
					break;

				if (namelen > left)
					goto out_free;

				if (namelen > DLM_RESNAME_MAXLEN)
					goto out_free;

				error = dlm_master_lookup(ls, memb->nodeid,
							  b, namelen,
							  DLM_LU_RECOVER_DIR,
							  &nodeid, &result);
				if (error) {
					log_error(ls, "recover_dir lookup %d",
						  error);
					goto out_free;
				}

				/* The name was found in rsbtbl, but the
				 * master nodeid is different from
				 * memb->nodeid which says it is the master.
				 * This should not happen. */

				if (result == DLM_LU_MATCH &&
				    nodeid != memb->nodeid) {
					count_bad++;
					log_error(ls, "recover_dir lookup %d "
						  "nodeid %d memb %d bad %u",
						  result, nodeid, memb->nodeid,
						  count_bad);
					print_hex_dump_bytes("dlm_recover_dir ",
							     DUMP_PREFIX_NONE,
							     b, namelen);
				}

				/* The name was found in rsbtbl, and the
				 * master nodeid matches memb->nodeid. */

				if (result == DLM_LU_MATCH &&
				    nodeid == memb->nodeid) {
					count_match++;
				}

				/* The name was not found in rsbtbl and was
				 * added with memb->nodeid as the master. */

				if (result == DLM_LU_ADD) {
					count_add++;
				}

				last_len = namelen;
				memcpy(last_name, b, namelen);
				b += namelen;
				left -= namelen;
				count++;
			}
		}
	 done:
		;
	}

 out_status:
	error = 0;
	dlm_set_recover_status(ls, DLM_RS_DIR);

	log_rinfo(ls, "dlm_recover_directory %u in %u new",
		  count, count_add);
 out_free:
	kfree(last_name);
 out:
	return error;
}

static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
				     int len)
{
	struct dlm_rsb *r;
	int rv;

	read_lock_bh(&ls->ls_rsbtbl_lock);
	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
	read_unlock_bh(&ls->ls_rsbtbl_lock);
	if (!rv)
		return r;

	list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
			log_debug(ls, "find_rsb_root revert to root_list %s",
				  r->res_name);
			return r;
		}
	}
	return NULL;
}

struct dlm_dir_dump {
	/* init values to match if whole
	 * dump fits to one seq. Sanity check only.
	 */
	uint64_t seq_init;
	uint64_t nodeid_init;
	/* compare local pointer with last lookup,
	 * just a sanity check.
	 */
	struct list_head *last;

	unsigned int sent_res; /* for log info */
	unsigned int sent_msg; /* for log info */

	struct list_head list;
};

static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
{
	struct dlm_dir_dump *dd, *safe;

	write_lock_bh(&ls->ls_dir_dump_lock);
	list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
		if (dd->nodeid_init == nodeid) {
			log_error(ls, "drop dump seq %llu",
				 (unsigned long long)dd->seq_init);
			list_del(&dd->list);
			kfree(dd);
		}
	}
	write_unlock_bh(&ls->ls_dir_dump_lock);
}

static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
{
	struct dlm_dir_dump *iter, *dd = NULL;

	read_lock_bh(&ls->ls_dir_dump_lock);
	list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
		if (iter->nodeid_init == nodeid) {
			dd = iter;
			break;
		}
	}
	read_unlock_bh(&ls->ls_dir_dump_lock);

	return dd;
}

static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
{
	struct dlm_dir_dump *dd;

	dd = lookup_dir_dump(ls, nodeid);
	if (dd) {
		log_error(ls, "found ongoing dir dump for node %d, will drop it",
			  nodeid);
		drop_dir_ctx(ls, nodeid);
	}

	dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
	if (!dd)
		return NULL;

	dd->seq_init = ls->ls_recover_seq;
	dd->nodeid_init = nodeid;

	write_lock_bh(&ls->ls_dir_dump_lock);
	list_add(&dd->list, &ls->ls_dir_dump_list);
	write_unlock_bh(&ls->ls_dir_dump_lock);

	return dd;
}

/* Find the rsb where we left off (or start again), then send rsb names
   for rsb's we're master of and whose directory node matches the requesting
   node.  inbuf is the rsb name last sent, inlen is the name's length */

void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
 			   char *outbuf, int outlen, int nodeid)
{
	struct list_head *list;
	struct dlm_rsb *r;
	int offset = 0, dir_nodeid;
	struct dlm_dir_dump *dd;
	__be16 be_namelen;

	read_lock_bh(&ls->ls_masters_lock);

	if (inlen > 1) {
		dd = lookup_dir_dump(ls, nodeid);
		if (!dd) {
			log_error(ls, "failed to lookup dir dump context nodeid: %d",
				  nodeid);
			goto out;
		}

		/* next chunk in dump */
		r = find_rsb_root(ls, inbuf, inlen);
		if (!r) {
			log_error(ls, "copy_master_names from %d start %d %.*s",
				  nodeid, inlen, inlen, inbuf);
			goto out;
		}
		list = r->res_masters_list.next;

		/* sanity checks */
		if (dd->last != &r->res_masters_list ||
		    dd->seq_init != ls->ls_recover_seq) {
			log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
				  (unsigned long long)dd->seq_init,
				  (unsigned long long)ls->ls_recover_seq);
			goto out;
		}
	} else {
		dd = init_dir_dump(ls, nodeid);
		if (!dd) {
			log_error(ls, "failed to allocate dir dump context");
			goto out;
		}

		/* start dump */
		list = ls->ls_masters_list.next;
		dd->last = list;
	}

	for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
		r = list_entry(list, struct dlm_rsb, res_masters_list);
		dir_nodeid = dlm_dir_nodeid(r);
		if (dir_nodeid != nodeid)
			continue;

		/*
		 * The block ends when we can't fit the following in the
		 * remaining buffer space:
		 * namelen (uint16_t) +
		 * name (r->res_length) +
		 * end-of-block record 0x0000 (uint16_t)
		 */

		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
			/* Write end-of-block record */
			be_namelen = cpu_to_be16(0);
			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
			offset += sizeof(__be16);
			dd->sent_msg++;
			goto out;
		}

		be_namelen = cpu_to_be16(r->res_length);
		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
		offset += sizeof(__be16);
		memcpy(outbuf + offset, r->res_name, r->res_length);
		offset += r->res_length;
		dd->sent_res++;
		dd->last = list;
	}

	/*
	 * If we've reached the end of the list (and there's room) write a
	 * terminating record.
	 */

	if ((list == &ls->ls_masters_list) &&
	    (offset + sizeof(uint16_t) <= outlen)) {
		/* end dump */
		be_namelen = cpu_to_be16(0xFFFF);
		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
		offset += sizeof(__be16);
		dd->sent_msg++;
		log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
			  nodeid, dd->sent_res, dd->sent_msg);

		write_lock_bh(&ls->ls_dir_dump_lock);
		list_del_init(&dd->list);
		write_unlock_bh(&ls->ls_dir_dump_lock);
		kfree(dd);
	}
 out:
	read_unlock_bh(&ls->ls_masters_lock);
}