diff options
Diffstat (limited to 'src/pybind/mgr/volumes/fs/async_job.py')
-rw-r--r-- | src/pybind/mgr/volumes/fs/async_job.py | 303 |
1 files changed, 303 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py new file mode 100644 index 000000000..f1d998c85 --- /dev/null +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -0,0 +1,303 @@ +import sys +import time +import logging +import threading +import traceback +from collections import deque +from mgr_util import lock_timeout_log, CephfsClient + +from .exception import NotImplementedException + +log = logging.getLogger(__name__) + + +class JobThread(threading.Thread): + # this is "not" configurable and there is no need for it to be + # configurable. if a thread encounters an exception, we retry + # until it hits this many consecutive exceptions. + MAX_RETRIES_ON_EXCEPTION = 10 + + def __init__(self, async_job, volume_client, name): + self.vc = volume_client + self.async_job = async_job + # event object to cancel jobs + self.cancel_event = threading.Event() + threading.Thread.__init__(self, name=name) + + def run(self): + retries = 0 + thread_id = threading.currentThread() + assert isinstance(thread_id, JobThread) + thread_name = thread_id.getName() + log.debug("thread [{0}] starting".format(thread_name)) + + while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: + vol_job = None + try: + # fetch next job to execute + with lock_timeout_log(self.async_job.lock): + while True: + if self.should_reconfigure_num_threads(): + log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) + self.async_job.threads.remove(self) + return + vol_job = self.async_job.get_job() + if vol_job: + break + self.async_job.cv.wait() + self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id) + + # execute the job (outside lock) + self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel()) + retries = 0 + except NotImplementedException: + raise + except Exception: + # unless the jobs fetching and execution routines are not implemented + # retry till we hit cap limit. + retries += 1 + log.warning("thread [{0}] encountered fatal error: (attempt#" + " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION)) + exc_type, exc_value, exc_traceback = sys.exc_info() + log.warning("traceback: {0}".format("".join( + traceback.format_exception(exc_type, exc_value, exc_traceback)))) + finally: + # when done, unregister the job + if vol_job: + with lock_timeout_log(self.async_job.lock): + self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id) + time.sleep(1) + log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) + self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) + with lock_timeout_log(self.async_job.lock): + self.async_job.threads.remove(self) + + def should_reconfigure_num_threads(self): + # reconfigure of max_concurrent_clones + return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs + + def cancel_job(self): + self.cancel_event.set() + + def should_cancel(self): + return self.cancel_event.is_set() + + def reset_cancel(self): + self.cancel_event.clear() + + +class AsyncJobs(threading.Thread): + """ + Class providing asynchronous execution of jobs via worker threads. + `jobs` are grouped by `volume`, so a `volume` can have N number of + `jobs` executing concurrently (capped by number of concurrent jobs). + + Usability is simple: subclass this and implement the following: + - get_next_job(volname, running_jobs) + - execute_job(volname, job, should_cancel) + + ... and do not forget to invoke base class constructor. + + Job cancelation is for a volume as a whole, i.e., all executing jobs + for a volume are canceled. Cancelation is poll based -- jobs need to + periodically check if cancelation is requested, after which the job + should return as soon as possible. Cancelation check is provided + via `should_cancel()` lambda passed to `execute_job()`. + """ + + def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): + threading.Thread.__init__(self, name="{0}.tick".format(name_pfx)) + self.vc = volume_client + # queue of volumes for starting async jobs + self.q = deque() # type: deque + # volume => job tracking + self.jobs = {} + # lock, cv for kickstarting jobs + self.lock = threading.Lock() + self.cv = threading.Condition(self.lock) + # cv for job cancelation + self.waiting = False + self.stopping = threading.Event() + self.cancel_cv = threading.Condition(self.lock) + self.nr_concurrent_jobs = nr_concurrent_jobs + self.name_pfx = name_pfx + # each async job group uses its own libcephfs connection (pool) + self.fs_client = CephfsClient(self.vc.mgr) + + self.threads = [] + for i in range(self.nr_concurrent_jobs): + self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i))) + self.threads[-1].start() + self.start() + + def run(self): + log.debug("tick thread {} starting".format(self.name)) + with lock_timeout_log(self.lock): + while not self.stopping.is_set(): + c = len(self.threads) + if c > self.nr_concurrent_jobs: + # Decrease concurrency: notify threads which are waiting for a job to terminate. + log.debug("waking threads to terminate due to job reduction") + self.cv.notifyAll() + elif c < self.nr_concurrent_jobs: + # Increase concurrency: create more threads. + log.debug("creating new threads to job increase") + for i in range(c, self.nr_concurrent_jobs): + self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i))) + self.threads[-1].start() + self.cv.wait(timeout=5) + + def shutdown(self): + self.stopping.set() + self.cancel_all_jobs() + with lock_timeout_log(self.lock): + self.cv.notifyAll() + self.join() + + def reconfigure_max_async_threads(self, nr_concurrent_jobs): + """ + reconfigure number of cloner threads + """ + self.nr_concurrent_jobs = nr_concurrent_jobs + + def get_job(self): + log.debug("processing {0} volume entries".format(len(self.q))) + nr_vols = len(self.q) + to_remove = [] + next_job = None + while nr_vols > 0: + volname = self.q[0] + # do this now so that the other thread pick up jobs for other volumes + self.q.rotate(1) + running_jobs = [j[0] for j in self.jobs[volname]] + (ret, job) = self.get_next_job(volname, running_jobs) + if job: + next_job = (volname, job) + break + # this is an optimization when for a given volume there are no more + # jobs and no jobs are in progress. in such cases we remove the volume + # from the tracking list so as to: + # + # a. not query the filesystem for jobs over and over again + # b. keep the filesystem connection idle so that it can be freed + # from the connection pool + # + # if at all there are jobs for a volume, the volume gets added again + # to the tracking list and the jobs get kickstarted. + # note that, we do not iterate the volume list fully if there is a + # jobs to process (that will take place eventually). + if ret == 0 and not job and not running_jobs: + to_remove.append(volname) + nr_vols -= 1 + for vol in to_remove: + log.debug("auto removing volume '{0}' from tracked volumes".format(vol)) + self.q.remove(vol) + self.jobs.pop(vol) + return next_job + + def register_async_job(self, volname, job, thread_id): + log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id)) + self.jobs[volname].append((job, thread_id)) + + def unregister_async_job(self, volname, job, thread_id): + log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id)) + self.jobs[volname].remove((job, thread_id)) + + cancelled = thread_id.should_cancel() + thread_id.reset_cancel() + + # wake up cancellation waiters if needed + if cancelled: + logging.info("waking up cancellation waiters") + self.cancel_cv.notifyAll() + + def queue_job(self, volname): + """ + queue a volume for asynchronous job execution. + """ + log.info("queuing job for volume '{0}'".format(volname)) + with lock_timeout_log(self.lock): + if volname not in self.q: + self.q.append(volname) + self.jobs[volname] = [] + self.cv.notifyAll() + + def _cancel_jobs(self, volname): + """ + cancel all jobs for the volume. do nothing is the no jobs are + executing for the given volume. this would wait until all jobs + get interrupted and finish execution. + """ + log.info("cancelling jobs for volume '{0}'".format(volname)) + try: + if volname not in self.q and volname not in self.jobs: + return + self.q.remove(volname) + # cancel in-progress operation and wait until complete + for j in self.jobs[volname]: + j[1].cancel_job() + # wait for cancellation to complete + while self.jobs[volname]: + log.debug("waiting for {0} in-progress jobs for volume '{1}' to " + "cancel".format(len(self.jobs[volname]), volname)) + self.cancel_cv.wait() + self.jobs.pop(volname) + except (KeyError, ValueError): + pass + + def _cancel_job(self, volname, job): + """ + cancel a executing job for a given volume. return True if canceled, False + otherwise (volume/job not found). + """ + canceled = False + log.info("canceling job {0} for volume {1}".format(job, volname)) + try: + vol_jobs = [j[0] for j in self.jobs.get(volname, [])] + if volname not in self.q and job not in vol_jobs: + return canceled + for j in self.jobs[volname]: + if j[0] == job: + j[1].cancel_job() + # be safe against _cancel_jobs() running concurrently + while j in self.jobs.get(volname, []): + self.cancel_cv.wait() + canceled = True + break + except (KeyError, ValueError): + pass + return canceled + + def cancel_job(self, volname, job): + with lock_timeout_log(self.lock): + return self._cancel_job(volname, job) + + def cancel_jobs(self, volname): + """ + cancel all executing jobs for a given volume. + """ + with lock_timeout_log(self.lock): + self._cancel_jobs(volname) + + def cancel_all_jobs(self): + """ + call all executing jobs for all volumes. + """ + with lock_timeout_log(self.lock): + for volname in list(self.q): + self._cancel_jobs(volname) + + def get_next_job(self, volname, running_jobs): + """ + get the next job for asynchronous execution as (retcode, job) tuple. if no + jobs are available return (0, None) else return (0, job). on error return + (-ret, None). called under `self.lock`. + """ + raise NotImplementedException() + + def execute_job(self, volname, job, should_cancel): + """ + execute a job for a volume. the job can block on I/O operations, sleep for long + hours and do all kinds of synchronous work. called outside `self.lock`. + """ + raise NotImplementedException() |