summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rbd_support/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rbd_support/task.py')
-rw-r--r--src/pybind/mgr/rbd_support/task.py832
1 files changed, 832 insertions, 0 deletions
diff --git a/src/pybind/mgr/rbd_support/task.py b/src/pybind/mgr/rbd_support/task.py
new file mode 100644
index 000000000..ff096fd9b
--- /dev/null
+++ b/src/pybind/mgr/rbd_support/task.py
@@ -0,0 +1,832 @@
+import errno
+import json
+import rados
+import rbd
+import re
+import traceback
+import uuid
+
+from contextlib import contextmanager
+from datetime import datetime, timedelta
+from functools import partial, wraps
+from threading import Condition, Lock, Thread
+
+from .common import (authorize_request, extract_pool_key, get_rbd_pools,
+ is_authorized)
+
+
+RBD_TASK_OID = "rbd_task"
+
+TASK_SEQUENCE = "sequence"
+TASK_ID = "id"
+TASK_REFS = "refs"
+TASK_MESSAGE = "message"
+TASK_RETRY_ATTEMPTS = "retry_attempts"
+TASK_RETRY_TIME = "retry_time"
+TASK_RETRY_MESSAGE = "retry_message"
+TASK_IN_PROGRESS = "in_progress"
+TASK_PROGRESS = "progress"
+TASK_CANCELED = "canceled"
+
+TASK_REF_POOL_NAME = "pool_name"
+TASK_REF_POOL_NAMESPACE = "pool_namespace"
+TASK_REF_IMAGE_NAME = "image_name"
+TASK_REF_IMAGE_ID = "image_id"
+TASK_REF_ACTION = "action"
+
+TASK_REF_ACTION_FLATTEN = "flatten"
+TASK_REF_ACTION_REMOVE = "remove"
+TASK_REF_ACTION_TRASH_REMOVE = "trash remove"
+TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute"
+TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit"
+TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort"
+
+VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
+ TASK_REF_ACTION_REMOVE,
+ TASK_REF_ACTION_TRASH_REMOVE,
+ TASK_REF_ACTION_MIGRATION_EXECUTE,
+ TASK_REF_ACTION_MIGRATION_COMMIT,
+ TASK_REF_ACTION_MIGRATION_ABORT]
+
+TASK_RETRY_INTERVAL = timedelta(seconds=30)
+TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300)
+MAX_COMPLETED_TASKS = 50
+
+
+class Throttle:
+ def __init__(self, throttle_period):
+ self.throttle_period = throttle_period
+ self.time_of_last_call = datetime.min
+
+ def __call__(self, fn):
+ @wraps(fn)
+ def wrapper(*args, **kwargs):
+ now = datetime.now()
+ if self.time_of_last_call + self.throttle_period <= now:
+ self.time_of_last_call = now
+ return fn(*args, **kwargs)
+ return wrapper
+
+
+class Task:
+ def __init__(self, sequence, task_id, message, refs):
+ self.sequence = sequence
+ self.task_id = task_id
+ self.message = message
+ self.refs = refs
+ self.retry_message = None
+ self.retry_attempts = 0
+ self.retry_time = None
+ self.in_progress = False
+ self.progress = 0.0
+ self.canceled = False
+ self.failed = False
+ self.progress_posted = False
+
+ def __str__(self):
+ return self.to_json()
+
+ @property
+ def sequence_key(self):
+ return "{0:016X}".format(self.sequence)
+
+ def cancel(self):
+ self.canceled = True
+ self.fail("Operation canceled")
+
+ def fail(self, message):
+ self.failed = True
+ self.failure_message = message
+
+ def to_dict(self):
+ d = {TASK_SEQUENCE: self.sequence,
+ TASK_ID: self.task_id,
+ TASK_MESSAGE: self.message,
+ TASK_REFS: self.refs
+ }
+ if self.retry_message:
+ d[TASK_RETRY_MESSAGE] = self.retry_message
+ if self.retry_attempts:
+ d[TASK_RETRY_ATTEMPTS] = self.retry_attempts
+ if self.retry_time:
+ d[TASK_RETRY_TIME] = self.retry_time.isoformat()
+ if self.in_progress:
+ d[TASK_IN_PROGRESS] = True
+ d[TASK_PROGRESS] = self.progress
+ if self.canceled:
+ d[TASK_CANCELED] = True
+ return d
+
+ def to_json(self):
+ return str(json.dumps(self.to_dict()))
+
+ @classmethod
+ def from_json(cls, val):
+ try:
+ d = json.loads(val)
+ action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION)
+ if action not in VALID_TASK_ACTIONS:
+ raise ValueError("Invalid task action: {}".format(action))
+
+ return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS])
+ except json.JSONDecodeError as e:
+ raise ValueError("Invalid JSON ({})".format(str(e)))
+ except KeyError as e:
+ raise ValueError("Invalid task format (missing key {})".format(str(e)))
+
+
+class TaskHandler:
+ lock = Lock()
+ condition = Condition(lock)
+ thread = None
+
+ in_progress_task = None
+ tasks_by_sequence = dict()
+ tasks_by_id = dict()
+
+ completed_tasks = []
+
+ sequence = 0
+
+ def __init__(self, module):
+ self.module = module
+ self.log = module.log
+
+ with self.lock:
+ self.init_task_queue()
+
+ self.thread = Thread(target=self.run)
+ self.thread.start()
+
+ @property
+ def default_pool_name(self):
+ return self.module.get_ceph_option("rbd_default_pool")
+
+ def extract_pool_spec(self, pool_spec):
+ pool_spec = extract_pool_key(pool_spec)
+ if pool_spec == GLOBAL_POOL_KEY:
+ pool_spec = (self.default_pool_name, '')
+ return pool_spec
+
+ def extract_image_spec(self, image_spec):
+ match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
+ image_spec or '')
+ if not match:
+ raise ValueError("Invalid image spec: {}".format(image_spec))
+ return (match.group(1) or self.default_pool_name, match.group(2) or '',
+ match.group(3))
+
+ def run(self):
+ try:
+ self.log.info("TaskHandler: starting")
+ while True:
+ with self.lock:
+ now = datetime.now()
+ for sequence in sorted([sequence for sequence, task
+ in self.tasks_by_sequence.items()
+ if not task.retry_time or task.retry_time <= now]):
+ self.execute_task(sequence)
+
+ self.condition.wait(5)
+ self.log.debug("TaskHandler: tick")
+
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ @contextmanager
+ def open_ioctx(self, spec):
+ try:
+ with self.module.rados.open_ioctx(spec[0]) as ioctx:
+ ioctx.set_namespace(spec[1])
+ yield ioctx
+ except rados.ObjectNotFound:
+ self.log.error("Failed to locate pool {}".format(spec[0]))
+ raise
+
+ @classmethod
+ def format_image_spec(cls, image_spec):
+ image = image_spec[2]
+ if image_spec[1]:
+ image = "{}/{}".format(image_spec[1], image)
+ if image_spec[0]:
+ image = "{}/{}".format(image_spec[0], image)
+ return image
+
+ def init_task_queue(self):
+ for pool_id, pool_name in get_rbd_pools(self.module).items():
+ try:
+ with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+ self.load_task_queue(ioctx, pool_name)
+
+ try:
+ namespaces = rbd.RBD().namespace_list(ioctx)
+ except rbd.OperationNotSupported:
+ self.log.debug("Namespaces not supported")
+ continue
+
+ for namespace in namespaces:
+ ioctx.set_namespace(namespace)
+ self.load_task_queue(ioctx, pool_name)
+
+ except rados.ObjectNotFound:
+ # pool DNE
+ pass
+
+ if self.tasks_by_sequence:
+ self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1]
+
+ self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
+ self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id)))
+
+ def load_task_queue(self, ioctx, pool_name):
+ pool_spec = pool_name
+ if ioctx.nspace:
+ pool_spec += "/{}".format(ioctx.nspace)
+
+ start_after = ''
+ try:
+ while True:
+ with rados.ReadOpCtx() as read_op:
+ self.log.info("load_task_task: {}, start_after={}".format(
+ pool_spec, start_after))
+ it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
+ ioctx.operate_read_op(read_op, RBD_TASK_OID)
+
+ it = list(it)
+ for k, v in it:
+ start_after = k
+ v = v.decode()
+ self.log.info("load_task_task: task={}".format(v))
+
+ try:
+ task = Task.from_json(v)
+ self.append_task(task)
+ except ValueError:
+ self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v))
+
+ if not it:
+ break
+
+ except StopIteration:
+ pass
+ except rados.ObjectNotFound:
+ # rbd_task DNE
+ pass
+
+ def append_task(self, task):
+ self.tasks_by_sequence[task.sequence] = task
+ self.tasks_by_id[task.task_id] = task
+
+ def task_refs_match(self, task_refs, refs):
+ if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs:
+ task_refs = task_refs.copy()
+ del task_refs[TASK_REF_IMAGE_ID]
+
+ self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs))
+ return task_refs == refs
+
+ def find_task(self, refs):
+ self.log.debug("find_task: refs={}".format(refs))
+
+ # search for dups and return the original
+ for task_id in reversed(sorted(self.tasks_by_id.keys())):
+ task = self.tasks_by_id[task_id]
+ if self.task_refs_match(task.refs, refs):
+ return task
+
+ # search for a completed task (message replay)
+ for task in reversed(self.completed_tasks):
+ if self.task_refs_match(task.refs, refs):
+ return task
+
+ def add_task(self, ioctx, message, refs):
+ self.log.debug("add_task: message={}, refs={}".format(message, refs))
+
+ # ensure unique uuid across all pools
+ while True:
+ task_id = str(uuid.uuid4())
+ if task_id not in self.tasks_by_id:
+ break
+
+ self.sequence += 1
+ task = Task(self.sequence, task_id, message, refs)
+
+ # add the task to the rbd_task omap
+ task_json = task.to_json()
+ omap_keys = (task.sequence_key, )
+ omap_vals = (str.encode(task_json), )
+ self.log.info("adding task: {} {}".format(omap_keys[0], omap_vals[0]))
+
+ with rados.WriteOpCtx() as write_op:
+ ioctx.set_omap(write_op, omap_keys, omap_vals)
+ ioctx.operate_write_op(write_op, RBD_TASK_OID)
+ self.append_task(task)
+
+ self.condition.notify()
+ return task_json
+
+ def remove_task(self, ioctx, task, remove_in_memory=True):
+ self.log.info("remove_task: task={}".format(str(task)))
+ if ioctx:
+ try:
+ with rados.WriteOpCtx() as write_op:
+ omap_keys = (task.sequence_key, )
+ ioctx.remove_omap_keys(write_op, omap_keys)
+ ioctx.operate_write_op(write_op, RBD_TASK_OID)
+ except rados.ObjectNotFound:
+ pass
+
+ if remove_in_memory:
+ try:
+ del self.tasks_by_id[task.task_id]
+ del self.tasks_by_sequence[task.sequence]
+
+ # keep a record of the last N tasks to help avoid command replay
+ # races
+ if not task.failed and not task.canceled:
+ self.log.debug("remove_task: moving to completed tasks")
+ self.completed_tasks.append(task)
+ self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
+
+ except KeyError:
+ pass
+
+ def execute_task(self, sequence):
+ task = self.tasks_by_sequence[sequence]
+ self.log.info("execute_task: task={}".format(str(task)))
+
+ pool_valid = False
+ try:
+ with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
+ pool_valid = True
+
+ action = task.refs[TASK_REF_ACTION]
+ execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten,
+ TASK_REF_ACTION_REMOVE: self.execute_remove,
+ TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove,
+ TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute,
+ TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit,
+ TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort
+ }.get(action)
+ if not execute_fn:
+ self.log.error("Invalid task action: {}".format(action))
+ else:
+ task.in_progress = True
+ self.in_progress_task = task
+
+ self.lock.release()
+ try:
+ execute_fn(ioctx, task)
+
+ except rbd.OperationCanceled:
+ self.log.info("Operation canceled: task={}".format(
+ str(task)))
+
+ finally:
+ self.lock.acquire()
+
+ task.in_progress = False
+ self.in_progress_task = None
+
+ self.complete_progress(task)
+ self.remove_task(ioctx, task)
+
+ except rados.ObjectNotFound as e:
+ self.log.error("execute_task: {}".format(e))
+ if pool_valid:
+ task.retry_message = "{}".format(e)
+ self.update_progress(task, 0)
+ else:
+ # pool DNE -- remove in-memory task
+ self.complete_progress(task)
+ self.remove_task(None, task)
+
+ except (rados.Error, rbd.Error) as e:
+ self.log.error("execute_task: {}".format(e))
+ task.retry_message = "{}".format(e)
+ self.update_progress(task, 0)
+
+ finally:
+ task.in_progress = False
+ task.retry_attempts += 1
+ task.retry_time = datetime.now() + min(
+ TASK_RETRY_INTERVAL * task.retry_attempts,
+ TASK_MAX_RETRY_INTERVAL)
+
+ def progress_callback(self, task, current, total):
+ progress = float(current) / float(total)
+ self.log.debug("progress_callback: task={}, progress={}".format(
+ str(task), progress))
+
+ # avoid deadlocking when a new command comes in during a progress callback
+ if not self.lock.acquire(False):
+ return 0
+
+ try:
+ if not self.in_progress_task or self.in_progress_task.canceled:
+ return -rbd.ECANCELED
+ self.in_progress_task.progress = progress
+ finally:
+ self.lock.release()
+
+ if not task.progress_posted:
+ # delayed creation of progress event until first callback
+ self.post_progress(task, progress)
+ else:
+ self.throttled_update_progress(task, progress)
+
+ return 0
+
+ def execute_flatten(self, ioctx, task):
+ self.log.info("execute_flatten: task={}".format(str(task)))
+
+ try:
+ with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image:
+ image.flatten(on_progress=partial(self.progress_callback, task))
+ except rbd.InvalidArgument:
+ task.fail("Image does not have parent")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_remove(self, ioctx, task):
+ self.log.info("execute_remove: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_trash_remove(self, ioctx, task):
+ self.log.info("execute_trash_remove: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_migration_execute(self, ioctx, task):
+ self.log.info("execute_migration_execute: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.InvalidArgument:
+ task.fail("Image is not migrating")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_migration_commit(self, ioctx, task):
+ self.log.info("execute_migration_commit: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.InvalidArgument:
+ task.fail("Image is not migrating or migration not executed")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def execute_migration_abort(self, ioctx, task):
+ self.log.info("execute_migration_abort: task={}".format(str(task)))
+
+ try:
+ rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME],
+ on_progress=partial(self.progress_callback, task))
+ except rbd.ImageNotFound:
+ task.fail("Image does not exist")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+ except rbd.InvalidArgument:
+ task.fail("Image is not migrating")
+ self.log.info("{}: task={}".format(task.failure_message, str(task)))
+
+ def complete_progress(self, task):
+ if not task.progress_posted:
+ # ensure progress event exists before we complete/fail it
+ self.post_progress(task, 0)
+
+ self.log.debug("complete_progress: task={}".format(str(task)))
+ try:
+ if task.failed:
+ self.module.remote("progress", "fail", task.task_id,
+ task.failure_message)
+ else:
+ self.module.remote("progress", "complete", task.task_id)
+ except ImportError:
+ # progress module is disabled
+ pass
+
+ def _update_progress(self, task, progress):
+ self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
+ try:
+ refs = {"origin": "rbd_support"}
+ refs.update(task.refs)
+
+ self.module.remote("progress", "update", task.task_id,
+ task.message, progress, refs)
+ except ImportError:
+ # progress module is disabled
+ pass
+
+ def post_progress(self, task, progress):
+ self._update_progress(task, progress)
+ task.progress_posted = True
+
+ def update_progress(self, task, progress):
+ if task.progress_posted:
+ self._update_progress(task, progress)
+
+ @Throttle(timedelta(seconds=1))
+ def throttled_update_progress(self, task, progress):
+ self.update_progress(task, progress)
+
+ def queue_flatten(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_flatten: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ try:
+ with rbd.Image(ioctx, image_spec[2]) as image:
+ refs[TASK_REF_IMAGE_ID] = image.id()
+
+ try:
+ parent_image_id = image.parent_id()
+ except rbd.ImageNotFound:
+ parent_image_id = None
+
+ except rbd.ImageNotFound:
+ pass
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ if TASK_REF_IMAGE_ID not in refs:
+ raise rbd.ImageNotFound("Image {} does not exist".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+ if not parent_image_id:
+ raise rbd.ImageNotFound("Image {} does not have a parent".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+
+ return 0, self.add_task(ioctx,
+ "Flattening image {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ""
+
+ def queue_remove(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_remove: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ try:
+ with rbd.Image(ioctx, image_spec[2]) as image:
+ refs[TASK_REF_IMAGE_ID] = image.id()
+ snaps = list(image.list_snaps())
+
+ except rbd.ImageNotFound:
+ pass
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ if TASK_REF_IMAGE_ID not in refs:
+ raise rbd.ImageNotFound("Image {} does not exist".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+ if snaps:
+ raise rbd.ImageBusy("Image {} has snapshots".format(
+ self.format_image_spec(image_spec)), errno=errno.EBUSY)
+
+ return 0, self.add_task(ioctx,
+ "Removing image {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ''
+
+ def queue_trash_remove(self, image_id_spec):
+ image_id_spec = self.extract_image_spec(image_id_spec)
+
+ authorize_request(self.module, image_id_spec[0], image_id_spec[1])
+ self.log.info("queue_trash_remove: {}".format(image_id_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
+ TASK_REF_POOL_NAME: image_id_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_id_spec[1],
+ TASK_REF_IMAGE_ID: image_id_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ # verify that image exists in trash
+ with self.open_ioctx(image_id_spec) as ioctx:
+ rbd.RBD().trash_get(ioctx, image_id_spec[2])
+
+ return 0, self.add_task(ioctx,
+ "Removing image {} from trash".format(
+ self.format_image_spec(image_id_spec)),
+ refs), ''
+
+ def get_migration_status(self, ioctx, image_spec):
+ try:
+ return rbd.RBD().migration_status(ioctx, image_spec[2])
+ except (rbd.InvalidArgument, rbd.ImageNotFound):
+ return None
+
+ def validate_image_migrating(self, image_spec, migration_status):
+ if not migration_status:
+ raise rbd.InvalidArgument("Image {} is not migrating".format(
+ self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+ def resolve_pool_name(self, pool_id):
+ osd_map = self.module.get('osd_map')
+ for pool in osd_map['pools']:
+ if pool['pool'] == pool_id:
+ return pool['pool_name']
+ return '<unknown>'
+
+ def queue_migration_execute(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_migration_execute: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ self.validate_image_migrating(image_spec, status)
+ if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
+ rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
+ raise rbd.InvalidArgument("Image {} is not in ready state".format(
+ self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+ source_pool = self.resolve_pool_name(status['source_pool_id'])
+ dest_pool = self.resolve_pool_name(status['dest_pool_id'])
+ return 0, self.add_task(ioctx,
+ "Migrating image {} to {}".format(
+ self.format_image_spec((source_pool,
+ status['source_pool_namespace'],
+ status['source_image_name'])),
+ self.format_image_spec((dest_pool,
+ status['dest_pool_namespace'],
+ status['dest_image_name']))),
+ refs), ''
+
+ def queue_migration_commit(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_migration_commit: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ self.validate_image_migrating(image_spec, status)
+ if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
+ raise rbd.InvalidArgument("Image {} has not completed migration".format(
+ self.format_image_spec(image_spec)), errno=errno.EINVAL)
+
+ return 0, self.add_task(ioctx,
+ "Committing image migration for {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ''
+
+ def queue_migration_abort(self, image_spec):
+ image_spec = self.extract_image_spec(image_spec)
+
+ authorize_request(self.module, image_spec[0], image_spec[1])
+ self.log.info("queue_migration_abort: {}".format(image_spec))
+
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+
+ with self.open_ioctx(image_spec) as ioctx:
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ self.validate_image_migrating(image_spec, status)
+ return 0, self.add_task(ioctx,
+ "Aborting image migration for {}".format(
+ self.format_image_spec(image_spec)),
+ refs), ''
+
+ def task_cancel(self, task_id):
+ self.log.info("task_cancel: {}".format(task_id))
+
+ task = self.tasks_by_id.get(task_id)
+ if not task or not is_authorized(self.module,
+ task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE]):
+ return -errno.ENOENT, '', "No such task {}".format(task_id)
+
+ task.cancel()
+
+ remove_in_memory = True
+ if self.in_progress_task and self.in_progress_task.task_id == task_id:
+ self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task)))
+ remove_in_memory = False
+
+ # complete any associated event in the progress module
+ self.complete_progress(task)
+
+ # remove from rbd_task omap
+ with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
+ self.remove_task(ioctx, task, remove_in_memory)
+
+ return 0, "", ""
+
+ def task_list(self, task_id):
+ self.log.info("task_list: {}".format(task_id))
+
+ if task_id:
+ task = self.tasks_by_id.get(task_id)
+ if not task or not is_authorized(self.module,
+ task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE]):
+ return -errno.ENOENT, '', "No such task {}".format(task_id)
+
+ result = task.to_dict()
+ else:
+ result = []
+ for sequence in sorted(self.tasks_by_sequence.keys()):
+ task = self.tasks_by_sequence[sequence]
+ if is_authorized(self.module,
+ task.refs[TASK_REF_POOL_NAME],
+ task.refs[TASK_REF_POOL_NAMESPACE]):
+ result.append(task.to_dict())
+
+ return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+ def handle_command(self, inbuf, prefix, cmd):
+ with self.lock:
+ if prefix == 'add flatten':
+ return self.queue_flatten(cmd['image_spec'])
+ elif prefix == 'add remove':
+ return self.queue_remove(cmd['image_spec'])
+ elif prefix == 'add trash remove':
+ return self.queue_trash_remove(cmd['image_id_spec'])
+ elif prefix == 'add migration execute':
+ return self.queue_migration_execute(cmd['image_spec'])
+ elif prefix == 'add migration commit':
+ return self.queue_migration_commit(cmd['image_spec'])
+ elif prefix == 'add migration abort':
+ return self.queue_migration_abort(cmd['image_spec'])
+ elif prefix == 'cancel':
+ return self.task_cancel(cmd['task_id'])
+ elif prefix == 'list':
+ return self.task_list(cmd.get('task_id'))
+
+ raise NotImplementedError(cmd['prefix'])