From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../ceph_volume/tests/util/test_device.py | 704 +++++++++++++++++++++ 1 file changed, 704 insertions(+) create mode 100644 src/ceph-volume/ceph_volume/tests/util/test_device.py (limited to 'src/ceph-volume/ceph_volume/tests/util/test_device.py') diff --git a/src/ceph-volume/ceph_volume/tests/util/test_device.py b/src/ceph-volume/ceph_volume/tests/util/test_device.py new file mode 100644 index 000000000..8eef3ff00 --- /dev/null +++ b/src/ceph-volume/ceph_volume/tests/util/test_device.py @@ -0,0 +1,704 @@ +import os +import pytest +from copy import deepcopy +from ceph_volume.util import device +from ceph_volume.api import lvm as api +from mock.mock import patch, mock_open + + +class TestDevice(object): + + def test_sys_api(self, monkeypatch, device_info): + volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg', + lv_tags={}, lv_path='/dev/VolGroup/lv') + volumes = [] + volumes.append(volume) + monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: + deepcopy(volumes)) + + data = {"/dev/sda": {"foo": "bar"}} + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.sys_api + assert "foo" in disk.sys_api + + def test_lvm_size(self, monkeypatch, device_info): + volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg', + lv_tags={}, lv_path='/dev/VolGroup/lv') + volumes = [] + volumes.append(volume) + monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: + deepcopy(volumes)) + + # 5GB in size + data = {"/dev/sda": {"size": "5368709120"}} + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.lvm_size.gb == 4 + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_lvm_size_rounds_down(self, fake_call, device_info): + # 5.5GB in size + data = {"/dev/sda": {"size": "5905580032"}} + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.lvm_size.gb == 4 + + def test_is_lv(self, fake_call, device_info): + data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"} + lsblk = {"TYPE": "lvm", "NAME": "vg-lv"} + device_info(lv=data,lsblk=lsblk) + disk = device.Device("vg/lv") + assert disk.is_lv + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_vgs_is_empty(self, fake_call, device_info, monkeypatch): + BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", + pv_tags={}) + pvolumes = [] + pvolumes.append(BarPVolume) + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(lsblk=lsblk) + monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {}) + + disk = device.Device("/dev/nvme0n1") + assert disk.vgs == [] + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_vgs_is_not_empty(self, fake_call, device_info, monkeypatch): + vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=6, + vg_extent_size=1073741824) + monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg]) + lsblk = {"TYPE": "disk", "NAME": "nvme0n1"} + device_info(lsblk=lsblk) + disk = device.Device("/dev/nvme0n1") + assert len(disk.vgs) == 1 + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_device_is_device(self, fake_call, device_info): + data = {"/dev/sda": {"foo": "bar"}} + lsblk = {"TYPE": "device", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.is_device is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_loop_device_is_not_device(self, fake_call, device_info): + data = {"/dev/loop0": {"foo": "bar"}} + lsblk = {"TYPE": "loop"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/loop0") + assert disk.is_device is False + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_loop_device_is_device(self, fake_call, device_info): + data = {"/dev/loop0": {"foo": "bar"}} + lsblk = {"TYPE": "loop"} + os.environ["CEPH_VOLUME_ALLOW_LOOP_DEVICES"] = "1" + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/loop0") + assert disk.is_device is True + del os.environ["CEPH_VOLUME_ALLOW_LOOP_DEVICES"] + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_device_is_rotational(self, fake_call, device_info): + data = {"/dev/sda": {"rotational": "1"}} + lsblk = {"TYPE": "device", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.rotational + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_device_is_not_rotational(self, fake_call, device_info): + data = {"/dev/sda": {"rotational": "0"}} + lsblk = {"TYPE": "device", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert not disk.rotational + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_device_is_rotational_lsblk(self, fake_call, device_info): + data = {"/dev/sda": {"foo": "bar"}} + lsblk = {"TYPE": "device", "ROTA": "1", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.rotational + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_device_is_not_rotational_lsblk(self, fake_call, device_info): + data = {"/dev/sda": {"rotational": "0"}} + lsblk = {"TYPE": "device", "ROTA": "0", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert not disk.rotational + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_device_is_rotational_defaults_true(self, fake_call, device_info): + # rotational will default true if no info from sys_api or lsblk is found + data = {"/dev/sda": {"foo": "bar"}} + lsblk = {"TYPE": "device", "foo": "bar", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.rotational + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_disk_is_device(self, fake_call, device_info): + data = {"/dev/sda": {"foo": "bar"}} + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.is_device is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_partition(self, fake_call, device_info): + data = {"/dev/sda1": {"foo": "bar"}} + lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda1") + assert disk.is_partition + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_mpath_device_is_device(self, fake_call, device_info): + data = {"/dev/foo": {"foo": "bar"}} + lsblk = {"TYPE": "mpath", "NAME": "foo"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/foo") + assert disk.is_device is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_not_lvm_member(self, fake_call, device_info): + data = {"/dev/sda1": {"foo": "bar"}} + lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda1") + assert not disk.is_lvm_member + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_lvm_member(self, fake_call, device_info): + data = {"/dev/sda1": {"foo": "bar"}} + lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/sda1") + assert not disk.is_lvm_member + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_mapper_device(self, fake_call, device_info): + lsblk = {"TYPE": "lvm", "NAME": "foo"} + device_info(lsblk=lsblk) + disk = device.Device("/dev/mapper/foo") + assert disk.is_mapper + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_dm_is_mapper_device(self, fake_call, device_info): + lsblk = {"TYPE": "lvm", "NAME": "dm-4"} + device_info(lsblk=lsblk) + disk = device.Device("/dev/dm-4") + assert disk.is_mapper + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_not_mapper_device(self, fake_call, device_info): + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(lsblk=lsblk) + disk = device.Device("/dev/sda") + assert not disk.is_mapper + + @pytest.mark.usefixtures("lsblk_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_ceph_disk_lsblk(self, fake_call, monkeypatch, patch_bluestore_label): + disk = device.Device("/dev/sda") + assert disk.is_ceph_disk_member + + @pytest.mark.usefixtures("blkid_ceph_disk_member", + "lsblk_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_ceph_disk_blkid(self, fake_call, monkeypatch, patch_bluestore_label): + disk = device.Device("/dev/sda") + assert disk.is_ceph_disk_member + + @pytest.mark.usefixtures("lsblk_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_ceph_disk_member_not_available_lsblk(self, fake_call, monkeypatch, patch_bluestore_label): + disk = device.Device("/dev/sda") + assert disk.is_ceph_disk_member + assert not disk.available + assert "Used by ceph-disk" in disk.rejected_reasons + + @pytest.mark.usefixtures("blkid_ceph_disk_member", + "lsblk_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_ceph_disk_member_not_available_blkid(self, fake_call, monkeypatch, patch_bluestore_label): + disk = device.Device("/dev/sda") + assert disk.is_ceph_disk_member + assert not disk.available + assert "Used by ceph-disk" in disk.rejected_reasons + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_reject_removable_device(self, fake_call, device_info): + data = {"/dev/sdb": {"removable": 1}} + lsblk = {"TYPE": "disk", "NAME": "sdb"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/sdb") + assert not disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_reject_device_with_gpt_headers(self, fake_call, device_info): + data = {"/dev/sdb": {"removable": 0, "size": 5368709120}} + lsblk = {"TYPE": "disk", "NAME": "sdb"} + blkid= {"PTTYPE": "gpt"} + device_info( + devices=data, + blkid=blkid, + lsblk=lsblk, + ) + disk = device.Device("/dev/sdb") + assert not disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_accept_non_removable_device(self, fake_call, device_info): + data = {"/dev/sdb": {"removable": 0, "size": 5368709120}} + lsblk = {"TYPE": "disk", "NAME": "sdb"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/sdb") + assert disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_reject_not_acceptable_device(self, fake_call, device_info): + data = {"/dev/dm-0": {"foo": "bar"}} + lsblk = {"TYPE": "mpath", "NAME": "dm-0"} + device_info(devices=data, lsblk=lsblk) + disk = device.Device("/dev/dm-0") + assert not disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + @patch('ceph_volume.util.device.os.path.realpath') + @patch('ceph_volume.util.device.os.path.islink') + def test_accept_symlink_to_device(self, + m_os_path_islink, + m_os_path_realpath, + device_info, + fake_call): + m_os_path_islink.return_value = True + m_os_path_realpath.return_value = '/dev/sdb' + data = {"/dev/sdb": {"ro": 0, "size": 5368709120}} + lsblk = {"TYPE": "disk"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/test_symlink") + print(disk) + print(disk.sys_api) + assert disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + @patch('ceph_volume.util.device.os.readlink') + @patch('ceph_volume.util.device.os.path.islink') + def test_reject_symlink_to_device_mapper(self, + m_os_path_islink, + m_os_readlink, + device_info, + fake_call): + m_os_path_islink.return_value = True + m_os_readlink.return_value = '/dev/dm-0' + data = {"/dev/mapper/mpatha": {"ro": 0, "size": 5368709120}} + lsblk = {"TYPE": "disk"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/mapper/mpatha") + assert disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_reject_readonly_device(self, fake_call, device_info): + data = {"/dev/cdrom": {"ro": 1}} + lsblk = {"TYPE": "disk", "NAME": "cdrom"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/cdrom") + assert not disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_reject_smaller_than_5gb(self, fake_call, device_info): + data = {"/dev/sda": {"size": 5368709119}} + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/sda") + assert not disk.available, 'too small device is available' + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_accept_non_readonly_device(self, fake_call, device_info): + data = {"/dev/sda": {"ro": 0, "size": 5368709120}} + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(devices=data,lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.available + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_reject_bluestore_device(self, fake_call, monkeypatch, patch_bluestore_label, device_info): + patch_bluestore_label.return_value = True + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(lsblk=lsblk) + disk = device.Device("/dev/sda") + assert not disk.available + assert "Has BlueStore device label" in disk.rejected_reasons + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_reject_device_with_oserror(self, fake_call, monkeypatch, patch_bluestore_label, device_info): + patch_bluestore_label.side_effect = OSError('test failure') + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(lsblk=lsblk) + disk = device.Device("/dev/sda") + assert not disk.available + assert "Failed to determine if device is BlueStore" in disk.rejected_reasons + + @pytest.mark.usefixtures("lsblk_ceph_disk_member", + "device_info_not_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_not_ceph_disk_member_lsblk(self, fake_call, patch_bluestore_label): + disk = device.Device("/dev/sda") + assert disk.is_ceph_disk_member is False + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_existing_vg_available(self, fake_call, monkeypatch, device_info): + vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1536, + vg_extent_size=4194304) + monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg]) + lsblk = {"TYPE": "disk", "NAME": "nvme0n1"} + data = {"/dev/nvme0n1": {"size": "6442450944"}} + lv = {"tags": {"ceph.osd_id": "1"}} + device_info(devices=data, lsblk=lsblk, lv=lv) + disk = device.Device("/dev/nvme0n1") + assert disk.available_lvm + assert not disk.available + assert not disk.available_raw + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_existing_vg_too_small(self, fake_call, monkeypatch, device_info): + vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=4, + vg_extent_size=1073741824) + monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg]) + lsblk = {"TYPE": "disk", "NAME": "nvme0n1"} + data = {"/dev/nvme0n1": {"size": "6442450944"}} + lv = {"tags": {"ceph.osd_id": "1"}} + device_info(devices=data, lsblk=lsblk, lv=lv) + disk = device.Device("/dev/nvme0n1") + assert not disk.available_lvm + assert not disk.available + assert not disk.available_raw + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_multiple_existing_vgs(self, fake_call, monkeypatch, device_info): + vg1 = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1000, + vg_extent_size=4194304) + vg2 = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=536, + vg_extent_size=4194304) + monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg1, vg2]) + lsblk = {"TYPE": "disk", "NAME": "nvme0n1"} + data = {"/dev/nvme0n1": {"size": "6442450944"}} + lv = {"tags": {"ceph.osd_id": "1"}} + device_info(devices=data, lsblk=lsblk, lv=lv) + disk = device.Device("/dev/nvme0n1") + assert disk.available_lvm + assert not disk.available + assert not disk.available_raw + + @pytest.mark.parametrize("ceph_type", ["data", "block"]) + def test_used_by_ceph(self, fake_call, device_info, + monkeypatch, ceph_type): + data = {"/dev/sda": {"foo": "bar"}} + lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"} + FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", + lv_uuid="0000", pv_tags={}, vg_name="vg") + pvolumes = [] + pvolumes.append(FooPVolume) + lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg", + "lv_uuid": "0000", "lv_tags": + "ceph.osd_id=0,ceph.type="+ceph_type} + volumes = [] + lv = api.Volume(**lv_data) + volumes.append(lv) + monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes) + monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: + deepcopy(volumes)) + + device_info(devices=data, lsblk=lsblk, lv=lv_data) + vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6, + vg_extent_size=1073741824) + monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) + disk = device.Device("/dev/sda") + assert disk.used_by_ceph + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_not_used_by_ceph(self, fake_call, device_info, monkeypatch): + FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") + pvolumes = [] + pvolumes.append(FooPVolume) + data = {"/dev/sda": {"foo": "bar"}} + lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"} + lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}} + monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes) + + device_info(devices=data, lsblk=lsblk, lv=lv_data) + disk = device.Device("/dev/sda") + assert not disk.used_by_ceph + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_get_device_id(self, fake_call, device_info): + udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']} + lsblk = {"TYPE": "disk", "NAME": "sda"} + device_info(udevadm=udev,lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL' + + def test_has_bluestore_label(self): + # patch device.Device __init__ function to do nothing since we want to only test the + # low-level behavior of has_bluestore_label + with patch.object(device.Device, "__init__", lambda self, path, with_lsm=False: None): + disk = device.Device("/dev/sda") + disk.path = "/dev/sda" + with patch('builtins.open', mock_open(read_data=b'bluestore block device\n')): + assert disk.has_bluestore_label + with patch('builtins.open', mock_open(read_data=b'not a bluestore block device\n')): + assert not disk.has_bluestore_label + + +class TestDeviceEncryption(object): + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_partition_is_not_encrypted_lsblk(self, fake_call, device_info): + lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'NAME': 'sda', 'PKNAME': 'sda'} + device_info(lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.is_encrypted is False + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_partition_is_encrypted_lsblk(self, fake_call, device_info): + lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'PKNAME': 'sda'} + device_info(lsblk=lsblk) + disk = device.Device("/dev/sda") + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_partition_is_not_encrypted_blkid(self, fake_call, device_info): + lsblk = {'TYPE': 'part', 'NAME': 'sda', 'PKNAME': 'sda'} + blkid = {'TYPE': 'ceph data'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + assert disk.is_encrypted is False + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_partition_is_encrypted_blkid(self, fake_call, device_info): + lsblk = {'TYPE': 'part', 'NAME': 'sda' ,'PKNAME': 'sda'} + blkid = {'TYPE': 'crypto_LUKS'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_mapper_is_encrypted_luks1(self, fake_call, device_info, monkeypatch): + status = {'type': 'LUKS1'} + monkeypatch.setattr(device, 'encryption_status', lambda x: status) + lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid','TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/mapper/uuid") + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_mapper_is_encrypted_luks2(self, fake_call, device_info, monkeypatch): + status = {'type': 'LUKS2'} + monkeypatch.setattr(device, 'encryption_status', lambda x: status) + lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/mapper/uuid") + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_mapper_is_encrypted_plain(self, fake_call, device_info, monkeypatch): + status = {'type': 'PLAIN'} + monkeypatch.setattr(device, 'encryption_status', lambda x: status) + lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/mapper/uuid") + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_mapper_is_not_encrypted_plain(self, fake_call, device_info, monkeypatch): + monkeypatch.setattr(device, 'encryption_status', lambda x: {}) + lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/mapper/uuid") + assert disk.is_encrypted is False + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_lv_is_encrypted_blkid(self, fake_call, device_info): + lsblk = {'TYPE': 'lvm', 'NAME': 'sda'} + blkid = {'TYPE': 'crypto_LUKS'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + disk.lv_api = {} + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_lv_is_not_encrypted_blkid(self, fake_call, factory, device_info): + lsblk = {'TYPE': 'lvm', 'NAME': 'sda'} + blkid = {'TYPE': 'xfs'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + disk.lv_api = factory(encrypted=None) + assert disk.is_encrypted is False + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_lv_is_encrypted_lsblk(self, fake_call, device_info): + lsblk = {'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + disk.lv_api = {} + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_lv_is_not_encrypted_lsblk(self, fake_call, factory, device_info): + lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + disk.lv_api = factory(encrypted=None) + assert disk.is_encrypted is False + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_lv_is_encrypted_lvm_api(self, fake_call, factory, device_info): + lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + disk.lv_api = factory(encrypted=True) + assert disk.is_encrypted is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_lv_is_not_encrypted_lvm_api(self, fake_call, factory, device_info): + lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'} + blkid = {'TYPE': 'mapper'} + device_info(lsblk=lsblk, blkid=blkid) + disk = device.Device("/dev/sda") + disk.lv_api = factory(encrypted=False) + assert disk.is_encrypted is False + + +class TestDeviceOrdering(object): + + def setup(self): + self.data = { + "/dev/sda": {"removable": 0}, + "/dev/sdb": {"removable": 1}, # invalid + "/dev/sdc": {"removable": 0}, + "/dev/sdd": {"removable": 1}, # invalid + } + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_valid_before_invalid(self, fake_call, device_info): + lsblk_sda = {"NAME": "sda", "TYPE": "disk"} + lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"} + device_info(devices=self.data,lsblk=lsblk_sda) + sda = device.Device("/dev/sda") + device_info(devices=self.data,lsblk=lsblk_sdb) + sdb = device.Device("/dev/sdb") + + assert sda < sdb + assert sdb > sda + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_valid_alphabetical_ordering(self, fake_call, device_info): + lsblk_sda = {"NAME": "sda", "TYPE": "disk"} + lsblk_sdc = {"NAME": "sdc", "TYPE": "disk"} + device_info(devices=self.data,lsblk=lsblk_sda) + sda = device.Device("/dev/sda") + device_info(devices=self.data,lsblk=lsblk_sdc) + sdc = device.Device("/dev/sdc") + + assert sda < sdc + assert sdc > sda + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_invalid_alphabetical_ordering(self, fake_call, device_info): + lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"} + lsblk_sdd = {"NAME": "sdd", "TYPE": "disk"} + device_info(devices=self.data,lsblk=lsblk_sdb) + sdb = device.Device("/dev/sdb") + device_info(devices=self.data,lsblk=lsblk_sdd) + sdd = device.Device("/dev/sdd") + + assert sdb < sdd + assert sdd > sdb + + +class TestCephDiskDevice(object): + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_partlabel_lsblk(self, fake_call, device_info): + lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": ""} + device_info(lsblk=lsblk) + disk = device.CephDiskDevice(device.Device("/dev/sda")) + + assert disk.partlabel == '' + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_partlabel_blkid(self, fake_call, device_info): + lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph data"} + blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"} + device_info(blkid=blkid, lsblk=lsblk) + disk = device.CephDiskDevice(device.Device("/dev/sda")) + + assert disk.partlabel == 'ceph data' + + @pytest.mark.usefixtures("lsblk_ceph_disk_member", + "blkid_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_member_blkid(self, fake_call, monkeypatch): + disk = device.CephDiskDevice(device.Device("/dev/sda")) + + assert disk.is_member is True + + @pytest.mark.usefixtures("lsblk_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_is_member_lsblk(self, fake_call, patch_bluestore_label, device_info): + lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph"} + device_info(lsblk=lsblk) + disk = device.CephDiskDevice(device.Device("/dev/sda")) + + assert disk.is_member is True + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_unknown_type(self, fake_call, device_info): + lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "gluster"} + device_info(lsblk=lsblk) + disk = device.CephDiskDevice(device.Device("/dev/sda")) + + assert disk.type == 'unknown' + + ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block'] + + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + @pytest.mark.usefixtures("lsblk_ceph_disk_member", + "blkid_ceph_disk_member", + "disable_kernel_queries") + def test_type_blkid(self, monkeypatch, fake_call, device_info, ceph_partlabel): + disk = device.CephDiskDevice(device.Device("/dev/sda")) + + assert disk.type in self.ceph_types + + @pytest.mark.usefixtures("blkid_ceph_disk_member", + "lsblk_ceph_disk_member", + "disable_kernel_queries") + @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False) + def test_type_lsblk(self, fake_call, device_info, ceph_partlabel): + disk = device.CephDiskDevice(device.Device("/dev/sda")) + + assert disk.type in self.ceph_types -- cgit v1.2.3