summaryrefslogtreecommitdiffstats
path: root/src/ceph-volume/ceph_volume/tests/util/test_device.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/ceph-volume/ceph_volume/tests/util/test_device.py704
1 files changed, 704 insertions, 0 deletions
diff --git a/src/ceph-volume/ceph_volume/tests/util/test_device.py b/src/ceph-volume/ceph_volume/tests/util/test_device.py
new file mode 100644
index 000000000..8eef3ff00
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/tests/util/test_device.py
@@ -0,0 +1,704 @@
+import os
+import pytest
+from copy import deepcopy
+from ceph_volume.util import device
+from ceph_volume.api import lvm as api
+from mock.mock import patch, mock_open
+
+
+class TestDevice(object):
+
+ def test_sys_api(self, monkeypatch, device_info):
+ volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
+ lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(volume)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.sys_api
+ assert "foo" in disk.sys_api
+
+ def test_lvm_size(self, monkeypatch, device_info):
+ volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
+ lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(volume)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
+ # 5GB in size
+ data = {"/dev/sda": {"size": "5368709120"}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.lvm_size.gb == 4
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_lvm_size_rounds_down(self, fake_call, device_info):
+ # 5.5GB in size
+ data = {"/dev/sda": {"size": "5905580032"}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.lvm_size.gb == 4
+
+ def test_is_lv(self, fake_call, device_info):
+ data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
+ lsblk = {"TYPE": "lvm", "NAME": "vg-lv"}
+ device_info(lv=data,lsblk=lsblk)
+ disk = device.Device("vg/lv")
+ assert disk.is_lv
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_vgs_is_empty(self, fake_call, device_info, monkeypatch):
+ BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
+ pv_tags={})
+ pvolumes = []
+ pvolumes.append(BarPVolume)
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(lsblk=lsblk)
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {})
+
+ disk = device.Device("/dev/nvme0n1")
+ assert disk.vgs == []
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_vgs_is_not_empty(self, fake_call, device_info, monkeypatch):
+ vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg])
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/nvme0n1")
+ assert len(disk.vgs) == 1
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_device_is_device(self, fake_call, device_info):
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "device", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.is_device is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_loop_device_is_not_device(self, fake_call, device_info):
+ data = {"/dev/loop0": {"foo": "bar"}}
+ lsblk = {"TYPE": "loop"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/loop0")
+ assert disk.is_device is False
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_loop_device_is_device(self, fake_call, device_info):
+ data = {"/dev/loop0": {"foo": "bar"}}
+ lsblk = {"TYPE": "loop"}
+ os.environ["CEPH_VOLUME_ALLOW_LOOP_DEVICES"] = "1"
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/loop0")
+ assert disk.is_device is True
+ del os.environ["CEPH_VOLUME_ALLOW_LOOP_DEVICES"]
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_device_is_rotational(self, fake_call, device_info):
+ data = {"/dev/sda": {"rotational": "1"}}
+ lsblk = {"TYPE": "device", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.rotational
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_device_is_not_rotational(self, fake_call, device_info):
+ data = {"/dev/sda": {"rotational": "0"}}
+ lsblk = {"TYPE": "device", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.rotational
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_device_is_rotational_lsblk(self, fake_call, device_info):
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "device", "ROTA": "1", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.rotational
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_device_is_not_rotational_lsblk(self, fake_call, device_info):
+ data = {"/dev/sda": {"rotational": "0"}}
+ lsblk = {"TYPE": "device", "ROTA": "0", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.rotational
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_device_is_rotational_defaults_true(self, fake_call, device_info):
+ # rotational will default true if no info from sys_api or lsblk is found
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "device", "foo": "bar", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.rotational
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_disk_is_device(self, fake_call, device_info):
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.is_device is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_partition(self, fake_call, device_info):
+ data = {"/dev/sda1": {"foo": "bar"}}
+ lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda1")
+ assert disk.is_partition
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_mpath_device_is_device(self, fake_call, device_info):
+ data = {"/dev/foo": {"foo": "bar"}}
+ lsblk = {"TYPE": "mpath", "NAME": "foo"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/foo")
+ assert disk.is_device is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_not_lvm_member(self, fake_call, device_info):
+ data = {"/dev/sda1": {"foo": "bar"}}
+ lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda1")
+ assert not disk.is_lvm_member
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_lvm_member(self, fake_call, device_info):
+ data = {"/dev/sda1": {"foo": "bar"}}
+ lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/sda1")
+ assert not disk.is_lvm_member
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_mapper_device(self, fake_call, device_info):
+ lsblk = {"TYPE": "lvm", "NAME": "foo"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/mapper/foo")
+ assert disk.is_mapper
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_dm_is_mapper_device(self, fake_call, device_info):
+ lsblk = {"TYPE": "lvm", "NAME": "dm-4"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/dm-4")
+ assert disk.is_mapper
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_not_mapper_device(self, fake_call, device_info):
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.is_mapper
+
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_ceph_disk_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
+ disk = device.Device("/dev/sda")
+ assert disk.is_ceph_disk_member
+
+ @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "lsblk_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_ceph_disk_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
+ disk = device.Device("/dev/sda")
+ assert disk.is_ceph_disk_member
+
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_ceph_disk_member_not_available_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
+ disk = device.Device("/dev/sda")
+ assert disk.is_ceph_disk_member
+ assert not disk.available
+ assert "Used by ceph-disk" in disk.rejected_reasons
+
+ @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "lsblk_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_ceph_disk_member_not_available_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
+ disk = device.Device("/dev/sda")
+ assert disk.is_ceph_disk_member
+ assert not disk.available
+ assert "Used by ceph-disk" in disk.rejected_reasons
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_removable_device(self, fake_call, device_info):
+ data = {"/dev/sdb": {"removable": 1}}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sdb")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_device_with_gpt_headers(self, fake_call, device_info):
+ data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
+ blkid= {"PTTYPE": "gpt"}
+ device_info(
+ devices=data,
+ blkid=blkid,
+ lsblk=lsblk,
+ )
+ disk = device.Device("/dev/sdb")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_accept_non_removable_device(self, fake_call, device_info):
+ data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sdb")
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_not_acceptable_device(self, fake_call, device_info):
+ data = {"/dev/dm-0": {"foo": "bar"}}
+ lsblk = {"TYPE": "mpath", "NAME": "dm-0"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/dm-0")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ @patch('ceph_volume.util.device.os.path.realpath')
+ @patch('ceph_volume.util.device.os.path.islink')
+ def test_accept_symlink_to_device(self,
+ m_os_path_islink,
+ m_os_path_realpath,
+ device_info,
+ fake_call):
+ m_os_path_islink.return_value = True
+ m_os_path_realpath.return_value = '/dev/sdb'
+ data = {"/dev/sdb": {"ro": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/test_symlink")
+ print(disk)
+ print(disk.sys_api)
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ @patch('ceph_volume.util.device.os.readlink')
+ @patch('ceph_volume.util.device.os.path.islink')
+ def test_reject_symlink_to_device_mapper(self,
+ m_os_path_islink,
+ m_os_readlink,
+ device_info,
+ fake_call):
+ m_os_path_islink.return_value = True
+ m_os_readlink.return_value = '/dev/dm-0'
+ data = {"/dev/mapper/mpatha": {"ro": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/mapper/mpatha")
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_readonly_device(self, fake_call, device_info):
+ data = {"/dev/cdrom": {"ro": 1}}
+ lsblk = {"TYPE": "disk", "NAME": "cdrom"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/cdrom")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_smaller_than_5gb(self, fake_call, device_info):
+ data = {"/dev/sda": {"size": 5368709119}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.available, 'too small device is available'
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_accept_non_readonly_device(self, fake_call, device_info):
+ data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_bluestore_device(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
+ patch_bluestore_label.return_value = True
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.available
+ assert "Has BlueStore device label" in disk.rejected_reasons
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_device_with_oserror(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
+ patch_bluestore_label.side_effect = OSError('test failure')
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.available
+ assert "Failed to determine if device is BlueStore" in disk.rejected_reasons
+
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "device_info_not_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_not_ceph_disk_member_lsblk(self, fake_call, patch_bluestore_label):
+ disk = device.Device("/dev/sda")
+ assert disk.is_ceph_disk_member is False
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_existing_vg_available(self, fake_call, monkeypatch, device_info):
+ vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1536,
+ vg_extent_size=4194304)
+ monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg])
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
+ lv = {"tags": {"ceph.osd_id": "1"}}
+ device_info(devices=data, lsblk=lsblk, lv=lv)
+ disk = device.Device("/dev/nvme0n1")
+ assert disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_existing_vg_too_small(self, fake_call, monkeypatch, device_info):
+ vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=4,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg])
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
+ lv = {"tags": {"ceph.osd_id": "1"}}
+ device_info(devices=data, lsblk=lsblk, lv=lv)
+ disk = device.Device("/dev/nvme0n1")
+ assert not disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_multiple_existing_vgs(self, fake_call, monkeypatch, device_info):
+ vg1 = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1000,
+ vg_extent_size=4194304)
+ vg2 = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=536,
+ vg_extent_size=4194304)
+ monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg1, vg2])
+ lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
+ lv = {"tags": {"ceph.osd_id": "1"}}
+ device_info(devices=data, lsblk=lsblk, lv=lv)
+ disk = device.Device("/dev/nvme0n1")
+ assert disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
+
+ @pytest.mark.parametrize("ceph_type", ["data", "block"])
+ def test_used_by_ceph(self, fake_call, device_info,
+ monkeypatch, ceph_type):
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
+ lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes = []
+ pvolumes.append(FooPVolume)
+ lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
+ "lv_uuid": "0000", "lv_tags":
+ "ceph.osd_id=0,ceph.type="+ceph_type}
+ volumes = []
+ lv = api.Volume(**lv_data)
+ volumes.append(lv)
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
+ device_info(devices=data, lsblk=lsblk, lv=lv_data)
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
+ disk = device.Device("/dev/sda")
+ assert disk.used_by_ceph
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_not_used_by_ceph(self, fake_call, device_info, monkeypatch):
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes = []
+ pvolumes.append(FooPVolume)
+ data = {"/dev/sda": {"foo": "bar"}}
+ lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
+ lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
+
+ device_info(devices=data, lsblk=lsblk, lv=lv_data)
+ disk = device.Device("/dev/sda")
+ assert not disk.used_by_ceph
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_get_device_id(self, fake_call, device_info):
+ udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(udevadm=udev,lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
+
+ def test_has_bluestore_label(self):
+ # patch device.Device __init__ function to do nothing since we want to only test the
+ # low-level behavior of has_bluestore_label
+ with patch.object(device.Device, "__init__", lambda self, path, with_lsm=False: None):
+ disk = device.Device("/dev/sda")
+ disk.path = "/dev/sda"
+ with patch('builtins.open', mock_open(read_data=b'bluestore block device\n')):
+ assert disk.has_bluestore_label
+ with patch('builtins.open', mock_open(read_data=b'not a bluestore block device\n')):
+ assert not disk.has_bluestore_label
+
+
+class TestDeviceEncryption(object):
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_partition_is_not_encrypted_lsblk(self, fake_call, device_info):
+ lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'NAME': 'sda', 'PKNAME': 'sda'}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.is_encrypted is False
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_partition_is_encrypted_lsblk(self, fake_call, device_info):
+ lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'PKNAME': 'sda'}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_partition_is_not_encrypted_blkid(self, fake_call, device_info):
+ lsblk = {'TYPE': 'part', 'NAME': 'sda', 'PKNAME': 'sda'}
+ blkid = {'TYPE': 'ceph data'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ assert disk.is_encrypted is False
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_partition_is_encrypted_blkid(self, fake_call, device_info):
+ lsblk = {'TYPE': 'part', 'NAME': 'sda' ,'PKNAME': 'sda'}
+ blkid = {'TYPE': 'crypto_LUKS'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_mapper_is_encrypted_luks1(self, fake_call, device_info, monkeypatch):
+ status = {'type': 'LUKS1'}
+ monkeypatch.setattr(device, 'encryption_status', lambda x: status)
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid','TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/mapper/uuid")
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_mapper_is_encrypted_luks2(self, fake_call, device_info, monkeypatch):
+ status = {'type': 'LUKS2'}
+ monkeypatch.setattr(device, 'encryption_status', lambda x: status)
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/mapper/uuid")
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_mapper_is_encrypted_plain(self, fake_call, device_info, monkeypatch):
+ status = {'type': 'PLAIN'}
+ monkeypatch.setattr(device, 'encryption_status', lambda x: status)
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/mapper/uuid")
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_mapper_is_not_encrypted_plain(self, fake_call, device_info, monkeypatch):
+ monkeypatch.setattr(device, 'encryption_status', lambda x: {})
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/mapper/uuid")
+ assert disk.is_encrypted is False
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_lv_is_encrypted_blkid(self, fake_call, device_info):
+ lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
+ blkid = {'TYPE': 'crypto_LUKS'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ disk.lv_api = {}
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_lv_is_not_encrypted_blkid(self, fake_call, factory, device_info):
+ lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
+ blkid = {'TYPE': 'xfs'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ disk.lv_api = factory(encrypted=None)
+ assert disk.is_encrypted is False
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_lv_is_encrypted_lsblk(self, fake_call, device_info):
+ lsblk = {'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ disk.lv_api = {}
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_lv_is_not_encrypted_lsblk(self, fake_call, factory, device_info):
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ disk.lv_api = factory(encrypted=None)
+ assert disk.is_encrypted is False
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_lv_is_encrypted_lvm_api(self, fake_call, factory, device_info):
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ disk.lv_api = factory(encrypted=True)
+ assert disk.is_encrypted is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_lv_is_not_encrypted_lvm_api(self, fake_call, factory, device_info):
+ lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
+ blkid = {'TYPE': 'mapper'}
+ device_info(lsblk=lsblk, blkid=blkid)
+ disk = device.Device("/dev/sda")
+ disk.lv_api = factory(encrypted=False)
+ assert disk.is_encrypted is False
+
+
+class TestDeviceOrdering(object):
+
+ def setup(self):
+ self.data = {
+ "/dev/sda": {"removable": 0},
+ "/dev/sdb": {"removable": 1}, # invalid
+ "/dev/sdc": {"removable": 0},
+ "/dev/sdd": {"removable": 1}, # invalid
+ }
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_valid_before_invalid(self, fake_call, device_info):
+ lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
+ lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
+ device_info(devices=self.data,lsblk=lsblk_sda)
+ sda = device.Device("/dev/sda")
+ device_info(devices=self.data,lsblk=lsblk_sdb)
+ sdb = device.Device("/dev/sdb")
+
+ assert sda < sdb
+ assert sdb > sda
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_valid_alphabetical_ordering(self, fake_call, device_info):
+ lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
+ lsblk_sdc = {"NAME": "sdc", "TYPE": "disk"}
+ device_info(devices=self.data,lsblk=lsblk_sda)
+ sda = device.Device("/dev/sda")
+ device_info(devices=self.data,lsblk=lsblk_sdc)
+ sdc = device.Device("/dev/sdc")
+
+ assert sda < sdc
+ assert sdc > sda
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_invalid_alphabetical_ordering(self, fake_call, device_info):
+ lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
+ lsblk_sdd = {"NAME": "sdd", "TYPE": "disk"}
+ device_info(devices=self.data,lsblk=lsblk_sdb)
+ sdb = device.Device("/dev/sdb")
+ device_info(devices=self.data,lsblk=lsblk_sdd)
+ sdd = device.Device("/dev/sdd")
+
+ assert sdb < sdd
+ assert sdd > sdb
+
+
+class TestCephDiskDevice(object):
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_partlabel_lsblk(self, fake_call, device_info):
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": ""}
+ device_info(lsblk=lsblk)
+ disk = device.CephDiskDevice(device.Device("/dev/sda"))
+
+ assert disk.partlabel == ''
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_partlabel_blkid(self, fake_call, device_info):
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph data"}
+ blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"}
+ device_info(blkid=blkid, lsblk=lsblk)
+ disk = device.CephDiskDevice(device.Device("/dev/sda"))
+
+ assert disk.partlabel == 'ceph data'
+
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "blkid_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_member_blkid(self, fake_call, monkeypatch):
+ disk = device.CephDiskDevice(device.Device("/dev/sda"))
+
+ assert disk.is_member is True
+
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_is_member_lsblk(self, fake_call, patch_bluestore_label, device_info):
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph"}
+ device_info(lsblk=lsblk)
+ disk = device.CephDiskDevice(device.Device("/dev/sda"))
+
+ assert disk.is_member is True
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_unknown_type(self, fake_call, device_info):
+ lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "gluster"}
+ device_info(lsblk=lsblk)
+ disk = device.CephDiskDevice(device.Device("/dev/sda"))
+
+ assert disk.type == 'unknown'
+
+ ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "blkid_ceph_disk_member",
+ "disable_kernel_queries")
+ def test_type_blkid(self, monkeypatch, fake_call, device_info, ceph_partlabel):
+ disk = device.CephDiskDevice(device.Device("/dev/sda"))
+
+ assert disk.type in self.ceph_types
+
+ @pytest.mark.usefixtures("blkid_ceph_disk_member",
+ "lsblk_ceph_disk_member",
+ "disable_kernel_queries")
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_type_lsblk(self, fake_call, device_info, ceph_partlabel):
+ disk = device.CephDiskDevice(device.Device("/dev/sda"))
+
+ assert disk.type in self.ceph_types