summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/snap_schedule
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/snap_schedule
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/snap_schedule')
-rw-r--r--src/pybind/mgr/snap_schedule/.gitignore1
-rw-r--r--src/pybind/mgr/snap_schedule/__init__.py11
-rw-r--r--src/pybind/mgr/snap_schedule/fs/__init__.py0
-rw-r--r--src/pybind/mgr/snap_schedule/fs/schedule.py502
-rw-r--r--src/pybind/mgr/snap_schedule/fs/schedule_client.py444
-rw-r--r--src/pybind/mgr/snap_schedule/module.py258
-rw-r--r--src/pybind/mgr/snap_schedule/requirements.txt1
-rw-r--r--src/pybind/mgr/snap_schedule/tests/__init__.py0
-rw-r--r--src/pybind/mgr/snap_schedule/tests/conftest.py34
-rw-r--r--src/pybind/mgr/snap_schedule/tests/fs/__init__.py0
-rw-r--r--src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py256
-rw-r--r--src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py37
-rw-r--r--src/pybind/mgr/snap_schedule/tox.ini19
13 files changed, 1563 insertions, 0 deletions
diff --git a/src/pybind/mgr/snap_schedule/.gitignore b/src/pybind/mgr/snap_schedule/.gitignore
new file mode 100644
index 000000000..172bf5786
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/.gitignore
@@ -0,0 +1 @@
+.tox
diff --git a/src/pybind/mgr/snap_schedule/__init__.py b/src/pybind/mgr/snap_schedule/__init__.py
new file mode 100644
index 000000000..8001b7184
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/__init__.py
@@ -0,0 +1,11 @@
+# -*- coding: utf-8 -*-
+
+from os import environ
+
+if 'SNAP_SCHED_UNITTEST' in environ:
+ import tests
+elif 'UNITTEST' in environ:
+ import tests
+ from .module import Module
+else:
+ from .module import Module
diff --git a/src/pybind/mgr/snap_schedule/fs/__init__.py b/src/pybind/mgr/snap_schedule/fs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/fs/__init__.py
diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py
new file mode 100644
index 000000000..95e43b7e0
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/fs/schedule.py
@@ -0,0 +1,502 @@
+"""
+Copyright (C) 2020 SUSE
+
+LGPL2.1. See file COPYING.
+"""
+from datetime import datetime, timezone
+import json
+import logging
+import re
+import sqlite3
+from typing import cast, Any, Dict, List, Tuple, Optional, Union
+
+log = logging.getLogger(__name__)
+
+# Work around missing datetime.fromisoformat for < python3.7
+SNAP_DB_TS_FORMAT = '%Y-%m-%dT%H:%M:%S'
+try:
+ from backports.datetime_fromisoformat import MonkeyPatch
+ MonkeyPatch.patch_fromisoformat()
+except ImportError:
+ log.debug('backports.datetime_fromisoformat not found')
+
+try:
+ # have mypy ignore this line. We use the attribute error to detect if we
+ # have fromisoformat or not
+ ts_parser = datetime.fromisoformat # type: ignore
+ log.debug('found datetime.fromisoformat')
+except AttributeError:
+ log.info(('Couldn\'t find datetime.fromisoformat, falling back to '
+ f'static timestamp parsing ({SNAP_DB_TS_FORMAT}'))
+
+ def ts_parser(data_string: str) -> datetime: # type: ignore
+ try:
+ date = datetime.strptime(data_string, SNAP_DB_TS_FORMAT)
+ return date
+ except ValueError:
+ msg = f'''The date string {data_string} does not match the required format
+ {SNAP_DB_TS_FORMAT}. For more flexibel date parsing upgrade to
+ python3.7 or install
+ https://github.com/movermeyer/backports.datetime_fromisoformat'''
+ log.error(msg)
+ raise ValueError(msg)
+
+
+def parse_timestamp(ts: str) -> datetime:
+ date = ts_parser(ts)
+ # normalize any non utc timezone to utc. If no tzinfo is supplied, assume
+ # its already utc
+ # import pdb; pdb.set_trace()
+ if date.tzinfo is not timezone.utc and date.tzinfo is not None:
+ date = date.astimezone(timezone.utc)
+ return date
+
+
+def parse_retention(retention: str) -> Dict[str, int]:
+ ret = {}
+ log.debug(f'parse_retention({retention})')
+ matches = re.findall(r'\d+[a-z]', retention)
+ for m in matches:
+ ret[m[-1]] = int(m[0:-1])
+ matches = re.findall(r'\d+[A-Z]', retention)
+ for m in matches:
+ ret[m[-1]] = int(m[0:-1])
+ log.debug(f'parse_retention({retention}) -> {ret}')
+ return ret
+
+
+RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y']
+
+TableRowT = Dict[str, Union[int, str]]
+
+
+def dump_retention(retention: Dict[str, str]) -> str:
+ ret = ''
+ for mult in RETENTION_MULTIPLIERS:
+ if mult in retention:
+ ret += str(retention[mult]) + mult
+ return ret
+
+
+class Schedule(object):
+ '''
+ Wrapper to work with schedules stored in sqlite
+ '''
+ def __init__(self,
+ path: str,
+ schedule: str,
+ fs_name: str,
+ rel_path: str,
+ start: Optional[str] = None,
+ subvol: Optional[str] = None,
+ retention_policy: str = '{}',
+ created: Optional[str] = None,
+ first: Optional[str] = None,
+ last: Optional[str] = None,
+ last_pruned: Optional[str] = None,
+ created_count: int = 0,
+ pruned_count: int = 0,
+ active: bool = True,
+ ) -> None:
+ self.fs = fs_name
+ self.subvol = subvol
+ self.path = path
+ self.rel_path = rel_path
+ self.schedule = schedule
+ self.retention = json.loads(retention_policy)
+ if start is None:
+ now = datetime.now(timezone.utc)
+ self.start = datetime(now.year,
+ now.month,
+ now.day,
+ tzinfo=now.tzinfo)
+ else:
+ self.start = parse_timestamp(start)
+ if created is None:
+ self.created: Optional[datetime] = datetime.now(timezone.utc)
+ else:
+ self.created = parse_timestamp(created)
+ if first:
+ self.first: Optional[datetime] = parse_timestamp(first)
+ else:
+ self.first = None
+ if last:
+ self.last: Optional[datetime] = parse_timestamp(last)
+ else:
+ self.last = None
+ if last_pruned:
+ self.last_pruned: Optional[datetime] = parse_timestamp(last_pruned)
+ else:
+ self.last_pruned = None
+ self.created_count = created_count
+ self.pruned_count = pruned_count
+ self.active = bool(active)
+
+ @classmethod
+ def _from_db_row(cls, table_row: TableRowT, fs: str) -> 'Schedule':
+ return cls(cast(str, table_row['path']),
+ cast(str, table_row['schedule']),
+ fs,
+ cast(str, table_row['rel_path']),
+ cast(str, table_row['start']),
+ cast(str, table_row['subvol']),
+ cast(str, table_row['retention']),
+ cast(str, table_row['created']),
+ cast(str, table_row['first']),
+ cast(str, table_row['last']),
+ cast(str, table_row['last_pruned']),
+ cast(int, table_row['created_count']),
+ cast(int, table_row['pruned_count']),
+ cast(bool, table_row['active']),
+ )
+
+ def __str__(self) -> str:
+ return f'{self.path} {self.schedule} {dump_retention(self.retention)}'
+
+ def json_list(self) -> str:
+ return json.dumps({'path': self.path, 'schedule': self.schedule,
+ 'retention': dump_retention(self.retention)})
+
+ CREATE_TABLES = '''CREATE TABLE IF NOT EXISTS schedules(
+ id INTEGER PRIMARY KEY ASC,
+ path TEXT NOT NULL UNIQUE,
+ subvol TEXT,
+ retention TEXT DEFAULT '{}',
+ rel_path TEXT NOT NULL
+ );
+ CREATE TABLE IF NOT EXISTS schedules_meta(
+ id INTEGER PRIMARY KEY ASC,
+ schedule_id INT,
+ start TEXT NOT NULL,
+ first TEXT,
+ last TEXT,
+ last_pruned TEXT,
+ created TEXT NOT NULL,
+ repeat INT NOT NULL,
+ schedule TEXT NOT NULL,
+ created_count INT DEFAULT 0,
+ pruned_count INT DEFAULT 0,
+ active INT NOT NULL,
+ FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
+ UNIQUE (schedule_id, start, repeat)
+ );'''
+
+ EXEC_QUERY = '''SELECT
+ s.retention,
+ sm.repeat - (strftime("%s", "now") - strftime("%s", sm.start)) %
+ sm.repeat "until",
+ sm.start, sm.repeat, sm.schedule
+ FROM schedules s
+ INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+ WHERE
+ s.path = ? AND
+ strftime("%s", "now") - strftime("%s", sm.start) > 0 AND
+ sm.active = 1
+ ORDER BY until;'''
+
+ PROTO_GET_SCHEDULES = '''SELECT
+ s.path, s.subvol, s.rel_path, sm.active,
+ sm.schedule, s.retention, sm.start, sm.first, sm.last,
+ sm.last_pruned, sm.created, sm.created_count, sm.pruned_count
+ FROM schedules s
+ INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
+ WHERE'''
+
+ GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?'
+
+ @classmethod
+ def get_db_schedules(cls,
+ path: str,
+ db: sqlite3.Connection,
+ fs: str,
+ schedule: Optional[str] = None,
+ start: Optional[str] = None,
+ repeat: Optional[str] = None) -> List['Schedule']:
+ query = cls.GET_SCHEDULES
+ data: Tuple[Any, ...] = (path,)
+ if repeat:
+ query += ' AND sm.repeat = ?'
+ data += (repeat,)
+ if schedule:
+ query += ' AND sm.schedule = ?'
+ data += (schedule,)
+ if start:
+ query += ' AND sm.start = ?'
+ data += (start,)
+ with db:
+ c = db.execute(query, data)
+ return [cls._from_db_row(row, fs) for row in c.fetchall()]
+
+ @classmethod
+ def list_schedules(cls,
+ path: str,
+ db: sqlite3.Connection,
+ fs: str, recursive: bool) -> List['Schedule']:
+ with db:
+ if recursive:
+ c = db.execute(cls.PROTO_GET_SCHEDULES + ' path LIKE ?',
+ (f'{path}%',))
+ else:
+ c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?',
+ (f'{path}',))
+ return [cls._from_db_row(row, fs) for row in c.fetchall()]
+
+ @classmethod
+ def list_all_schedules(cls,
+ db: sqlite3.Connection,
+ fs: str) -> List['Schedule']:
+ with db:
+ c = db.execute(cls.PROTO_GET_SCHEDULES + " path LIKE '%'")
+ return [cls._from_db_row(row, fs) for row in c.fetchall()]
+
+ INSERT_SCHEDULE = '''INSERT INTO
+ schedules(path, subvol, retention, rel_path)
+ Values(?, ?, ?, ?);'''
+ INSERT_SCHEDULE_META = '''INSERT INTO
+ schedules_meta(schedule_id, start, created, repeat, schedule,
+ active)
+ SELECT ?, ?, ?, ?, ?, ?'''
+
+ def store_schedule(self, db: sqlite3.Connection) -> None:
+ sched_id = None
+ with db:
+ try:
+ log.debug(f'schedule with retention {self.retention}')
+ c = db.execute(self.INSERT_SCHEDULE,
+ (self.path,
+ self.subvol,
+ json.dumps(self.retention),
+ self.rel_path,))
+ sched_id = c.lastrowid
+ except sqlite3.IntegrityError:
+ # might be adding another schedule, retrieve sched id
+ log.debug((f'found schedule entry for {self.path}, '
+ 'trying to add meta'))
+ c = db.execute('SELECT id FROM schedules where path = ?',
+ (self.path,))
+ sched_id = c.fetchone()[0]
+ pass
+ assert self.created, "self.created should be set"
+ db.execute(self.INSERT_SCHEDULE_META,
+ (sched_id,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.created.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat,
+ self.schedule,
+ 1))
+
+ @classmethod
+ def rm_schedule(cls,
+ db: sqlite3.Connection,
+ path: str,
+ repeat: Optional[str],
+ start: Optional[str]) -> None:
+ with db:
+ cur = db.execute('SELECT id FROM schedules WHERE path = ?',
+ (path,))
+ row = cur.fetchone()
+
+ if row is None:
+ log.info(f'no schedule for {path} found')
+ raise ValueError('SnapSchedule for {} not found'.format(path))
+
+ id_ = tuple(row)
+
+ if repeat or start:
+ meta_delete = ('DELETE FROM schedules_meta '
+ 'WHERE schedule_id = ?')
+ delete_param = id_
+ if repeat:
+ meta_delete += ' AND schedule = ?'
+ delete_param += (repeat,)
+ if start:
+ meta_delete += ' AND start = ?'
+ delete_param += (start,)
+ # maybe only delete meta entry
+ log.debug(f'executing {meta_delete}, {delete_param}')
+ res = db.execute(meta_delete + ';', delete_param).rowcount
+ if res < 1:
+ raise ValueError(f'No schedule found for {repeat} {start}')
+ db.execute('COMMIT;')
+ # now check if we have schedules in meta left, if not delete
+ # the schedule as well
+ meta_count = db.execute(
+ 'SELECT COUNT() FROM schedules_meta WHERE schedule_id = ?',
+ id_)
+ if meta_count.fetchone() == (0,):
+ log.debug(
+ 'no more schedules left, cleaning up schedules table')
+ db.execute('DELETE FROM schedules WHERE id = ?;', id_)
+ else:
+ # just delete the schedule CASCADE DELETE takes care of the
+ # rest
+ db.execute('DELETE FROM schedules WHERE id = ?;', id_)
+
+ GET_RETENTION = '''SELECT retention FROM schedules
+ WHERE path = ?'''
+ UPDATE_RETENTION = '''UPDATE schedules
+ SET retention = ?
+ WHERE path = ?'''
+
+ @classmethod
+ def add_retention(cls,
+ db: sqlite3.Connection,
+ path: str,
+ retention_spec: str) -> None:
+ with db:
+ row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
+ if row is None:
+ raise ValueError(f'No schedule found for {path}')
+ retention = parse_retention(retention_spec)
+ if not retention:
+ raise ValueError(f'Retention spec {retention_spec} is invalid')
+ log.debug(f'db result is {tuple(row)}')
+ current = row['retention']
+ current_retention = json.loads(current)
+ for r, v in retention.items():
+ if r in current_retention:
+ msg = (f'Retention for {r} is already present with value'
+ f'{current_retention[r]}. Please remove first')
+ raise ValueError(msg)
+ current_retention.update(retention)
+ db.execute(cls.UPDATE_RETENTION,
+ (json.dumps(current_retention), path))
+
+ @classmethod
+ def rm_retention(cls,
+ db: sqlite3.Connection,
+ path: str,
+ retention_spec: str) -> None:
+ with db:
+ row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
+ if row is None:
+ raise ValueError(f'No schedule found for {path}')
+ retention = parse_retention(retention_spec)
+ current = row['retention']
+ current_retention = json.loads(current)
+ for r, v in retention.items():
+ if r not in current_retention or current_retention[r] != v:
+ msg = (f'Retention for {r}: {v} was not set for {path} '
+ 'can\'t remove')
+ raise ValueError(msg)
+ current_retention.pop(r)
+ db.execute(cls.UPDATE_RETENTION,
+ (json.dumps(current_retention), path))
+
+ def report(self) -> str:
+ return self.report_json()
+
+ def report_json(self) -> str:
+ return json.dumps(dict(self.__dict__),
+ default=lambda o: o.strftime(SNAP_DB_TS_FORMAT))
+
+ @classmethod
+ def parse_schedule(cls, schedule: str) -> Tuple[int, str]:
+ return int(schedule[0:-1]), schedule[-1]
+
+ @property
+ def repeat(self) -> int:
+ period, mult = self.parse_schedule(self.schedule)
+ if mult == 'M':
+ return period * 60
+ elif mult == 'h':
+ return period * 60 * 60
+ elif mult == 'd':
+ return period * 60 * 60 * 24
+ elif mult == 'w':
+ return period * 60 * 60 * 24 * 7
+ else:
+ raise ValueError(f'schedule multiplier "{mult}" not recognized')
+
+ UPDATE_LAST = '''UPDATE schedules_meta
+ SET
+ last = ?,
+ created_count = created_count + 1,
+ first = CASE WHEN first IS NULL THEN ? ELSE first END
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = schedules_meta.schedule_id
+ AND s.path = ?
+ AND schedules_meta.start = ?
+ AND schedules_meta.repeat = ?);'''
+
+ def update_last(self, time: datetime, db: sqlite3.Connection) -> None:
+ with db:
+ db.execute(self.UPDATE_LAST,
+ (time.strftime(SNAP_DB_TS_FORMAT),
+ time.strftime(SNAP_DB_TS_FORMAT),
+ self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
+ self.created_count += 1
+ self.last = time
+ if not self.first:
+ self.first = time
+
+ UPDATE_INACTIVE = '''UPDATE schedules_meta
+ SET
+ active = 0
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = schedules_meta.schedule_id
+ AND s.path = ?
+ AND schedules_meta.start = ?
+ AND schedules_meta.repeat = ?);'''
+
+ def set_inactive(self, db: sqlite3.Connection) -> None:
+ with db:
+ log.debug((f'Deactivating schedule ({self.repeat}, '
+ f'{self.start}) on path {self.path}'))
+ db.execute(self.UPDATE_INACTIVE,
+ (self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
+ self.active = False
+
+ UPDATE_ACTIVE = '''UPDATE schedules_meta
+ SET
+ active = 1
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = schedules_meta.schedule_id
+ AND s.path = ?
+ AND schedules_meta.start = ?
+ AND schedules_meta.repeat = ?);'''
+
+ def set_active(self, db: sqlite3.Connection) -> None:
+ with db:
+ log.debug(f'Activating schedule ({self.repeat}, {self.start}) '
+ f'on path {self.path}')
+ db.execute(self.UPDATE_ACTIVE,
+ (self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
+ self.active = True
+
+ UPDATE_PRUNED = '''UPDATE schedules_meta
+ SET
+ last_pruned = ?,
+ pruned_count = pruned_count + ?
+ WHERE EXISTS(
+ SELECT id
+ FROM schedules s
+ WHERE s.id = schedules_meta.schedule_id
+ AND s.path = ?
+ AND schedules_meta.start = ?
+ AND schedules_meta.repeat = ?);'''
+
+ def update_pruned(self,
+ time: datetime,
+ db: sqlite3.Connection,
+ pruned: int) -> None:
+ with db:
+ db.execute(self.UPDATE_PRUNED,
+ (time.strftime(SNAP_DB_TS_FORMAT), pruned,
+ self.path,
+ self.start.strftime(SNAP_DB_TS_FORMAT),
+ self.repeat))
+ self.pruned_count += pruned
+ self.last_pruned = time
diff --git a/src/pybind/mgr/snap_schedule/fs/schedule_client.py b/src/pybind/mgr/snap_schedule/fs/schedule_client.py
new file mode 100644
index 000000000..28d54639a
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/fs/schedule_client.py
@@ -0,0 +1,444 @@
+"""
+Copyright (C) 2020 SUSE
+
+LGPL2.1. See file COPYING.
+"""
+import cephfs
+import rados
+from contextlib import contextmanager
+from mgr_util import CephfsClient, open_filesystem
+from collections import OrderedDict
+from datetime import datetime, timezone
+import logging
+from threading import Timer, Lock
+from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
+ Tuple, TypeVar, Union, Type
+from types import TracebackType
+import sqlite3
+from .schedule import Schedule
+import traceback
+
+
+SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
+SNAP_DB_PREFIX = 'snap_db'
+# increment this every time the db schema changes and provide upgrade code
+SNAP_DB_VERSION = '0'
+SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
+# scheduled snapshots are tz suffixed
+SNAPSHOT_TS_FORMAT_TZ = '%Y-%m-%d-%H_%M_%S_%Z'
+# for backward compat snapshot name parsing
+SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
+# length of timestamp format (without tz suffix)
+# e.g.: scheduled-2022-04-19-05_39_00_UTC (len = "2022-04-19-05_39_00")
+SNAPSHOT_TS_FORMAT_LEN = 19
+SNAPSHOT_PREFIX = 'scheduled'
+
+log = logging.getLogger(__name__)
+
+
+CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
+
+
+@contextmanager
+def open_ioctx(self: CephfsClientT,
+ pool: Union[int, str]) -> Iterator[rados.Ioctx]:
+ try:
+ if type(pool) is int:
+ with self.mgr.rados.open_ioctx2(pool) as ioctx:
+ ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+ yield ioctx
+ else:
+ with self.mgr.rados.open_ioctx(pool) as ioctx:
+ ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+ yield ioctx
+ except rados.ObjectNotFound:
+ log.error("Failed to locate pool {}".format(pool))
+ raise
+
+
+FuncT = TypeVar('FuncT', bound=Callable[..., None])
+
+
+def updates_schedule_db(func: FuncT) -> FuncT:
+ def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
+ ret = func(self, fs, schedule_or_path, *args)
+ path = schedule_or_path
+ if isinstance(schedule_or_path, Schedule):
+ path = schedule_or_path.path
+ self.refresh_snap_timers(fs, path)
+ return ret
+ return cast(FuncT, f)
+
+
+def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
+ retention: Dict[str, int],
+ max_snaps_to_retain: int) -> Set:
+ PRUNING_PATTERNS = OrderedDict([
+ # n is for keep last n snapshots, uses the snapshot name timestamp
+ # format for lowest granularity
+ # NOTE: prune set has tz suffix stripped out.
+ ("n", SNAPSHOT_TS_FORMAT),
+ # TODO remove M for release
+ ("M", '%Y-%m-%d-%H_%M'),
+ ("h", '%Y-%m-%d-%H'),
+ ("d", '%Y-%m-%d'),
+ ("w", '%G-%V'),
+ ("m", '%Y-%m'),
+ ("y", '%Y'),
+ ])
+ keep = []
+ if not retention:
+ log.info(f'no retention set, assuming n: {max_snaps_to_retain}')
+ retention = {'n': max_snaps_to_retain}
+ for period, date_pattern in PRUNING_PATTERNS.items():
+ log.debug(f'compiling keep set for period {period}')
+ period_count = retention.get(period, 0)
+ if not period_count:
+ continue
+ last = None
+ kept_for_this_period = 0
+ for snap in sorted(candidates, key=lambda x: x[0].d_name,
+ reverse=True):
+ snap_ts = snap[1].strftime(date_pattern)
+ if snap_ts != last:
+ last = snap_ts
+ if snap not in keep:
+ log.debug((f'keeping {snap[0].d_name} due to '
+ f'{period_count}{period}'))
+ keep.append(snap)
+ kept_for_this_period += 1
+ if kept_for_this_period == period_count:
+ log.debug(('found enough snapshots for '
+ f'{period_count}{period}'))
+ break
+ if len(keep) > max_snaps_to_retain:
+ log.info(f'Pruning keep set; would retain first {max_snaps_to_retain}'
+ f' out of {len(keep)} snaps')
+ keep = keep[:max_snaps_to_retain]
+ return candidates - set(keep)
+
+def snap_name_to_timestamp(scheduled_snap_name: str) -> str:
+ """ extract timestamp from a schedule snapshot with tz suffix stripped out """
+ ts = scheduled_snap_name.lstrip(f'{SNAPSHOT_PREFIX}-')
+ return ts[0:SNAPSHOT_TS_FORMAT_LEN]
+
+class DBInfo():
+ def __init__(self, fs: str, db: sqlite3.Connection):
+ self.fs: str = fs
+ self.lock: Lock = Lock()
+ self.db: sqlite3.Connection = db
+
+
+# context manager for serializing db connection usage
+class DBConnectionManager():
+ def __init__(self, info: DBInfo):
+ self.dbinfo: DBInfo = info
+
+ # using string as return type hint since __future__.annotations is not
+ # available with Python 3.6; its avaialbe starting from Pytohn 3.7
+ def __enter__(self) -> 'DBConnectionManager':
+ log.debug(f'locking db connection for {self.dbinfo.fs}')
+ self.dbinfo.lock.acquire()
+ log.debug(f'locked db connection for {self.dbinfo.fs}')
+ return self
+
+ def __exit__(self,
+ exception_type: Optional[Type[BaseException]],
+ exception_value: Optional[BaseException],
+ traceback: Optional[TracebackType]) -> None:
+ log.debug(f'unlocking db connection for {self.dbinfo.fs}')
+ self.dbinfo.lock.release()
+ log.debug(f'unlocked db connection for {self.dbinfo.fs}')
+
+
+class SnapSchedClient(CephfsClient):
+
+ def __init__(self, mgr: Any) -> None:
+ super(SnapSchedClient, self).__init__(mgr)
+ # Each db connection is now guarded by a Lock; this is required to
+ # avoid concurrent DB transactions when more than one paths in a
+ # file-system are scheduled at the same interval eg. 1h; without the
+ # lock, there are races to use the same connection, causing nested
+ # transactions to be aborted
+ self.sqlite_connections: Dict[str, DBInfo] = {}
+ self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
+ self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections
+
+ # restart old schedules
+ for fs_name in self.get_all_filesystems():
+ with self.get_schedule_db(fs_name) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ sched_list = Schedule.list_all_schedules(db, fs_name)
+ for sched in sched_list:
+ self.refresh_snap_timers(fs_name, sched.path, db)
+
+ @property
+ def allow_minute_snaps(self) -> None:
+ return self.mgr.get_module_option('allow_m_granularity')
+
+ @property
+ def dump_on_update(self) -> None:
+ return self.mgr.get_module_option('dump_on_update')
+
+ def get_schedule_db(self, fs: str) -> DBConnectionManager:
+ dbinfo = None
+ self.conn_lock.acquire()
+ if fs not in self.sqlite_connections:
+ poolid = self.get_metadata_pool(fs)
+ assert poolid, f'fs "{fs}" not found'
+ uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
+ log.debug(f"using uri {uri}")
+ db = sqlite3.connect(uri, check_same_thread=False, uri=True)
+ db.execute('PRAGMA FOREIGN_KEYS = 1')
+ db.execute('PRAGMA JOURNAL_MODE = PERSIST')
+ db.execute('PRAGMA PAGE_SIZE = 65536')
+ db.execute('PRAGMA CACHE_SIZE = 256')
+ db.execute('PRAGMA TEMP_STORE = memory')
+ db.row_factory = sqlite3.Row
+ # check for legacy dump store
+ pool_param = cast(Union[int, str], poolid)
+ with open_ioctx(self, pool_param) as ioctx:
+ try:
+ size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
+ dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
+ db.executescript(dump)
+ ioctx.remove_object(SNAP_DB_OBJECT_NAME)
+ except rados.ObjectNotFound:
+ log.debug(f'No legacy schedule DB found in {fs}')
+ db.executescript(Schedule.CREATE_TABLES)
+ self.sqlite_connections[fs] = DBInfo(fs, db)
+ dbinfo = self.sqlite_connections[fs]
+ self.conn_lock.release()
+ return DBConnectionManager(dbinfo)
+
+ def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
+ if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
+ if self.allow_minute_snaps:
+ log.debug(('Minute repeats allowed, '
+ f'scheduling snapshot on path {path}'))
+ return True
+ else:
+ log.info(('Minute repeats disabled, '
+ f'skipping snapshot on path {path}'))
+ return False
+ else:
+ return True
+
+ def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
+ with db:
+ if self.dump_on_update:
+ dump = [line for line in db.iterdump()]
+ dump = "\n".join(dump)
+ log.debug(f"db dump:\n{dump}")
+ cur = db.execute(Schedule.EXEC_QUERY, (path,))
+ all_rows = cur.fetchall()
+ rows = [r for r in all_rows
+ if self._is_allowed_repeat(r, path)][0:1]
+ return rows
+
+ def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
+ try:
+ log.debug((f'SnapDB on {fs} changed for {path}, '
+ 'updating next Timer'))
+ rows = []
+ # olddb is passed in the case where we land here without a timer
+ # the lock on the db connection has already been taken
+ if olddb:
+ rows = self.fetch_schedules(olddb, path)
+ else:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ rows = self.fetch_schedules(db, path)
+ timers = self.active_timers.get((fs, path), [])
+ for timer in timers:
+ timer.cancel()
+ timers = []
+ for row in rows:
+ log.debug(f'Creating new snapshot timer for {path}')
+ t = Timer(row[1],
+ self.create_scheduled_snapshot,
+ args=[fs, path, row[0], row[2], row[3]])
+ t.start()
+ timers.append(t)
+ log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s')
+ self.active_timers[(fs, path)] = timers
+ except Exception:
+ self._log_exception('refresh_snap_timers')
+
+ def _log_exception(self, fct: str) -> None:
+ log.error(f'{fct} raised an exception:')
+ log.error(traceback.format_exc())
+
+ def create_scheduled_snapshot(self,
+ fs_name: str,
+ path: str,
+ retention: str,
+ start: str,
+ repeat: str) -> None:
+ log.debug(f'Scheduled snapshot of {path} triggered')
+ set_schedule_to_inactive = False
+ try:
+ with self.get_schedule_db(fs_name) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ try:
+ sched = Schedule.get_db_schedules(path,
+ db,
+ fs_name,
+ repeat=repeat,
+ start=start)[0]
+ time = datetime.now(timezone.utc)
+ with open_filesystem(self, fs_name) as fs_handle:
+ snap_ts = time.strftime(SNAPSHOT_TS_FORMAT_TZ)
+ snap_dir = self.mgr.rados.conf_get('client_snapdir')
+ snap_name = f'{path}/{snap_dir}/{SNAPSHOT_PREFIX}-{snap_ts}'
+ fs_handle.mkdir(snap_name, 0o755)
+ log.info(f'created scheduled snapshot of {path}')
+ log.debug(f'created scheduled snapshot {snap_name}')
+ sched.update_last(time, db)
+ except cephfs.ObjectNotFound:
+ # maybe path is missing or wrong
+ self._log_exception('create_scheduled_snapshot')
+ log.debug(f'path {path} is probably missing or wrong; '
+ 'remember to strip off the mount point path '
+ 'prefix to provide the correct path')
+ set_schedule_to_inactive = True
+ except cephfs.Error:
+ self._log_exception('create_scheduled_snapshot')
+ except Exception:
+ # catch all exceptions cause otherwise we'll never know since this
+ # is running in a thread
+ self._log_exception('create_scheduled_snapshot')
+ finally:
+ if set_schedule_to_inactive:
+ sched.set_inactive(db)
+ finally:
+ with self.get_schedule_db(fs_name) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ self.refresh_snap_timers(fs_name, path, db)
+ self.prune_snapshots(sched)
+
+ def prune_snapshots(self, sched: Schedule) -> None:
+ try:
+ log.debug('Pruning snapshots')
+ ret = sched.retention
+ path = sched.path
+ prune_candidates = set()
+ time = datetime.now(timezone.utc)
+ mds_max_snaps_per_dir = self.mgr.get_ceph_option('mds_max_snaps_per_dir')
+ with open_filesystem(self, sched.fs) as fs_handle:
+ snap_dir = self.mgr.rados.conf_get('client_snapdir')
+ with fs_handle.opendir(f'{path}/{snap_dir}') as d_handle:
+ dir_ = fs_handle.readdir(d_handle)
+ while dir_:
+ if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'):
+ log.debug(f'add {dir_.d_name} to pruning')
+ ts = datetime.strptime(
+ snap_name_to_timestamp(dir_.d_name.decode('utf-8')), SNAPSHOT_TS_FORMAT)
+ prune_candidates.add((dir_, ts))
+ else:
+ log.debug(f'skipping dir entry {dir_.d_name}')
+ dir_ = fs_handle.readdir(d_handle)
+ # Limit ourselves to one snapshot less than allowed by config to allow for
+ # snapshot creation before pruning
+ to_prune = get_prune_set(prune_candidates, ret, mds_max_snaps_per_dir - 1)
+ for k in to_prune:
+ dirname = k[0].d_name.decode('utf-8')
+ log.debug(f'rmdir on {dirname}')
+ fs_handle.rmdir(f'{path}/{snap_dir}/{dirname}')
+ if to_prune:
+ with self.get_schedule_db(sched.fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ sched.update_pruned(time, db, len(to_prune))
+ except Exception:
+ self._log_exception('prune_snapshots')
+
+ def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ return Schedule.get_db_schedules(path, db, fs)
+
+ def list_snap_schedules(self,
+ fs: str,
+ path: str,
+ recursive: bool) -> List[Schedule]:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ return Schedule.list_schedules(path, db, fs, recursive)
+
+ @updates_schedule_db
+ # TODO improve interface
+ def store_snap_schedule(self,
+ fs: str, path_: str,
+ args: Tuple[str, str, str, str,
+ Optional[str], Optional[str]]) -> None:
+ sched = Schedule(*args)
+ log.debug(f'repeat is {sched.repeat}')
+ if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
+ log.error('not allowed')
+ raise ValueError('no minute snaps allowed')
+ log.debug(f'attempting to add schedule {sched}')
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ sched.store_schedule(db)
+
+ @updates_schedule_db
+ def rm_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.rm_schedule(db, path, schedule, start)
+
+ @updates_schedule_db
+ def add_retention_spec(self,
+ fs: str,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str]) -> None:
+ retention_spec = retention_spec_or_period
+ if retention_count:
+ retention_spec = retention_count + retention_spec
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.add_retention(db, path, retention_spec)
+
+ @updates_schedule_db
+ def rm_retention_spec(self,
+ fs: str,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str]) -> None:
+ retention_spec = retention_spec_or_period
+ if retention_count:
+ retention_spec = retention_count + retention_spec
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.rm_retention(db, path, retention_spec)
+
+ @updates_schedule_db
+ def activate_snap_schedule(self,
+ fs: str,
+ path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ schedules = Schedule.get_db_schedules(path, db, fs,
+ schedule=schedule,
+ start=start)
+ for s in schedules:
+ s.set_active(db)
+
+ @updates_schedule_db
+ def deactivate_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ schedules = Schedule.get_db_schedules(path, db, fs,
+ schedule=schedule,
+ start=start)
+ for s in schedules:
+ s.set_inactive(db)
diff --git a/src/pybind/mgr/snap_schedule/module.py b/src/pybind/mgr/snap_schedule/module.py
new file mode 100644
index 000000000..b691572b6
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/module.py
@@ -0,0 +1,258 @@
+"""
+Copyright (C) 2019 SUSE
+
+LGPL2.1. See file COPYING.
+"""
+import errno
+import json
+import sqlite3
+from typing import Any, Dict, Optional, Tuple
+from .fs.schedule_client import SnapSchedClient
+from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option
+from mgr_util import CephfsConnectionException
+from threading import Event
+
+
+class Module(MgrModule):
+ MODULE_OPTIONS = [
+ Option(
+ 'allow_m_granularity',
+ type='bool',
+ default=False,
+ desc='allow minute scheduled snapshots',
+ runtime=True,
+ ),
+ Option(
+ 'dump_on_update',
+ type='bool',
+ default=False,
+ desc='dump database to debug log on update',
+ runtime=True,
+ ),
+
+ ]
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ super(Module, self).__init__(*args, **kwargs)
+ self._initialized = Event()
+ self.client = SnapSchedClient(self)
+
+ @property
+ def _default_fs(self) -> Tuple[int, str, str]:
+ fs_map = self.get('fs_map')
+ if len(fs_map['filesystems']) > 1:
+ return -errno.EINVAL, '', "filesystem argument is required when there is more than one file system"
+ elif len(fs_map['filesystems']) == 1:
+ return 0, fs_map['filesystems'][0]['mdsmap']['fs_name'], "Success"
+ else:
+ self.log.error('No filesystem instance could be found.')
+ return -errno.ENOENT, "", "no filesystem found"
+
+ def _validate_fs(self, fs: Optional[str]) -> Tuple[int, str, str]:
+ if not fs:
+ rc, fs, err = self._default_fs
+ if rc < 0:
+ return rc, fs, err
+ if not self.has_fs(fs):
+ return -errno.EINVAL, '', f"no such file system: {fs}"
+ return 0, fs, 'Success'
+
+ def has_fs(self, fs_name: str) -> bool:
+ return fs_name in self.client.get_all_filesystems()
+
+ def serve(self) -> None:
+ self._initialized.set()
+
+ def handle_command(self, inbuf: str, cmd: Dict[str, str]) -> Tuple[int, str, str]:
+ self._initialized.wait()
+ return -errno.EINVAL, "", "Unknown command"
+
+ @CLIReadCommand('fs snap-schedule status')
+ def snap_schedule_get(self,
+ path: str = '/',
+ fs: Optional[str] = None,
+ format: Optional[str] = 'plain') -> Tuple[int, str, str]:
+ '''
+ List current snapshot schedules
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ ret_scheds = self.client.get_snap_schedules(fs, path)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ if format == 'json':
+ json_report = ','.join([ret_sched.report_json() for ret_sched in ret_scheds])
+ return 0, f'[{json_report}]', ''
+ return 0, '\n===\n'.join([ret_sched.report() for ret_sched in ret_scheds]), ''
+
+ @CLIReadCommand('fs snap-schedule list')
+ def snap_schedule_list(self, path: str,
+ recursive: bool = False,
+ fs: Optional[str] = None,
+ format: Optional[str] = 'plain') -> Tuple[int, str, str]:
+ '''
+ Get current snapshot schedule for <path>
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ scheds = self.client.list_snap_schedules(fs, path, recursive)
+ self.log.debug(f'recursive is {recursive}')
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ if not scheds:
+ if format == 'json':
+ output: Dict[str, str] = {}
+ return 0, json.dumps(output), ''
+ return -errno.ENOENT, '', f'SnapSchedule for {path} not found'
+ if format == 'json':
+ # json_list = ','.join([sched.json_list() for sched in scheds])
+ schedule_list = [sched.schedule for sched in scheds]
+ retention_list = [sched.retention for sched in scheds]
+ out = {'path': path, 'schedule': schedule_list, 'retention': retention_list}
+ return 0, json.dumps(out), ''
+ return 0, '\n'.join([str(sched) for sched in scheds]), ''
+
+ @CLIWriteCommand('fs snap-schedule add')
+ def snap_schedule_add(self,
+ path: str,
+ snap_schedule: str,
+ start: Optional[str] = None,
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Set a snapshot schedule for <path>
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ abs_path = path
+ subvol = None
+ self.client.store_snap_schedule(fs,
+ abs_path,
+ (abs_path, snap_schedule,
+ fs, path, start, subvol))
+ suc_msg = f'Schedule set for path {path}'
+ except sqlite3.IntegrityError:
+ existing_scheds = self.client.get_snap_schedules(fs, path)
+ report = [s.report() for s in existing_scheds]
+ error_msg = f'Found existing schedule {report}'
+ self.log.error(error_msg)
+ return -errno.EEXIST, '', error_msg
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ return 0, suc_msg, ''
+
+ @CLIWriteCommand('fs snap-schedule remove')
+ def snap_schedule_rm(self,
+ path: str,
+ repeat: Optional[str] = None,
+ start: Optional[str] = None,
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Remove a snapshot schedule for <path>
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ abs_path = path
+ self.client.rm_snap_schedule(fs, abs_path, repeat, start)
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ return 0, 'Schedule removed for path {}'.format(path), ''
+
+ @CLIWriteCommand('fs snap-schedule retention add')
+ def snap_schedule_retention_add(self,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str] = None,
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Set a retention specification for <path>
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ abs_path = path
+ self.client.add_retention_spec(fs, abs_path,
+ retention_spec_or_period,
+ retention_count)
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ return 0, 'Retention added to path {}'.format(path), ''
+
+ @CLIWriteCommand('fs snap-schedule retention remove')
+ def snap_schedule_retention_rm(self,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str] = None,
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Remove a retention specification for <path>
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ abs_path = path
+ self.client.rm_retention_spec(fs, abs_path,
+ retention_spec_or_period,
+ retention_count)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ return 0, 'Retention removed from path {}'.format(path), ''
+
+ @CLIWriteCommand('fs snap-schedule activate')
+ def snap_schedule_activate(self,
+ path: str,
+ repeat: Optional[str] = None,
+ start: Optional[str] = None,
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Activate a snapshot schedule for <path>
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ abs_path = path
+ self.client.activate_snap_schedule(fs, abs_path, repeat, start)
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ return 0, 'Schedule activated for path {}'.format(path), ''
+
+ @CLIWriteCommand('fs snap-schedule deactivate')
+ def snap_schedule_deactivate(self,
+ path: str,
+ repeat: Optional[str] = None,
+ start: Optional[str] = None,
+ fs: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Deactivate a snapshot schedule for <path>
+ '''
+ rc, fs, err = self._validate_fs(fs)
+ if rc < 0:
+ return rc, fs, err
+ try:
+ abs_path = path
+ self.client.deactivate_snap_schedule(fs, abs_path, repeat, start)
+ except ValueError as e:
+ return -errno.ENOENT, '', str(e)
+ except CephfsConnectionException as e:
+ return e.to_tuple()
+ return 0, 'Schedule deactivated for path {}'.format(path), ''
diff --git a/src/pybind/mgr/snap_schedule/requirements.txt b/src/pybind/mgr/snap_schedule/requirements.txt
new file mode 100644
index 000000000..e079f8a60
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/requirements.txt
@@ -0,0 +1 @@
+pytest
diff --git a/src/pybind/mgr/snap_schedule/tests/__init__.py b/src/pybind/mgr/snap_schedule/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/tests/__init__.py
diff --git a/src/pybind/mgr/snap_schedule/tests/conftest.py b/src/pybind/mgr/snap_schedule/tests/conftest.py
new file mode 100644
index 000000000..35255b8d4
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/tests/conftest.py
@@ -0,0 +1,34 @@
+import pytest
+import sqlite3
+from ..fs.schedule import Schedule
+
+
+# simple_schedule fixture returns schedules without any timing arguments
+# the tuple values correspong to ctor args for Schedule
+_simple_schedules = [
+ ('/foo', '6h', 'fs_name', '/foo'),
+ ('/foo', '24h', 'fs_name', '/foo'),
+ ('/bar', '1d', 'fs_name', '/bar'),
+ ('/fnord', '1w', 'fs_name', '/fnord'),
+]
+
+
+@pytest.fixture(params=_simple_schedules)
+def simple_schedule(request):
+ return Schedule(*request.param)
+
+
+@pytest.fixture
+def simple_schedules():
+ return [Schedule(*s) for s in _simple_schedules]
+
+
+@pytest.fixture
+def db():
+ db = sqlite3.connect(':memory:',
+ check_same_thread=False)
+ with db:
+ db.row_factory = sqlite3.Row
+ db.execute("PRAGMA FOREIGN_KEYS = 1")
+ db.executescript(Schedule.CREATE_TABLES)
+ return db
diff --git a/src/pybind/mgr/snap_schedule/tests/fs/__init__.py b/src/pybind/mgr/snap_schedule/tests/fs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/tests/fs/__init__.py
diff --git a/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py
new file mode 100644
index 000000000..1e984ab64
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py
@@ -0,0 +1,256 @@
+import datetime
+import json
+import pytest
+import random
+import sqlite3
+from ...fs.schedule import Schedule, parse_retention
+
+SELECT_ALL = ('select * from schedules s'
+ ' INNER JOIN schedules_meta sm'
+ ' ON sm.schedule_id = s.id')
+
+
+def assert_updated(new, old, update_expected={}):
+ '''
+ This helper asserts that an object new has been updated in the
+ attributes in the dict updated AND has not changed in other attributes
+ compared to old.
+ if update expected is the empty dict, equality is checked
+ '''
+
+ for var in vars(new):
+ if var in update_expected:
+ expected_val = update_expected.get(var)
+ new_val = getattr(new, var)
+ if isinstance(expected_val, datetime.datetime):
+ assert new_val.year == expected_val.year
+ assert new_val.month == expected_val.month
+ assert new_val.day == expected_val.day
+ assert new_val.hour == expected_val.hour
+ assert new_val.minute == expected_val.minute
+ assert new_val.second == expected_val.second
+ else:
+ assert new_val == expected_val, f'new did not update value for {var}'
+ else:
+ expected_val = getattr(old, var)
+ new_val = getattr(new, var)
+ if isinstance(expected_val, datetime.datetime):
+ assert new_val.year == expected_val.year
+ assert new_val.month == expected_val.month
+ assert new_val.day == expected_val.day
+ assert new_val.hour == expected_val.hour
+ assert new_val.minute == expected_val.minute
+ assert new_val.second == expected_val.second
+ else:
+ assert new_val == expected_val, f'new changed unexpectedly in value for {var}'
+
+
+class TestSchedule(object):
+ '''
+ Test the schedule class basics and that its methods update self as expected
+ '''
+
+ def test_start_default_midnight(self, simple_schedule):
+ now = datetime.datetime.now(datetime.timezone.utc)
+ assert simple_schedule.start.second == 0
+ assert simple_schedule.start.minute == 0
+ assert simple_schedule.start.hour == 0
+ assert simple_schedule.start.day == now.day
+ assert simple_schedule.start.month == now.month
+ assert simple_schedule.start.year == now.year
+ assert simple_schedule.start.tzinfo == now.tzinfo
+
+ def test_created_now(self, simple_schedule):
+ now = datetime.datetime.now(datetime.timezone.utc)
+ assert simple_schedule.created.minute == now.minute
+ assert simple_schedule.created.hour == now.hour
+ assert simple_schedule.created.day == now.day
+ assert simple_schedule.created.month == now.month
+ assert simple_schedule.created.year == now.year
+ assert simple_schedule.created.tzinfo == now.tzinfo
+
+ def test_repeat_valid(self, simple_schedule):
+ repeat = simple_schedule.repeat
+ assert isinstance(repeat, int)
+
+ def test_store_single(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+ row = ()
+ with db:
+ row = db.execute(SELECT_ALL).fetchone()
+
+ db_schedule = Schedule._from_db_row(row, simple_schedule.fs)
+
+ assert_updated(db_schedule, simple_schedule)
+
+ def test_store_multiple(self, db, simple_schedules):
+ [s.store_schedule(db) for s in simple_schedules]
+
+ rows = []
+ with db:
+ rows = db.execute(SELECT_ALL).fetchall()
+
+ assert len(rows) == len(simple_schedules)
+
+ def test_update_last(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ _ = db.execute(SELECT_ALL).fetchone()
+
+ first_time = datetime.datetime.now(datetime.timezone.utc)
+ simple_schedule.update_last(first_time, db)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ simple_schedule)
+
+ second_time = datetime.datetime.now(datetime.timezone.utc)
+ simple_schedule.update_last(second_time, db)
+
+ with db:
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ simple_schedule)
+
+ def test_set_inactive_active(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ _ = db.execute(SELECT_ALL).fetchone()
+
+ simple_schedule.set_inactive(db)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ simple_schedule)
+
+ simple_schedule.set_active(db)
+
+ with db:
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ simple_schedule)
+
+ def test_update_pruned(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ _ = db.execute(SELECT_ALL).fetchone()
+
+ now = datetime.datetime.now(datetime.timezone.utc)
+ pruned_count = random.randint(1, 1000)
+
+ simple_schedule.update_pruned(now, db, pruned_count)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ simple_schedule)
+
+ # TODO test get_schedules and list_schedules
+
+
+class TestScheduleDB(object):
+ '''
+ This class tests that Schedules methods update the DB correctly
+ '''
+
+ def test_update_last(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
+
+ first_time = datetime.datetime.now(datetime.timezone.utc)
+ simple_schedule.update_last(first_time, db)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ Schedule._from_db_row(before, simple_schedule.fs),
+ {'created_count': 1,
+ 'last': first_time,
+ 'first': first_time})
+
+ second_time = datetime.datetime.now(datetime.timezone.utc)
+ simple_schedule.update_last(second_time, db)
+
+ with db:
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ Schedule._from_db_row(after, simple_schedule.fs),
+ {'created_count': 2, 'last': second_time})
+
+ def test_set_inactive_active(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
+
+ simple_schedule.set_inactive(db)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ Schedule._from_db_row(before, simple_schedule.fs),
+ {'active': 0})
+
+ simple_schedule.set_active(db)
+
+ with db:
+ after2 = db.execute(SELECT_ALL).fetchone()
+ assert_updated(Schedule._from_db_row(after2, simple_schedule.fs),
+ Schedule._from_db_row(after, simple_schedule.fs),
+ {'active': 1})
+
+ def test_update_pruned(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
+
+ now = datetime.datetime.now(datetime.timezone.utc)
+ pruned_count = random.randint(1, 1000)
+
+ simple_schedule.update_pruned(now, db, pruned_count)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert_updated(Schedule._from_db_row(after, simple_schedule.fs),
+ Schedule._from_db_row(before, simple_schedule.fs),
+ {'last_pruned': now, 'pruned_count': pruned_count})
+
+ def test_add_retention(self, db, simple_schedule):
+ simple_schedule.store_schedule(db)
+
+ with db:
+ before = db.execute(SELECT_ALL).fetchone()
+
+ retention = "7d12m"
+ simple_schedule.add_retention(db, simple_schedule.path, retention)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert after['retention'] == json.dumps(parse_retention(retention))
+
+ retention2 = "4w"
+ simple_schedule.add_retention(db, simple_schedule.path, retention2)
+
+ with db:
+ after = db.execute(SELECT_ALL).fetchone()
+
+ assert after['retention'] == json.dumps(parse_retention(retention + retention2))
+
+ def test_per_path_and_repeat_uniqness(self, db):
+ s1 = Schedule(*('/foo', '24h', 'fs_name', '/foo'))
+ s2 = Schedule(*('/foo', '1d', 'fs_name', '/foo'))
+
+ s1.store_schedule(db)
+ with pytest.raises(sqlite3.IntegrityError):
+ s2.store_schedule(db)
diff --git a/src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py
new file mode 100644
index 000000000..177e8cd9f
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py
@@ -0,0 +1,37 @@
+from datetime import datetime, timedelta
+from unittest.mock import MagicMock
+import pytest
+from ...fs.schedule_client import get_prune_set, SNAPSHOT_TS_FORMAT
+
+
+class TestScheduleClient(object):
+
+ def test_get_prune_set_empty_retention_no_prune(self):
+ now = datetime.now()
+ candidates = set()
+ for i in range(10):
+ ts = now - timedelta(minutes=i*5)
+ fake_dir = MagicMock()
+ fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}'
+ candidates.add((fake_dir, ts))
+ ret = {}
+ prune_set = get_prune_set(candidates, ret, 99)
+ assert prune_set == set(), 'candidates are pruned despite empty retention'
+
+ def test_get_prune_set_two_retention_specs(self):
+ now = datetime.now()
+ candidates = set()
+ for i in range(10):
+ ts = now - timedelta(hours=i*1)
+ fake_dir = MagicMock()
+ fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}'
+ candidates.add((fake_dir, ts))
+ for i in range(10):
+ ts = now - timedelta(days=i*1)
+ fake_dir = MagicMock()
+ fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}'
+ candidates.add((fake_dir, ts))
+ # should keep 8 snapshots
+ ret = {'h': 6, 'd': 2}
+ prune_set = get_prune_set(candidates, ret, 99)
+ assert len(prune_set) == len(candidates) - 8, 'wrong size of prune set'
diff --git a/src/pybind/mgr/snap_schedule/tox.ini b/src/pybind/mgr/snap_schedule/tox.ini
new file mode 100644
index 000000000..fbf894b06
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/tox.ini
@@ -0,0 +1,19 @@
+[tox]
+envlist = py36, py3
+skipsdist = true
+; toxworkdir = {env:CEPH_BUILD_DIR}/snap-schedule
+; minversion = 2.8.1
+
+[testenv]
+setenv=
+ LD_LIBRARY_PATH = {toxinidir}/../../../../build/lib
+ PATH = {toxinidir}/../../../../build/bin:$PATH
+ py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2:..
+ py3: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3:{toxinidir}
+ SNAP_SCHED_UNITTEST = true
+deps =
+ pytest
+ mock
+ py27: pathlib
+commands=
+ pytest {posargs}