summaryrefslogtreecommitdiffstats
path: root/third_party/python/dlmanager/dlmanager/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/dlmanager/dlmanager/manager.py')
-rw-r--r--third_party/python/dlmanager/dlmanager/manager.py323
1 files changed, 323 insertions, 0 deletions
diff --git a/third_party/python/dlmanager/dlmanager/manager.py b/third_party/python/dlmanager/dlmanager/manager.py
new file mode 100644
index 0000000000..3dce3b7838
--- /dev/null
+++ b/third_party/python/dlmanager/dlmanager/manager.py
@@ -0,0 +1,323 @@
+import os
+import requests
+import six
+import sys
+import tempfile
+import threading
+
+from contextlib import closing
+from six.moves.urllib.parse import urlparse
+
+from dlmanager import fs
+from dlmanager.persist_limit import PersistLimit
+
+
+class DownloadInterrupt(Exception):
+ "Raised when a download is interrupted."
+
+
+class Download(object):
+ """
+ Download is reponsible of downloading one file in the background.
+
+ Example of use: ::
+
+ dl = Download(url, dest)
+ dl.start()
+ dl.wait() # this will block until completion / cancel / error
+
+ If a download fail or is canceled, the temporary dest is removed from
+ the disk.
+
+ Usually, Downloads are created by using :meth:`DownloadManager.download`.
+
+ :param url: the url of the file to download
+ :param dest: the local file path destination
+ :param finished_callback: a callback that will be called in the thread
+ when the thread work is done. Takes the download
+ instance as a parameter.
+ :param chunk_size: size of the chunk that will be read. The thread can
+ not be stopped while we are reading that chunk size.
+ :param session: a requests.Session instance that will do do the real
+ downloading work. If None, `requests` module is used.
+ :param progress: A callable to report the progress (default to None).
+ see :meth:`set_progress`.
+ """
+ def __init__(self, url, dest, finished_callback=None,
+ chunk_size=16 * 1024, session=None, progress=None):
+ self.thread = threading.Thread(
+ target=self._download,
+ args=(url, dest, finished_callback, chunk_size,
+ session or requests)
+ )
+ self._lock = threading.Lock()
+ self.__url = url
+ self.__dest = dest
+ self.__progress = progress
+ self.__canceled = False
+ self.__error = None
+
+ def start(self):
+ """
+ Start the thread that will do the download.
+ """
+ self.thread.start()
+
+ def cancel(self):
+ """
+ Cancel a previously started download.
+ """
+ self.__canceled = True
+
+ def is_canceled(self):
+ """
+ Returns True if we canceled this download.
+ """
+ return self.__canceled
+
+ def is_running(self):
+ """
+ Returns True if the downloading thread is running.
+ """
+ return self.thread.is_alive()
+
+ def wait(self, raise_if_error=True):
+ """
+ Block until the downloading thread is finished.
+
+ :param raise_if_error: if True (the default), :meth:`raise_if_error`
+ will be called and raise an error if any.
+ """
+ while self.thread.is_alive():
+ try:
+ # in case of exception here (like KeyboardInterrupt),
+ # cancel the task.
+ self.thread.join(0.02)
+ except:
+ self.cancel()
+ raise
+ # this will raise exception that may happen inside the thread.
+ if raise_if_error:
+ self.raise_if_error()
+
+ def error(self):
+ """
+ Returns None or a tuple of three values (type, value, traceback)
+ that give information about the exception.
+ """
+ return self.__error
+
+ def raise_if_error(self):
+ """
+ Raise an error if any. If the download was canceled, raise
+ :class:`DownloadInterrupt`.
+ """
+ if self.__error:
+ six.reraise(*self.__error)
+ if self.__canceled:
+ raise DownloadInterrupt()
+
+ def set_progress(self, progress):
+ """
+ set a callable to report the progress of the download, or None to
+ disable any report.
+
+ The callable must take three parameters (download, current, total).
+ Note that this method is thread safe, you can call it during a
+ download.
+ """
+ with self._lock:
+ self.__progress = progress
+
+ def get_dest(self):
+ """
+ Returns the dest.
+ """
+ return self.__dest
+
+ def get_url(self):
+ """
+ Returns the url.
+ """
+ return self.__url
+
+ def _update_progress(self, current, total):
+ with self._lock:
+ if self.__progress:
+ self.__progress(self, current, total)
+
+ def _download(self, url, dest, finished_callback, chunk_size, session):
+ # save the file under a temporary name
+ # this allow to not use a broken file in case things went really bad
+ # while downloading the file (ie the python interpreter is killed
+ # abruptly)
+ temp = None
+ bytes_so_far = 0
+ try:
+ with closing(session.get(url, stream=True)) as response:
+ total_size = response.headers.get('Content-length', '').strip()
+ total_size = int(total_size) if total_size else None
+ self._update_progress(bytes_so_far, total_size)
+ # we use NamedTemporaryFile as raw open() call was causing
+ # issues on windows - see:
+ # https://bugzilla.mozilla.org/show_bug.cgi?id=1185756
+ with tempfile.NamedTemporaryFile(
+ delete=False,
+ suffix='.tmp',
+ dir=os.path.dirname(dest)) as temp:
+ for chunk in response.iter_content(chunk_size):
+ if self.is_canceled():
+ break
+ if chunk:
+ temp.write(chunk)
+ bytes_so_far += len(chunk)
+ self._update_progress(bytes_so_far, total_size)
+ response.raise_for_status()
+ except:
+ self.__error = sys.exc_info()
+ try:
+ if temp is None:
+ pass # not even opened the temp file, nothing to do
+ elif self.is_canceled() or self.__error:
+ fs.remove(temp.name)
+ else:
+ # if all goes well, then rename the file to the real dest
+ fs.remove(dest) # just in case it already existed
+ fs.move(temp.name, dest)
+ finally:
+ if finished_callback:
+ finished_callback(self)
+
+
+class DownloadManager(object):
+ """
+ DownloadManager is responsible of starting and managing downloads inside
+ a given directory. It will download a file only if a given filename
+ is not already there.
+
+ Note that background downloads needs to be stopped. For example, if
+ you have an exception while a download is occuring, python will only
+ exit when the download will finish. To get rid of that, there is a
+ possible idiom: ::
+
+ def download_things(manager):
+ # do things with the manager
+ manager.download(url1, f1)
+ manager.download(url2, f2)
+ ...
+
+ manager = DownloadManager(destdir)
+ try:
+ download_things(manager)
+ finally:
+ # ensure we cancel all background downloads to ask the end
+ # of possible remainings threads
+ manager.cancel()
+
+ :param destdir: a directory where files are downloaded. It will be created
+ if it does not exists.
+ :param session: a requests session. If None, one will be created for you.
+ :param persist_limit: an instance of :class:`PersistLimit`, to allow
+ limiting the size of the download dir. Defaults
+ to None, meaning no limit.
+ """
+ def __init__(self, destdir, session=None, persist_limit=None):
+ self.destdir = destdir
+ self.session = session or requests.Session()
+ self._downloads = {}
+ self._lock = threading.Lock()
+ self.persist_limit = persist_limit or PersistLimit(0)
+ self.persist_limit.register_dir_content(self.destdir)
+
+ # if persist folder does not exist, create it
+ if not os.path.isdir(destdir):
+ os.makedirs(destdir)
+
+ def get_dest(self, fname):
+ return os.path.join(self.destdir, fname)
+
+ def cancel(self, cancel_if=None):
+ """
+ Cancel downloads, if any.
+
+ if cancel_if is given, it must be a callable that take the download
+ instance as parameter, and return True if the download needs to be
+ canceled.
+
+ Note that download threads won't be stopped directly.
+ """
+ with self._lock:
+ for download in six.itervalues(self._downloads):
+ if cancel_if is None or cancel_if(download):
+ if download.is_running():
+ download.cancel()
+
+ def wait(self, raise_if_error=True):
+ """
+ Wait for all downloads to be finished.
+ """
+ for download in self._downloads.values():
+ download.wait(raise_if_error=raise_if_error)
+
+ def download(self, url, fname=None, progress=None):
+ """
+ Returns a started :class:`Download` instance, or None if fname is
+ already present in destdir.
+
+ if a download is already running for the given fname, it is just
+ returned. Else the download is created, started and returned.
+
+ :param url: url of the file to download.
+ :param fname: name to give for the downloaded file. If None, it will
+ be the name extracted in the url.
+ :param progress: a callable to report the download progress, or None.
+ See :meth:`Download.set_progress`.
+ """
+ if fname is None:
+ fname = urlparse(url).path.split('/')[-1]
+ dest = self.get_dest(fname)
+ with self._lock:
+ # if we are downloading, returns the instance
+ if dest in self._downloads:
+ dl = self._downloads[dest]
+ if progress:
+ dl.set_progress(progress)
+ return dl
+
+ if os.path.exists(dest):
+ return None
+
+ # else create the download (will be automatically removed of
+ # the list on completion) start it, and returns that.
+ with self._lock:
+ download = Download(url, dest,
+ session=self.session,
+ finished_callback=self._download_finished,
+ progress=progress)
+ self._downloads[dest] = download
+ download.start()
+ self._download_started(download)
+ return download
+
+ def _download_started(self, dl):
+ """
+ Useful when sub-classing. Report the start event of a download.
+
+ :param dl: The :class:`Download` instance.
+ """
+ pass
+
+ def _download_finished(self, dl):
+ """
+ Useful when sub-classing. Report the end of a download.
+
+ Note that this is executed in the download thread. Also, you should
+ make sure to call the base implementation.
+
+ :param dl: The :class:`Download` instance.
+ """
+ with self._lock:
+ dest = dl.get_dest()
+ del self._downloads[dest]
+ self.persist_limit.register_file(dest)
+ self.persist_limit.remove_old_files()