diff options
Diffstat (limited to 'src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py')
-rw-r--r-- | src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py b/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py new file mode 100644 index 000000000..64016111c --- /dev/null +++ b/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py @@ -0,0 +1,241 @@ +import os +import pytest +from copy import deepcopy +from mock.mock import patch, call +from ceph_volume import process +from ceph_volume.api import lvm as api +from ceph_volume.devices.lvm import zap + + +class TestZap(object): + def test_invalid_osd_id_passed(self): + with pytest.raises(SystemExit): + zap.Zap(argv=['--osd-id', 'foo']).main() + +class TestFindAssociatedDevices(object): + + def test_no_lvs_found_that_match_id(self, monkeypatch, device_info): + tags = 'ceph.osd_id=9,ceph.journal_uuid=x,ceph.type=data' + osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_tags=tags, lv_path='/dev/VolGroup/lv') + volumes = [] + volumes.append(osd) + monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {}) + + with pytest.raises(RuntimeError): + zap.find_associated_devices(osd_id=10) + + def test_no_lvs_found_that_match_fsid(self, monkeypatch, device_info): + tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,'+\ + 'ceph.type=data' + osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags, + vg_name='vg', lv_path='/dev/VolGroup/lv') + volumes = [] + volumes.append(osd) + monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {}) + + with pytest.raises(RuntimeError): + zap.find_associated_devices(osd_fsid='aaaa-lkjh') + + def test_no_lvs_found_that_match_id_fsid(self, monkeypatch, device_info): + tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,'+\ + 'ceph.type=data' + osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_tags=tags, lv_path='/dev/VolGroup/lv') + volumes = [] + volumes.append(osd) + monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {}) + + with pytest.raises(RuntimeError): + zap.find_associated_devices(osd_id='9', osd_fsid='aaaa-lkjh') + + def test_no_ceph_lvs_found(self, monkeypatch): + osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags='', + lv_path='/dev/VolGroup/lv') + volumes = [] + volumes.append(osd) + monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {}) + + with pytest.raises(RuntimeError): + zap.find_associated_devices(osd_id=100) + + def test_lv_is_matched_id(self, monkeypatch): + tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data' + osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='', + lv_path='/dev/VolGroup/lv', lv_tags=tags) + volumes = [] + volumes.append(osd) + monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + result = zap.find_associated_devices(osd_id='0') + assert result[0].path == '/dev/VolGroup/lv' + + def test_lv_is_matched_fsid(self, monkeypatch): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\ + 'ceph.type=data' + osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='', + lv_path='/dev/VolGroup/lv', lv_tags=tags) + volumes = [] + volumes.append(osd) + monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: deepcopy(volumes)) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + result = zap.find_associated_devices(osd_fsid='asdf-lkjh') + assert result[0].path == '/dev/VolGroup/lv' + + def test_lv_is_matched_id_fsid(self, monkeypatch): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\ + 'ceph.type=data' + osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='', + lv_path='/dev/VolGroup/lv', lv_tags=tags) + volumes = [] + volumes.append(osd) + monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + result = zap.find_associated_devices(osd_id='0', osd_fsid='asdf-lkjh') + assert result[0].path == '/dev/VolGroup/lv' + + +class TestEnsureAssociatedLVs(object): + + def test_nothing_is_found(self): + volumes = [] + result = zap.ensure_associated_lvs(volumes) + assert result == [] + + def test_data_is_found(self, fake_call): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data' + osd = api.Volume( + lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/data', lv_tags=tags) + volumes = [] + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert result == ['/dev/VolGroup/data'] + + def test_block_is_found(self, fake_call): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block' + osd = api.Volume( + lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags) + volumes = [] + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert result == ['/dev/VolGroup/block'] + + def test_success_message_for_fsid(self, factory, is_root, capsys): + cli_zap = zap.Zap([]) + args = factory(devices=[], osd_id=None, osd_fsid='asdf-lkjh') + cli_zap.args = args + cli_zap.zap() + out, err = capsys.readouterr() + assert "Zapping successful for OSD: asdf-lkjh" in err + + def test_success_message_for_id(self, factory, is_root, capsys): + cli_zap = zap.Zap([]) + args = factory(devices=[], osd_id='1', osd_fsid=None) + cli_zap.args = args + cli_zap.zap() + out, err = capsys.readouterr() + assert "Zapping successful for OSD: 1" in err + + def test_block_and_partition_are_found(self, monkeypatch): + monkeypatch.setattr(zap.disk, 'get_device_from_partuuid', lambda x: '/dev/sdb1') + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block' + osd = api.Volume( + lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags) + volumes = [] + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert '/dev/sdb1' in result + assert '/dev/VolGroup/block' in result + + def test_journal_is_found(self, fake_call): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal' + osd = api.Volume( + lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv', lv_tags=tags) + volumes = [] + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert result == ['/dev/VolGroup/lv'] + + def test_multiple_journals_are_found(self): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal' + volumes = [] + for i in range(3): + osd = api.Volume( + lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags) + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert '/dev/VolGroup/lv0' in result + assert '/dev/VolGroup/lv1' in result + assert '/dev/VolGroup/lv2' in result + + def test_multiple_dbs_are_found(self): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=db' + volumes = [] + for i in range(3): + osd = api.Volume( + lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags) + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert '/dev/VolGroup/lv0' in result + assert '/dev/VolGroup/lv1' in result + assert '/dev/VolGroup/lv2' in result + + def test_multiple_wals_are_found(self): + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=wal' + volumes = [] + for i in range(3): + osd = api.Volume( + lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags) + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert '/dev/VolGroup/lv0' in result + assert '/dev/VolGroup/lv1' in result + assert '/dev/VolGroup/lv2' in result + + def test_multiple_backing_devs_are_found(self): + volumes = [] + for _type in ['journal', 'db', 'wal']: + tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=%s' % _type + osd = api.Volume( + lv_name='volume%s' % _type, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % _type, lv_tags=tags) + volumes.append(osd) + result = zap.ensure_associated_lvs(volumes) + assert '/dev/VolGroup/lvjournal' in result + assert '/dev/VolGroup/lvwal' in result + assert '/dev/VolGroup/lvdb' in result + + @patch('ceph_volume.devices.lvm.zap.api.get_lvs') + def test_ensure_associated_lvs(self, m_get_lvs): + zap.ensure_associated_lvs([], lv_tags={'ceph.osd_id': '1'}) + calls = [ + call(tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}), + call(tags={'ceph.type': 'db', 'ceph.osd_id': '1'}), + call(tags={'ceph.type': 'wal', 'ceph.osd_id': '1'}) + ] + m_get_lvs.assert_has_calls(calls, any_order=True) + + +class TestWipeFs(object): + + def setup(self): + os.environ['CEPH_VOLUME_WIPEFS_INTERVAL'] = '0' + + def test_works_on_second_try(self, stub_call): + os.environ['CEPH_VOLUME_WIPEFS_TRIES'] = '2' + stub_call([('wiping /dev/sda', '', 1), ('', '', 0)]) + result = zap.wipefs('/dev/sda') + assert result is None + + def test_does_not_work_after_several_tries(self, stub_call): + os.environ['CEPH_VOLUME_WIPEFS_TRIES'] = '2' + stub_call([('wiping /dev/sda', '', 1), ('', '', 1)]) + with pytest.raises(RuntimeError): + zap.wipefs('/dev/sda') + + def test_does_not_work_default_tries(self, stub_call): + stub_call([('wiping /dev/sda', '', 1)]*8) + with pytest.raises(RuntimeError): + zap.wipefs('/dev/sda') |