path: root/src/ceph-volume/ceph_volume/devices/lvm
diff options
Diffstat (limited to 'src/ceph-volume/ceph_volume/devices/lvm')
11 files changed, 2537 insertions, 0 deletions
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..3c147123
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1 @@
+from .main import LVM # noqa
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..e4ac074a
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,370 @@
+from __future__ import print_function
+import argparse
+import logging
+import os
+from textwrap import dedent
+from ceph_volume import process, conf, decorators, terminal, __release__, configuration
+from ceph_volume.util import system, disk
+from ceph_volume.util import prepare as prepare_utils
+from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.systemd import systemctl
+from ceph_volume.api import lvm as api
+from .listing import direct_report
+logger = logging.getLogger(__name__)
+def activate_filestore(osd_lvs, no_systemd=False):
+ # find the osd
+ for osd_lv in osd_lvs:
+ if osd_lv.tags.get('ceph.type') == 'data':
+ data_lv = osd_lv
+ break
+ else:
+ raise RuntimeError('Unable to find a data LV for filestore activation')
+ is_encrypted = data_lv.tags.get('ceph.encrypted', '0') == '1'
+ is_vdo = data_lv.tags.get('ceph.vdo', '0')
+ osd_id = data_lv.tags['ceph.osd_id']
+ configuration.load_ceph_conf_path(data_lv.tags['ceph.cluster_name'])
+ configuration.load()
+ # it may have a volume with a journal
+ for osd_lv in osd_lvs:
+ if osd_lv.tags.get('ceph.type') == 'journal':
+ osd_journal_lv = osd_lv
+ break
+ else:
+ osd_journal_lv = None
+ # TODO: add sensible error reporting if this is ever the case
+ # blow up with a KeyError if this doesn't exist
+ osd_fsid = data_lv.tags['ceph.osd_fsid']
+ if not osd_journal_lv:
+ # must be a disk partition, by querying blkid by the uuid we are ensuring that the
+ # device path is always correct
+ journal_uuid = data_lv.tags['ceph.journal_uuid']
+ osd_journal = disk.get_device_from_partuuid(journal_uuid)
+ else:
+ journal_uuid = osd_journal_lv.lv_uuid
+ osd_journal = data_lv.tags['ceph.journal_device']
+ if not osd_journal:
+ raise RuntimeError('unable to detect an lv or device journal for OSD %s' % osd_id)
+ # this is done here, so that previous checks that ensure path availability
+ # and correctness can still be enforced, and report if any issues are found
+ if is_encrypted:
+ lockbox_secret = data_lv.tags['ceph.cephx_lockbox_secret']
+ # this keyring writing is idempotent
+ encryption_utils.write_lockbox_keyring(osd_id, osd_fsid, lockbox_secret)
+ dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
+ encryption_utils.luks_open(dmcrypt_secret, data_lv.lv_path, data_lv.lv_uuid)
+ encryption_utils.luks_open(dmcrypt_secret, osd_journal, journal_uuid)
+ osd_journal = '/dev/mapper/%s' % journal_uuid
+ source = '/dev/mapper/%s' % data_lv.lv_uuid
+ else:
+ source = data_lv.lv_path
+ # mount the osd
+ destination = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
+ if not system.device_is_mounted(source, destination=destination):
+ prepare_utils.mount_osd(source, osd_id, is_vdo=is_vdo)
+ # ensure that the OSD destination is always chowned properly
+ system.chown(destination)
+ # always re-do the symlink regardless if it exists, so that the journal
+ # device path that may have changed can be mapped correctly every time
+ destination = '/var/lib/ceph/osd/%s-%s/journal' % (conf.cluster, osd_id)
+['ln', '-snf', osd_journal, destination])
+ # make sure that the journal has proper permissions
+ system.chown(osd_journal)
+ if no_systemd is False:
+ # enable the ceph-volume unit for this OSD
+ systemctl.enable_volume(osd_id, osd_fsid, 'lvm')
+ # enable the OSD
+ systemctl.enable_osd(osd_id)
+ # start the OSD
+ systemctl.start_osd(osd_id)
+ terminal.success("ceph-volume lvm activate successful for osd ID: %s" % osd_id)
+def get_osd_device_path(osd_lvs, device_type, dmcrypt_secret=None):
+ """
+ ``device_type`` can be one of ``db``, ``wal`` or ``block`` so that we can
+ query LVs on system and fallback to querying the uuid if that is not
+ present.
+ Return a path if possible, failing to do that a ``None``, since some of
+ these devices are optional.
+ """
+ osd_block_lv = None
+ for lv in osd_lvs:
+ if lv.tags.get('ceph.type') == 'block':
+ osd_block_lv = lv
+ break
+ if osd_block_lv:
+ is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
+ logger.debug('Found block device (%s) with encryption: %s',, is_encrypted)
+ uuid_tag = 'ceph.%s_uuid' % device_type
+ device_uuid = osd_block_lv.tags.get(uuid_tag)
+ if not device_uuid:
+ return None
+ device_lv = None
+ for lv in osd_lvs:
+ if lv.tags.get('ceph.type') == device_type:
+ device_lv = lv
+ break
+ if device_lv:
+ if is_encrypted:
+ encryption_utils.luks_open(dmcrypt_secret, device_lv.lv_path, device_uuid)
+ return '/dev/mapper/%s' % device_uuid
+ return device_lv.lv_path
+ # this could be a regular device, so query it with blkid
+ physical_device = disk.get_device_from_partuuid(device_uuid)
+ if physical_device:
+ if is_encrypted:
+ encryption_utils.luks_open(dmcrypt_secret, physical_device, device_uuid)
+ return '/dev/mapper/%s' % device_uuid
+ return physical_device
+ raise RuntimeError('could not find %s with uuid %s' % (device_type, device_uuid))
+def activate_bluestore(osd_lvs, no_systemd=False):
+ for lv in osd_lvs:
+ if lv.tags.get('ceph.type') == 'block':
+ osd_block_lv = lv
+ break
+ else:
+ raise RuntimeError('could not find a bluestore OSD to activate')
+ is_encrypted = osd_block_lv.tags.get('ceph.encrypted', '0') == '1'
+ dmcrypt_secret = None
+ osd_id = osd_block_lv.tags['ceph.osd_id']
+ conf.cluster = osd_block_lv.tags['ceph.cluster_name']
+ osd_fsid = osd_block_lv.tags['ceph.osd_fsid']
+ # mount on tmpfs the osd directory
+ osd_path = '/var/lib/ceph/osd/%s-%s' % (conf.cluster, osd_id)
+ if not system.path_is_mounted(osd_path):
+ # mkdir -p and mount as tmpfs
+ prepare_utils.create_osd_path(osd_id, tmpfs=True)
+ # XXX This needs to be removed once ceph-bluestore-tool can deal with
+ # symlinks that exist in the osd dir
+ for link_name in ['block', 'block.db', 'block.wal']:
+ link_path = os.path.join(osd_path, link_name)
+ if os.path.exists(link_path):
+ os.unlink(os.path.join(osd_path, link_name))
+ # encryption is handled here, before priming the OSD dir
+ if is_encrypted:
+ osd_lv_path = '/dev/mapper/%s' % osd_block_lv.lv_uuid
+ lockbox_secret = osd_block_lv.tags['ceph.cephx_lockbox_secret']
+ encryption_utils.write_lockbox_keyring(osd_id, osd_fsid, lockbox_secret)
+ dmcrypt_secret = encryption_utils.get_dmcrypt_key(osd_id, osd_fsid)
+ encryption_utils.luks_open(dmcrypt_secret, osd_block_lv.lv_path, osd_block_lv.lv_uuid)
+ else:
+ osd_lv_path = osd_block_lv.lv_path
+ db_device_path = get_osd_device_path(osd_lvs, 'db', dmcrypt_secret=dmcrypt_secret)
+ wal_device_path = get_osd_device_path(osd_lvs, 'wal', dmcrypt_secret=dmcrypt_secret)
+ # Once symlinks are removed, the osd dir can be 'primed again. chown first,
+ # regardless of what currently exists so that ``prime-osd-dir`` can succeed
+ # even if permissions are somehow messed up
+ system.chown(osd_path)
+ prime_command = [
+ 'ceph-bluestore-tool', '--cluster=%s' % conf.cluster,
+ 'prime-osd-dir', '--dev', osd_lv_path,
+ '--path', osd_path]
+ if __release__ != "luminous":
+ # mon-config changes are not available in Luminous
+ prime_command.append('--no-mon-config')
+ # always re-do the symlink regardless if it exists, so that the block,
+ # block.wal, and block.db devices that may have changed can be mapped
+ # correctly every time
+['ln', '-snf', osd_lv_path, os.path.join(osd_path, 'block')])
+ system.chown(os.path.join(osd_path, 'block'))
+ system.chown(osd_path)
+ if db_device_path:
+ destination = os.path.join(osd_path, 'block.db')
+['ln', '-snf', db_device_path, destination])
+ system.chown(db_device_path)
+ system.chown(destination)
+ if wal_device_path:
+ destination = os.path.join(osd_path, 'block.wal')
+['ln', '-snf', wal_device_path, destination])
+ system.chown(wal_device_path)
+ system.chown(destination)
+ if no_systemd is False:
+ # enable the ceph-volume unit for this OSD
+ systemctl.enable_volume(osd_id, osd_fsid, 'lvm')
+ # enable the OSD
+ systemctl.enable_osd(osd_id)
+ # start the OSD
+ systemctl.start_osd(osd_id)
+ terminal.success("ceph-volume lvm activate successful for osd ID: %s" % osd_id)
+class Activate(object):
+ help = 'Discover and mount the LVM device associated with an OSD ID and start the Ceph OSD'
+ def __init__(self, argv):
+ self.argv = argv
+ @decorators.needs_root
+ def activate_all(self, args):
+ listed_osds = direct_report()
+ osds = {}
+ for osd_id, devices in listed_osds.items():
+ # the metadata for all devices in each OSD will contain
+ # the FSID which is required for activation
+ for device in devices:
+ fsid = device.get('tags', {}).get('ceph.osd_fsid')
+ if fsid:
+ osds[fsid] = osd_id
+ break
+ if not osds:
+ terminal.warning('Was unable to find any OSDs to activate')
+ terminal.warning('Verify OSDs are present with "ceph-volume lvm list"')
+ return
+ for osd_fsid, osd_id in osds.items():
+ if systemctl.osd_is_active(osd_id):
+ terminal.warning(
+ 'OSD ID %s FSID %s process is active. Skipping activation' % (osd_id, osd_fsid)
+ )
+ else:
+'Activating OSD ID %s FSID %s' % (osd_id, osd_fsid))
+ self.activate(args, osd_id=osd_id, osd_fsid=osd_fsid)
+ @decorators.needs_root
+ def activate(self, args, osd_id=None, osd_fsid=None):
+ """
+ :param args: The parsed arguments coming from the CLI
+ :param osd_id: When activating all, this gets populated with an
+ existing OSD ID
+ :param osd_fsid: When activating all, this gets populated with an
+ existing OSD FSID
+ """
+ osd_id = osd_id if osd_id else args.osd_id
+ osd_fsid = osd_fsid if osd_fsid else args.osd_fsid
+ if osd_id and osd_fsid:
+ tags = {'ceph.osd_id': osd_id, 'ceph.osd_fsid': osd_fsid}
+ elif not osd_id and osd_fsid:
+ tags = {'ceph.osd_fsid': osd_fsid}
+ lvs = api.get_lvs(tags=tags)
+ if not lvs:
+ raise RuntimeError('could not find osd.%s with osd_fsid %s' %
+ (osd_id, osd_fsid))
+ # This argument is only available when passed in directly or via
+ # systemd, not when ``create`` is being used
+ if getattr(args, 'auto_detect_objectstore', False):
+'auto detecting objectstore')
+ # may get multiple lvs, so can't do get_the_lvs() calls here
+ for lv in lvs:
+ has_journal = lv.tags.get('ceph.journal_uuid')
+ if has_journal:
+'found a journal associated with the OSD, '
+ 'assuming filestore')
+ return activate_filestore(lvs, args.no_systemd)
+'unable to find a journal associated with the OSD, '
+ 'assuming bluestore')
+ return activate_bluestore(lvs, args.no_systemd)
+ if args.bluestore:
+ activate_bluestore(lvs, args.no_systemd)
+ elif args.filestore:
+ activate_filestore(lvs, args.no_systemd)
+ def main(self):
+ sub_command_help = dedent("""
+ Activate OSDs by discovering them with LVM and mounting them in their
+ appropriate destination:
+ ceph-volume lvm activate {ID} {FSID}
+ The lvs associated with the OSD need to have been prepared previously,
+ so that all needed tags and metadata exist.
+ When migrating OSDs, or a multiple-osd activation is needed, the
+ ``--all`` flag can be used instead of the individual ID and FSID:
+ ceph-volume lvm activate --all
+ """)
+ parser = argparse.ArgumentParser(
+ prog='ceph-volume lvm activate',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=sub_command_help,
+ )
+ parser.add_argument(
+ 'osd_id',
+ metavar='ID',
+ nargs='?',
+ help='The ID of the OSD, usually an integer, like 0'
+ )
+ parser.add_argument(
+ 'osd_fsid',
+ metavar='FSID',
+ nargs='?',
+ help='The FSID of the OSD, similar to a SHA1'
+ )
+ parser.add_argument(
+ '--auto-detect-objectstore',
+ action='store_true',
+ help='Autodetect the objectstore by inspecting the OSD',
+ )
+ parser.add_argument(
+ '--bluestore',
+ action='store_true',
+ help='bluestore objectstore (default)',
+ )
+ parser.add_argument(
+ '--filestore',
+ action='store_true',
+ help='filestore objectstore',
+ )
+ parser.add_argument(
+ '--all',
+ dest='activate_all',
+ action='store_true',
+ help='Activate all OSDs found in the system',
+ )
+ parser.add_argument(
+ '--no-systemd',
+ dest='no_systemd',
+ action='store_true',
+ help='Skip creating and enabling systemd units and starting OSD services',
+ )
+ if len(self.argv) == 0:
+ print(sub_command_help)
+ return
+ args = parser.parse_args(self.argv)
+ # Default to bluestore here since defaulting it in add_argument may
+ # cause both to be True
+ if not args.bluestore and not args.filestore:
+ args.bluestore = True
+ if args.activate_all:
+ self.activate_all(args)
+ else:
+ self.activate(args)
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..40c0fea4
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,628 @@
+import argparse
+from collections import namedtuple
+import json
+import logging
+from textwrap import dedent
+from ceph_volume import terminal, decorators
+from ceph_volume.util import disk, prompt_bool, arg_validators, templates
+from ceph_volume.util import prepare
+from . import common
+from .create import Create
+from .prepare import Prepare
+mlogger = terminal.MultiLogger(__name__)
+logger = logging.getLogger(__name__)
+device_list_template = """
+ * {path: <25} {size: <10} {state}"""
+def device_formatter(devices):
+ lines = []
+ for path, details in devices:
+ lines.append(device_list_template.format(
+ path=path, size=details['human_readable_size'],
+ state='solid' if details['rotational'] == '0' else 'rotational')
+ )
+ return ''.join(lines)
+def ensure_disjoint_device_lists(data, db=[], wal=[], journal=[]):
+ # check that all device lists are disjoint with each other
+ if not all([set(data).isdisjoint(set(db)),
+ set(data).isdisjoint(set(wal)),
+ set(data).isdisjoint(set(journal)),
+ set(db).isdisjoint(set(wal))]):
+ raise Exception('Device lists are not disjoint')
+def separate_devices_from_lvs(devices):
+ phys = []
+ lvm = []
+ for d in devices:
+ phys.append(d) if d.is_device else lvm.append(d)
+ return phys, lvm
+def get_physical_osds(devices, args):
+ '''
+ Goes through passed physical devices and assigns OSDs
+ '''
+ data_slots = args.osds_per_device
+ if args.data_slots:
+ data_slots = max(args.data_slots, args.osds_per_device)
+ rel_data_size = 1.0 / data_slots
+ mlogger.debug('relative data size: {}'.format(rel_data_size))
+ ret = []
+ for dev in devices:
+ if dev.available_lvm:
+ dev_size = dev.vg_size[0]
+ abs_size = disk.Size(b=int(dev_size * rel_data_size))
+ free_size = dev.vg_free[0]
+ for _ in range(args.osds_per_device):
+ if abs_size > free_size:
+ break
+ free_size -= abs_size.b
+ osd_id = None
+ if args.osd_ids:
+ osd_id = args.osd_ids.pop()
+ ret.append(Batch.OSD(dev.path,
+ rel_data_size,
+ abs_size,
+ args.osds_per_device,
+ osd_id,
+ 'dmcrypt' if args.dmcrypt else None))
+ return ret
+def get_lvm_osds(lvs, args):
+ '''
+ Goes through passed LVs and assigns planned osds
+ '''
+ ret = []
+ for lv in lvs:
+ if lv.used_by_ceph:
+ continue
+ osd_id = None
+ if args.osd_ids:
+ osd_id = args.osd_ids.pop()
+ osd = Batch.OSD("{}/{}".format(lv.vg_name, lv.lv_name),
+ 100.0,
+ disk.Size(b=int(lv.lvs[0].lv_size)),
+ 1,
+ osd_id,
+ 'dmcrypt' if args.dmcrypt else None)
+ ret.append(osd)
+ return ret
+def get_physical_fast_allocs(devices, type_, fast_slots_per_device, new_osds, args):
+ requested_slots = getattr(args, '{}_slots'.format(type_))
+ if not requested_slots or requested_slots < fast_slots_per_device:
+ if requested_slots:
+'{}_slots argument is too small, ignoring'.format(type_))
+ requested_slots = fast_slots_per_device
+ requested_size = getattr(args, '{}_size'.format(type_), 0)
+ if requested_size == 0:
+ # no size argument was specified, check ceph.conf
+ get_size_fct = getattr(prepare, 'get_{}_size'.format(type_))
+ requested_size = get_size_fct(lv_format=False)
+ ret = []
+ for dev in devices:
+ if not dev.available_lvm:
+ continue
+ # any LV present is considered a taken slot
+ occupied_slots = len(dev.lvs)
+ # this only looks at the first vg on device, unsure if there is a better
+ # way
+ dev_size = dev.vg_size[0]
+ abs_size = disk.Size(b=int(dev_size / requested_slots))
+ free_size = dev.vg_free[0]
+ relative_size = int(abs_size) / dev_size
+ if requested_size:
+ if requested_size <= abs_size:
+ abs_size = requested_size
+ else:
+ mlogger.error(
+ '{} was requested for {}, but only {} can be fulfilled'.format(
+ requested_size,
+ '{}_size'.format(type_),
+ abs_size,
+ ))
+ exit(1)
+ while abs_size <= free_size and len(ret) < new_osds and occupied_slots < fast_slots_per_device:
+ free_size -= abs_size.b
+ occupied_slots += 1
+ ret.append((dev.path, relative_size, abs_size, requested_slots))
+ return ret
+def get_lvm_fast_allocs(lvs):
+ return [("{}/{}".format(d.vg_name, d.lv_name), 100.0,
+ disk.Size(b=int(d.lvs[0].lv_size)), 1) for d in lvs if not
+ d.used_by_ceph]
+class Batch(object):
+ help = 'Automatically size devices for multi-OSD provisioning with minimal interaction'
+ _help = dedent("""
+ Automatically size devices ready for OSD provisioning based on default strategies.
+ Usage:
+ ceph-volume lvm batch [DEVICE...]
+ Devices can be physical block devices or LVs.
+ Optional reporting on possible outcomes is enabled with --report
+ ceph-volume lvm batch --report [DEVICE...]
+ """)
+ def __init__(self, argv):
+ parser = argparse.ArgumentParser(
+ prog='ceph-volume lvm batch',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=self._help,
+ )
+ parser.add_argument(
+ 'devices',
+ metavar='DEVICES',
+ nargs='*',
+ type=arg_validators.ValidBatchDevice(),
+ default=[],
+ help='Devices to provision OSDs',
+ )
+ parser.add_argument(
+ '--db-devices',
+ nargs='*',
+ type=arg_validators.ValidBatchDevice(),
+ default=[],
+ help='Devices to provision OSDs db volumes',
+ )
+ parser.add_argument(
+ '--wal-devices',
+ nargs='*',
+ type=arg_validators.ValidBatchDevice(),
+ default=[],
+ help='Devices to provision OSDs wal volumes',
+ )
+ parser.add_argument(
+ '--journal-devices',
+ nargs='*',
+ type=arg_validators.ValidBatchDevice(),
+ default=[],
+ help='Devices to provision OSDs journal volumes',
+ )
+ parser.add_argument(
+ '--auto',
+ action='store_true',
+ help=('deploy multi-device OSDs if rotational and non-rotational drives '
+ 'are passed in DEVICES'),
+ default=True
+ )
+ parser.add_argument(
+ '--no-auto',
+ action='store_false',
+ dest='auto',
+ help=('deploy standalone OSDs if rotational and non-rotational drives '
+ 'are passed in DEVICES'),
+ )
+ parser.add_argument(
+ '--bluestore',
+ action='store_true',
+ help='bluestore objectstore (default)',
+ )
+ parser.add_argument(
+ '--filestore',
+ action='store_true',
+ help='filestore objectstore',
+ )
+ parser.add_argument(
+ '--report',
+ action='store_true',
+ help='Only report on OSD that would be created and exit',
+ )
+ parser.add_argument(
+ '--yes',
+ action='store_true',
+ help='Avoid prompting for confirmation when provisioning',
+ )
+ parser.add_argument(
+ '--format',
+ help='output format, defaults to "pretty"',
+ default='pretty',
+ choices=['json', 'json-pretty', 'pretty'],
+ )
+ parser.add_argument(
+ '--dmcrypt',
+ action='store_true',
+ help='Enable device encryption via dm-crypt',
+ )
+ parser.add_argument(
+ '--crush-device-class',
+ dest='crush_device_class',
+ help='Crush device class to assign this OSD to',
+ )
+ parser.add_argument(
+ '--no-systemd',
+ dest='no_systemd',
+ action='store_true',
+ help='Skip creating and enabling systemd units and starting OSD services',
+ )
+ parser.add_argument(
+ '--osds-per-device',
+ type=int,
+ default=1,
+ help='Provision more than 1 (the default) OSD per device',
+ )
+ parser.add_argument(
+ '--data-slots',
+ type=int,
+ help=('Provision more than 1 (the default) OSD slot per device'
+ ' if more slots then osds-per-device are specified, slots'
+ 'will stay unoccupied'),
+ )
+ parser.add_argument(
+ '--block-db-size',
+ type=disk.Size.parse,
+ help='Set (or override) the "bluestore_block_db_size" value, in bytes'
+ )
+ parser.add_argument(
+ '--block-db-slots',
+ type=int,
+ help='Provision slots on DB device, can remain unoccupied'
+ )
+ parser.add_argument(
+ '--block-wal-size',
+ type=disk.Size.parse,
+ help='Set (or override) the "bluestore_block_wal_size" value, in bytes'
+ )
+ parser.add_argument(
+ '--block-wal-slots',
+ type=int,
+ help='Provision slots on WAL device, can remain unoccupied'
+ )
+ def journal_size_in_mb_hack(size):
+ # TODO give user time to adjust, then remove this
+ if size and size[-1].isdigit():
+ mlogger.warning('DEPRECATION NOTICE')
+ mlogger.warning('--journal-size as integer is parsed as megabytes')
+ mlogger.warning('A future release will parse integers as bytes')
+ mlogger.warning('Add a "M" to explicitly pass a megabyte size')
+ size += 'M'
+ return disk.Size.parse(size)
+ parser.add_argument(
+ '--journal-size',
+ type=journal_size_in_mb_hack,
+ help='Override the "osd_journal_size" value, in megabytes'
+ )
+ parser.add_argument(
+ '--journal-slots',
+ type=int,
+ help='Provision slots on journal device, can remain unoccupied'
+ )
+ parser.add_argument(
+ '--prepare',
+ action='store_true',
+ help='Only prepare all OSDs, do not activate',
+ )
+ parser.add_argument(
+ '--osd-ids',
+ nargs='*',
+ default=[],
+ help='Reuse existing OSD ids',
+ )
+ self.args = parser.parse_args(argv)
+ self.parser = parser
+ for dev_list in ['', 'db_', 'wal_', 'journal_']:
+ setattr(self, '{}usable'.format(dev_list), [])
+ def report(self, plan):
+ report = self._create_report(plan)
+ print(report)
+ def _create_report(self, plan):
+ if self.args.format == 'pretty':
+ report = ''
+ report += templates.total_osds.format(total_osds=len(plan))
+ report += templates.osd_component_titles
+ for osd in plan:
+ report += templates.osd_header
+ report +=
+ return report
+ else:
+ json_report = []
+ for osd in plan:
+ json_report.append(osd.report_json())
+ if self.args.format == 'json':
+ return json.dumps(json_report)
+ elif self.args.format == 'json-pretty':
+ return json.dumps(json_report, indent=4,
+ sort_keys=True)
+ def _check_slot_args(self):
+ '''
+ checking if -slots args are consistent with other arguments
+ '''
+ if self.args.data_slots and self.args.osds_per_device:
+ if self.args.data_slots < self.args.osds_per_device:
+ raise ValueError('data_slots is smaller then osds_per_device')
+ def _sort_rotational_disks(self):
+ '''
+ Helper for legacy auto behaviour.
+ Sorts drives into rotating and non-rotating, the latter being used for
+ db or journal.
+ '''
+ mlogger.warning('DEPRECATION NOTICE')
+ mlogger.warning('You are using the legacy automatic disk sorting behavior')
+ mlogger.warning('The Pacific release will change the default to --no-auto')
+ rotating = []
+ ssd = []
+ for d in self.args.devices:
+ rotating.append(d) if d.rotational else ssd.append(d)
+ if ssd and not rotating:
+ # no need for additional sorting, we'll only deploy standalone on ssds
+ return
+ self.args.devices = rotating
+ if self.args.filestore:
+ self.args.journal_devices = ssd
+ else:
+ self.args.db_devices = ssd
+ @decorators.needs_root
+ def main(self):
+ if not self.args.devices:
+ return self.parser.print_help()
+ # Default to bluestore here since defaulting it in add_argument may
+ # cause both to be True
+ if not self.args.bluestore and not self.args.filestore:
+ self.args.bluestore = True
+ if ( and not self.args.db_devices and not
+ self.args.wal_devices and not self.args.journal_devices):
+ self._sort_rotational_disks()
+ self._check_slot_args()
+ ensure_disjoint_device_lists(self.args.devices,
+ self.args.db_devices,
+ self.args.wal_devices,
+ self.args.journal_devices)
+ plan = self.get_plan(self.args)
+ if
+ return 0
+ if not self.args.yes:
+'The above OSDs would be created if the operation continues')
+ if not prompt_bool('do you want to proceed? (yes/no)'):
+ terminal.error('aborting OSD provisioning')
+ raise SystemExit(0)
+ self._execute(plan)
+ def _execute(self, plan):
+ defaults = common.get_default_args()
+ global_args = [
+ 'bluestore',
+ 'filestore',
+ 'dmcrypt',
+ 'crush_device_class',
+ 'no_systemd',
+ ]
+ defaults.update({arg: getattr(self.args, arg) for arg in global_args})
+ for osd in plan:
+ args = osd.get_args(defaults)
+ if self.args.prepare:
+ p = Prepare([])
+ p.safe_prepare(argparse.Namespace(**args))
+ else:
+ c = Create([])
+ c.create(argparse.Namespace(**args))
+ def get_plan(self, args):
+ if args.bluestore:
+ plan = self.get_deployment_layout(args, args.devices, args.db_devices,
+ args.wal_devices)
+ elif args.filestore:
+ plan = self.get_deployment_layout(args, args.devices, args.journal_devices)
+ return plan
+ def get_deployment_layout(self, args, devices, fast_devices=[],
+ very_fast_devices=[]):
+ '''
+ The methods here are mostly just organization, error reporting and
+ setting up of (default) args. The heavy lifting code for the deployment
+ layout can be found in the static get_*_osds and get_*_fast_allocs
+ functions.
+ '''
+ plan = []
+ phys_devs, lvm_devs = separate_devices_from_lvs(devices)
+ mlogger.debug(('passed data devices: {} physical,'
+ ' {} LVM').format(len(phys_devs), len(lvm_devs)))
+ plan.extend(get_physical_osds(phys_devs, args))
+ plan.extend(get_lvm_osds(lvm_devs, args))
+ num_osds = len(plan)
+ if num_osds == 0:
+'All data devices are unavailable')
+ return plan
+ requested_osds = args.osds_per_device * len(phys_devs) + len(lvm_devs)
+ fast_type = 'block_db' if args.bluestore else 'journal'
+ fast_allocations = self.fast_allocations(fast_devices,
+ requested_osds,
+ num_osds,
+ fast_type)
+ if fast_devices and not fast_allocations:
+'{} fast devices were passed, but none are available'.format(len(fast_devices)))
+ return []
+ if fast_devices and not len(fast_allocations) == num_osds:
+ mlogger.error('{} fast allocations != {} num_osds'.format(
+ len(fast_allocations), num_osds))
+ exit(1)
+ very_fast_allocations = self.fast_allocations(very_fast_devices,
+ requested_osds,
+ num_osds,
+ 'block_wal')
+ if very_fast_devices and not very_fast_allocations:
+'{} very fast devices were passed, but none are available'.format(len(very_fast_devices)))
+ return []
+ if very_fast_devices and not len(very_fast_allocations) == num_osds:
+ mlogger.error('{} very fast allocations != {} num_osds'.format(
+ len(very_fast_allocations), num_osds))
+ exit(1)
+ for osd in plan:
+ if fast_devices:
+ osd.add_fast_device(*fast_allocations.pop(),
+ type_=fast_type)
+ if very_fast_devices and args.bluestore:
+ osd.add_very_fast_device(*very_fast_allocations.pop())
+ return plan
+ def fast_allocations(self, devices, requested_osds, new_osds, type_):
+ ret = []
+ if not devices:
+ return ret
+ phys_devs, lvm_devs = separate_devices_from_lvs(devices)
+ mlogger.debug(('passed {} devices: {} physical,'
+ ' {} LVM').format(type_, len(phys_devs), len(lvm_devs)))
+ ret.extend(get_lvm_fast_allocs(lvm_devs))
+ # fill up uneven distributions across fast devices: 5 osds and 2 fast
+ # devices? create 3 slots on each device rather then deploying
+ # heterogeneous osds
+ if (requested_osds - len(lvm_devs)) % len(phys_devs):
+ fast_slots_per_device = int((requested_osds - len(lvm_devs)) / len(phys_devs)) + 1
+ else:
+ fast_slots_per_device = int((requested_osds - len(lvm_devs)) / len(phys_devs))
+ ret.extend(get_physical_fast_allocs(phys_devs,
+ type_,
+ fast_slots_per_device,
+ new_osds,
+ self.args))
+ return ret
+ class OSD(object):
+ '''
+ This class simply stores info about to-be-deployed OSDs and provides an
+ easy way to retrieve the necessary create arguments.
+ '''
+ VolSpec = namedtuple('VolSpec',
+ ['path',
+ 'rel_size',
+ 'abs_size',
+ 'slots',
+ 'type_'])
+ def __init__(self,
+ data_path,
+ rel_size,
+ abs_size,
+ slots,
+ id_,
+ encryption):
+ self.id_ = id_
+ = self.VolSpec(path=data_path,
+ rel_size=rel_size,
+ abs_size=abs_size,
+ slots=slots,
+ type_='data')
+ = None
+ self.very_fast = None
+ self.encryption = encryption
+ def add_fast_device(self, path, rel_size, abs_size, slots, type_):
+ = self.VolSpec(path=path,
+ rel_size=rel_size,
+ abs_size=abs_size,
+ slots=slots,
+ type_=type_)
+ def add_very_fast_device(self, path, rel_size, abs_size, slots):
+ self.very_fast = self.VolSpec(path=path,
+ rel_size=rel_size,
+ abs_size=abs_size,
+ slots=slots,
+ type_='block_wal')
+ def _get_osd_plan(self):
+ plan = {
+ 'data':,
+ 'data_size':,
+ 'encryption': self.encryption,
+ }
+ if
+ type_ ='.', '_')
+ plan.update(
+ {
+ type_:,
+ '{}_size'.format(type_):,
+ })
+ if self.very_fast:
+ plan.update(
+ {
+ 'block_wal': self.very_fast.path,
+ 'block_wal_size': self.very_fast.abs_size,
+ })
+ if self.id_:
+ plan.update({'osd_id': self.id_})
+ return plan
+ def get_args(self, defaults):
+ my_defaults = defaults.copy()
+ my_defaults.update(self._get_osd_plan())
+ return my_defaults
+ def report(self):
+ report = ''
+ if self.id_:
+ report += templates.osd_reused_id.format(
+ id_=self.id_)
+ if self.encryption:
+ report += templates.osd_encryption.format(
+ enc=self.encryption)
+ report += templates.osd_component.format(
+ if
+ report += templates.osd_component.format(
+ if self.very_fast:
+ report += templates.osd_component.format(
+ _type=self.very_fast.type_,
+ path=self.very_fast.path,
+ size=self.very_fast.abs_size,
+ percent=self.very_fast.rel_size)
+ return report
+ def report_json(self):
+ # cast all values to string so that the report can be dumped in to
+ # json.dumps
+ return {k: str(v) for k, v in self._get_osd_plan().items()}
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..06369e47
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,184 @@
+from ceph_volume.util import arg_validators, disk
+from ceph_volume import process, conf
+from ceph_volume import terminal
+import argparse
+def rollback_osd(args, osd_id=None):
+ """
+ When the process of creating or preparing fails, the OSD needs to be
+ destroyed so that the ID cane be reused. This is prevents leaving the ID
+ around as "used" on the monitor, which can cause confusion if expecting
+ sequential OSD IDs.
+ The usage of `destroy-new` allows this to be done without requiring the
+ admin keyring (otherwise needed for destroy and purge commands)
+ """
+ if not osd_id:
+ # it means that it wasn't generated, so there is nothing to rollback here
+ return
+ # once here, this is an error condition that needs to be rolled back
+ terminal.error('Was unable to complete a new OSD, will rollback changes')
+ osd_name = 'osd.%s'
+ bootstrap_keyring = '/var/lib/ceph/bootstrap-osd/%s.keyring' % conf.cluster
+ cmd = [
+ 'ceph',
+ '--cluster', conf.cluster,
+ '--name', 'client.bootstrap-osd',
+ '--keyring', bootstrap_keyring,
+ 'osd', 'purge-new', osd_name % osd_id,
+ '--yes-i-really-mean-it',
+ ]
+common_args = {
+ '--data': {
+ 'help': 'OSD data path. A physical device or logical volume',
+ 'required': True,
+ 'type': arg_validators.ValidDevice(as_string=True),
+ #'default':,
+ #'type':,
+ },
+ '--data-size': {
+ 'help': 'Size of data LV in case a device was passed in --data',
+ 'default': '0',
+ 'type': disk.Size.parse
+ },
+ '--data-slots': {
+ 'help': ('Intended number of slots on data device. The new OSD gets one'
+ 'of those slots or 1/nth of the available capacity'),
+ 'type': int,
+ 'default': 1,
+ },
+ '--osd-id': {
+ 'help': 'Reuse an existing OSD id',
+ 'default': None,
+ },
+ '--osd-fsid': {
+ 'help': 'Reuse an existing OSD fsid',
+ 'default': None,
+ },
+ '--cluster-fsid': {
+ 'help': 'Specify the cluster fsid, useful when no ceph.conf is available',
+ 'default': None,
+ },
+ '--crush-device-class': {
+ 'dest': 'crush_device_class',
+ 'help': 'Crush device class to assign this OSD to',
+ 'default': None,
+ },
+ '--dmcrypt': {
+ 'action': 'store_true',
+ 'help': 'Enable device encryption via dm-crypt',
+ },
+ '--no-systemd': {
+ 'dest': 'no_systemd',
+ 'action': 'store_true',
+ 'help': 'Skip creating and enabling systemd units and starting OSD services when activating',
+ },
+bluestore_args = {
+ '--bluestore': {
+ 'action': 'store_true',
+ 'help': 'Use the bluestore objectstore',
+ },
+ '--block.db': {
+ 'dest': 'block_db',
+ 'help': 'Path to bluestore block.db logical volume or device',
+ },
+ '--block.db-size': {
+ 'dest': 'block_db_size',
+ 'help': 'Size of block.db LV in case device was passed in --block.db',
+ 'default': '0',
+ 'type': disk.Size.parse
+ },
+ '--block.db-slots': {
+ 'dest': 'block_db_slots',
+ 'help': ('Intended number of slots on db device. The new OSD gets one'
+ 'of those slots or 1/nth of the available capacity'),
+ 'type': int,
+ 'default': 1,
+ },
+ '--block.wal': {
+ 'dest': 'block_wal',
+ 'help': 'Path to bluestore block.wal logical volume or device',
+ },
+ '--block.wal-size': {
+ 'dest': 'block_wal_size',
+ 'help': 'Size of block.wal LV in case device was passed in --block.wal',
+ 'default': '0',
+ 'type': disk.Size.parse
+ },
+ '--block.wal-slots': {
+ 'dest': 'block_wal_slots',
+ 'help': ('Intended number of slots on wal device. The new OSD gets one'
+ 'of those slots or 1/nth of the available capacity'),
+ 'type': int,
+ 'default': 1,
+ },
+filestore_args = {
+ '--filestore': {
+ 'action': 'store_true',
+ 'help': 'Use the filestore objectstore',
+ },
+ '--journal': {
+ 'help': 'A logical volume (vg_name/lv_name), or path to a device',
+ },
+ '--journal-size': {
+ 'help': 'Size of journal LV in case a raw block device was passed in --journal',
+ 'default': '0',
+ 'type': disk.Size.parse
+ },
+ '--journal-slots': {
+ 'help': ('Intended number of slots on journal device. The new OSD gets one'
+ 'of those slots or 1/nth of the available capacity'),
+ 'type': int,
+ 'default': 1,
+ },
+def get_default_args():
+ defaults = {}
+ def format_name(name):
+ return name.strip('-').replace('-', '_').replace('.', '_')
+ for argset in (common_args, filestore_args, bluestore_args):
+ defaults.update({format_name(name): val.get('default', None) for name, val in argset.items()})
+ return defaults
+def common_parser(prog, description):
+ """
+ Both prepare and create share the same parser, those are defined here to
+ avoid duplication
+ """
+ parser = argparse.ArgumentParser(
+ prog=prog,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=description,
+ )
+ filestore_group = parser.add_argument_group('filestore')
+ bluestore_group = parser.add_argument_group('bluestore')
+ for name, kwargs in common_args.items():
+ parser.add_argument(name, **kwargs)
+ for name, kwargs in bluestore_args.items():
+ bluestore_group.add_argument(name, **kwargs)
+ for name, kwargs in filestore_args.items():
+ filestore_group.add_argument(name, **kwargs)
+ # Do not parse args, so that consumers can do something before the args get
+ # parsed triggering argparse behavior
+ return parser
+create_parser = common_parser # noqa
+prepare_parser = common_parser # noqa
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..af2cd96c
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,77 @@
+from __future__ import print_function
+from textwrap import dedent
+import logging
+from ceph_volume.util import system
+from ceph_volume.util.arg_validators import exclude_group_options
+from ceph_volume import decorators, terminal
+from .common import create_parser, rollback_osd
+from .prepare import Prepare
+from .activate import Activate
+logger = logging.getLogger(__name__)
+class Create(object):
+ help = 'Create a new OSD from an LVM device'
+ def __init__(self, argv):
+ self.argv = argv
+ @decorators.needs_root
+ def create(self, args):
+ if not args.osd_fsid:
+ args.osd_fsid = system.generate_uuid()
+ prepare_step = Prepare([])
+ prepare_step.safe_prepare(args)
+ osd_id = prepare_step.osd_id
+ try:
+ # we try this for activate only when 'creating' an OSD, because a rollback should not
+ # happen when doing normal activation. For example when starting an OSD, systemd will call
+ # activate, which would never need to be rolled back.
+ Activate([]).activate(args)
+ except Exception:
+ logger.exception('lvm activate was unable to complete, while creating the OSD')
+'will rollback OSD ID creation')
+ rollback_osd(args, osd_id)
+ raise
+ terminal.success("ceph-volume lvm create successful for: %s" %
+ def main(self):
+ sub_command_help = dedent("""
+ Create an OSD by assigning an ID and FSID, registering them with the
+ cluster with an ID and FSID, formatting and mounting the volume, adding
+ all the metadata to the logical volumes using LVM tags, and starting
+ the OSD daemon. This is a convenience command that combines the prepare
+ and activate steps.
+ Encryption is supported via dmcrypt and the --dmcrypt flag.
+ Existing logical volume (lv):
+ ceph-volume lvm create --data {vg/lv}
+ Existing block device (a logical volume will be created):
+ ceph-volume lvm create --data /path/to/device
+ Optionally, can consume db and wal block devices, partitions or logical
+ volumes. A device will get a logical volume, partitions and existing
+ logical volumes will be used as is:
+ ceph-volume lvm create --data {vg/lv} --block.wal {partition} --block.db {/path/to/device}
+ """)
+ parser = create_parser(
+ prog='ceph-volume lvm create',
+ description=sub_command_help,
+ )
+ if len(self.argv) == 0:
+ print(sub_command_help)
+ return
+ exclude_group_options(parser, groups=['filestore', 'bluestore'], argv=self.argv)
+ args = parser.parse_args(self.argv)
+ # Default to bluestore here since defaulting it in add_argument may
+ # cause both to be True
+ if not args.bluestore and not args.filestore:
+ args.bluestore = True
+ self.create(args)
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..5de6dbe3
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,90 @@
+import argparse
+import logging
+import sys
+from textwrap import dedent
+from ceph_volume import conf
+from ceph_volume.util import encryption, system
+from ceph_volume.api.lvm import get_lvs_by_tag
+logger = logging.getLogger(__name__)
+def deactivate_osd(osd_id=None, osd_uuid=None):
+ lvs = []
+ if osd_uuid is not None:
+ lvs = get_lvs_by_tag('ceph.osd_fsid={}'.format(osd_uuid))
+ osd_id = next(lv.tags['ceph.osd_id'] for lv in lvs)
+ else:
+ lvs = get_lvs_by_tag('ceph.osd_id={}'.format(osd_id))
+ data_lv = next(lv for lv in lvs if lv.tags['ceph.type'] in ['data', 'block'])
+ conf.cluster = data_lv.tags['ceph.cluster_name']
+ logger.debug('Found cluster name {}'.format(conf.cluster))
+ tmpfs_path = '/var/lib/ceph/osd/{}-{}'.format(conf.cluster, osd_id)
+ system.unmount_tmpfs(tmpfs_path)
+ for lv in lvs:
+ if lv.tags.get('ceph.encrypted', '0') == '1':
+ encryption.dmcrypt_close(lv.lv_uuid)
+class Deactivate(object):
+ help = 'Deactivate OSDs'
+ def deactivate(self, args=None):
+ if args:
+ self.args = args
+ try:
+ deactivate_osd(self.args.osd_id, self.args.osd_uuid)
+ except StopIteration:
+ logger.error(('No data or block LV found for OSD'
+ '{}').format(self.args.osd_id))
+ sys.exit(1)
+ def __init__(self, argv):
+ self.argv = argv
+ def main(self):
+ sub_command_help = dedent("""
+ Deactivate unmounts and OSDs tmpfs and closes any crypt devices.
+ ceph-volume lvm deactivate {ID} {FSID}
+ To deactivate all volumes use the --all flag.
+ ceph-volume lvm deactivate --all
+ """)
+ parser = argparse.ArgumentParser(
+ prog='ceph-volume lvm deactivate',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=sub_command_help,
+ )
+ parser.add_argument(
+ 'osd_id',
+ nargs='?',
+ help='The ID of the OSD'
+ )
+ parser.add_argument(
+ 'osd_uuid',
+ nargs='?',
+ help='The UUID of the OSD, similar to a SHA1, takes precedence over osd_id'
+ )
+ # parser.add_argument(
+ # '--all',
+ # action='store_true',
+ # help='Deactivate all OSD volumes found in the system',
+ # )
+ if len(self.argv) == 0:
+ print(sub_command_help)
+ return
+ args = parser.parse_args(self.argv)
+ # Default to bluestore here since defaulting it in add_argument may
+ # cause both to be True
+ if not args.osd_id and not args.osd_uuid:
+ raise ValueError(('Can not identify OSD, pass either all or'
+ 'osd_id or osd_uuid'))
+ self.deactivate(args)
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..1ae8489f
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,223 @@
+from __future__ import print_function
+import argparse
+import json
+import logging
+import os.path
+from textwrap import dedent
+from ceph_volume import decorators
+from ceph_volume.api import lvm as api
+logger = logging.getLogger(__name__)
+osd_list_header_template = """\n
+osd_device_header_template = """
+ {type: <13} {path}
+device_metadata_item_template = """
+ {tag_name: <25} {value}"""
+def readable_tag(tag):
+ actual_name = tag.split('.')[-1]
+ return actual_name.replace('_', ' ')
+def pretty_report(report):
+ output = []
+ for osd_id, devices in sorted(report.items()):
+ output.append(
+ osd_list_header_template.format(osd_id=" osd.%s " % osd_id)
+ )
+ for device in devices:
+ output.append(
+ osd_device_header_template.format(
+ type='[%s]' % device['type'],
+ path=device['path']
+ )
+ )
+ for tag_name, value in sorted(device.get('tags', {}).items()):
+ output.append(
+ device_metadata_item_template.format(
+ tag_name=readable_tag(tag_name),
+ value=value
+ )
+ )
+ if not device.get('devices'):
+ continue
+ else:
+ output.append(
+ device_metadata_item_template.format(
+ tag_name='devices',
+ value=','.join(device['devices'])
+ )
+ )
+ print(''.join(output))
+def direct_report():
+ """
+ Other non-cli consumers of listing information will want to consume the
+ report without the need to parse arguments or other flags. This helper
+ bypasses the need to deal with the class interface which is meant for cli
+ handling.
+ """
+ return List([]).full_report()
+# TODO: Perhaps, get rid of this class and simplify this module further?
+class List(object):
+ help = 'list logical volumes and devices associated with Ceph'
+ def __init__(self, argv):
+ self.argv = argv
+ @decorators.needs_root
+ def list(self, args):
+ report = self.single_report(args.device) if args.device else \
+ self.full_report()
+ if args.format == 'json':
+ # If the report is empty, we don't return a non-zero exit status
+ # because it is assumed this is going to be consumed by automated
+ # systems like ceph-ansible which would be forced to ignore the
+ # non-zero exit status if all they need is the information in the
+ # JSON object
+ print(json.dumps(report, indent=4, sort_keys=True))
+ else:
+ if not report:
+ raise SystemExit('No valid Ceph lvm devices found')
+ pretty_report(report)
+ def create_report(self, lvs):
+ """
+ Create a report for LVM dev(s) passed. Returns '{}' to denote failure.
+ """
+ report = {}
+ for lv in lvs:
+ if not api.is_ceph_device(lv):
+ continue
+ osd_id = lv.tags['ceph.osd_id']
+ report.setdefault(osd_id, [])
+ lv_report = lv.as_dict()
+ pvs = api.get_pvs(filters={'lv_uuid': lv.lv_uuid})
+ lv_report['devices'] = [ for pv in pvs] if pvs else []
+ report[osd_id].append(lv_report)
+ phys_devs = self.create_report_non_lv_device(lv)
+ if phys_devs:
+ report[osd_id].append(phys_devs)
+ return report
+ def create_report_non_lv_device(self, lv):
+ report = {}
+ if lv.tags.get('ceph.type', '') in ['data', 'block']:
+ for dev_type in ['journal', 'wal', 'db']:
+ dev = lv.tags.get('ceph.{}_device'.format(dev_type), '')
+ # counting / in the device name seems brittle but should work,
+ # lvs will have 3
+ if dev and dev.count('/') == 2:
+ device_uuid = lv.tags.get('ceph.{}_uuid'.format(dev_type))
+ report = {'tags': {'PARTUUID': device_uuid},
+ 'type': dev_type,
+ 'path': dev}
+ return report
+ def full_report(self):
+ """
+ Create a report of all Ceph LVs. Returns '{}' to denote failure.
+ """
+ return self.create_report(api.get_lvs())
+ def single_report(self, device):
+ """
+ Generate a report for a single device. This can be either a logical
+ volume in the form of vg/lv or a device with an absolute path like
+ /dev/sda1 or /dev/sda. Returns '{}' to denote failure.
+ """
+ lvs = []
+ if os.path.isabs(device):
+ # we have a block device
+ lvs = api.get_device_lvs(device)
+ if not lvs:
+ # maybe this was a LV path /dev/vg_name/lv_name or /dev/mapper/
+ lvs = api.get_lvs(filters={'path': device})
+ else:
+ # vg_name/lv_name was passed
+ vg_name, lv_name = device.split('/')
+ lvs = api.get_lvs(filters={'lv_name': lv_name,
+ 'vg_name': vg_name})
+ report = self.create_report(lvs)
+ if not report:
+ # check if device is a non-lvm journals or wal/db
+ for dev_type in ['journal', 'wal', 'db']:
+ lvs = api.get_lvs(tags={
+ 'ceph.{}_device'.format(dev_type): device})
+ if lvs:
+ # just taking the first lv here should work
+ lv = lvs[0]
+ phys_dev = self.create_report_non_lv_device(lv)
+ osd_id = lv.tags.get('ceph.osd_id')
+ if osd_id:
+ report[osd_id] = [phys_dev]
+ return report
+ def main(self):
+ sub_command_help = dedent("""
+ List devices or logical volumes associated with Ceph. An association is
+ determined if a device has information relating to an OSD. This is
+ verified by querying LVM's metadata and correlating it with devices.
+ The lvs associated with the OSD need to have been prepared previously,
+ so that all needed tags and metadata exist.
+ Full listing of all system devices associated with a cluster::
+ ceph-volume lvm list
+ List a particular device, reporting all metadata about it::
+ ceph-volume lvm list /dev/sda1
+ List a logical volume, along with all its metadata (vg is a volume
+ group, and lv the logical volume name)::
+ ceph-volume lvm list {vg/lv}
+ """)
+ parser = argparse.ArgumentParser(
+ prog='ceph-volume lvm list',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=sub_command_help,
+ )
+ parser.add_argument(
+ 'device',
+ metavar='DEVICE',
+ nargs='?',
+ help='Path to an lv (as vg/lv) or to a device like /dev/sda1'
+ )
+ parser.add_argument(
+ '--format',
+ help='output format, defaults to "pretty"',
+ default='pretty',
+ choices=['json', 'pretty'],
+ )
+ args = parser.parse_args(self.argv)
+ self.list(args)
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..3ef3c111
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,50 @@
+import argparse
+from textwrap import dedent
+from ceph_volume import terminal
+from . import activate
+from . import deactivate
+from . import prepare
+from . import create
+from . import trigger
+from . import listing
+from . import zap
+from . import batch
+class LVM(object):
+ help = 'Use LVM and LVM-based technologies to deploy OSDs'
+ _help = dedent("""
+ Use LVM and LVM-based technologies to deploy OSDs
+ {sub_help}
+ """)
+ mapper = {
+ 'activate': activate.Activate,
+ 'deactivate': deactivate.Deactivate,
+ 'batch': batch.Batch,
+ 'prepare': prepare.Prepare,
+ 'create': create.Create,
+ 'trigger': trigger.Trigger,
+ 'list': listing.List,
+ 'zap': zap.Zap,
+ }
+ def __init__(self, argv):
+ self.argv = argv
+ def print_help(self, sub_help):
+ return self._help.format(sub_help=sub_help)
+ def main(self):
+ terminal.dispatch(self.mapper, self.argv)
+ parser = argparse.ArgumentParser(
+ prog='ceph-volume lvm',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=self.print_help(terminal.subhelp(self.mapper)),
+ )
+ parser.parse_args(self.argv)
+ if len(self.argv) <= 1:
+ return parser.print_help()
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..f0c3959a
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,441 @@
+from __future__ import print_function
+import json
+import logging
+from textwrap import dedent
+from ceph_volume.util import prepare as prepare_utils
+from ceph_volume.util import encryption as encryption_utils
+from ceph_volume.util import system, disk
+from ceph_volume.util.arg_validators import exclude_group_options
+from ceph_volume import conf, decorators, terminal
+from ceph_volume.api import lvm as api
+from .common import prepare_parser, rollback_osd
+logger = logging.getLogger(__name__)
+def prepare_dmcrypt(key, device, device_type, tags):
+ """
+ Helper for devices that are encrypted. The operations needed for
+ block, db, wal, or data/journal devices are all the same
+ """
+ if not device:
+ return ''
+ tag_name = 'ceph.%s_uuid' % device_type
+ uuid = tags[tag_name]
+ # format data device
+ encryption_utils.luks_format(
+ key,
+ device
+ )
+ encryption_utils.luks_open(
+ key,
+ device,
+ uuid
+ )
+ return '/dev/mapper/%s' % uuid
+def prepare_filestore(device, journal, secrets, tags, osd_id, fsid):
+ """
+ :param device: The name of the logical volume to work with
+ :param journal: similar to device but can also be a regular/plain disk
+ :param secrets: A dict with the secrets needed to create the osd (e.g. cephx)
+ :param id_: The OSD id
+ :param fsid: The OSD fsid, also known as the OSD UUID
+ """
+ cephx_secret = secrets.get('cephx_secret', prepare_utils.create_key())
+ # encryption-only operations
+ if secrets.get('dmcrypt_key'):
+ # format and open ('decrypt' devices) and re-assign the device and journal
+ # variables so that the rest of the process can use the mapper paths
+ key = secrets['dmcrypt_key']
+ device = prepare_dmcrypt(key, device, 'data', tags)
+ journal = prepare_dmcrypt(key, journal, 'journal', tags)
+ # vdo detection
+ is_vdo = api.is_vdo(device)
+ # create the directory
+ prepare_utils.create_osd_path(osd_id)
+ # format the device
+ prepare_utils.format_device(device)
+ # mount the data device
+ prepare_utils.mount_osd(device, osd_id, is_vdo=is_vdo)
+ # symlink the journal
+ prepare_utils.link_journal(journal, osd_id)
+ # get the latest monmap
+ prepare_utils.get_monmap(osd_id)
+ # prepare the osd filesystem
+ prepare_utils.osd_mkfs_filestore(osd_id, fsid, cephx_secret)
+ # write the OSD keyring if it doesn't exist already
+ prepare_utils.write_keyring(osd_id, cephx_secret)
+ if secrets.get('dmcrypt_key'):
+ # if the device is going to get activated right away, this can be done
+ # here, otherwise it will be recreated
+ encryption_utils.write_lockbox_keyring(
+ osd_id,
+ fsid,
+ tags['ceph.cephx_lockbox_secret']
+ )
+def prepare_bluestore(block, wal, db, secrets, tags, osd_id, fsid):
+ """
+ :param block: The name of the logical volume for the bluestore data
+ :param wal: a regular/plain disk or logical volume, to be used for block.wal
+ :param db: a regular/plain disk or logical volume, to be used for block.db
+ :param secrets: A dict with the secrets needed to create the osd (e.g. cephx)
+ :param id_: The OSD id
+ :param fsid: The OSD fsid, also known as the OSD UUID
+ """
+ cephx_secret = secrets.get('cephx_secret', prepare_utils.create_key())
+ # encryption-only operations
+ if secrets.get('dmcrypt_key'):
+ # If encrypted, there is no need to create the lockbox keyring file because
+ # bluestore re-creates the files and does not have support for other files
+ # like the custom lockbox one. This will need to be done on activation.
+ # format and open ('decrypt' devices) and re-assign the device and journal
+ # variables so that the rest of the process can use the mapper paths
+ key = secrets['dmcrypt_key']
+ block = prepare_dmcrypt(key, block, 'block', tags)
+ wal = prepare_dmcrypt(key, wal, 'wal', tags)
+ db = prepare_dmcrypt(key, db, 'db', tags)
+ # create the directory
+ prepare_utils.create_osd_path(osd_id, tmpfs=True)
+ # symlink the block
+ prepare_utils.link_block(block, osd_id)
+ # get the latest monmap
+ prepare_utils.get_monmap(osd_id)
+ # write the OSD keyring if it doesn't exist already
+ prepare_utils.write_keyring(osd_id, cephx_secret)
+ # prepare the osd filesystem
+ prepare_utils.osd_mkfs_bluestore(
+ osd_id, fsid,
+ keyring=cephx_secret,
+ wal=wal,
+ db=db
+ )
+class Prepare(object):
+ help = 'Format an LVM device and associate it with an OSD'
+ def __init__(self, argv):
+ self.argv = argv
+ self.osd_id = None
+ def get_ptuuid(self, argument):
+ uuid = disk.get_partuuid(argument)
+ if not uuid:
+ terminal.error('blkid could not detect a PARTUUID for device: %s' % argument)
+ raise RuntimeError('unable to use device')
+ return uuid
+ def setup_device(self, device_type, device_name, tags, size, slots):
+ """
+ Check if ``device`` is an lv, if so, set the tags, making sure to
+ update the tags with the lv_uuid and lv_path which the incoming tags
+ will not have.
+ If the device is not a logical volume, then retrieve the partition UUID
+ by querying ``blkid``
+ """
+ if device_name is None:
+ return '', '', tags
+ tags['ceph.type'] = device_type
+ tags['ceph.vdo'] = api.is_vdo(device_name)
+ try:
+ vg_name, lv_name = device_name.split('/')
+ lv = api.get_first_lv(filters={'lv_name': lv_name,
+ 'vg_name': vg_name})
+ except ValueError:
+ lv = None
+ if lv:
+ lv_uuid = lv.lv_uuid
+ path = lv.lv_path
+ tags['ceph.%s_uuid' % device_type] = lv_uuid
+ tags['ceph.%s_device' % device_type] = path
+ lv.set_tags(tags)
+ elif disk.is_device(device_name):
+ # We got a disk, create an lv
+ lv_type = "osd-{}".format(device_type)
+ name_uuid = system.generate_uuid()
+ kwargs = {
+ 'device': device_name,
+ 'tags': tags,
+ 'slots': slots
+ }
+ #TODO use get_block_db_size and co here to get configured size in
+ #conf file
+ if size != 0:
+ kwargs['size'] = size
+ lv = api.create_lv(
+ lv_type,
+ name_uuid,
+ **kwargs)
+ path = lv.lv_path
+ tags['ceph.{}_device'.format(device_type)] = path
+ tags['ceph.{}_uuid'.format(device_type)] = lv.lv_uuid
+ lv_uuid = lv.lv_uuid
+ lv.set_tags(tags)
+ else:
+ # otherwise assume this is a regular disk partition
+ name_uuid = self.get_ptuuid(device_name)
+ path = device_name
+ tags['ceph.%s_uuid' % device_type] = name_uuid
+ tags['ceph.%s_device' % device_type] = path
+ lv_uuid = name_uuid
+ return path, lv_uuid, tags
+ def prepare_data_device(self, device_type, osd_uuid):
+ """
+ Check if ``arg`` is a device or partition to create an LV out of it
+ with a distinct volume group name, assigning LV tags on it and
+ ultimately, returning the logical volume object. Failing to detect
+ a device or partition will result in error.
+ :param arg: The value of ``--data`` when parsing args
+ :param device_type: Usually, either ``data`` or ``block`` (filestore vs. bluestore)
+ :param osd_uuid: The OSD uuid
+ """
+ device =
+ if disk.is_partition(device) or disk.is_device(device):
+ # we must create a vg, and then a single lv
+ lv_name_prefix = "osd-{}".format(device_type)
+ kwargs = {'device': device,
+ 'tags': {'ceph.type': device_type},
+ 'slots': self.args.data_slots,
+ }
+ logger.debug('data device size: {}'.format(self.args.data_size))
+ if self.args.data_size != 0:
+ kwargs['size'] = self.args.data_size
+ return api.create_lv(
+ lv_name_prefix,
+ osd_uuid,
+ **kwargs)
+ else:
+ error = [
+ 'Cannot use device ({}).'.format(device),
+ 'A vg/lv path or an existing device is needed']
+ raise RuntimeError(' '.join(error))
+ raise RuntimeError('no data logical volume found with: {}'.format(device))
+ def safe_prepare(self, args=None):
+ """
+ An intermediate step between `main()` and `prepare()` so that we can
+ capture the `self.osd_id` in case we need to rollback
+ :param args: Injected args, usually from `lvm create` which compounds
+ both `prepare` and `create`
+ """
+ if args is not None:
+ self.args = args
+ try:
+ vgname, lvname ='/')
+ lv = api.get_first_lv(filters={'lv_name': lvname,
+ 'vg_name': vgname})
+ except ValueError:
+ lv = None
+ if api.is_ceph_device(lv):
+"device {} is already used".format(
+ raise RuntimeError("skipping {}, it is already prepared".format(
+ try:
+ self.prepare()
+ except Exception:
+ logger.exception('lvm prepare was unable to complete')
+'will rollback OSD ID creation')
+ rollback_osd(self.args, self.osd_id)
+ raise
+ terminal.success("ceph-volume lvm prepare successful for: %s" %
+ def get_cluster_fsid(self):
+ """
+ Allows using --cluster-fsid as an argument, but can fallback to reading
+ from ceph.conf if that is unset (the default behavior).
+ """
+ if self.args.cluster_fsid:
+ return self.args.cluster_fsid
+ else:
+ return conf.ceph.get('global', 'fsid')
+ @decorators.needs_root
+ def prepare(self):
+ # FIXME we don't allow re-using a keyring, we always generate one for the
+ # OSD, this needs to be fixed. This could either be a file (!) or a string
+ # (!!) or some flags that we would need to compound into a dict so that we
+ # can convert to JSON (!!!)
+ secrets = {'cephx_secret': prepare_utils.create_key()}
+ cephx_lockbox_secret = ''
+ encrypted = 1 if self.args.dmcrypt else 0
+ cephx_lockbox_secret = '' if not encrypted else prepare_utils.create_key()
+ if encrypted:
+ secrets['dmcrypt_key'] = encryption_utils.create_dmcrypt_key()
+ secrets['cephx_lockbox_secret'] = cephx_lockbox_secret
+ cluster_fsid = self.get_cluster_fsid()
+ osd_fsid = self.args.osd_fsid or system.generate_uuid()
+ crush_device_class = self.args.crush_device_class
+ if crush_device_class:
+ secrets['crush_device_class'] = crush_device_class
+ # reuse a given ID if it exists, otherwise create a new ID
+ self.osd_id = prepare_utils.create_id(osd_fsid, json.dumps(secrets), osd_id=self.args.osd_id)
+ tags = {
+ 'ceph.osd_fsid': osd_fsid,
+ 'ceph.osd_id': self.osd_id,
+ 'ceph.cluster_fsid': cluster_fsid,
+ 'ceph.cluster_name': conf.cluster,
+ 'ceph.crush_device_class': crush_device_class,
+ 'ceph.osdspec_affinity': prepare_utils.get_osdspec_affinity()
+ }
+ if self.args.filestore:
+ if not self.args.journal:
+'no journal was specifed, creating journal lv '
+ 'on {}').format(
+ self.args.journal =
+ self.args.journal_size = disk.Size(g=5)
+ # need to adjust data size/slots for colocated journal
+ if self.args.data_size:
+ self.args.data_size -= self.args.journal_size
+ if self.args.data_slots == 1:
+ self.args.data_slots = 0
+ else:
+ raise RuntimeError('Can\'t handle multiple filestore OSDs '
+ 'with colocated journals yet. Please '
+ 'create journal LVs manually')
+ tags['ceph.cephx_lockbox_secret'] = cephx_lockbox_secret
+ tags['ceph.encrypted'] = encrypted
+ journal_device, journal_uuid, tags = self.setup_device(
+ 'journal',
+ self.args.journal,
+ tags,
+ self.args.journal_size,
+ self.args.journal_slots)
+ try:
+ vg_name, lv_name ='/')
+ data_lv = api.get_first_lv(filters={'lv_name': lv_name,
+ 'vg_name': vg_name})
+ except ValueError:
+ data_lv = None
+ if not data_lv:
+ data_lv = self.prepare_data_device('data', osd_fsid)
+ tags['ceph.data_device'] = data_lv.lv_path
+ tags['ceph.data_uuid'] = data_lv.lv_uuid
+ tags['ceph.vdo'] = api.is_vdo(data_lv.lv_path)
+ tags['ceph.type'] = 'data'
+ data_lv.set_tags(tags)
+ if not journal_device.startswith('/'):
+ # we got a journal lv, set rest of the tags
+ api.get_first_lv(filters={'lv_name': lv_name,
+ 'vg_name': vg_name}).set_tags(tags)
+ prepare_filestore(
+ data_lv.lv_path,
+ journal_device,
+ secrets,
+ tags,
+ self.osd_id,
+ osd_fsid,
+ )
+ elif self.args.bluestore:
+ try:
+ vg_name, lv_name ='/')
+ block_lv = api.get_first_lv(filters={'lv_name': lv_name,
+ 'vg_name': vg_name})
+ except ValueError:
+ block_lv = None
+ if not block_lv:
+ block_lv = self.prepare_data_device('block', osd_fsid)
+ tags['ceph.block_device'] = block_lv.lv_path
+ tags['ceph.block_uuid'] = block_lv.lv_uuid
+ tags['ceph.cephx_lockbox_secret'] = cephx_lockbox_secret
+ tags['ceph.encrypted'] = encrypted
+ tags['ceph.vdo'] = api.is_vdo(block_lv.lv_path)
+ wal_device, wal_uuid, tags = self.setup_device(
+ 'wal',
+ self.args.block_wal,
+ tags,
+ self.args.block_wal_size,
+ self.args.block_wal_slots)
+ db_device, db_uuid, tags = self.setup_device(
+ 'db',
+ self.args.block_db,
+ tags,
+ self.args.block_db_size,
+ self.args.block_db_slots)
+ tags['ceph.type'] = 'block'
+ block_lv.set_tags(tags)
+ prepare_bluestore(
+ block_lv.lv_path,
+ wal_device,
+ db_device,
+ secrets,
+ tags,
+ self.osd_id,
+ osd_fsid,
+ )
+ def main(self):
+ sub_command_help = dedent("""
+ Prepare an OSD by assigning an ID and FSID, registering them with the
+ cluster with an ID and FSID, formatting and mounting the volume, and
+ finally by adding all the metadata to the logical volumes using LVM
+ tags, so that it can later be discovered.
+ Once the OSD is ready, an ad-hoc systemd unit will be enabled so that
+ it can later get activated and the OSD daemon can get started.
+ Encryption is supported via dmcrypt and the --dmcrypt flag.
+ Existing logical volume (lv):
+ ceph-volume lvm prepare --data {vg/lv}
+ Existing block device (a logical volume will be created):
+ ceph-volume lvm prepare --data /path/to/device
+ Optionally, can consume db and wal devices, partitions or logical
+ volumes. A device will get a logical volume, partitions and existing
+ logical volumes will be used as is:
+ ceph-volume lvm prepare --data {vg/lv} --block.wal {partition} --block.db {/path/to/device}
+ """)
+ parser = prepare_parser(
+ prog='ceph-volume lvm prepare',
+ description=sub_command_help,
+ )
+ if len(self.argv) == 0:
+ print(sub_command_help)
+ return
+ exclude_group_options(parser, argv=self.argv, groups=['filestore', 'bluestore'])
+ self.args = parser.parse_args(self.argv)
+ # the unfortunate mix of one superset for both filestore and bluestore
+ # makes this validation cumbersome
+ if self.args.filestore:
+ if not self.args.journal:
+ raise SystemExit('--journal is required when using --filestore')
+ # Default to bluestore here since defaulting it in add_argument may
+ # cause both to be True
+ if not self.args.bluestore and not self.args.filestore:
+ self.args.bluestore = True
+ self.safe_prepare()
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..dc57011d
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,70 @@
+from __future__ import print_function
+import argparse
+from textwrap import dedent
+from ceph_volume.exceptions import SuffixParsingError
+from ceph_volume import decorators
+from .activate import Activate
+def parse_osd_id(string):
+ osd_id = string.split('-', 1)[0]
+ if not osd_id:
+ raise SuffixParsingError('OSD id', string)
+ if osd_id.isdigit():
+ return osd_id
+ raise SuffixParsingError('OSD id', string)
+def parse_osd_uuid(string):
+ osd_id = '%s-' % parse_osd_id(string)
+ # remove the id first
+ osd_uuid = string.split(osd_id, 1)[-1]
+ if not osd_uuid:
+ raise SuffixParsingError('OSD uuid', string)
+ return osd_uuid
+class Trigger(object):
+ help = 'systemd helper to activate an OSD'
+ def __init__(self, argv):
+ self.argv = argv
+ @decorators.needs_root
+ def main(self):
+ sub_command_help = dedent("""
+ This tool is meant to help the systemd unit that knows about OSDs.
+ Proxy OSD activation to ``ceph-volume lvm activate`` by parsing the
+ input from systemd, detecting the UUID and ID associated with an OSD::
+ ceph-volume lvm trigger {SYSTEMD-DATA}
+ The systemd "data" is expected to be in the format of::
+ The lvs associated with the OSD need to have been prepared previously,
+ so that all needed tags and metadata exist.
+ """)
+ parser = argparse.ArgumentParser(
+ prog='ceph-volume lvm trigger',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=sub_command_help,
+ )
+ parser.add_argument(
+ 'systemd_data',
+ metavar='SYSTEMD_DATA',
+ nargs='?',
+ help='Data from a systemd unit containing ID and UUID of the OSD, like asdf-lkjh-0'
+ )
+ if len(self.argv) == 0:
+ print(sub_command_help)
+ return
+ args = parser.parse_args(self.argv)
+ osd_id = parse_osd_id(args.systemd_data)
+ osd_uuid = parse_osd_uuid(args.systemd_data)
+ Activate(['--auto-detect-objectstore', osd_id, osd_uuid]).main()
diff --git a/src/ceph-volume/ceph_volume/devices/lvm/ b/src/ceph-volume/ceph_volume/devices/lvm/
new file mode 100644
index 00000000..21b54b6c
--- /dev/null
+++ b/src/ceph-volume/ceph_volume/devices/lvm/
@@ -0,0 +1,403 @@
+import argparse
+import os
+import logging
+import time
+from textwrap import dedent
+from ceph_volume import decorators, terminal, process
+from ceph_volume.api import lvm as api
+from ceph_volume.util import system, encryption, disk, arg_validators, str_to_int, merge_dict
+from ceph_volume.util.device import Device
+from ceph_volume.systemd import systemctl
+logger = logging.getLogger(__name__)
+mlogger = terminal.MultiLogger(__name__)
+def wipefs(path):
+ """
+ Removes the filesystem from an lv or partition.
+ Environment variables supported::
+ * ``CEPH_VOLUME_WIPEFS_TRIES``: Defaults to 8
+ * ``CEPH_VOLUME_WIPEFS_INTERVAL``: Defaults to 5
+ """
+ tries = str_to_int(
+ os.environ.get('CEPH_VOLUME_WIPEFS_TRIES', 8)
+ )
+ interval = str_to_int(
+ os.environ.get('CEPH_VOLUME_WIPEFS_INTERVAL', 5)
+ )
+ for trying in range(tries):
+ stdout, stderr, exit_code =[
+ 'wipefs',
+ '--all',
+ path
+ ])
+ if exit_code != 0:
+ # this could narrow the retry by poking in the stderr of the output
+ # to verify that 'probing initialization failed' appears, but
+ # better to be broad in this retry to prevent missing on
+ # a different message that needs to be retried as well
+ terminal.warning(
+ 'failed to wipefs device, will try again to workaround probable race condition'
+ )
+ time.sleep(interval)
+ else:
+ return
+ raise RuntimeError("could not complete wipefs on device: %s" % path)
+def zap_data(path):
+ """
+ Clears all data from the given path. Path should be
+ an absolute path to an lv or partition.
+ 10M of data is written to the path to make sure that
+ there is no trace left of any previous Filesystem.
+ """
+ 'dd',
+ 'if=/dev/zero',
+ 'of={path}'.format(path=path),
+ 'bs=1M',
+ 'count=10',
+ 'conv=fsync'
+ ])
+def find_associated_devices(osd_id=None, osd_fsid=None):
+ """
+ From an ``osd_id`` and/or an ``osd_fsid``, filter out all the LVs in the
+ system that match those tag values, further detect if any partitions are
+ part of the OSD, and then return the set of LVs and partitions (if any).
+ """
+ lv_tags = {}
+ if osd_id:
+ lv_tags['ceph.osd_id'] = osd_id
+ if osd_fsid:
+ lv_tags['ceph.osd_fsid'] = osd_fsid
+ lvs = api.get_lvs(tags=lv_tags)
+ if not lvs:
+ raise RuntimeError('Unable to find any LV for zapping OSD: '
+ '%s' % osd_id or osd_fsid)
+ devices_to_zap = ensure_associated_lvs(lvs, lv_tags)
+ return [Device(path) for path in set(devices_to_zap) if path]
+def ensure_associated_lvs(lvs, lv_tags={}):
+ """
+ Go through each LV and ensure if backing devices (journal, wal, block)
+ are LVs or partitions, so that they can be accurately reported.
+ """
+ # look for many LVs for each backing type, because it is possible to
+ # receive a filtering for osd.1, and have multiple failed deployments
+ # leaving many journals with osd.1 - usually, only a single LV will be
+ # returned
+ journal_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'journal'}))
+ db_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'db'}))
+ wal_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'wal'}))
+ backing_devices = [(journal_lvs, 'journal'), (db_lvs, 'db'),
+ (wal_lvs, 'wal')]
+ verified_devices = []
+ for lv in lvs:
+ # go through each lv and append it, otherwise query `blkid` to find
+ # a physical device. Do this for each type (journal,db,wal) regardless
+ # if they have been processed in the previous LV, so that bad devices
+ # with the same ID can be caught
+ for ceph_lvs, _type in backing_devices:
+ if ceph_lvs:
+ verified_devices.extend([l.lv_path for l in ceph_lvs])
+ continue
+ # must be a disk partition, by querying blkid by the uuid we are
+ # ensuring that the device path is always correct
+ try:
+ device_uuid = lv.tags['ceph.%s_uuid' % _type]
+ except KeyError:
+ # Bluestore will not have ceph.journal_uuid, and Filestore
+ # will not not have ceph.db_uuid
+ continue
+ osd_device = disk.get_device_from_partuuid(device_uuid)
+ if not osd_device:
+ # if the osd_device is not found by the partuuid, then it is
+ # not possible to ensure this device exists anymore, so skip it
+ continue
+ verified_devices.append(osd_device)
+ verified_devices.append(lv.lv_path)
+ # reduce the list from all the duplicates that were added
+ return list(set(verified_devices))
+class Zap(object):
+ help = 'Removes all data and filesystems from a logical volume or partition.'
+ def __init__(self, argv):
+ self.argv = argv
+ def unmount_lv(self, lv):
+ if lv.tags.get('ceph.cluster_name') and lv.tags.get('ceph.osd_id'):
+ lv_path = "/var/lib/ceph/osd/{}-{}".format(lv.tags['ceph.cluster_name'], lv.tags['ceph.osd_id'])
+ else:
+ lv_path = lv.lv_path
+ dmcrypt_uuid = lv.lv_uuid
+ dmcrypt = lv.encrypted
+ if system.path_is_mounted(lv_path):
+"Unmounting %s", lv_path)
+ system.unmount(lv_path)
+ if dmcrypt and dmcrypt_uuid:
+ self.dmcrypt_close(dmcrypt_uuid)
+ def zap_lv(self, device):
+ """
+ Device examples: vg-name/lv-name, /dev/vg-name/lv-name
+ Requirements: Must be a logical volume (LV)
+ """
+ lv = api.get_first_lv(filters={'lv_name': device.lv_name, 'vg_name':
+ device.vg_name})
+ self.unmount_lv(lv)
+ wipefs(device.abspath)
+ zap_data(device.abspath)
+ if self.args.destroy:
+ lvs = api.get_lvs(filters={'vg_name': device.vg_name})
+ if lvs == []:
+'No LVs left, exiting', device.vg_name)
+ return
+ elif len(lvs) <= 1:
+'Only 1 LV left in VG, will proceed to destroy '
+ 'volume group %s', device.vg_name)
+ api.remove_vg(device.vg_name)
+ else:
+'More than 1 LV left in VG, will proceed to '
+ 'destroy LV only')
+'Removing LV because --destroy was given: %s',
+ device.abspath)
+ api.remove_lv(device.abspath)
+ elif lv:
+ # just remove all lvm metadata, leaving the LV around
+ lv.clear_tags()
+ def zap_partition(self, device):
+ """
+ Device example: /dev/sda1
+ Requirements: Must be a partition
+ """
+ if device.is_encrypted:
+ # find the holder
+ holders = [
+ '/dev/%s' % holder for holder in device.sys_api.get('holders', [])
+ ]
+ for mapper_uuid in os.listdir('/dev/mapper'):
+ mapper_path = os.path.join('/dev/mapper', mapper_uuid)
+ if os.path.realpath(mapper_path) in holders:
+ self.dmcrypt_close(mapper_uuid)
+ if system.device_is_mounted(device.abspath):
+"Unmounting %s", device.abspath)
+ system.unmount(device.abspath)
+ wipefs(device.abspath)
+ zap_data(device.abspath)
+ if self.args.destroy:
+"Destroying partition since --destroy was used: %s" % device.abspath)
+ disk.remove_partition(device)
+ def zap_lvm_member(self, device):
+ """
+ An LVM member may have more than one LV and or VG, for example if it is
+ a raw device with multiple partitions each belonging to a different LV
+ Device example: /dev/sda
+ Requirements: An LV or VG present in the device, making it an LVM member
+ """
+ for lv in device.lvs:
+ if lv.lv_name:
+'Zapping lvm member {}. lv_path is {}'.format(device.abspath, lv.lv_path))
+ self.zap_lv(Device(lv.lv_path))
+ else:
+ vg = api.get_first_vg(filters={'vg_name': lv.vg_name})
+ if vg:
+'Found empty VG {}, removing'.format(vg.vg_name))
+ api.remove_vg(vg.vg_name)
+ def zap_raw_device(self, device):
+ """
+ Any whole (raw) device passed in as input will be processed here,
+ checking for LVM membership and partitions (if any).
+ Device example: /dev/sda
+ Requirements: None
+ """
+ if not self.args.destroy:
+ # the use of dd on a raw device causes the partition table to be
+ # destroyed
+ mlogger.warning(
+ '--destroy was not specified, but zapping a whole device will remove the partition table'
+ )
+ # look for partitions and zap those
+ for part_name in device.sys_api.get('partitions', {}).keys():
+ self.zap_partition(Device('/dev/%s' % part_name))
+ wipefs(device.abspath)
+ zap_data(device.abspath)
+ @decorators.needs_root
+ def zap(self, devices=None):
+ devices = devices or self.args.devices
+ for device in devices:
+"Zapping: %s", device.abspath)
+ if device.is_mapper:
+ terminal.error("Refusing to zap the mapper device: {}".format(device))
+ raise SystemExit(1)
+ if device.is_lvm_member:
+ self.zap_lvm_member(device)
+ if device.is_lv:
+ self.zap_lv(device)
+ if device.is_partition:
+ self.zap_partition(device)
+ if device.is_device:
+ self.zap_raw_device(device)
+ if self.args.devices:
+ terminal.success(
+ "Zapping successful for: %s" % ", ".join([str(d) for d in self.args.devices])
+ )
+ else:
+ identifier = self.args.osd_id or self.args.osd_fsid
+ terminal.success(
+ "Zapping successful for OSD: %s" % identifier
+ )
+ @decorators.needs_root
+ def zap_osd(self):
+ if self.args.osd_id and not self.args.no_systemd:
+ osd_is_running = systemctl.osd_is_active(self.args.osd_id)
+ if osd_is_running:
+ mlogger.error("OSD ID %s is running, stop it with:" % self.args.osd_id)
+ mlogger.error("systemctl stop ceph-osd@%s" % self.args.osd_id)
+ raise SystemExit("Unable to zap devices associated with OSD ID: %s" % self.args.osd_id)
+ devices = find_associated_devices(self.args.osd_id, self.args.osd_fsid)
+ self.zap(devices)
+ def dmcrypt_close(self, dmcrypt_uuid):
+ dmcrypt_path = "/dev/mapper/{}".format(dmcrypt_uuid)
+"Closing encrypted path %s", dmcrypt_path)
+ encryption.dmcrypt_close(dmcrypt_path)
+ def main(self):
+ sub_command_help = dedent("""
+ Zaps the given logical volume(s), raw device(s) or partition(s) for reuse by ceph-volume.
+ If given a path to a logical volume it must be in the format of vg/lv. Any
+ filesystems present on the given device, vg/lv, or partition will be removed and
+ all data will be purged.
+ If the logical volume, raw device or partition is being used for any ceph related
+ mount points they will be unmounted.
+ However, the lv or partition will be kept intact.
+ Example calls for supported scenarios:
+ Zapping a logical volume:
+ ceph-volume lvm zap {vg name/lv name}
+ Zapping a partition:
+ ceph-volume lvm zap /dev/sdc1
+ Zapping many raw devices:
+ ceph-volume lvm zap /dev/sda /dev/sdb /db/sdc
+ Zapping devices associated with an OSD ID:
+ ceph-volume lvm zap --osd-id 1
+ Optionally include the OSD FSID
+ ceph-volume lvm zap --osd-id 1 --osd-fsid 55BD4219-16A7-4037-BC20-0F158EFCC83D
+ If the --destroy flag is given and you are zapping a raw device or partition
+ then all vgs and lvs that exist on that raw device or partition will be destroyed.
+ This is especially useful if a raw device or partition was used by ceph-volume lvm create
+ or ceph-volume lvm prepare commands previously and now you want to reuse that device.
+ For example:
+ ceph-volume lvm zap /dev/sda --destroy
+ If the --destroy flag is given and you are zapping an lv then the lv is still
+ kept intact for reuse.
+ """)
+ parser = argparse.ArgumentParser(
+ prog='ceph-volume lvm zap',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=sub_command_help,
+ )
+ parser.add_argument(
+ 'devices',
+ metavar='DEVICES',
+ nargs='*',
+ type=arg_validators.ValidDevice(gpt_ok=True),
+ default=[],
+ help='Path to one or many lv (as vg/lv), partition (as /dev/sda1) or device (as /dev/sda)'
+ )
+ parser.add_argument(
+ '--destroy',
+ action='store_true',
+ default=False,
+ help='Destroy all volume groups and logical volumes if you are zapping a raw device or partition',
+ )
+ parser.add_argument(
+ '--osd-id',
+ help='Specify an OSD ID to detect associated devices for zapping',
+ )
+ parser.add_argument(
+ '--osd-fsid',
+ help='Specify an OSD FSID to detect associated devices for zapping',
+ )
+ parser.add_argument(
+ '--no-systemd',
+ dest='no_systemd',
+ action='store_true',
+ help='Skip systemd unit checks',
+ )
+ if len(self.argv) == 0:
+ print(sub_command_help)
+ return
+ self.args = parser.parse_args(self.argv)
+ if self.args.osd_id or self.args.osd_fsid:
+ self.zap_osd()
+ else:
+ self.zap()