From 0ebf5bdf043a27fd3dfb7f92e0cb63d88954c44d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 03:47:29 +0200 Subject: Adding upstream version 115.8.0esr. Signed-off-by: Daniel Baumann --- third_party/python/dlmanager/tests/test_manager.py | 251 +++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 third_party/python/dlmanager/tests/test_manager.py (limited to 'third_party/python/dlmanager/tests/test_manager.py') diff --git a/third_party/python/dlmanager/tests/test_manager.py b/third_party/python/dlmanager/tests/test_manager.py new file mode 100644 index 0000000000..f0ade9021f --- /dev/null +++ b/third_party/python/dlmanager/tests/test_manager.py @@ -0,0 +1,251 @@ +try: + import unittest2 as unittest # python < 2.7 compat +except ImportError: + import unittest +import tempfile +import shutil +import os +import time +import six +from mock import Mock + +from dlmanager import manager as download_manager + + +def mock_session(): + response = Mock() + session = Mock(get=Mock(return_value=response)) + return session, response + + +def mock_response(response, data, wait=0): + data = six.b(data) + + def iter_content(chunk_size=4): + rest = data + while rest: + time.sleep(wait) + chunk = rest[:chunk_size] + rest = rest[chunk_size:] + yield chunk + + response.headers = {'Content-length': str(len(data))} + response.iter_content = iter_content + + +class TestDownload(unittest.TestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tempdir) + self.finished = Mock() + self.session, self.session_response = mock_session() + self.tempfile = os.path.join(self.tempdir, 'dest') + self.dl = download_manager.Download('http://url', self.tempfile, + finished_callback=self.finished, + chunk_size=4, + session=self.session) + + def test_creation(self): + self.assertFalse(self.dl.is_canceled()) + self.assertFalse(self.dl.is_running()) + self.assertIsNone(self.dl.error()) + self.assertEquals(self.dl.get_url(), 'http://url') + self.assertEquals(self.dl.get_dest(), self.tempfile) + + def create_response(self, data, wait=0): + mock_response(self.session_response, data, wait) + + def test_download(self): + self.create_response('1234' * 4, 0.01) + + # no file present yet + self.assertFalse(os.path.exists(self.tempfile)) + + self.dl.start() + self.assertTrue(self.dl.is_running()) + self.dl.wait() + + self.assertFalse(self.dl.is_running()) + self.finished.assert_called_with(self.dl) + # file has been downloaded + with open(self.tempfile) as f: + self.assertEquals(f.read(), '1234' * 4) + + def test_download_cancel(self): + self.create_response('1234' * 1000, wait=0.01) + + start = time.time() + self.dl.start() + time.sleep(0.1) + self.dl.cancel() + + with self.assertRaises(download_manager.DownloadInterrupt): + self.dl.wait() + + self.assertTrue(self.dl.is_canceled()) + + # response generation should have taken 1000 * 0.01 = 10 seconds. + # since we canceled, this must be lower. + self.assertTrue((time.time() - start) < 1.0) + + # file was deleted + self.assertFalse(os.path.exists(self.tempfile)) + # finished callback was called + self.finished.assert_called_with(self.dl) + + def test_download_with_progress(self): + data = [] + + def update_progress(_dl, current, total): + data.append((_dl, current, total)) + + self.create_response('1234' * 4) + + self.dl.set_progress(update_progress) + self.dl.start() + self.dl.wait() + + self.assertEquals(data, [ + (self.dl, 0, 16), + (self.dl, 4, 16), + (self.dl, 8, 16), + (self.dl, 12, 16), + (self.dl, 16, 16), + ]) + # file has been downloaded + with open(self.tempfile) as f: + self.assertEquals(f.read(), '1234' * 4) + # finished callback was called + self.finished.assert_called_with(self.dl) + + def test_download_error_in_thread(self): + self.session_response.headers = {'Content-length': '24'} + self.session_response.iter_content.side_effect = IOError + + self.dl.start() + with self.assertRaises(IOError): + self.dl.wait() + + self.assertEquals(self.dl.error()[0], IOError) + # finished callback was called + self.finished.assert_called_with(self.dl) + + def test_wait_does_not_block_on_exception(self): + # this test the case when a user may hit CTRL-C for example + # during a dl.wait() call. + self.create_response('1234' * 1000, wait=0.01) + + original_join = self.dl.thread.join + it = iter('123') + + def join(timeout=None): + next(it) # will throw StopIteration after a few calls + original_join(timeout) + + self.dl.thread.join = join + + start = time.time() + self.dl.start() + + with self.assertRaises(StopIteration): + self.dl.wait() + + self.assertTrue(self.dl.is_canceled()) + # wait for the thread to finish + original_join() + + # response generation should have taken 1000 * 0.01 = 10 seconds. + # since we got an error, this must be lower. + self.assertTrue((time.time() - start) < 1.0) + + # file was deleted + self.assertFalse(os.path.exists(self.tempfile)) + # finished callback was called + self.finished.assert_called_with(self.dl) + + +class TestDownloadManager(unittest.TestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tempdir) + + self.dl_manager = download_manager.DownloadManager(self.tempdir) + + def do_download(self, url, fname, data, wait=0): + session, response = mock_session() + mock_response(response, data, wait) + # patch the session, so the download will use that + self.dl_manager.session = session + return self.dl_manager.download(url, fname) + + def test_download(self): + dl1 = self.do_download('http://foo', 'foo', 'hello' * 4, wait=0.02) + self.assertIsInstance(dl1, download_manager.Download) + self.assertTrue(dl1.is_running()) + + # with the same fname, no new download is started. The same instance + # is returned since the download is running. + dl2 = self.do_download('http://bar', 'foo', 'hello2' * 4, wait=0.02) + self.assertEquals(dl1, dl2) + + # starting a download with another fname will trigger a new download + dl3 = self.do_download('http://bar', 'foo2', 'hello you' * 4) + self.assertIsInstance(dl3, download_manager.Download) + self.assertNotEquals(dl3, dl1) + + # let's wait for the downloads to finish + dl3.wait() + dl1.wait() + + # now if we try to download a fname that exists, None is returned + dl4 = self.do_download('http://bar', 'foo', 'hello2' * 4, wait=0.02) + self.assertIsNone(dl4) + + # downloaded files are what is expected + def content(fname): + with open(os.path.join(self.tempdir, fname)) as f: + return f.read() + self.assertEquals(content('foo'), 'hello' * 4) + self.assertEquals(content('foo2'), 'hello you' * 4) + + # download instances are removed from the manager (internal test) + self.assertEquals(self.dl_manager._downloads, {}) + + def test_cancel(self): + dl1 = self.do_download('http://foo', 'foo', 'foo' * 50000, wait=0.02) + dl2 = self.do_download('http://foo', 'bar', 'bar' * 50000, wait=0.02) + dl3 = self.do_download('http://foo', 'foobar', 'foobar' * 4) + + # let's cancel only one + def cancel_if(dl): + if os.path.basename(dl.get_dest()) == 'foo': + return True + self.dl_manager.cancel(cancel_if=cancel_if) + + self.assertTrue(dl1.is_canceled()) + self.assertFalse(dl2.is_canceled()) + self.assertFalse(dl3.is_canceled()) + + # wait for dl3 + dl3.wait() + + # cancel everything + self.dl_manager.cancel() + + self.assertTrue(dl1.is_canceled()) + self.assertTrue(dl2.is_canceled()) + # dl3 is not canceled since it finished before + self.assertFalse(dl3.is_canceled()) + + # wait for the completion of dl1 and dl2 threads + dl1.wait(raise_if_error=False) + dl2.wait(raise_if_error=False) + + # at the end, only dl3 has been downloaded + self.assertEquals(os.listdir(self.tempdir), ["foobar"]) + + with open(os.path.join(self.tempdir, 'foobar')) as f: + self.assertEquals(f.read(), 'foobar' * 4) + + # download instances are removed from the manager (internal test) + self.assertEquals(self.dl_manager._downloads, {}) -- cgit v1.2.3