summaryrefslogtreecommitdiffstats
path: root/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py')
-rw-r--r--src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py241
1 files changed, 241 insertions, 0 deletions
diff --git a/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py b/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py
new file mode 100644
index 000000000..2446c5ed6
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py
@@ -0,0 +1,241 @@
+import os
+import pytest
+from copy import deepcopy
+from mock.mock import patch, call
+from ceph_volume import process
+from ceph_volume.api import lvm as api
+from ceph_volume.devices.lvm import zap
+
+
+class TestZap(object):
+ def test_invalid_osd_id_passed(self):
+ with pytest.raises(SystemExit):
+ zap.Zap(argv=['--osd-id', 'foo']).main()
+
+class TestFindAssociatedDevices(object):
+
+ def test_no_lvs_found_that_match_id(self, monkeypatch, device_info):
+ tags = 'ceph.osd_id=9,ceph.journal_uuid=x,ceph.type=data'
+ osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg',
+ lv_tags=tags, lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(osd)
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
+
+ with pytest.raises(RuntimeError):
+ zap.find_associated_devices(osd_id=10)
+
+ def test_no_lvs_found_that_match_fsid(self, monkeypatch, device_info):
+ tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,'+\
+ 'ceph.type=data'
+ osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags,
+ vg_name='vg', lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(osd)
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
+
+ with pytest.raises(RuntimeError):
+ zap.find_associated_devices(osd_fsid='aaaa-lkjh')
+
+ def test_no_lvs_found_that_match_id_fsid(self, monkeypatch, device_info):
+ tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,'+\
+ 'ceph.type=data'
+ osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg',
+ lv_tags=tags, lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(osd)
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
+
+ with pytest.raises(RuntimeError):
+ zap.find_associated_devices(osd_id='9', osd_fsid='aaaa-lkjh')
+
+ def test_no_ceph_lvs_found(self, monkeypatch):
+ osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags='',
+ lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(osd)
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kwargs: {})
+
+ with pytest.raises(RuntimeError):
+ zap.find_associated_devices(osd_id=100)
+
+ def test_lv_is_matched_id(self, monkeypatch):
+ tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
+ osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
+ lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
+ volumes.append(osd)
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
+ monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
+
+ result = zap.find_associated_devices(osd_id='0')
+ assert result[0].path == '/dev/VolGroup/lv'
+
+ def test_lv_is_matched_fsid(self, monkeypatch):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
+ 'ceph.type=data'
+ osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
+ lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
+ volumes.append(osd)
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: deepcopy(volumes))
+ monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
+
+ result = zap.find_associated_devices(osd_fsid='asdf-lkjh')
+ assert result[0].path == '/dev/VolGroup/lv'
+
+ def test_lv_is_matched_id_fsid(self, monkeypatch):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,' +\
+ 'ceph.type=data'
+ osd = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='',
+ lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
+ volumes.append(osd)
+ monkeypatch.setattr(zap.api, 'get_lvs', lambda **kw: volumes)
+ monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0))
+
+ result = zap.find_associated_devices(osd_id='0', osd_fsid='asdf-lkjh')
+ assert result[0].path == '/dev/VolGroup/lv'
+
+
+class TestEnsureAssociatedLVs(object):
+
+ def test_nothing_is_found(self):
+ volumes = []
+ result = zap.ensure_associated_lvs(volumes)
+ assert result == []
+
+ def test_data_is_found(self, fake_call):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data'
+ osd = api.Volume(
+ lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/data', lv_tags=tags)
+ volumes = []
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert result == ['/dev/VolGroup/data']
+
+ def test_block_is_found(self, fake_call):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block'
+ osd = api.Volume(
+ lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags)
+ volumes = []
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert result == ['/dev/VolGroup/block']
+
+ def test_success_message_for_fsid(self, factory, is_root, capsys):
+ cli_zap = zap.Zap([])
+ args = factory(devices=[], osd_id=None, osd_fsid='asdf-lkjh')
+ cli_zap.args = args
+ cli_zap.zap()
+ out, err = capsys.readouterr()
+ assert "Zapping successful for OSD: asdf-lkjh" in err
+
+ def test_success_message_for_id(self, factory, is_root, capsys):
+ cli_zap = zap.Zap([])
+ args = factory(devices=[], osd_id='1', osd_fsid=None)
+ cli_zap.args = args
+ cli_zap.zap()
+ out, err = capsys.readouterr()
+ assert "Zapping successful for OSD: 1" in err
+
+ def test_block_and_partition_are_found(self, monkeypatch):
+ monkeypatch.setattr(zap.disk, 'get_device_from_partuuid', lambda x: '/dev/sdb1')
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block'
+ osd = api.Volume(
+ lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags)
+ volumes = []
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert '/dev/sdb1' in result
+ assert '/dev/VolGroup/block' in result
+
+ def test_journal_is_found(self, fake_call):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
+ osd = api.Volume(
+ lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv', lv_tags=tags)
+ volumes = []
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert result == ['/dev/VolGroup/lv']
+
+ def test_multiple_journals_are_found(self):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
+ volumes = []
+ for i in range(3):
+ osd = api.Volume(
+ lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert '/dev/VolGroup/lv0' in result
+ assert '/dev/VolGroup/lv1' in result
+ assert '/dev/VolGroup/lv2' in result
+
+ def test_multiple_dbs_are_found(self):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=db'
+ volumes = []
+ for i in range(3):
+ osd = api.Volume(
+ lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert '/dev/VolGroup/lv0' in result
+ assert '/dev/VolGroup/lv1' in result
+ assert '/dev/VolGroup/lv2' in result
+
+ def test_multiple_wals_are_found(self):
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=wal'
+ volumes = []
+ for i in range(3):
+ osd = api.Volume(
+ lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert '/dev/VolGroup/lv0' in result
+ assert '/dev/VolGroup/lv1' in result
+ assert '/dev/VolGroup/lv2' in result
+
+ def test_multiple_backing_devs_are_found(self):
+ volumes = []
+ for _type in ['journal', 'db', 'wal']:
+ tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=%s' % _type
+ osd = api.Volume(
+ lv_name='volume%s' % _type, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % _type, lv_tags=tags)
+ volumes.append(osd)
+ result = zap.ensure_associated_lvs(volumes)
+ assert '/dev/VolGroup/lvjournal' in result
+ assert '/dev/VolGroup/lvwal' in result
+ assert '/dev/VolGroup/lvdb' in result
+
+ @patch('ceph_volume.devices.lvm.zap.api.get_lvs')
+ def test_ensure_associated_lvs(self, m_get_lvs):
+ zap.ensure_associated_lvs([], lv_tags={'ceph.osd_id': '1'})
+ calls = [
+ call(tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}),
+ call(tags={'ceph.type': 'db', 'ceph.osd_id': '1'}),
+ call(tags={'ceph.type': 'wal', 'ceph.osd_id': '1'})
+ ]
+ m_get_lvs.assert_has_calls(calls, any_order=True)
+
+
+class TestWipeFs(object):
+
+ def setup_method(self):
+ os.environ['CEPH_VOLUME_WIPEFS_INTERVAL'] = '0'
+
+ def test_works_on_second_try(self, stub_call):
+ os.environ['CEPH_VOLUME_WIPEFS_TRIES'] = '2'
+ stub_call([('wiping /dev/sda', '', 1), ('', '', 0)])
+ result = zap.wipefs('/dev/sda')
+ assert result is None
+
+ def test_does_not_work_after_several_tries(self, stub_call):
+ os.environ['CEPH_VOLUME_WIPEFS_TRIES'] = '2'
+ stub_call([('wiping /dev/sda', '', 1), ('', '', 1)])
+ with pytest.raises(RuntimeError):
+ zap.wipefs('/dev/sda')
+
+ def test_does_not_work_default_tries(self, stub_call):
+ stub_call([('wiping /dev/sda', '', 1)]*8)
+ with pytest.raises(RuntimeError):
+ zap.wipefs('/dev/sda')