summaryrefslogtreecommitdiffstats
path: root/src/ceph-volume/ceph_volume/tests/devices/simple/test_activate.py
blob: 152ac9b09e23bc28ced5ddf09f62df98f23ad736 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import os
import pytest
from ceph_volume.devices.simple import activate


class TestActivate(object):

    def test_no_data_uuid(self, factory, is_root, monkeypatch, capture, fake_filesystem):
        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        args = factory(osd_id='0', osd_fsid='1234', json_config='/tmp/json-config')
        with pytest.raises(RuntimeError):
            activate.Activate([]).activate(args)

    def test_invalid_json_path(self):
        os.environ['CEPH_VOLUME_SIMPLE_JSON_DIR'] = '/non/existing/path'
        with pytest.raises(RuntimeError) as error:
            activate.Activate(['1', 'asdf']).main()
        assert 'Expected JSON config path not found' in str(error.value)

    def test_main_spits_help_with_no_arguments(self, capsys):
        activate.Activate([]).main()
        stdout, stderr = capsys.readouterr()
        assert 'Activate OSDs by mounting devices previously configured' in stdout

    def test_activate_all(self, is_root, monkeypatch):
        '''
        make sure Activate calls activate for each file returned by glob
        '''
        mocked_glob = []
        def mock_glob(glob):
            path = os.path.dirname(glob)
            mocked_glob.extend(['{}/{}.json'.format(path, file_) for file_ in
                                ['1', '2', '3']])
            return mocked_glob
        activate_files = []
        def mock_activate(self, args):
            activate_files.append(args.json_config)
        monkeypatch.setattr('glob.glob', mock_glob)
        monkeypatch.setattr(activate.Activate, 'activate', mock_activate)
        activate.Activate(['--all']).main()
        assert activate_files == mocked_glob




class TestEnableSystemdUnits(object):

    def test_nothing_is_activated(self, is_root, capsys, fake_filesystem):
        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        activation = activate.Activate(['--no-systemd', '--file', '/tmp/json-config', '0', '1234'], from_trigger=True)
        activation.activate = lambda x: True
        activation.main()
        activation.enable_systemd_units('0', '1234')
        stdout, stderr = capsys.readouterr()
        assert 'Skipping enabling of `simple`' in stderr
        assert 'Skipping masking of ceph-disk' in stderr
        assert 'Skipping enabling and starting OSD simple' in stderr

    def test_no_systemd_flag_is_true(self, is_root, fake_filesystem):
        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        activation = activate.Activate(['--no-systemd', '--file', '/tmp/json-config', '0', '1234'], from_trigger=True)
        activation.activate = lambda x: True
        activation.main()
        assert activation.skip_systemd is True

    def test_no_systemd_flag_is_false(self, is_root, fake_filesystem):
        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=True)
        activation.activate = lambda x: True
        activation.main()
        assert activation.skip_systemd is False

    def test_masks_ceph_disk(self, is_root, monkeypatch, capture, fake_filesystem):
        monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', capture)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)

        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
        activation.activate = lambda x: True
        activation.main()
        activation.enable_systemd_units('0', '1234')
        assert len(capture.calls) == 1

    def test_enables_simple_unit(self, is_root, monkeypatch, capture, fake_filesystem):
        monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', capture)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)

        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
        activation.activate = lambda x: True
        activation.main()
        activation.enable_systemd_units('0', '1234')
        assert len(capture.calls) == 1
        assert capture.calls[0]['args'] == ('0', '1234', 'simple')

    def test_enables_osd_unit(self, is_root, monkeypatch, capture, fake_filesystem):
        monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', capture)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)

        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
        activation.activate = lambda x: True
        activation.main()
        activation.enable_systemd_units('0', '1234')
        assert len(capture.calls) == 1
        assert capture.calls[0]['args'] == ('0',)

    def test_starts_osd_unit(self, is_root, monkeypatch, capture, fake_filesystem):
        monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
        monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', capture)

        fake_filesystem.create_file('/tmp/json-config', contents='{}')
        activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
        activation.activate = lambda x: True
        activation.main()
        activation.enable_systemd_units('0', '1234')
        assert len(capture.calls) == 1
        assert capture.calls[0]['args'] == ('0',)


class TestValidateDevices(object):

    def test_bluestore_with_all_devices(self):
        activation = activate.Activate([])
        result = activation.validate_devices({'type': 'bluestore', 'data': {}, 'block': {}})
        assert result is True

    def test_bluestore_without_type(self):
        activation = activate.Activate([])
        result = activation.validate_devices({'data': {}, 'block': {}})
        assert result is True

    def test_bluestore_is_default(self):
        activation = activate.Activate([])
        result = activation.validate_devices({'data': {}, 'block': {}})
        assert result is True

    def test_bluestore_data_device_found(self, capsys):
        activation = activate.Activate([])
        with pytest.raises(RuntimeError):
            activation.validate_devices({'data': {}})
        stdout, stderr = capsys.readouterr()
        assert "devices found: ['data']" in stderr

    def test_bluestore_missing_data(self):
        activation = activate.Activate([])
        with pytest.raises(RuntimeError) as error:
            activation.validate_devices({'type': 'bluestore', 'block': {}})
        assert 'Unable to activate bluestore OSD due to missing devices' in str(error.value)

    def test_bluestore_block_device_found(self, capsys):
        activation = activate.Activate([])
        with pytest.raises(RuntimeError):
            activation.validate_devices({'block': {}})
        stdout, stderr = capsys.readouterr()
        assert "devices found: ['block']" in stderr