summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/snap_schedule/fs/schedule_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/snap_schedule/fs/schedule_client.py')
-rw-r--r--src/pybind/mgr/snap_schedule/fs/schedule_client.py421
1 files changed, 421 insertions, 0 deletions
diff --git a/src/pybind/mgr/snap_schedule/fs/schedule_client.py b/src/pybind/mgr/snap_schedule/fs/schedule_client.py
new file mode 100644
index 000000000..dff14966e
--- /dev/null
+++ b/src/pybind/mgr/snap_schedule/fs/schedule_client.py
@@ -0,0 +1,421 @@
+"""
+Copyright (C) 2020 SUSE
+
+LGPL2.1. See file COPYING.
+"""
+import cephfs
+import rados
+from contextlib import contextmanager
+from mgr_util import CephfsClient, open_filesystem, CephfsConnectionException
+from collections import OrderedDict
+from datetime import datetime, timezone
+import logging
+from threading import Timer, Lock
+from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
+ Tuple, TypeVar, Union, Type
+from types import TracebackType
+import sqlite3
+from .schedule import Schedule, parse_retention
+import traceback
+import errno
+
+
+MAX_SNAPS_PER_PATH = 50
+SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
+SNAP_DB_PREFIX = 'snap_db'
+# increment this every time the db schema changes and provide upgrade code
+SNAP_DB_VERSION = '0'
+SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
+# scheduled snapshots are tz suffixed
+SNAPSHOT_TS_FORMAT_TZ = '%Y-%m-%d-%H_%M_%S_%Z'
+# for backward compat snapshot name parsing
+SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
+# length of timestamp format (without tz suffix)
+# e.g.: scheduled-2022-04-19-05_39_00_UTC (len = "2022-04-19-05_39_00")
+SNAPSHOT_TS_FORMAT_LEN = 19
+SNAPSHOT_PREFIX = 'scheduled'
+
+log = logging.getLogger(__name__)
+
+
+@contextmanager
+def open_ioctx(self, pool):
+ try:
+ if type(pool) is int:
+ with self.mgr.rados.open_ioctx2(pool) as ioctx:
+ ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+ yield ioctx
+ else:
+ with self.mgr.rados.open_ioctx(pool) as ioctx:
+ ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
+ yield ioctx
+ except rados.ObjectNotFound:
+ log.error("Failed to locate pool {}".format(pool))
+ raise
+
+
+def updates_schedule_db(func):
+ def f(self, fs, schedule_or_path, *args):
+ func(self, fs, schedule_or_path, *args)
+ path = schedule_or_path
+ if isinstance(schedule_or_path, Schedule):
+ path = schedule_or_path.path
+ self.refresh_snap_timers(fs, path)
+ return f
+
+
+def get_prune_set(candidates, retention):
+ PRUNING_PATTERNS = OrderedDict([
+ # n is for keep last n snapshots, uses the snapshot name timestamp
+ # format for lowest granularity
+ # NOTE: prune set has tz suffix stripped out.
+ ("n", SNAPSHOT_TS_FORMAT),
+ # TODO remove M for release
+ ("M", '%Y-%m-%d-%H_%M'),
+ ("h", '%Y-%m-%d-%H'),
+ ("d", '%Y-%m-%d'),
+ ("w", '%G-%V'),
+ ("m", '%Y-%m'),
+ ("y", '%Y'),
+ ])
+ keep = []
+ if not retention:
+ log.info(f'no retention set, assuming n: {MAX_SNAPS_PER_PATH}')
+ retention = {'n': MAX_SNAPS_PER_PATH}
+ for period, date_pattern in PRUNING_PATTERNS.items():
+ log.debug(f'compiling keep set for period {period}')
+ period_count = retention.get(period, 0)
+ if not period_count:
+ continue
+ last = None
+ kept_for_this_period = 0
+ for snap in sorted(candidates, key=lambda x: x[0].d_name,
+ reverse=True):
+ snap_ts = snap[1].strftime(date_pattern)
+ if snap_ts != last:
+ last = snap_ts
+ if snap not in keep:
+ log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
+ keep.append(snap)
+ kept_for_this_period += 1
+ if kept_for_this_period == period_count:
+ log.debug(('found enough snapshots for '
+ f'{period_count}{period}'))
+ break
+ if len(keep) > MAX_SNAPS_PER_PATH:
+ log.info(f'Would keep more then {MAX_SNAPS_PER_PATH}, pruning keep set')
+ keep = keep[:MAX_SNAPS_PER_PATH]
+ return candidates - set(keep)
+
+def snap_name_to_timestamp(scheduled_snap_name: str) -> str:
+ """ extract timestamp from a schedule snapshot with tz suffix stripped out """
+ ts = scheduled_snap_name.lstrip(f'{SNAPSHOT_PREFIX}-')
+ return ts[0:SNAPSHOT_TS_FORMAT_LEN]
+
+class DBInfo():
+ def __init__(self, fs: str, db: sqlite3.Connection):
+ self.fs: str = fs
+ self.lock: Lock = Lock()
+ self.db: sqlite3.Connection = db
+
+
+# context manager for serializing db connection usage
+class DBConnectionManager():
+ def __init__(self, info: DBInfo):
+ self.dbinfo: DBInfo = info
+
+ # using string as return type hint since __future__.annotations is not
+ # available with Python 3.6; its avaialbe starting from Pytohn 3.7
+ def __enter__(self) -> 'DBConnectionManager':
+ log.debug(f'locking db connection for {self.dbinfo.fs}')
+ self.dbinfo.lock.acquire()
+ log.debug(f'locked db connection for {self.dbinfo.fs}')
+ return self
+
+ def __exit__(self,
+ exception_type: Optional[Type[BaseException]],
+ exception_value: Optional[BaseException],
+ traceback: Optional[TracebackType]) -> None:
+ log.debug(f'unlocking db connection for {self.dbinfo.fs}')
+ self.dbinfo.lock.release()
+ log.debug(f'unlocked db connection for {self.dbinfo.fs}')
+
+
+class SnapSchedClient(CephfsClient):
+
+ def __init__(self, mgr):
+ super(SnapSchedClient, self).__init__(mgr)
+ # Each db connection is now guarded by a Lock; this is required to
+ # avoid concurrent DB transactions when more than one paths in a
+ # file-system are scheduled at the same interval eg. 1h; without the
+ # lock, there are races to use the same connection, causing nested
+ # transactions to be aborted
+ self.sqlite_connections: Dict[str, DBInfo] = {}
+ self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
+ self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections
+
+ # restart old schedules
+ for fs_name in self.get_all_filesystems():
+ with self.get_schedule_db(fs_name) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ sched_list = Schedule.list_all_schedules(db, fs_name)
+ for sched in sched_list:
+ self.refresh_snap_timers(fs_name, sched.path, db)
+
+ @property
+ def allow_minute_snaps(self):
+ return self.mgr.get_module_option('allow_m_granularity')
+
+ @property
+ def dump_on_update(self) -> None:
+ return self.mgr.get_module_option('dump_on_update')
+
+ def get_schedule_db(self, fs):
+ dbinfo = None
+ self.conn_lock.acquire()
+ if fs not in self.sqlite_connections:
+ db = sqlite3.connect(':memory:', check_same_thread=False)
+ with db:
+ db.row_factory = sqlite3.Row
+ db.execute("PRAGMA FOREIGN_KEYS = 1")
+ pool = self.get_metadata_pool(fs)
+ with open_ioctx(self, pool) as ioctx:
+ try:
+ size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
+ ddl = ioctx.read(SNAP_DB_OBJECT_NAME,
+ size).decode('utf-8')
+ db.executescript(ddl)
+ except rados.ObjectNotFound:
+ log.debug(f'No schedule DB found in {fs}, creating one.')
+ db.executescript(Schedule.CREATE_TABLES)
+ self.sqlite_connections[fs] = DBInfo(fs, db)
+ dbinfo = self.sqlite_connections[fs]
+ self.conn_lock.release()
+ return DBConnectionManager(dbinfo)
+
+ def store_schedule_db(self, fs, db):
+ # only store db is it exists, otherwise nothing to do
+ metadata_pool = self.get_metadata_pool(fs)
+ if not metadata_pool:
+ raise CephfsConnectionException(
+ -errno.ENOENT, "Filesystem {} does not exist".format(fs))
+ db_content = []
+ for row in db.iterdump():
+ db_content.append(row)
+ with open_ioctx(self, metadata_pool) as ioctx:
+ ioctx.write_full(SNAP_DB_OBJECT_NAME,
+ '\n'.join(db_content).encode('utf-8'))
+
+ def _is_allowed_repeat(self, exec_row, path):
+ if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
+ if self.allow_minute_snaps:
+ log.debug(f'Minute repeats allowed, scheduling snapshot on path {path}')
+ return True
+ else:
+ log.info(f'Minute repeats disabled, skipping snapshot on path {path}')
+ return False
+ else:
+ return True
+
+ def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
+ with db:
+ if self.dump_on_update:
+ dump = [line for line in db.iterdump()]
+ dump = "\n".join(dump)
+ log.debug(f"db dump:\n{dump}")
+ cur = db.execute(Schedule.EXEC_QUERY, (path,))
+ all_rows = cur.fetchall()
+ rows = [r for r in all_rows
+ if self._is_allowed_repeat(r, path)][0:1]
+ return rows
+
+ def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
+ try:
+ log.debug((f'SnapDB on {fs} changed for {path}, '
+ 'updating next Timer'))
+ rows = []
+ # olddb is passed in the case where we land here without a timer
+ # the lock on the db connection has already been taken
+ if olddb:
+ rows = self.fetch_schedules(olddb, path)
+ else:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ rows = self.fetch_schedules(db, path)
+ timers = self.active_timers.get((fs, path), [])
+ for timer in timers:
+ timer.cancel()
+ timers = []
+ for row in rows:
+ log.debug(f'Creating new snapshot timer for {path}')
+ t = Timer(row[1],
+ self.create_scheduled_snapshot,
+ args=[fs, path, row[0], row[2], row[3]])
+ t.start()
+ timers.append(t)
+ log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s')
+ self.active_timers[(fs, path)] = timers
+ except Exception:
+ self._log_exception('refresh_snap_timers')
+
+ def _log_exception(self, fct):
+ log.error(f'{fct} raised an exception:')
+ log.error(traceback.format_exc())
+
+ def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat):
+ log.debug(f'Scheduled snapshot of {path} triggered')
+ with self.get_schedule_db(fs_name) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ try:
+ try:
+ sched = Schedule.get_db_schedules(path,
+ db,
+ fs_name,
+ repeat=repeat,
+ start=start)[0]
+ time = datetime.now(timezone.utc)
+ with open_filesystem(self, fs_name) as fs_handle:
+ snap_ts = time.strftime(SNAPSHOT_TS_FORMAT_TZ)
+ snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
+ fs_handle.mkdir(snap_name, 0o755)
+ log.info(f'created scheduled snapshot of {path}')
+ log.debug(f'created scheduled snapshot {snap_name}')
+ sched.update_last(time, db)
+ except cephfs.Error:
+ self._log_exception('create_scheduled_snapshot: '
+ 'unexpected exception; '
+ f'deactivating schedule fs:"{fs_name}" '
+ f'path:"{path}" repeat:"{repeat}" '
+ f'retention:"{retention}"')
+ sched.set_inactive(db)
+ except Exception:
+ # catch all exceptions cause otherwise we'll never know since this
+ # is running in a thread
+ self._log_exception('create_scheduled_snapshot')
+ finally:
+ self.refresh_snap_timers(fs_name, path, db)
+ self.prune_snapshots(sched, db)
+ self.store_schedule_db(fs_name, db)
+
+ def prune_snapshots(self, sched, db):
+ try:
+ log.debug('Pruning snapshots')
+ ret = sched.retention
+ path = sched.path
+ prune_candidates = set()
+ time = datetime.now(timezone.utc)
+ with open_filesystem(self, sched.fs) as fs_handle:
+ with fs_handle.opendir(f'{path}/.snap') as d_handle:
+ dir_ = fs_handle.readdir(d_handle)
+ while dir_:
+ if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'):
+ log.debug(f'add {dir_.d_name} to pruning')
+ ts = datetime.strptime(
+ snap_name_to_timestamp(dir_.d_name.decode('utf-8')), SNAPSHOT_TS_FORMAT)
+ prune_candidates.add((dir_, ts))
+ else:
+ log.debug(f'skipping dir entry {dir_.d_name}')
+ dir_ = fs_handle.readdir(d_handle)
+ to_prune = get_prune_set(prune_candidates, ret)
+ for k in to_prune:
+ dirname = k[0].d_name.decode('utf-8')
+ log.debug(f'rmdir on {dirname}')
+ fs_handle.rmdir(f'{path}/.snap/{dirname}')
+ if to_prune:
+ sched.update_pruned(time, db, len(to_prune))
+ except Exception:
+ self._log_exception('prune_snapshots')
+
+ def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ return Schedule.get_db_schedules(path, db, fs)
+
+ def list_snap_schedules(self,
+ fs: str,
+ path: str,
+ recursive: bool) -> List[Schedule]:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ return Schedule.list_schedules(path, db, fs, recursive)
+
+ @updates_schedule_db
+ # TODO improve interface
+ def store_snap_schedule(self, fs, path_, args):
+ sched = Schedule(*args)
+ log.debug(f'repeat is {sched.repeat}')
+ if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
+ log.error('not allowed')
+ raise ValueError('no minute snaps allowed')
+ log.debug(f'attempting to add schedule {sched}')
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ sched.store_schedule(db)
+ self.store_schedule_db(sched.fs, db)
+
+ @updates_schedule_db
+ def rm_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.rm_schedule(db, path, schedule, start)
+ self.store_schedule_db(fs, db)
+
+ @updates_schedule_db
+ def add_retention_spec(self,
+ fs,
+ path,
+ retention_spec_or_period,
+ retention_count):
+ retention_spec = retention_spec_or_period
+ if retention_count:
+ retention_spec = retention_count + retention_spec
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.add_retention(db, path, retention_spec)
+ self.store_schedule_db(fs, db)
+
+ @updates_schedule_db
+ def rm_retention_spec(self,
+ fs,
+ path,
+ retention_spec_or_period,
+ retention_count):
+ retention_spec = retention_spec_or_period
+ if retention_count:
+ retention_spec = retention_count + retention_spec
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.rm_retention(db, path, retention_spec)
+ self.store_schedule_db(fs, db)
+
+ @updates_schedule_db
+ def activate_snap_schedule(self,
+ fs: str,
+ path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ schedules = Schedule.get_db_schedules(path, db, fs,
+ schedule=schedule,
+ start=start)
+ for s in schedules:
+ s.set_active(db)
+ self.store_schedule_db(fs, db)
+
+ @updates_schedule_db
+ def deactivate_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ schedules = Schedule.get_db_schedules(path, db, fs,
+ schedule=schedule,
+ start=start)
+ for s in schedules:
+ s.set_inactive(db)
+ self.store_schedule_db(fs, db)