1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 John Spray <john.spray@inktank.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*/
#include "MDSUtility.h"
#include "mon/MonClient.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
MDSUtility::MDSUtility() :
Dispatcher(g_ceph_context),
objecter(NULL),
finisher(g_ceph_context, "MDSUtility", "fn_mds_utility"),
waiting_for_mds_map(NULL),
inited(false)
{
monc = new MonClient(g_ceph_context, poolctx);
messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
fsmap = new FSMap();
objecter = new Objecter(g_ceph_context, messenger, monc, poolctx);
}
MDSUtility::~MDSUtility()
{
if (inited) {
shutdown();
}
delete objecter;
delete monc;
delete messenger;
delete fsmap;
ceph_assert(waiting_for_mds_map == NULL);
}
int MDSUtility::init()
{
// Initialize Messenger
poolctx.start(1);
messenger->start();
objecter->set_client_incarnation(0);
objecter->init();
// Connect dispatchers before starting objecter
messenger->add_dispatcher_tail(objecter);
messenger->add_dispatcher_tail(this);
// Initialize MonClient
if (monc->build_initial_monmap() < 0) {
objecter->shutdown();
messenger->shutdown();
messenger->wait();
return -1;
}
monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD|CEPH_ENTITY_TYPE_MDS);
monc->set_messenger(messenger);
monc->init();
int r = monc->authenticate();
if (r < 0) {
derr << "Authentication failed, did you specify an MDS ID with a valid keyring?" << dendl;
monc->shutdown();
objecter->shutdown();
messenger->shutdown();
messenger->wait();
return r;
}
client_t whoami = monc->get_global_id();
messenger->set_myname(entity_name_t::CLIENT(whoami.v));
// Start Objecter and wait for OSD map
objecter->start();
objecter->wait_for_osd_map();
// Prepare to receive MDS map and request it
ceph::mutex init_lock = ceph::make_mutex("MDSUtility:init");
ceph::condition_variable cond;
bool done = false;
ceph_assert(!fsmap->get_epoch());
lock.lock();
waiting_for_mds_map = new C_SafeCond(init_lock, cond, &done, NULL);
lock.unlock();
monc->sub_want("fsmap", 0, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
// Wait for MDS map
dout(4) << "waiting for MDS map..." << dendl;
{
std::unique_lock locker{init_lock};
cond.wait(locker, [&done] { return done; });
}
dout(4) << "Got MDS map " << fsmap->get_epoch() << dendl;
finisher.start();
inited = true;
return 0;
}
void MDSUtility::shutdown()
{
finisher.stop();
lock.lock();
objecter->shutdown();
lock.unlock();
monc->shutdown();
messenger->shutdown();
messenger->wait();
poolctx.finish();
}
bool MDSUtility::ms_dispatch(Message *m)
{
std::lock_guard locker{lock};
switch (m->get_type()) {
case CEPH_MSG_FS_MAP:
handle_fs_map((MFSMap*)m);
break;
case CEPH_MSG_OSD_MAP:
break;
default:
return false;
}
m->put();
return true;
}
void MDSUtility::handle_fs_map(MFSMap* m)
{
*fsmap = m->get_fsmap();
if (waiting_for_mds_map) {
waiting_for_mds_map->complete(0);
waiting_for_mds_map = NULL;
}
}
|