summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/volumes/fs/async_job.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/pybind/mgr/volumes/fs/async_job.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/pybind/mgr/volumes/fs/async_job.py')
-rw-r--r--src/pybind/mgr/volumes/fs/async_job.py303
1 files changed, 303 insertions, 0 deletions
diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py
new file mode 100644
index 000000000..f1d998c85
--- /dev/null
+++ b/src/pybind/mgr/volumes/fs/async_job.py
@@ -0,0 +1,303 @@
+import sys
+import time
+import logging
+import threading
+import traceback
+from collections import deque
+from mgr_util import lock_timeout_log, CephfsClient
+
+from .exception import NotImplementedException
+
+log = logging.getLogger(__name__)
+
+
+class JobThread(threading.Thread):
+ # this is "not" configurable and there is no need for it to be
+ # configurable. if a thread encounters an exception, we retry
+ # until it hits this many consecutive exceptions.
+ MAX_RETRIES_ON_EXCEPTION = 10
+
+ def __init__(self, async_job, volume_client, name):
+ self.vc = volume_client
+ self.async_job = async_job
+ # event object to cancel jobs
+ self.cancel_event = threading.Event()
+ threading.Thread.__init__(self, name=name)
+
+ def run(self):
+ retries = 0
+ thread_id = threading.currentThread()
+ assert isinstance(thread_id, JobThread)
+ thread_name = thread_id.getName()
+ log.debug("thread [{0}] starting".format(thread_name))
+
+ while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
+ vol_job = None
+ try:
+ # fetch next job to execute
+ with lock_timeout_log(self.async_job.lock):
+ while True:
+ if self.should_reconfigure_num_threads():
+ log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+ self.async_job.threads.remove(self)
+ return
+ vol_job = self.async_job.get_job()
+ if vol_job:
+ break
+ self.async_job.cv.wait()
+ self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
+
+ # execute the job (outside lock)
+ self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
+ retries = 0
+ except NotImplementedException:
+ raise
+ except Exception:
+ # unless the jobs fetching and execution routines are not implemented
+ # retry till we hit cap limit.
+ retries += 1
+ log.warning("thread [{0}] encountered fatal error: (attempt#"
+ " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ log.warning("traceback: {0}".format("".join(
+ traceback.format_exception(exc_type, exc_value, exc_traceback))))
+ finally:
+ # when done, unregister the job
+ if vol_job:
+ with lock_timeout_log(self.async_job.lock):
+ self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
+ time.sleep(1)
+ log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
+ self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+ with lock_timeout_log(self.async_job.lock):
+ self.async_job.threads.remove(self)
+
+ def should_reconfigure_num_threads(self):
+ # reconfigure of max_concurrent_clones
+ return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
+
+ def cancel_job(self):
+ self.cancel_event.set()
+
+ def should_cancel(self):
+ return self.cancel_event.is_set()
+
+ def reset_cancel(self):
+ self.cancel_event.clear()
+
+
+class AsyncJobs(threading.Thread):
+ """
+ Class providing asynchronous execution of jobs via worker threads.
+ `jobs` are grouped by `volume`, so a `volume` can have N number of
+ `jobs` executing concurrently (capped by number of concurrent jobs).
+
+ Usability is simple: subclass this and implement the following:
+ - get_next_job(volname, running_jobs)
+ - execute_job(volname, job, should_cancel)
+
+ ... and do not forget to invoke base class constructor.
+
+ Job cancelation is for a volume as a whole, i.e., all executing jobs
+ for a volume are canceled. Cancelation is poll based -- jobs need to
+ periodically check if cancelation is requested, after which the job
+ should return as soon as possible. Cancelation check is provided
+ via `should_cancel()` lambda passed to `execute_job()`.
+ """
+
+ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
+ threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
+ self.vc = volume_client
+ # queue of volumes for starting async jobs
+ self.q = deque() # type: deque
+ # volume => job tracking
+ self.jobs = {}
+ # lock, cv for kickstarting jobs
+ self.lock = threading.Lock()
+ self.cv = threading.Condition(self.lock)
+ # cv for job cancelation
+ self.waiting = False
+ self.stopping = threading.Event()
+ self.cancel_cv = threading.Condition(self.lock)
+ self.nr_concurrent_jobs = nr_concurrent_jobs
+ self.name_pfx = name_pfx
+ # each async job group uses its own libcephfs connection (pool)
+ self.fs_client = CephfsClient(self.vc.mgr)
+
+ self.threads = []
+ for i in range(self.nr_concurrent_jobs):
+ self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i)))
+ self.threads[-1].start()
+ self.start()
+
+ def run(self):
+ log.debug("tick thread {} starting".format(self.name))
+ with lock_timeout_log(self.lock):
+ while not self.stopping.is_set():
+ c = len(self.threads)
+ if c > self.nr_concurrent_jobs:
+ # Decrease concurrency: notify threads which are waiting for a job to terminate.
+ log.debug("waking threads to terminate due to job reduction")
+ self.cv.notifyAll()
+ elif c < self.nr_concurrent_jobs:
+ # Increase concurrency: create more threads.
+ log.debug("creating new threads to job increase")
+ for i in range(c, self.nr_concurrent_jobs):
+ self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i)))
+ self.threads[-1].start()
+ self.cv.wait(timeout=5)
+
+ def shutdown(self):
+ self.stopping.set()
+ self.cancel_all_jobs()
+ with lock_timeout_log(self.lock):
+ self.cv.notifyAll()
+ self.join()
+
+ def reconfigure_max_async_threads(self, nr_concurrent_jobs):
+ """
+ reconfigure number of cloner threads
+ """
+ self.nr_concurrent_jobs = nr_concurrent_jobs
+
+ def get_job(self):
+ log.debug("processing {0} volume entries".format(len(self.q)))
+ nr_vols = len(self.q)
+ to_remove = []
+ next_job = None
+ while nr_vols > 0:
+ volname = self.q[0]
+ # do this now so that the other thread pick up jobs for other volumes
+ self.q.rotate(1)
+ running_jobs = [j[0] for j in self.jobs[volname]]
+ (ret, job) = self.get_next_job(volname, running_jobs)
+ if job:
+ next_job = (volname, job)
+ break
+ # this is an optimization when for a given volume there are no more
+ # jobs and no jobs are in progress. in such cases we remove the volume
+ # from the tracking list so as to:
+ #
+ # a. not query the filesystem for jobs over and over again
+ # b. keep the filesystem connection idle so that it can be freed
+ # from the connection pool
+ #
+ # if at all there are jobs for a volume, the volume gets added again
+ # to the tracking list and the jobs get kickstarted.
+ # note that, we do not iterate the volume list fully if there is a
+ # jobs to process (that will take place eventually).
+ if ret == 0 and not job and not running_jobs:
+ to_remove.append(volname)
+ nr_vols -= 1
+ for vol in to_remove:
+ log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
+ self.q.remove(vol)
+ self.jobs.pop(vol)
+ return next_job
+
+ def register_async_job(self, volname, job, thread_id):
+ log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
+ self.jobs[volname].append((job, thread_id))
+
+ def unregister_async_job(self, volname, job, thread_id):
+ log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
+ self.jobs[volname].remove((job, thread_id))
+
+ cancelled = thread_id.should_cancel()
+ thread_id.reset_cancel()
+
+ # wake up cancellation waiters if needed
+ if cancelled:
+ logging.info("waking up cancellation waiters")
+ self.cancel_cv.notifyAll()
+
+ def queue_job(self, volname):
+ """
+ queue a volume for asynchronous job execution.
+ """
+ log.info("queuing job for volume '{0}'".format(volname))
+ with lock_timeout_log(self.lock):
+ if volname not in self.q:
+ self.q.append(volname)
+ self.jobs[volname] = []
+ self.cv.notifyAll()
+
+ def _cancel_jobs(self, volname):
+ """
+ cancel all jobs for the volume. do nothing is the no jobs are
+ executing for the given volume. this would wait until all jobs
+ get interrupted and finish execution.
+ """
+ log.info("cancelling jobs for volume '{0}'".format(volname))
+ try:
+ if volname not in self.q and volname not in self.jobs:
+ return
+ self.q.remove(volname)
+ # cancel in-progress operation and wait until complete
+ for j in self.jobs[volname]:
+ j[1].cancel_job()
+ # wait for cancellation to complete
+ while self.jobs[volname]:
+ log.debug("waiting for {0} in-progress jobs for volume '{1}' to "
+ "cancel".format(len(self.jobs[volname]), volname))
+ self.cancel_cv.wait()
+ self.jobs.pop(volname)
+ except (KeyError, ValueError):
+ pass
+
+ def _cancel_job(self, volname, job):
+ """
+ cancel a executing job for a given volume. return True if canceled, False
+ otherwise (volume/job not found).
+ """
+ canceled = False
+ log.info("canceling job {0} for volume {1}".format(job, volname))
+ try:
+ vol_jobs = [j[0] for j in self.jobs.get(volname, [])]
+ if volname not in self.q and job not in vol_jobs:
+ return canceled
+ for j in self.jobs[volname]:
+ if j[0] == job:
+ j[1].cancel_job()
+ # be safe against _cancel_jobs() running concurrently
+ while j in self.jobs.get(volname, []):
+ self.cancel_cv.wait()
+ canceled = True
+ break
+ except (KeyError, ValueError):
+ pass
+ return canceled
+
+ def cancel_job(self, volname, job):
+ with lock_timeout_log(self.lock):
+ return self._cancel_job(volname, job)
+
+ def cancel_jobs(self, volname):
+ """
+ cancel all executing jobs for a given volume.
+ """
+ with lock_timeout_log(self.lock):
+ self._cancel_jobs(volname)
+
+ def cancel_all_jobs(self):
+ """
+ call all executing jobs for all volumes.
+ """
+ with lock_timeout_log(self.lock):
+ for volname in list(self.q):
+ self._cancel_jobs(volname)
+
+ def get_next_job(self, volname, running_jobs):
+ """
+ get the next job for asynchronous execution as (retcode, job) tuple. if no
+ jobs are available return (0, None) else return (0, job). on error return
+ (-ret, None). called under `self.lock`.
+ """
+ raise NotImplementedException()
+
+ def execute_job(self, volname, job, should_cancel):
+ """
+ execute a job for a volume. the job can block on I/O operations, sleep for long
+ hours and do all kinds of synchronous work. called outside `self.lock`.
+ """
+ raise NotImplementedException()