diff options
Diffstat (limited to 'third_party/python/dlmanager/dlmanager')
-rw-r--r-- | third_party/python/dlmanager/dlmanager/__init__.py | 18 | ||||
-rw-r--r-- | third_party/python/dlmanager/dlmanager/fs.py | 116 | ||||
-rw-r--r-- | third_party/python/dlmanager/dlmanager/manager.py | 323 | ||||
-rw-r--r-- | third_party/python/dlmanager/dlmanager/persist_limit.py | 65 |
4 files changed, 522 insertions, 0 deletions
diff --git a/third_party/python/dlmanager/dlmanager/__init__.py b/third_party/python/dlmanager/dlmanager/__init__.py new file mode 100644 index 0000000000..0890af484a --- /dev/null +++ b/third_party/python/dlmanager/dlmanager/__init__.py @@ -0,0 +1,18 @@ +import logging + +__version__ = "0.1.1" + + +try: # Python 2.7+ + from logging import NullHandler +except ImportError: + class NullHandler(logging.Handler): + def emit(self, record): + pass + +# Set default logging handler to avoid "No handler found" warnings. +logging.getLogger(__name__).addHandler(NullHandler()) + +# exported api +from dlmanager.manager import Download, DownloadInterrupt, DownloadManager # noqa +from dlmanager.persist_limit import PersistLimit # noqa diff --git a/third_party/python/dlmanager/dlmanager/fs.py b/third_party/python/dlmanager/dlmanager/fs.py new file mode 100644 index 0000000000..8908b5efce --- /dev/null +++ b/third_party/python/dlmanager/dlmanager/fs.py @@ -0,0 +1,116 @@ +import errno +import logging +import os +import shutil +import stat +import time + +""" +File system utilities, copied from mozfile. +""" + +LOG = logging.getLogger(__name__) + + +def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5): + """ + It's possible to see spurious errors on Windows due to various things + keeping a handle to the directory open (explorer, virus scanners, etc) + So we try a few times if it fails with a known error. + """ + retry_count = 0 + while True: + try: + func(*args) + except OSError as e: + # Error codes are defined in: + # http://docs.python.org/2/library/errno.html#module-errno + if e.errno not in (errno.EACCES, errno.ENOTEMPTY): + raise + + if retry_count == retry_max: + raise + + retry_count += 1 + + LOG.info('%s() failed for "%s". Reason: %s (%s). Retrying...', + func.__name__, args, e.strerror, e.errno) + time.sleep(retry_delay) + else: + # If no exception has been thrown it should be done + break + + +def remove(path): + """Removes the specified file, link, or directory tree. + + This is a replacement for shutil.rmtree that works better under + windows. It does the following things: + + - check path access for the current user before trying to remove + - retry operations on some known errors due to various things keeping + a handle on file paths - like explorer, virus scanners, etc. The + known errors are errno.EACCES and errno.ENOTEMPTY, and it will + retry up to 5 five times with a delay of 0.5 seconds between each + attempt. + + Note that no error will be raised if the given path does not exists. + + :param path: path to be removed + """ + + def _call_with_windows_retry(*args, **kwargs): + try: + _call_windows_retry(*args, **kwargs) + except OSError as e: + # The file or directory to be removed doesn't exist anymore + if e.errno != errno.ENOENT: + raise + + def _update_permissions(path): + """Sets specified pemissions depending on filetype""" + if os.path.islink(path): + # Path is a symlink which we don't have to modify + # because it should already have all the needed permissions + return + + stats = os.stat(path) + + if os.path.isfile(path): + mode = stats.st_mode | stat.S_IWUSR + elif os.path.isdir(path): + mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR + else: + # Not supported type + return + + _call_with_windows_retry(os.chmod, (path, mode)) + + if not os.path.exists(path): + return + + if os.path.isfile(path) or os.path.islink(path): + # Verify the file or link is read/write for the current user + _update_permissions(path) + _call_with_windows_retry(os.remove, (path,)) + + elif os.path.isdir(path): + # Verify the directory is read/write/execute for the current user + _update_permissions(path) + + # We're ensuring that every nested item has writable permission. + for root, dirs, files in os.walk(path): + for entry in dirs + files: + _update_permissions(os.path.join(root, entry)) + _call_with_windows_retry(shutil.rmtree, (path,)) + + +def move(src, dst): + """ + Move a file or directory path. + + This is a replacement for shutil.move that works better under windows, + retrying operations on some known errors due to various things keeping + a handle on file paths. + """ + _call_windows_retry(shutil.move, (src, dst)) diff --git a/third_party/python/dlmanager/dlmanager/manager.py b/third_party/python/dlmanager/dlmanager/manager.py new file mode 100644 index 0000000000..3dce3b7838 --- /dev/null +++ b/third_party/python/dlmanager/dlmanager/manager.py @@ -0,0 +1,323 @@ +import os +import requests +import six +import sys +import tempfile +import threading + +from contextlib import closing +from six.moves.urllib.parse import urlparse + +from dlmanager import fs +from dlmanager.persist_limit import PersistLimit + + +class DownloadInterrupt(Exception): + "Raised when a download is interrupted." + + +class Download(object): + """ + Download is reponsible of downloading one file in the background. + + Example of use: :: + + dl = Download(url, dest) + dl.start() + dl.wait() # this will block until completion / cancel / error + + If a download fail or is canceled, the temporary dest is removed from + the disk. + + Usually, Downloads are created by using :meth:`DownloadManager.download`. + + :param url: the url of the file to download + :param dest: the local file path destination + :param finished_callback: a callback that will be called in the thread + when the thread work is done. Takes the download + instance as a parameter. + :param chunk_size: size of the chunk that will be read. The thread can + not be stopped while we are reading that chunk size. + :param session: a requests.Session instance that will do do the real + downloading work. If None, `requests` module is used. + :param progress: A callable to report the progress (default to None). + see :meth:`set_progress`. + """ + def __init__(self, url, dest, finished_callback=None, + chunk_size=16 * 1024, session=None, progress=None): + self.thread = threading.Thread( + target=self._download, + args=(url, dest, finished_callback, chunk_size, + session or requests) + ) + self._lock = threading.Lock() + self.__url = url + self.__dest = dest + self.__progress = progress + self.__canceled = False + self.__error = None + + def start(self): + """ + Start the thread that will do the download. + """ + self.thread.start() + + def cancel(self): + """ + Cancel a previously started download. + """ + self.__canceled = True + + def is_canceled(self): + """ + Returns True if we canceled this download. + """ + return self.__canceled + + def is_running(self): + """ + Returns True if the downloading thread is running. + """ + return self.thread.is_alive() + + def wait(self, raise_if_error=True): + """ + Block until the downloading thread is finished. + + :param raise_if_error: if True (the default), :meth:`raise_if_error` + will be called and raise an error if any. + """ + while self.thread.is_alive(): + try: + # in case of exception here (like KeyboardInterrupt), + # cancel the task. + self.thread.join(0.02) + except: + self.cancel() + raise + # this will raise exception that may happen inside the thread. + if raise_if_error: + self.raise_if_error() + + def error(self): + """ + Returns None or a tuple of three values (type, value, traceback) + that give information about the exception. + """ + return self.__error + + def raise_if_error(self): + """ + Raise an error if any. If the download was canceled, raise + :class:`DownloadInterrupt`. + """ + if self.__error: + six.reraise(*self.__error) + if self.__canceled: + raise DownloadInterrupt() + + def set_progress(self, progress): + """ + set a callable to report the progress of the download, or None to + disable any report. + + The callable must take three parameters (download, current, total). + Note that this method is thread safe, you can call it during a + download. + """ + with self._lock: + self.__progress = progress + + def get_dest(self): + """ + Returns the dest. + """ + return self.__dest + + def get_url(self): + """ + Returns the url. + """ + return self.__url + + def _update_progress(self, current, total): + with self._lock: + if self.__progress: + self.__progress(self, current, total) + + def _download(self, url, dest, finished_callback, chunk_size, session): + # save the file under a temporary name + # this allow to not use a broken file in case things went really bad + # while downloading the file (ie the python interpreter is killed + # abruptly) + temp = None + bytes_so_far = 0 + try: + with closing(session.get(url, stream=True)) as response: + total_size = response.headers.get('Content-length', '').strip() + total_size = int(total_size) if total_size else None + self._update_progress(bytes_so_far, total_size) + # we use NamedTemporaryFile as raw open() call was causing + # issues on windows - see: + # https://bugzilla.mozilla.org/show_bug.cgi?id=1185756 + with tempfile.NamedTemporaryFile( + delete=False, + suffix='.tmp', + dir=os.path.dirname(dest)) as temp: + for chunk in response.iter_content(chunk_size): + if self.is_canceled(): + break + if chunk: + temp.write(chunk) + bytes_so_far += len(chunk) + self._update_progress(bytes_so_far, total_size) + response.raise_for_status() + except: + self.__error = sys.exc_info() + try: + if temp is None: + pass # not even opened the temp file, nothing to do + elif self.is_canceled() or self.__error: + fs.remove(temp.name) + else: + # if all goes well, then rename the file to the real dest + fs.remove(dest) # just in case it already existed + fs.move(temp.name, dest) + finally: + if finished_callback: + finished_callback(self) + + +class DownloadManager(object): + """ + DownloadManager is responsible of starting and managing downloads inside + a given directory. It will download a file only if a given filename + is not already there. + + Note that background downloads needs to be stopped. For example, if + you have an exception while a download is occuring, python will only + exit when the download will finish. To get rid of that, there is a + possible idiom: :: + + def download_things(manager): + # do things with the manager + manager.download(url1, f1) + manager.download(url2, f2) + ... + + manager = DownloadManager(destdir) + try: + download_things(manager) + finally: + # ensure we cancel all background downloads to ask the end + # of possible remainings threads + manager.cancel() + + :param destdir: a directory where files are downloaded. It will be created + if it does not exists. + :param session: a requests session. If None, one will be created for you. + :param persist_limit: an instance of :class:`PersistLimit`, to allow + limiting the size of the download dir. Defaults + to None, meaning no limit. + """ + def __init__(self, destdir, session=None, persist_limit=None): + self.destdir = destdir + self.session = session or requests.Session() + self._downloads = {} + self._lock = threading.Lock() + self.persist_limit = persist_limit or PersistLimit(0) + self.persist_limit.register_dir_content(self.destdir) + + # if persist folder does not exist, create it + if not os.path.isdir(destdir): + os.makedirs(destdir) + + def get_dest(self, fname): + return os.path.join(self.destdir, fname) + + def cancel(self, cancel_if=None): + """ + Cancel downloads, if any. + + if cancel_if is given, it must be a callable that take the download + instance as parameter, and return True if the download needs to be + canceled. + + Note that download threads won't be stopped directly. + """ + with self._lock: + for download in six.itervalues(self._downloads): + if cancel_if is None or cancel_if(download): + if download.is_running(): + download.cancel() + + def wait(self, raise_if_error=True): + """ + Wait for all downloads to be finished. + """ + for download in self._downloads.values(): + download.wait(raise_if_error=raise_if_error) + + def download(self, url, fname=None, progress=None): + """ + Returns a started :class:`Download` instance, or None if fname is + already present in destdir. + + if a download is already running for the given fname, it is just + returned. Else the download is created, started and returned. + + :param url: url of the file to download. + :param fname: name to give for the downloaded file. If None, it will + be the name extracted in the url. + :param progress: a callable to report the download progress, or None. + See :meth:`Download.set_progress`. + """ + if fname is None: + fname = urlparse(url).path.split('/')[-1] + dest = self.get_dest(fname) + with self._lock: + # if we are downloading, returns the instance + if dest in self._downloads: + dl = self._downloads[dest] + if progress: + dl.set_progress(progress) + return dl + + if os.path.exists(dest): + return None + + # else create the download (will be automatically removed of + # the list on completion) start it, and returns that. + with self._lock: + download = Download(url, dest, + session=self.session, + finished_callback=self._download_finished, + progress=progress) + self._downloads[dest] = download + download.start() + self._download_started(download) + return download + + def _download_started(self, dl): + """ + Useful when sub-classing. Report the start event of a download. + + :param dl: The :class:`Download` instance. + """ + pass + + def _download_finished(self, dl): + """ + Useful when sub-classing. Report the end of a download. + + Note that this is executed in the download thread. Also, you should + make sure to call the base implementation. + + :param dl: The :class:`Download` instance. + """ + with self._lock: + dest = dl.get_dest() + del self._downloads[dest] + self.persist_limit.register_file(dest) + self.persist_limit.remove_old_files() diff --git a/third_party/python/dlmanager/dlmanager/persist_limit.py b/third_party/python/dlmanager/dlmanager/persist_limit.py new file mode 100644 index 0000000000..03a1829f70 --- /dev/null +++ b/third_party/python/dlmanager/dlmanager/persist_limit.py @@ -0,0 +1,65 @@ +import os +import stat + +from collections import namedtuple +from glob import glob + +from dlmanager import fs + + +File = namedtuple('File', ('path', 'stat')) + + +class PersistLimit(object): + """ + Keep a list of files, removing the oldest ones when the size_limit + is reached. + + The access time of a file is used to determine the oldests, e.g. the + last time a file was read. + + :param size_limit: the size limit in bytes. A value of 0 means no limit. + :param file_limit: even if the size limit is reached, this force + to keep at least *file_limit* files. + """ + def __init__(self, size_limit, file_limit=5): + self.size_limit = size_limit + self.file_limit = file_limit + self.files = [] + self._files_size = 0 + + def register_file(self, path): + """ + register a single file. + """ + try: + fstat = os.stat(path) + except OSError: + # file do not exists probably, just skip it + # note this happen when backgound files are canceled + return + if stat.S_ISREG(fstat.st_mode): + self.files.append(File(path=path, stat=fstat)) + self._files_size += fstat.st_size + + def register_dir_content(self, directory, pattern="*"): + """ + Register every files in a directory that match *pattern*. + """ + for path in glob(os.path.join(directory, pattern)): + self.register_file(path) + + def remove_old_files(self): + """ + remove oldest registered files. + """ + if self.size_limit <= 0 or self.file_limit <= 0: + return + # sort by creation time, oldest first + files = sorted(self.files, key=lambda f: f.stat.st_atime) + while len(files) > self.file_limit and \ + self._files_size >= self.size_limit: + f = files.pop(0) + fs.remove(f.path) + self._files_size -= f.stat.st_size + self.files = files |