summaryrefslogtreecommitdiffstats
path: root/qa/tasks/cephfs/fuse_mount.py
blob: 0b9b17403863fd96e26a8777d2c3dbbd6a50f057 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
import json
import time
import logging

from io import StringIO
from textwrap import dedent

from teuthology.contextutil import MaxWhileTries
from teuthology.contextutil import safe_while
from teuthology.orchestra import run
from teuthology.exceptions import CommandFailedError
from tasks.ceph_manager import get_valgrind_args
from tasks.cephfs.mount import CephFSMount, UMOUNT_TIMEOUT

log = logging.getLogger(__name__)

# Refer mount.py for docstrings.
class FuseMount(CephFSMount):
    def __init__(self, ctx, test_dir, client_id, client_remote,
                 client_keyring_path=None, cephfs_name=None,
                 cephfs_mntpt=None, hostfs_mntpt=None, brxnet=None,
                 client_config={}):
        super(FuseMount, self).__init__(ctx=ctx, test_dir=test_dir,
            client_id=client_id, client_remote=client_remote,
            client_keyring_path=client_keyring_path, hostfs_mntpt=hostfs_mntpt,
            cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet,
            client_config=client_config)

        self.fuse_daemon = None
        self._fuse_conn = None
        self.id = None
        self.inst = None
        self.addr = None
        self.mount_timeout = int(self.client_config.get('mount_timeout', 30))

        self._mount_bin = [
            'ceph-fuse', "-f",
            "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok"]
        self._mount_cmd_cwd = self.test_dir
        if self.client_config.get('valgrind') is not None:
            self.cwd = None # get_valgrind_args chdir for us
        self._mount_cmd_logger = log.getChild('ceph-fuse.{id}'.format(id=self.client_id))
        self._mount_cmd_stdin = run.PIPE

    def mount(self, mntopts=None, check_status=True, mntargs=None, **kwargs):
        self.update_attrs(**kwargs)
        self.assert_and_log_minimum_mount_details()

        self.setup_netns()

        try:
            return self._mount(mntopts, mntargs, check_status)
        except RuntimeError:
            # Catch exceptions by the mount() logic (i.e. not remote command
            # failures) and ensure the mount is not left half-up.
            # Otherwise we might leave a zombie mount point that causes
            # anyone traversing cephtest/ to get hung up on.
            log.warning("Trying to clean up after failed mount")
            self.umount_wait(force=True)
            raise

    def _mount(self, mntopts, mntargs, check_status):
        log.info("Client client.%s config is %s" % (self.client_id,
                                                    self.client_config))

        self._create_mntpt()

        retval = self._run_mount_cmd(mntopts, mntargs, check_status)
        if retval:
            return retval

        self.gather_mount_info()

    def _run_mount_cmd(self, mntopts, mntargs, check_status):
        mount_cmd = self._get_mount_cmd(mntopts, mntargs)
        mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()

        # Before starting ceph-fuse process, note the contents of
        # /sys/fs/fuse/connections
        pre_mount_conns = self._list_fuse_conns()
        log.info("Pre-mount connections: {0}".format(pre_mount_conns))

        self.fuse_daemon = self.client_remote.run(
            args=mount_cmd,
            cwd=self._mount_cmd_cwd,
            logger=self._mount_cmd_logger,
            stdin=self._mount_cmd_stdin,
            stdout=mountcmd_stdout,
            stderr=mountcmd_stderr,
            wait=False
        )

        return self._wait_and_record_our_fuse_conn(
            check_status, pre_mount_conns, mountcmd_stdout, mountcmd_stderr)

    def _get_mount_cmd(self, mntopts, mntargs):
        daemon_signal = 'kill'
        if self.client_config.get('coverage') or \
           self.client_config.get('valgrind') is not None:
            daemon_signal = 'term'

        mount_cmd = ['sudo', 'adjust-ulimits', 'ceph-coverage',
                     '{tdir}/archive/coverage'.format(tdir=self.test_dir),
                     'daemon-helper', daemon_signal]

        mount_cmd = self._add_valgrind_args(mount_cmd)
        mount_cmd = ['sudo'] + self._nsenter_args + mount_cmd

        mount_cmd += self._mount_bin + [self.hostfs_mntpt]
        if self.client_id:
            mount_cmd += ['--id', self.client_id]
        if self.client_keyring_path and self.client_id:
            mount_cmd += ['-k', self.client_keyring_path]

        self.validate_subvol_options()

        if self.cephfs_mntpt:
            mount_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]

        if self.cephfs_name:
            mount_cmd += ["--client_fs=" + self.cephfs_name]
        if mntopts:
            mount_cmd.extend(('-o', ','.join(mntopts)))
        if mntargs:
            mount_cmd.extend(mntargs)

        return mount_cmd

    def _add_valgrind_args(self, mount_cmd):
        if self.client_config.get('valgrind') is not None:
            mount_cmd = get_valgrind_args(
                self.test_dir,
                'client.{id}'.format(id=self.client_id),
                mount_cmd,
                self.client_config.get('valgrind'),
                cd=False
            )

        return mount_cmd

    def _list_fuse_conns(self):
        conn_dir = "/sys/fs/fuse/connections"

        self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
                               check_status=False)
        self.client_remote.run(
            args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
            check_status=False, timeout=(30))

        try:
            ls_str = self.client_remote.sh("ls " + conn_dir,
                                           stdout=StringIO(),
                                           timeout=300).strip()
        except CommandFailedError:
            return []

        if ls_str:
            return [int(n) for n in ls_str.split("\n")]
        else:
            return []

    def _wait_and_record_our_fuse_conn(self, check_status, pre_mount_conns,
                                       mountcmd_stdout, mountcmd_stderr):
        """
        Wait for the connection reference to appear in /sys
        """
        waited = 0

        post_mount_conns = self._list_fuse_conns()
        while len(post_mount_conns) <= len(pre_mount_conns):
            if self.fuse_daemon.finished:
                # Did mount fail?  Raise the CommandFailedError instead of
                # hitting the "failed to populate /sys/" timeout
                try:
                    self.fuse_daemon.wait()
                except CommandFailedError as e:
                    log.info('mount command failed.')
                    if check_status:
                        raise
                    else:
                        return (e, mountcmd_stdout.getvalue(),
                                mountcmd_stderr.getvalue())
            time.sleep(1)
            waited += 1
            if waited > self._fuse_conn_check_timeout:
                raise RuntimeError(
                    "Fuse mount failed to populate/sys/ after {} "
                    "seconds".format(waited))
            else:
                post_mount_conns = self._list_fuse_conns()

        log.info("Post-mount connections: {0}".format(post_mount_conns))

        self._record_our_fuse_conn(pre_mount_conns, post_mount_conns)

    @property
    def _fuse_conn_check_timeout(self):
        mount_wait = self.client_config.get('mount_wait', 0)
        if mount_wait > 0:
            log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
            time.sleep(mount_wait)
        timeout = int(self.client_config.get('mount_timeout', 30))
        return timeout

    def _record_our_fuse_conn(self, pre_mount_conns, post_mount_conns):
        """
        Record our fuse connection number so that we can use it when forcing
        an unmount.
        """
        new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
        if len(new_conns) == 0:
            raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
        elif len(new_conns) > 1:
            raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
        else:
            self._fuse_conn = new_conns[0]

    def gather_mount_info(self):
        status = self.admin_socket(['status'])
        self.id = status['id']
        self.client_pid = status['metadata']['pid']
        try:
            self.inst = status['inst_str']
            self.addr = status['addr_str']
        except KeyError:
            sessions = self.fs.rank_asok(['session', 'ls'])
            for s in sessions:
                if s['id'] == self.id:
                    self.inst = s['inst']
                    self.addr = self.inst.split()[1]
            if self.inst is None:
                raise RuntimeError("cannot find client session")

    def check_mounted_state(self):
        proc = self.client_remote.run(
            args=[
                'stat',
                '--file-system',
                '--printf=%T\n',
                '--',
                self.hostfs_mntpt,
            ],
            stdout=StringIO(),
            stderr=StringIO(),
            wait=False,
            timeout=300
        )
        try:
            proc.wait()
        except CommandFailedError:
            error = proc.stderr.getvalue()
            if ("endpoint is not connected" in error
            or "Software caused connection abort" in error):
                # This happens is fuse is killed without unmount
                log.warning("Found stale mount point at {0}".format(self.hostfs_mntpt))
                return True
            else:
                # This happens if the mount directory doesn't exist
                log.info('mount point does not exist: %s', self.hostfs_mntpt)
                return False

        fstype = proc.stdout.getvalue().rstrip('\n')
        if fstype == 'fuseblk':
            log.info('ceph-fuse is mounted on %s', self.hostfs_mntpt)
            return True
        else:
            log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
                fstype=fstype))
            return False

    def wait_until_mounted(self):
        """
        Check to make sure that fuse is mounted on mountpoint.  If not,
        sleep for 5 seconds and check again.
        """

        while not self.check_mounted_state():
            # Even if it's not mounted, it should at least
            # be running: catch simple failures where it has terminated.
            assert not self.fuse_daemon.poll()

            time.sleep(5)

        # Now that we're mounted, set permissions so that the rest of the test
        # will have unrestricted access to the filesystem mount.
        for retry in range(10):
            try:
                stderr = StringIO()
                self.client_remote.run(args=['sudo', 'chmod', '1777',
                                             self.hostfs_mntpt],
                                       timeout=300,
                                       stderr=stderr, omit_sudo=False)
                break
            except run.CommandFailedError:
                stderr = stderr.getvalue().lower()
                if "read-only file system" in stderr:
                    break
                elif "permission denied" in stderr:
                    time.sleep(5)
                else:
                    raise

    def _mountpoint_exists(self):
        return self.client_remote.run(args=["ls", "-d", self.hostfs_mntpt],
                                      check_status=False,
                                      timeout=300).exitstatus == 0

    def umount(self, cleanup=True):
        """
        umount() must not run cleanup() when it's called by umount_wait()
        since "run.wait([self.fuse_daemon], timeout)" would hang otherwise.
        """
        if not self.is_mounted():
            if cleanup:
                self.cleanup()
            return
        if self.is_blocked():
            self._run_umount_lf()
            if cleanup:
                self.cleanup()
            return

        try:
            log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
            stderr = StringIO()
            self.client_remote.run(
                args=['sudo', 'fusermount', '-u', self.hostfs_mntpt],
                stderr=stderr, timeout=UMOUNT_TIMEOUT, omit_sudo=False)
        except run.CommandFailedError:
            if "mountpoint not found" in stderr.getvalue():
                # This happens if the mount directory doesn't exist
                log.info('mount point does not exist: %s', self.mountpoint)
            elif "not mounted" in stderr.getvalue():
                # This happens if the mount directory already unmouted
                log.info('mount point not mounted: %s', self.mountpoint)
            else:
                log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))

                self.client_remote.run(
                    args=['sudo', run.Raw('PATH=/usr/sbin:$PATH'), 'lsof',
                    run.Raw(';'), 'ps', 'auxf'],
                    timeout=UMOUNT_TIMEOUT, omit_sudo=False)

                # abort the fuse mount, killing all hung processes
                if self._fuse_conn:
                    self.run_python(dedent("""
                    import os
                    path = "/sys/fs/fuse/connections/{0}/abort"
                    if os.path.exists(path):
                        open(path, "w").write("1")
                    """).format(self._fuse_conn))
                    self._fuse_conn = None

                # make sure its unmounted
                self._run_umount_lf()

        self._fuse_conn = None
        self.id = None
        self.inst = None
        self.addr = None
        if cleanup:
            self.cleanup()

    def umount_wait(self, force=False, require_clean=False,
                    timeout=UMOUNT_TIMEOUT):
        """
        :param force: Complete cleanly even if the MDS is offline
        """
        if not (self.is_mounted() and self.fuse_daemon):
            log.debug('ceph-fuse client.{id} is not mounted at {remote} '
                      '{mnt}'.format(id=self.client_id,
                                     remote=self.client_remote,
                                     mnt=self.hostfs_mntpt))
            self.cleanup()
            return

        if force:
            assert not require_clean  # mutually exclusive

            # When we expect to be forcing, kill the ceph-fuse process directly.
            # This should avoid hitting the more aggressive fallback killing
            # in umount() which can affect other mounts too.
            self.fuse_daemon.stdin.close()

            # However, we will still hit the aggressive wait if there is an ongoing
            # mount -o remount (especially if the remount is stuck because MDSs
            # are unavailable)

        if self.is_blocked():
            self._run_umount_lf()
            self.cleanup()
            return

        # cleanup is set to to fail since clieanup must happen after umount is
        # complete; otherwise following call to run.wait hangs.
        self.umount(cleanup=False)

        try:
            # Permit a timeout, so that we do not block forever
            run.wait([self.fuse_daemon], timeout)

        except MaxWhileTries:
            log.error("process failed to terminate after unmount. This probably"
                      " indicates a bug within ceph-fuse.")
            raise
        except CommandFailedError:
            if require_clean:
                raise

        self.cleanup()

    def teardown(self):
        """
        Whatever the state of the mount, get it gone.
        """
        super(FuseMount, self).teardown()

        self.umount()

        if self.fuse_daemon and not self.fuse_daemon.finished:
            self.fuse_daemon.stdin.close()
            try:
                self.fuse_daemon.wait()
            except CommandFailedError:
                pass

    def _asok_path(self):
        return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)

    @property
    def _prefix(self):
        return ""

    def find_admin_socket(self):
        pyscript = """
import glob
import re
import os
import subprocess

def _find_admin_socket(client_name):
        asok_path = "{asok_path}"
        files = glob.glob(asok_path)
        mountpoint = "{mountpoint}"

        # Given a non-glob path, it better be there
        if "*" not in asok_path:
            assert(len(files) == 1)
            return files[0]

        for f in files:
                pid = re.match(".*\.(\d+)\.asok$", f).group(1)
                if os.path.exists("/proc/{{0}}".format(pid)):
                    with open("/proc/{{0}}/cmdline".format(pid), 'r') as proc_f:
                        contents = proc_f.read()
                        if mountpoint in contents:
                            return f
        raise RuntimeError("Client socket {{0}} not found".format(client_name))

print(_find_admin_socket("{client_name}"))
""".format(
            asok_path=self._asok_path(),
            client_name="client.{0}".format(self.client_id),
            mountpoint=self.mountpoint)

        asok_path = self.run_python(pyscript, sudo=True)
        log.info("Found client admin socket at {0}".format(asok_path))
        return asok_path

    def admin_socket(self, args):
        asok_path = self.find_admin_socket()

        # Query client ID from admin socket, wait 2 seconds
        # and retry 10 times if it is not ready
        with safe_while(sleep=2, tries=10) as proceed:
            while proceed():
                try:
                    p = self.client_remote.run(args=
                        ['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
                        stdout=StringIO(), stderr=StringIO(), wait=False,
                        timeout=300)
                    p.wait()
                    break
                except CommandFailedError:
                    if "connection refused" in p.stderr.getvalue().lower():
                        pass

        return json.loads(p.stdout.getvalue().strip())

    def get_global_id(self):
        """
        Look up the CephFS client ID for this mount
        """
        return self.admin_socket(['mds_sessions'])['id']

    def get_global_inst(self):
        """
        Look up the CephFS client instance for this mount
        """
        return self.inst

    def get_global_addr(self):
        """
        Look up the CephFS client addr for this mount
        """
        return self.addr

    def get_client_pid(self):
        """
        return pid of ceph-fuse process
        """
        status = self.admin_socket(['status'])
        return status['metadata']['pid']

    def get_osd_epoch(self):
        """
        Return 2-tuple of osd_epoch, osd_epoch_barrier
        """
        status = self.admin_socket(['status'])
        return status['osd_epoch'], status['osd_epoch_barrier']

    def get_dentry_count(self):
        """
        Return 2-tuple of dentry_count, dentry_pinned_count
        """
        status = self.admin_socket(['status'])
        return status['dentry_count'], status['dentry_pinned_count']

    def set_cache_size(self, size):
        return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])

    def get_op_read_count(self):
        return self.admin_socket(['perf', 'dump', 'objecter'])['objecter']['osdop_read']