diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/snap_schedule/.gitignore | 1 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/__init__.py | 11 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/fs/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/fs/schedule.py | 468 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/fs/schedule_client.py | 421 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/module.py | 245 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/requirements.txt | 1 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/tests/conftest.py | 34 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/tests/fs/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py | 256 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py | 37 | ||||
-rw-r--r-- | src/pybind/mgr/snap_schedule/tox.ini | 19 |
13 files changed, 1493 insertions, 0 deletions
diff --git a/src/pybind/mgr/snap_schedule/.gitignore b/src/pybind/mgr/snap_schedule/.gitignore new file mode 100644 index 000000000..172bf5786 --- /dev/null +++ b/src/pybind/mgr/snap_schedule/.gitignore @@ -0,0 +1 @@ +.tox diff --git a/src/pybind/mgr/snap_schedule/__init__.py b/src/pybind/mgr/snap_schedule/__init__.py new file mode 100644 index 000000000..8001b7184 --- /dev/null +++ b/src/pybind/mgr/snap_schedule/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +from os import environ + +if 'SNAP_SCHED_UNITTEST' in environ: + import tests +elif 'UNITTEST' in environ: + import tests + from .module import Module +else: + from .module import Module diff --git a/src/pybind/mgr/snap_schedule/fs/__init__.py b/src/pybind/mgr/snap_schedule/fs/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/snap_schedule/fs/__init__.py diff --git a/src/pybind/mgr/snap_schedule/fs/schedule.py b/src/pybind/mgr/snap_schedule/fs/schedule.py new file mode 100644 index 000000000..773a0694a --- /dev/null +++ b/src/pybind/mgr/snap_schedule/fs/schedule.py @@ -0,0 +1,468 @@ +""" +Copyright (C) 2020 SUSE + +LGPL2.1. See file COPYING. +""" +from datetime import datetime, timezone +import json +import logging +import re +import sqlite3 +from typing import Tuple, Any, List + +log = logging.getLogger(__name__) + +# Work around missing datetime.fromisoformat for < python3.7 +SNAP_DB_TS_FORMAT = '%Y-%m-%dT%H:%M:%S' +try: + from backports.datetime_fromisoformat import MonkeyPatch + MonkeyPatch.patch_fromisoformat() +except ImportError: + log.debug('backports.datetime_fromisoformat not found') + +try: + # have mypy ignore this line. We use the attribute error to detect if we + # have fromisoformat or not + ts_parser = datetime.fromisoformat # type: ignore + log.debug('found datetime.fromisoformat') +except AttributeError: + log.info(('Couldn\'t find datetime.fromisoformat, falling back to ' + f'static timestamp parsing ({SNAP_DB_TS_FORMAT}')) + + def ts_parser(date_string): # type: ignore + try: + date = datetime.strptime(date_string, SNAP_DB_TS_FORMAT) + return date + except ValueError: + msg = f'''The date string {date_string} does not match the required format + {SNAP_DB_TS_FORMAT}. For more flexibel date parsing upgrade to + python3.7 or install + https://github.com/movermeyer/backports.datetime_fromisoformat''' + log.error(msg) + raise ValueError(msg) + + +def parse_timestamp(ts): + date = ts_parser(ts) + # normalize any non utc timezone to utc. If no tzinfo is supplied, assume + # its already utc + # import pdb; pdb.set_trace() + if date.tzinfo is not timezone.utc and date.tzinfo is not None: + date = date.astimezone(timezone.utc) + return date + + +def parse_retention(retention): + ret = {} + log.debug(f'parse_retention({retention})') + matches = re.findall(r'\d+[a-z]', retention) + for m in matches: + ret[m[-1]] = int(m[0:-1]) + matches = re.findall(r'\d+[A-Z]', retention) + for m in matches: + ret[m[-1]] = int(m[0:-1]) + log.debug(f'parse_retention({retention}) -> {ret}') + return ret + + +RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y'] + + +def dump_retention(retention): + ret = '' + for mult in RETENTION_MULTIPLIERS: + if mult in retention: + ret += str(retention[mult]) + mult + return ret + + +class Schedule(object): + ''' + Wrapper to work with schedules stored in sqlite + ''' + def __init__(self, + path, + schedule, + fs_name, + rel_path, + start=None, + subvol=None, + retention_policy='{}', + created=None, + first=None, + last=None, + last_pruned=None, + created_count=0, + pruned_count=0, + active=True, + ): + self.fs = fs_name + self.subvol = subvol + self.path = path + self.rel_path = rel_path + self.schedule = schedule + self.retention = json.loads(retention_policy) + if start is None: + now = datetime.now(timezone.utc) + self.start = datetime(now.year, + now.month, + now.day, + tzinfo=now.tzinfo) + else: + self.start = parse_timestamp(start) + if created is None: + self.created = datetime.now(timezone.utc) + else: + self.created = parse_timestamp(created) + if first: + self.first = parse_timestamp(first) + else: + self.first = first + if last: + self.last = parse_timestamp(last) + else: + self.last = last + if last_pruned: + self.last_pruned = parse_timestamp(last_pruned) + else: + self.last_pruned = last_pruned + self.created_count = created_count + self.pruned_count = pruned_count + self.active = bool(active) + + @classmethod + def _from_db_row(cls, table_row, fs): + return cls(table_row['path'], + table_row['schedule'], + fs, + table_row['rel_path'], + table_row['start'], + table_row['subvol'], + table_row['retention'], + table_row['created'], + table_row['first'], + table_row['last'], + table_row['last_pruned'], + table_row['created_count'], + table_row['pruned_count'], + table_row['active'], + ) + + def __str__(self): + return f'''{self.path} {self.schedule} {dump_retention(self.retention)}''' + + def json_list(self): + return json.dumps({'path': self.path, 'schedule': self.schedule, + 'retention': dump_retention(self.retention)}) + + CREATE_TABLES = '''CREATE TABLE schedules( + id INTEGER PRIMARY KEY ASC, + path TEXT NOT NULL UNIQUE, + subvol TEXT, + retention TEXT DEFAULT '{}', + rel_path TEXT NOT NULL + ); + CREATE TABLE schedules_meta( + id INTEGER PRIMARY KEY ASC, + schedule_id INT, + start TEXT NOT NULL, + first TEXT, + last TEXT, + last_pruned TEXT, + created TEXT NOT NULL, + repeat INT NOT NULL, + schedule TEXT NOT NULL, + created_count INT DEFAULT 0, + pruned_count INT DEFAULT 0, + active INT NOT NULL, + FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE, + UNIQUE (schedule_id, start, repeat) + );''' + + EXEC_QUERY = '''SELECT + s.retention, + sm.repeat - (strftime("%s", "now") - strftime("%s", sm.start)) % + sm.repeat "until", + sm.start, sm.repeat, sm.schedule + FROM schedules s + INNER JOIN schedules_meta sm ON sm.schedule_id = s.id + WHERE + s.path = ? AND + strftime("%s", "now") - strftime("%s", sm.start) > 0 AND + sm.active = 1 + ORDER BY until;''' + + PROTO_GET_SCHEDULES = '''SELECT + s.path, s.subvol, s.rel_path, sm.active, + sm.schedule, s.retention, sm.start, sm.first, sm.last, + sm.last_pruned, sm.created, sm.created_count, sm.pruned_count + FROM schedules s + INNER JOIN schedules_meta sm ON sm.schedule_id = s.id + WHERE''' + + GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?''' + + @classmethod + def get_db_schedules(cls, path, db, fs, + schedule=None, + start=None, + repeat=None): + query = cls.GET_SCHEDULES + data: Tuple[Any, ...] = (path,) + if repeat: + query += ' AND sm.repeat = ?' + data += (repeat,) + if schedule: + query += ' AND sm.schedule = ?' + data += (schedule,) + if start: + query += ' AND sm.start = ?' + data += (start,) + with db: + c = db.execute(query, data) + return [cls._from_db_row(row, fs) for row in c.fetchall()] + + @classmethod + def list_schedules(cls, path, db, fs, recursive): + with db: + if recursive: + c = db.execute(cls.PROTO_GET_SCHEDULES + ' path LIKE ?', + (f'{path}%',)) + else: + c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?', + (f'{path}',)) + return [cls._from_db_row(row, fs) for row in c.fetchall()] + + @classmethod + def list_all_schedules(cls, + db: sqlite3.Connection, + fs: str) -> List['Schedule']: + with db: + c = db.execute(cls.PROTO_GET_SCHEDULES + " path LIKE '%'") + return [cls._from_db_row(row, fs) for row in c.fetchall()] + + INSERT_SCHEDULE = '''INSERT INTO + schedules(path, subvol, retention, rel_path) + Values(?, ?, ?, ?);''' + INSERT_SCHEDULE_META = '''INSERT INTO + schedules_meta(schedule_id, start, created, repeat, schedule, + active) + SELECT ?, ?, ?, ?, ?, ?''' + + def store_schedule(self, db): + sched_id = None + with db: + try: + log.debug(f'schedule with retention {self.retention}') + c = db.execute(self.INSERT_SCHEDULE, + (self.path, + self.subvol, + json.dumps(self.retention), + self.rel_path,)) + sched_id = c.lastrowid + except sqlite3.IntegrityError: + # might be adding another schedule, retrieve sched id + log.debug(f'found schedule entry for {self.path}, trying to add meta') + c = db.execute('SELECT id FROM schedules where path = ?', + (self.path,)) + sched_id = c.fetchone()[0] + pass + db.execute(self.INSERT_SCHEDULE_META, + (sched_id, + self.start.strftime(SNAP_DB_TS_FORMAT), + self.created.strftime(SNAP_DB_TS_FORMAT), + self.repeat, + self.schedule, + 1)) + + @classmethod + def rm_schedule(cls, db, path, repeat, start): + with db: + cur = db.execute('SELECT id FROM schedules WHERE path = ?', + (path,)) + row = cur.fetchone() + + if len(row) == 0: + log.info(f'no schedule for {path} found') + raise ValueError('SnapSchedule for {} not found'.format(path)) + + id_ = tuple(row) + + if repeat or start: + meta_delete = 'DELETE FROM schedules_meta WHERE schedule_id = ?' + delete_param = id_ + if repeat: + meta_delete += ' AND schedule = ?' + delete_param += (repeat,) + if start: + meta_delete += ' AND start = ?' + delete_param += (start,) + # maybe only delete meta entry + log.debug(f'executing {meta_delete}, {delete_param}') + res = db.execute(meta_delete + ';', delete_param).rowcount + if res < 1: + raise ValueError(f'No schedule found for {repeat} {start}') + db.execute('COMMIT;') + # now check if we have schedules in meta left, if not delete + # the schedule as well + meta_count = db.execute( + 'SELECT COUNT() FROM schedules_meta WHERE schedule_id = ?', + id_) + if meta_count.fetchone() == (0,): + log.debug( + f'no more schedules left, cleaning up schedules table') + db.execute('DELETE FROM schedules WHERE id = ?;', id_) + else: + # just delete the schedule CASCADE DELETE takes care of the + # rest + db.execute('DELETE FROM schedules WHERE id = ?;', id_) + + GET_RETENTION = '''SELECT retention FROM schedules + WHERE path = ?''' + UPDATE_RETENTION = '''UPDATE schedules + SET retention = ? + WHERE path = ?''' + + @classmethod + def add_retention(cls, db, path, retention_spec): + with db: + row = db.execute(cls.GET_RETENTION, (path,)).fetchone() + if not row: + raise ValueError(f'No schedule found for {path}') + retention = parse_retention(retention_spec) + if not retention: + raise ValueError(f'Retention spec {retention_spec} is invalid') + log.debug(f'db result is {tuple(row)}') + current = row['retention'] + current_retention = json.loads(current) + for r, v in retention.items(): + if r in current_retention: + raise ValueError((f'Retention for {r} is already present ' + 'with value {current_retention[r]}. Please remove first')) + current_retention.update(retention) + db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path)) + + @classmethod + def rm_retention(cls, db, path, retention_spec): + with db: + row = db.execute(cls.GET_RETENTION, (path,)).fetchone() + if not row: + raise ValueError(f'No schedule found for {path}') + retention = parse_retention(retention_spec) + current = row['retention'] + current_retention = json.loads(current) + for r, v in retention.items(): + if r not in current_retention or current_retention[r] != v: + raise ValueError((f'Retention for {r}: {v} was not set for {path} ' + 'can\'t remove')) + current_retention.pop(r) + db.execute(cls.UPDATE_RETENTION, (json.dumps(current_retention), path)) + + def report(self): + return self.report_json() + + def report_json(self): + return json.dumps(dict(self.__dict__), + default=lambda o: o.strftime(SNAP_DB_TS_FORMAT)) + + @classmethod + def parse_schedule(cls, schedule): + return int(schedule[0:-1]), schedule[-1] + + @property + def repeat(self): + period, mult = self.parse_schedule(self.schedule) + if mult == 'M': + return period * 60 + elif mult == 'h': + return period * 60 * 60 + elif mult == 'd': + return period * 60 * 60 * 24 + elif mult == 'w': + return period * 60 * 60 * 24 * 7 + else: + raise ValueError(f'schedule multiplier "{mult}" not recognized') + + UPDATE_LAST = '''UPDATE schedules_meta + SET + last = ?, + created_count = created_count + 1, + first = CASE WHEN first IS NULL THEN ? ELSE first END + WHERE EXISTS( + SELECT id + FROM schedules s + WHERE s.id = schedules_meta.schedule_id + AND s.path = ? + AND schedules_meta.start = ? + AND schedules_meta.repeat = ?);''' + + def update_last(self, time, db): + with db: + db.execute(self.UPDATE_LAST, (time.strftime(SNAP_DB_TS_FORMAT), + time.strftime(SNAP_DB_TS_FORMAT), + self.path, + self.start.strftime(SNAP_DB_TS_FORMAT), + self.repeat)) + self.created_count += 1 + self.last = time + if not self.first: + self.first = time + + UPDATE_INACTIVE = '''UPDATE schedules_meta + SET + active = 0 + WHERE EXISTS( + SELECT id + FROM schedules s + WHERE s.id = schedules_meta.schedule_id + AND s.path = ? + AND schedules_meta.start = ? + AND schedules_meta.repeat = ?);''' + + def set_inactive(self, db): + with db: + log.debug(f'Deactivating schedule ({self.repeat}, {self.start}) on path {self.path}') + db.execute(self.UPDATE_INACTIVE, (self.path, + self.start.strftime(SNAP_DB_TS_FORMAT), + self.repeat)) + self.active = False + + UPDATE_ACTIVE = '''UPDATE schedules_meta + SET + active = 1 + WHERE EXISTS( + SELECT id + FROM schedules s + WHERE s.id = schedules_meta.schedule_id + AND s.path = ? + AND schedules_meta.start = ? + AND schedules_meta.repeat = ?);''' + + def set_active(self, db): + with db: + log.debug(f'Activating schedule ({self.repeat}, {self.start}) on path {self.path}') + db.execute(self.UPDATE_ACTIVE, (self.path, + self.start.strftime(SNAP_DB_TS_FORMAT), + self.repeat)) + self.active = True + + UPDATE_PRUNED = '''UPDATE schedules_meta + SET + last_pruned = ?, + pruned_count = pruned_count + ? + WHERE EXISTS( + SELECT id + FROM schedules s + WHERE s.id = schedules_meta.schedule_id + AND s.path = ? + AND schedules_meta.start = ? + AND schedules_meta.repeat = ?);''' + + def update_pruned(self, time, db, pruned): + with db: + db.execute(self.UPDATE_PRUNED, (time.strftime(SNAP_DB_TS_FORMAT), pruned, + self.path, + self.start.strftime(SNAP_DB_TS_FORMAT), + self.repeat)) + self.pruned_count += pruned + self.last_pruned = time diff --git a/src/pybind/mgr/snap_schedule/fs/schedule_client.py b/src/pybind/mgr/snap_schedule/fs/schedule_client.py new file mode 100644 index 000000000..dff14966e --- /dev/null +++ b/src/pybind/mgr/snap_schedule/fs/schedule_client.py @@ -0,0 +1,421 @@ +""" +Copyright (C) 2020 SUSE + +LGPL2.1. See file COPYING. +""" +import cephfs +import rados +from contextlib import contextmanager +from mgr_util import CephfsClient, open_filesystem, CephfsConnectionException +from collections import OrderedDict +from datetime import datetime, timezone +import logging +from threading import Timer, Lock +from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \ + Tuple, TypeVar, Union, Type +from types import TracebackType +import sqlite3 +from .schedule import Schedule, parse_retention +import traceback +import errno + + +MAX_SNAPS_PER_PATH = 50 +SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule' +SNAP_DB_PREFIX = 'snap_db' +# increment this every time the db schema changes and provide upgrade code +SNAP_DB_VERSION = '0' +SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}' +# scheduled snapshots are tz suffixed +SNAPSHOT_TS_FORMAT_TZ = '%Y-%m-%d-%H_%M_%S_%Z' +# for backward compat snapshot name parsing +SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S' +# length of timestamp format (without tz suffix) +# e.g.: scheduled-2022-04-19-05_39_00_UTC (len = "2022-04-19-05_39_00") +SNAPSHOT_TS_FORMAT_LEN = 19 +SNAPSHOT_PREFIX = 'scheduled' + +log = logging.getLogger(__name__) + + +@contextmanager +def open_ioctx(self, pool): + try: + if type(pool) is int: + with self.mgr.rados.open_ioctx2(pool) as ioctx: + ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE) + yield ioctx + else: + with self.mgr.rados.open_ioctx(pool) as ioctx: + ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE) + yield ioctx + except rados.ObjectNotFound: + log.error("Failed to locate pool {}".format(pool)) + raise + + +def updates_schedule_db(func): + def f(self, fs, schedule_or_path, *args): + func(self, fs, schedule_or_path, *args) + path = schedule_or_path + if isinstance(schedule_or_path, Schedule): + path = schedule_or_path.path + self.refresh_snap_timers(fs, path) + return f + + +def get_prune_set(candidates, retention): + PRUNING_PATTERNS = OrderedDict([ + # n is for keep last n snapshots, uses the snapshot name timestamp + # format for lowest granularity + # NOTE: prune set has tz suffix stripped out. + ("n", SNAPSHOT_TS_FORMAT), + # TODO remove M for release + ("M", '%Y-%m-%d-%H_%M'), + ("h", '%Y-%m-%d-%H'), + ("d", '%Y-%m-%d'), + ("w", '%G-%V'), + ("m", '%Y-%m'), + ("y", '%Y'), + ]) + keep = [] + if not retention: + log.info(f'no retention set, assuming n: {MAX_SNAPS_PER_PATH}') + retention = {'n': MAX_SNAPS_PER_PATH} + for period, date_pattern in PRUNING_PATTERNS.items(): + log.debug(f'compiling keep set for period {period}') + period_count = retention.get(period, 0) + if not period_count: + continue + last = None + kept_for_this_period = 0 + for snap in sorted(candidates, key=lambda x: x[0].d_name, + reverse=True): + snap_ts = snap[1].strftime(date_pattern) + if snap_ts != last: + last = snap_ts + if snap not in keep: + log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}') + keep.append(snap) + kept_for_this_period += 1 + if kept_for_this_period == period_count: + log.debug(('found enough snapshots for ' + f'{period_count}{period}')) + break + if len(keep) > MAX_SNAPS_PER_PATH: + log.info(f'Would keep more then {MAX_SNAPS_PER_PATH}, pruning keep set') + keep = keep[:MAX_SNAPS_PER_PATH] + return candidates - set(keep) + +def snap_name_to_timestamp(scheduled_snap_name: str) -> str: + """ extract timestamp from a schedule snapshot with tz suffix stripped out """ + ts = scheduled_snap_name.lstrip(f'{SNAPSHOT_PREFIX}-') + return ts[0:SNAPSHOT_TS_FORMAT_LEN] + +class DBInfo(): + def __init__(self, fs: str, db: sqlite3.Connection): + self.fs: str = fs + self.lock: Lock = Lock() + self.db: sqlite3.Connection = db + + +# context manager for serializing db connection usage +class DBConnectionManager(): + def __init__(self, info: DBInfo): + self.dbinfo: DBInfo = info + + # using string as return type hint since __future__.annotations is not + # available with Python 3.6; its avaialbe starting from Pytohn 3.7 + def __enter__(self) -> 'DBConnectionManager': + log.debug(f'locking db connection for {self.dbinfo.fs}') + self.dbinfo.lock.acquire() + log.debug(f'locked db connection for {self.dbinfo.fs}') + return self + + def __exit__(self, + exception_type: Optional[Type[BaseException]], + exception_value: Optional[BaseException], + traceback: Optional[TracebackType]) -> None: + log.debug(f'unlocking db connection for {self.dbinfo.fs}') + self.dbinfo.lock.release() + log.debug(f'unlocked db connection for {self.dbinfo.fs}') + + +class SnapSchedClient(CephfsClient): + + def __init__(self, mgr): + super(SnapSchedClient, self).__init__(mgr) + # Each db connection is now guarded by a Lock; this is required to + # avoid concurrent DB transactions when more than one paths in a + # file-system are scheduled at the same interval eg. 1h; without the + # lock, there are races to use the same connection, causing nested + # transactions to be aborted + self.sqlite_connections: Dict[str, DBInfo] = {} + self.active_timers: Dict[Tuple[str, str], List[Timer]] = {} + self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections + + # restart old schedules + for fs_name in self.get_all_filesystems(): + with self.get_schedule_db(fs_name) as conn_mgr: + db = conn_mgr.dbinfo.db + sched_list = Schedule.list_all_schedules(db, fs_name) + for sched in sched_list: + self.refresh_snap_timers(fs_name, sched.path, db) + + @property + def allow_minute_snaps(self): + return self.mgr.get_module_option('allow_m_granularity') + + @property + def dump_on_update(self) -> None: + return self.mgr.get_module_option('dump_on_update') + + def get_schedule_db(self, fs): + dbinfo = None + self.conn_lock.acquire() + if fs not in self.sqlite_connections: + db = sqlite3.connect(':memory:', check_same_thread=False) + with db: + db.row_factory = sqlite3.Row + db.execute("PRAGMA FOREIGN_KEYS = 1") + pool = self.get_metadata_pool(fs) + with open_ioctx(self, pool) as ioctx: + try: + size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME) + ddl = ioctx.read(SNAP_DB_OBJECT_NAME, + size).decode('utf-8') + db.executescript(ddl) + except rados.ObjectNotFound: + log.debug(f'No schedule DB found in {fs}, creating one.') + db.executescript(Schedule.CREATE_TABLES) + self.sqlite_connections[fs] = DBInfo(fs, db) + dbinfo = self.sqlite_connections[fs] + self.conn_lock.release() + return DBConnectionManager(dbinfo) + + def store_schedule_db(self, fs, db): + # only store db is it exists, otherwise nothing to do + metadata_pool = self.get_metadata_pool(fs) + if not metadata_pool: + raise CephfsConnectionException( + -errno.ENOENT, "Filesystem {} does not exist".format(fs)) + db_content = [] + for row in db.iterdump(): + db_content.append(row) + with open_ioctx(self, metadata_pool) as ioctx: + ioctx.write_full(SNAP_DB_OBJECT_NAME, + '\n'.join(db_content).encode('utf-8')) + + def _is_allowed_repeat(self, exec_row, path): + if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M': + if self.allow_minute_snaps: + log.debug(f'Minute repeats allowed, scheduling snapshot on path {path}') + return True + else: + log.info(f'Minute repeats disabled, skipping snapshot on path {path}') + return False + else: + return True + + def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]: + with db: + if self.dump_on_update: + dump = [line for line in db.iterdump()] + dump = "\n".join(dump) + log.debug(f"db dump:\n{dump}") + cur = db.execute(Schedule.EXEC_QUERY, (path,)) + all_rows = cur.fetchall() + rows = [r for r in all_rows + if self._is_allowed_repeat(r, path)][0:1] + return rows + + def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None: + try: + log.debug((f'SnapDB on {fs} changed for {path}, ' + 'updating next Timer')) + rows = [] + # olddb is passed in the case where we land here without a timer + # the lock on the db connection has already been taken + if olddb: + rows = self.fetch_schedules(olddb, path) + else: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + rows = self.fetch_schedules(db, path) + timers = self.active_timers.get((fs, path), []) + for timer in timers: + timer.cancel() + timers = [] + for row in rows: + log.debug(f'Creating new snapshot timer for {path}') + t = Timer(row[1], + self.create_scheduled_snapshot, + args=[fs, path, row[0], row[2], row[3]]) + t.start() + timers.append(t) + log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s') + self.active_timers[(fs, path)] = timers + except Exception: + self._log_exception('refresh_snap_timers') + + def _log_exception(self, fct): + log.error(f'{fct} raised an exception:') + log.error(traceback.format_exc()) + + def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat): + log.debug(f'Scheduled snapshot of {path} triggered') + with self.get_schedule_db(fs_name) as conn_mgr: + db = conn_mgr.dbinfo.db + try: + try: + sched = Schedule.get_db_schedules(path, + db, + fs_name, + repeat=repeat, + start=start)[0] + time = datetime.now(timezone.utc) + with open_filesystem(self, fs_name) as fs_handle: + snap_ts = time.strftime(SNAPSHOT_TS_FORMAT_TZ) + snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}' + fs_handle.mkdir(snap_name, 0o755) + log.info(f'created scheduled snapshot of {path}') + log.debug(f'created scheduled snapshot {snap_name}') + sched.update_last(time, db) + except cephfs.Error: + self._log_exception('create_scheduled_snapshot: ' + 'unexpected exception; ' + f'deactivating schedule fs:"{fs_name}" ' + f'path:"{path}" repeat:"{repeat}" ' + f'retention:"{retention}"') + sched.set_inactive(db) + except Exception: + # catch all exceptions cause otherwise we'll never know since this + # is running in a thread + self._log_exception('create_scheduled_snapshot') + finally: + self.refresh_snap_timers(fs_name, path, db) + self.prune_snapshots(sched, db) + self.store_schedule_db(fs_name, db) + + def prune_snapshots(self, sched, db): + try: + log.debug('Pruning snapshots') + ret = sched.retention + path = sched.path + prune_candidates = set() + time = datetime.now(timezone.utc) + with open_filesystem(self, sched.fs) as fs_handle: + with fs_handle.opendir(f'{path}/.snap') as d_handle: + dir_ = fs_handle.readdir(d_handle) + while dir_: + if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'): + log.debug(f'add {dir_.d_name} to pruning') + ts = datetime.strptime( + snap_name_to_timestamp(dir_.d_name.decode('utf-8')), SNAPSHOT_TS_FORMAT) + prune_candidates.add((dir_, ts)) + else: + log.debug(f'skipping dir entry {dir_.d_name}') + dir_ = fs_handle.readdir(d_handle) + to_prune = get_prune_set(prune_candidates, ret) + for k in to_prune: + dirname = k[0].d_name.decode('utf-8') + log.debug(f'rmdir on {dirname}') + fs_handle.rmdir(f'{path}/.snap/{dirname}') + if to_prune: + sched.update_pruned(time, db, len(to_prune)) + except Exception: + self._log_exception('prune_snapshots') + + def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + return Schedule.get_db_schedules(path, db, fs) + + def list_snap_schedules(self, + fs: str, + path: str, + recursive: bool) -> List[Schedule]: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + return Schedule.list_schedules(path, db, fs, recursive) + + @updates_schedule_db + # TODO improve interface + def store_snap_schedule(self, fs, path_, args): + sched = Schedule(*args) + log.debug(f'repeat is {sched.repeat}') + if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps: + log.error('not allowed') + raise ValueError('no minute snaps allowed') + log.debug(f'attempting to add schedule {sched}') + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + sched.store_schedule(db) + self.store_schedule_db(sched.fs, db) + + @updates_schedule_db + def rm_snap_schedule(self, + fs: str, path: str, + schedule: Optional[str], + start: Optional[str]) -> None: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.rm_schedule(db, path, schedule, start) + self.store_schedule_db(fs, db) + + @updates_schedule_db + def add_retention_spec(self, + fs, + path, + retention_spec_or_period, + retention_count): + retention_spec = retention_spec_or_period + if retention_count: + retention_spec = retention_count + retention_spec + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.add_retention(db, path, retention_spec) + self.store_schedule_db(fs, db) + + @updates_schedule_db + def rm_retention_spec(self, + fs, + path, + retention_spec_or_period, + retention_count): + retention_spec = retention_spec_or_period + if retention_count: + retention_spec = retention_count + retention_spec + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + Schedule.rm_retention(db, path, retention_spec) + self.store_schedule_db(fs, db) + + @updates_schedule_db + def activate_snap_schedule(self, + fs: str, + path: str, + schedule: Optional[str], + start: Optional[str]) -> None: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + schedules = Schedule.get_db_schedules(path, db, fs, + schedule=schedule, + start=start) + for s in schedules: + s.set_active(db) + self.store_schedule_db(fs, db) + + @updates_schedule_db + def deactivate_snap_schedule(self, + fs: str, path: str, + schedule: Optional[str], + start: Optional[str]) -> None: + with self.get_schedule_db(fs) as conn_mgr: + db = conn_mgr.dbinfo.db + schedules = Schedule.get_db_schedules(path, db, fs, + schedule=schedule, + start=start) + for s in schedules: + s.set_inactive(db) + self.store_schedule_db(fs, db) diff --git a/src/pybind/mgr/snap_schedule/module.py b/src/pybind/mgr/snap_schedule/module.py new file mode 100644 index 000000000..93770f350 --- /dev/null +++ b/src/pybind/mgr/snap_schedule/module.py @@ -0,0 +1,245 @@ +""" +Copyright (C) 2019 SUSE + +LGPL2.1. See file COPYING. +""" +import errno +import json +import sqlite3 +from typing import Dict, Sequence, Optional, cast, Optional, Tuple +from .fs.schedule_client import SnapSchedClient +from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option +from mgr_util import CephfsConnectionException +from threading import Event + + +class Module(MgrModule): + MODULE_OPTIONS = [ + Option( + 'allow_m_granularity', + type='bool', + default=False, + desc='allow minute scheduled snapshots', + runtime=True, + ), + Option( + 'dump_on_update', + type='bool', + default=False, + desc='dump database to debug log on update', + runtime=True, + ), + ] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self._initialized = Event() + self.client = SnapSchedClient(self) + + @property + def default_fs(self): + fs_map = self.get('fs_map') + if fs_map['filesystems']: + return fs_map['filesystems'][0]['mdsmap']['fs_name'] + else: + self.log.error('No filesystem instance could be found.') + raise CephfsConnectionException( + -errno.ENOENT, "no filesystem found") + + def has_fs(self, fs_name: str) -> bool: + return fs_name in self.client.get_all_filesystems() + + def serve(self): + self._initialized.set() + + def handle_command(self, inbuf, cmd): + self._initialized.wait() + return -errno.EINVAL, "", "Unknown command" + + @CLIReadCommand('fs snap-schedule status') + def snap_schedule_get(self, + path: str = '/', + fs: Optional[str] = None, + format: Optional[str] = 'plain'): + ''' + List current snapshot schedules + ''' + use_fs = fs if fs else self.default_fs + try: + path = cast(str, path) + ret_scheds = self.client.get_snap_schedules(use_fs, path) + except CephfsConnectionException as e: + return e.to_tuple() + if format == 'json': + json_report = ','.join([ret_sched.report_json() for ret_sched in ret_scheds]) + return 0, f'[{json_report}]', '' + return 0, '\n===\n'.join([ret_sched.report() for ret_sched in ret_scheds]), '' + + @CLIReadCommand('fs snap-schedule list') + def snap_schedule_list(self, path: str, + recursive: bool = False, + fs: Optional[str] = None, + format: Optional[str] = 'plain'): + ''' + Get current snapshot schedule for <path> + ''' + try: + use_fs = fs if fs else self.default_fs + recursive = cast(bool, recursive) + scheds = self.client.list_snap_schedules(use_fs, path, recursive) + self.log.debug(f'recursive is {recursive}') + except CephfsConnectionException as e: + return e.to_tuple() + if not scheds: + if format == 'json': + output = {} # type: Dict[str,str] + return 0, json.dumps(output), '' + return -errno.ENOENT, '', f'SnapSchedule for {path} not found' + if format == 'json': + # json_list = ','.join([sched.json_list() for sched in scheds]) + schedule_list = [sched.schedule for sched in scheds] + retention_list = [sched.retention for sched in scheds] + out = {'path': path, 'schedule': schedule_list, 'retention': retention_list} + return 0, json.dumps(out), '' + return 0, '\n'.join([str(sched) for sched in scheds]), '' + + @CLIWriteCommand('fs snap-schedule add') + def snap_schedule_add(self, + path: str, + snap_schedule: Optional[str], + start: Optional[str] = None, + fs: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Set a snapshot schedule for <path> + ''' + try: + use_fs = fs if fs else self.default_fs + if not self.has_fs(use_fs): + return -errno.EINVAL, '', f"no such filesystem: {use_fs}" + abs_path = path + subvol = None + self.client.store_snap_schedule(use_fs, + abs_path, + (abs_path, snap_schedule, + use_fs, path, start, subvol)) + suc_msg = f'Schedule set for path {path}' + except sqlite3.IntegrityError: + existing_scheds = self.client.get_snap_schedules(use_fs, path) + report = [s.report() for s in existing_scheds] + error_msg = f'Found existing schedule {report}' + self.log.error(error_msg) + return -errno.EEXIST, '', error_msg + except ValueError as e: + return -errno.ENOENT, '', str(e) + except CephfsConnectionException as e: + return e.to_tuple() + return 0, suc_msg, '' + + @CLIWriteCommand('fs snap-schedule remove') + def snap_schedule_rm(self, + path: str, + repeat: Optional[str] = None, + start: Optional[str] = None, + fs: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Remove a snapshot schedule for <path> + ''' + try: + use_fs = fs if fs else self.default_fs + if not self.has_fs(use_fs): + return -errno.EINVAL, '', f"no such filesystem: {use_fs}" + abs_path = path + self.client.rm_snap_schedule(use_fs, abs_path, repeat, start) + except CephfsConnectionException as e: + return e.to_tuple() + except ValueError as e: + return -errno.ENOENT, '', str(e) + return 0, 'Schedule removed for path {}'.format(path), '' + + @CLIWriteCommand('fs snap-schedule retention add') + def snap_schedule_retention_add(self, + path: str, + retention_spec_or_period: str, + retention_count: Optional[str] = None, + fs: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Set a retention specification for <path> + ''' + try: + use_fs = fs if fs else self.default_fs + if not self.has_fs(use_fs): + return -errno.EINVAL, '', f"no such filesystem: {use_fs}" + abs_path = path + self.client.add_retention_spec(use_fs, abs_path, + retention_spec_or_period, + retention_count) + except CephfsConnectionException as e: + return e.to_tuple() + except ValueError as e: + return -errno.ENOENT, '', str(e) + return 0, 'Retention added to path {}'.format(path), '' + + @CLIWriteCommand('fs snap-schedule retention remove') + def snap_schedule_retention_rm(self, + path: str, + retention_spec_or_period: str, + retention_count: Optional[str] = None, + fs: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Remove a retention specification for <path> + ''' + try: + use_fs = fs if fs else self.default_fs + if not self.has_fs(use_fs): + return -errno.EINVAL, '', f"no such filesystem: {use_fs}" + abs_path = path + self.client.rm_retention_spec(use_fs, abs_path, + retention_spec_or_period, + retention_count) + except CephfsConnectionException as e: + return e.to_tuple() + except ValueError as e: + return -errno.ENOENT, '', str(e) + return 0, 'Retention removed from path {}'.format(path), '' + + @CLIWriteCommand('fs snap-schedule activate') + def snap_schedule_activate(self, + path: str, + repeat: Optional[str] = None, + start: Optional[str] = None, + fs: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Activate a snapshot schedule for <path> + ''' + try: + use_fs = fs if fs else self.default_fs + if not self.has_fs(use_fs): + return -errno.EINVAL, '', f"no such filesystem: {use_fs}" + abs_path = path + self.client.activate_snap_schedule(use_fs, abs_path, repeat, start) + except CephfsConnectionException as e: + return e.to_tuple() + except ValueError as e: + return -errno.ENOENT, '', str(e) + return 0, 'Schedule activated for path {}'.format(path), '' + + @CLIWriteCommand('fs snap-schedule deactivate') + def snap_schedule_deactivate(self, + path: str, + repeat: Optional[str] = None, + start: Optional[str] = None, + fs: Optional[str] = None) -> Tuple[int, str, str]: + ''' + Deactivate a snapshot schedule for <path> + ''' + try: + use_fs = fs if fs else self.default_fs + if not self.has_fs(use_fs): + return -errno.EINVAL, '', f"no such filesystem: {use_fs}" + abs_path = path + self.client.deactivate_snap_schedule(use_fs, abs_path, repeat, start) + except CephfsConnectionException as e: + return e.to_tuple() + except ValueError as e: + return -errno.ENOENT, '', str(e) + return 0, 'Schedule deactivated for path {}'.format(path), '' diff --git a/src/pybind/mgr/snap_schedule/requirements.txt b/src/pybind/mgr/snap_schedule/requirements.txt new file mode 100644 index 000000000..e079f8a60 --- /dev/null +++ b/src/pybind/mgr/snap_schedule/requirements.txt @@ -0,0 +1 @@ +pytest diff --git a/src/pybind/mgr/snap_schedule/tests/__init__.py b/src/pybind/mgr/snap_schedule/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/snap_schedule/tests/__init__.py diff --git a/src/pybind/mgr/snap_schedule/tests/conftest.py b/src/pybind/mgr/snap_schedule/tests/conftest.py new file mode 100644 index 000000000..35255b8d4 --- /dev/null +++ b/src/pybind/mgr/snap_schedule/tests/conftest.py @@ -0,0 +1,34 @@ +import pytest +import sqlite3 +from ..fs.schedule import Schedule + + +# simple_schedule fixture returns schedules without any timing arguments +# the tuple values correspong to ctor args for Schedule +_simple_schedules = [ + ('/foo', '6h', 'fs_name', '/foo'), + ('/foo', '24h', 'fs_name', '/foo'), + ('/bar', '1d', 'fs_name', '/bar'), + ('/fnord', '1w', 'fs_name', '/fnord'), +] + + +@pytest.fixture(params=_simple_schedules) +def simple_schedule(request): + return Schedule(*request.param) + + +@pytest.fixture +def simple_schedules(): + return [Schedule(*s) for s in _simple_schedules] + + +@pytest.fixture +def db(): + db = sqlite3.connect(':memory:', + check_same_thread=False) + with db: + db.row_factory = sqlite3.Row + db.execute("PRAGMA FOREIGN_KEYS = 1") + db.executescript(Schedule.CREATE_TABLES) + return db diff --git a/src/pybind/mgr/snap_schedule/tests/fs/__init__.py b/src/pybind/mgr/snap_schedule/tests/fs/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/snap_schedule/tests/fs/__init__.py diff --git a/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py new file mode 100644 index 000000000..1e984ab64 --- /dev/null +++ b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule.py @@ -0,0 +1,256 @@ +import datetime +import json +import pytest +import random +import sqlite3 +from ...fs.schedule import Schedule, parse_retention + +SELECT_ALL = ('select * from schedules s' + ' INNER JOIN schedules_meta sm' + ' ON sm.schedule_id = s.id') + + +def assert_updated(new, old, update_expected={}): + ''' + This helper asserts that an object new has been updated in the + attributes in the dict updated AND has not changed in other attributes + compared to old. + if update expected is the empty dict, equality is checked + ''' + + for var in vars(new): + if var in update_expected: + expected_val = update_expected.get(var) + new_val = getattr(new, var) + if isinstance(expected_val, datetime.datetime): + assert new_val.year == expected_val.year + assert new_val.month == expected_val.month + assert new_val.day == expected_val.day + assert new_val.hour == expected_val.hour + assert new_val.minute == expected_val.minute + assert new_val.second == expected_val.second + else: + assert new_val == expected_val, f'new did not update value for {var}' + else: + expected_val = getattr(old, var) + new_val = getattr(new, var) + if isinstance(expected_val, datetime.datetime): + assert new_val.year == expected_val.year + assert new_val.month == expected_val.month + assert new_val.day == expected_val.day + assert new_val.hour == expected_val.hour + assert new_val.minute == expected_val.minute + assert new_val.second == expected_val.second + else: + assert new_val == expected_val, f'new changed unexpectedly in value for {var}' + + +class TestSchedule(object): + ''' + Test the schedule class basics and that its methods update self as expected + ''' + + def test_start_default_midnight(self, simple_schedule): + now = datetime.datetime.now(datetime.timezone.utc) + assert simple_schedule.start.second == 0 + assert simple_schedule.start.minute == 0 + assert simple_schedule.start.hour == 0 + assert simple_schedule.start.day == now.day + assert simple_schedule.start.month == now.month + assert simple_schedule.start.year == now.year + assert simple_schedule.start.tzinfo == now.tzinfo + + def test_created_now(self, simple_schedule): + now = datetime.datetime.now(datetime.timezone.utc) + assert simple_schedule.created.minute == now.minute + assert simple_schedule.created.hour == now.hour + assert simple_schedule.created.day == now.day + assert simple_schedule.created.month == now.month + assert simple_schedule.created.year == now.year + assert simple_schedule.created.tzinfo == now.tzinfo + + def test_repeat_valid(self, simple_schedule): + repeat = simple_schedule.repeat + assert isinstance(repeat, int) + + def test_store_single(self, db, simple_schedule): + simple_schedule.store_schedule(db) + row = () + with db: + row = db.execute(SELECT_ALL).fetchone() + + db_schedule = Schedule._from_db_row(row, simple_schedule.fs) + + assert_updated(db_schedule, simple_schedule) + + def test_store_multiple(self, db, simple_schedules): + [s.store_schedule(db) for s in simple_schedules] + + rows = [] + with db: + rows = db.execute(SELECT_ALL).fetchall() + + assert len(rows) == len(simple_schedules) + + def test_update_last(self, db, simple_schedule): + simple_schedule.store_schedule(db) + + with db: + _ = db.execute(SELECT_ALL).fetchone() + + first_time = datetime.datetime.now(datetime.timezone.utc) + simple_schedule.update_last(first_time, db) + + with db: + after = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after, simple_schedule.fs), + simple_schedule) + + second_time = datetime.datetime.now(datetime.timezone.utc) + simple_schedule.update_last(second_time, db) + + with db: + after2 = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after2, simple_schedule.fs), + simple_schedule) + + def test_set_inactive_active(self, db, simple_schedule): + simple_schedule.store_schedule(db) + + with db: + _ = db.execute(SELECT_ALL).fetchone() + + simple_schedule.set_inactive(db) + + with db: + after = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after, simple_schedule.fs), + simple_schedule) + + simple_schedule.set_active(db) + + with db: + after2 = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after2, simple_schedule.fs), + simple_schedule) + + def test_update_pruned(self, db, simple_schedule): + simple_schedule.store_schedule(db) + + with db: + _ = db.execute(SELECT_ALL).fetchone() + + now = datetime.datetime.now(datetime.timezone.utc) + pruned_count = random.randint(1, 1000) + + simple_schedule.update_pruned(now, db, pruned_count) + + with db: + after = db.execute(SELECT_ALL).fetchone() + + assert_updated(Schedule._from_db_row(after, simple_schedule.fs), + simple_schedule) + + # TODO test get_schedules and list_schedules + + +class TestScheduleDB(object): + ''' + This class tests that Schedules methods update the DB correctly + ''' + + def test_update_last(self, db, simple_schedule): + simple_schedule.store_schedule(db) + + with db: + before = db.execute(SELECT_ALL).fetchone() + + first_time = datetime.datetime.now(datetime.timezone.utc) + simple_schedule.update_last(first_time, db) + + with db: + after = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after, simple_schedule.fs), + Schedule._from_db_row(before, simple_schedule.fs), + {'created_count': 1, + 'last': first_time, + 'first': first_time}) + + second_time = datetime.datetime.now(datetime.timezone.utc) + simple_schedule.update_last(second_time, db) + + with db: + after2 = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after2, simple_schedule.fs), + Schedule._from_db_row(after, simple_schedule.fs), + {'created_count': 2, 'last': second_time}) + + def test_set_inactive_active(self, db, simple_schedule): + simple_schedule.store_schedule(db) + + with db: + before = db.execute(SELECT_ALL).fetchone() + + simple_schedule.set_inactive(db) + + with db: + after = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after, simple_schedule.fs), + Schedule._from_db_row(before, simple_schedule.fs), + {'active': 0}) + + simple_schedule.set_active(db) + + with db: + after2 = db.execute(SELECT_ALL).fetchone() + assert_updated(Schedule._from_db_row(after2, simple_schedule.fs), + Schedule._from_db_row(after, simple_schedule.fs), + {'active': 1}) + + def test_update_pruned(self, db, simple_schedule): + simple_schedule.store_schedule(db) + + with db: + before = db.execute(SELECT_ALL).fetchone() + + now = datetime.datetime.now(datetime.timezone.utc) + pruned_count = random.randint(1, 1000) + + simple_schedule.update_pruned(now, db, pruned_count) + + with db: + after = db.execute(SELECT_ALL).fetchone() + + assert_updated(Schedule._from_db_row(after, simple_schedule.fs), + Schedule._from_db_row(before, simple_schedule.fs), + {'last_pruned': now, 'pruned_count': pruned_count}) + + def test_add_retention(self, db, simple_schedule): + simple_schedule.store_schedule(db) + + with db: + before = db.execute(SELECT_ALL).fetchone() + + retention = "7d12m" + simple_schedule.add_retention(db, simple_schedule.path, retention) + + with db: + after = db.execute(SELECT_ALL).fetchone() + + assert after['retention'] == json.dumps(parse_retention(retention)) + + retention2 = "4w" + simple_schedule.add_retention(db, simple_schedule.path, retention2) + + with db: + after = db.execute(SELECT_ALL).fetchone() + + assert after['retention'] == json.dumps(parse_retention(retention + retention2)) + + def test_per_path_and_repeat_uniqness(self, db): + s1 = Schedule(*('/foo', '24h', 'fs_name', '/foo')) + s2 = Schedule(*('/foo', '1d', 'fs_name', '/foo')) + + s1.store_schedule(db) + with pytest.raises(sqlite3.IntegrityError): + s2.store_schedule(db) diff --git a/src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py new file mode 100644 index 000000000..02996146b --- /dev/null +++ b/src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py @@ -0,0 +1,37 @@ +from datetime import datetime, timedelta +from unittest.mock import MagicMock +import pytest +from ...fs.schedule_client import get_prune_set, SNAPSHOT_TS_FORMAT + + +class TestScheduleClient(object): + + def test_get_prune_set_empty_retention_no_prune(self): + now = datetime.now() + candidates = set() + for i in range(10): + ts = now - timedelta(minutes=i*5) + fake_dir = MagicMock() + fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}' + candidates.add((fake_dir, ts)) + ret = {} + prune_set = get_prune_set(candidates, ret) + assert prune_set == set(), 'candidates are pruned despite empty retention' + + def test_get_prune_set_two_retention_specs(self): + now = datetime.now() + candidates = set() + for i in range(10): + ts = now - timedelta(hours=i*1) + fake_dir = MagicMock() + fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}' + candidates.add((fake_dir, ts)) + for i in range(10): + ts = now - timedelta(days=i*1) + fake_dir = MagicMock() + fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}' + candidates.add((fake_dir, ts)) + # should keep 8 snapshots + ret = {'h': 6, 'd': 2} + prune_set = get_prune_set(candidates, ret) + assert len(prune_set) == len(candidates) - 8, 'wrong size of prune set' diff --git a/src/pybind/mgr/snap_schedule/tox.ini b/src/pybind/mgr/snap_schedule/tox.ini new file mode 100644 index 000000000..fbf894b06 --- /dev/null +++ b/src/pybind/mgr/snap_schedule/tox.ini @@ -0,0 +1,19 @@ +[tox] +envlist = py36, py3 +skipsdist = true +; toxworkdir = {env:CEPH_BUILD_DIR}/snap-schedule +; minversion = 2.8.1 + +[testenv] +setenv= + LD_LIBRARY_PATH = {toxinidir}/../../../../build/lib + PATH = {toxinidir}/../../../../build/bin:$PATH + py27: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.2:.. + py3: PYTHONPATH = {toxinidir}/../../../../build/lib/cython_modules/lib.3:{toxinidir} + SNAP_SCHED_UNITTEST = true +deps = + pytest + mock + py27: pathlib +commands= + pytest {posargs} |