diff options
Diffstat (limited to 'deluge/tests/test_httpdownloader.py')
-rw-r--r-- | deluge/tests/test_httpdownloader.py | 239 |
1 files changed, 132 insertions, 107 deletions
diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py index a503e46..8c491b6 100644 --- a/deluge/tests/test_httpdownloader.py +++ b/deluge/tests/test_httpdownloader.py @@ -1,26 +1,23 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - +import os import tempfile from email.utils import formatdate +import pytest +import pytest_twisted from twisted.internet import reactor from twisted.internet.error import CannotListenError -from twisted.python.failure import Failure -from twisted.trial import unittest -from twisted.web.error import PageRedirect +from twisted.web.error import Error, PageRedirect from twisted.web.http import NOT_MODIFIED from twisted.web.resource import EncodingResourceWrapper, Resource from twisted.web.server import GzipEncoderFactory, Site from twisted.web.util import redirectTo -from deluge.common import windows_check from deluge.httpdownloader import download_file from deluge.log import setup_logger @@ -28,7 +25,7 @@ temp_dir = tempfile.mkdtemp() def fname(name): - return '%s/%s' % (temp_dir, name) + return os.path.join(temp_dir, name) class RedirectResource(Resource): @@ -47,9 +44,30 @@ class RenameResource(Resource): class AttachmentResource(Resource): def render(self, request): - request.setHeader(b'Content-Type', b'text/plain') + content_type = b'text/plain' + charset = request.getHeader(b'content-charset') + if charset: + content_type += b'; charset=' + charset + request.setHeader(b'Content-Type', content_type) request.setHeader(b'Content-Disposition', b'attachment') - return b'Attachement with no filename set' + append = request.getHeader(b'content-append') or b'' + content = 'Attachment with no filename set{}'.format(append.decode('utf8')) + return ( + content.encode(charset.decode('utf8')) + if charset + else content.encode('utf8') + ) + + +class TorrentResource(Resource): + def render(self, request): + content_type = b'application/x-bittorrent' + charset = request.getHeader(b'content-charset') + if charset: + content_type += b'; charset=' + charset + request.setHeader(b'Content-Type', content_type) + request.setHeader(b'Content-Disposition', b'attachment; filename=test.torrent') + return 'Binary attachment ignore charset 世丕且\n'.encode() class CookieResource(Resource): @@ -101,6 +119,7 @@ class TopLevelResource(Resource): self.putChild(b'redirect', self.redirect_rsrc) self.putChild(b'rename', RenameResource()) self.putChild(b'attachment', AttachmentResource()) + self.putChild(b'torrent', TorrentResource()) self.putChild(b'partial', PartialDownloadResource()) def getChild(self, path, request): # NOQA: N802 @@ -110,16 +129,18 @@ class TopLevelResource(Resource): return Resource.getChild(self, path, request) def render(self, request): - if request.getHeader('If-Modified-Since'): + if request.getHeader(b'If-Modified-Since'): request.setResponseCode(NOT_MODIFIED) return b'<h1>Deluge HTTP Downloader tests webserver here</h1>' -class DownloadFileTestCase(unittest.TestCase): +class TestDownloadFile: def get_url(self, path=''): return 'http://localhost:%d/%s' % (self.listen_port, path) - def setUp(self): # NOQA + @pytest_twisted.async_yield_fixture(autouse=True) + async def setUp(self, request): # NOQA + self = request.instance setup_logger('warning', fname('log_file')) self.website = Site(TopLevelResource()) self.listen_port = 51242 @@ -135,132 +156,136 @@ class DownloadFileTestCase(unittest.TestCase): else: raise error - def tearDown(self): # NOQA - return self.webserver.stopListening() + yield + + await self.webserver.stopListening() - def assertContains(self, filename, contents): # NOQA - with open(filename) as _file: + def assert_contains(self, filename, contents): + with open(filename, encoding='utf8') as _file: try: - self.assertEqual(_file.read(), contents) + assert _file.read() == contents except Exception as ex: - self.fail(ex) + pytest.fail(ex) return filename - def assertNotContains(self, filename, contents, file_mode=''): # NOQA - with open(filename, file_mode) as _file: + def assert_not_contains(self, filename, contents, file_mode=''): + with open(filename, encoding='utf8') as _file: try: - self.assertNotEqual(_file.read(), contents) + assert _file.read() != contents except Exception as ex: - self.fail(ex) + pytest.fail(ex) return filename - def test_download(self): - d = download_file(self.get_url(), fname('index.html')) - d.addCallback(self.assertEqual, fname('index.html')) - return d + @pytest_twisted.ensureDeferred + async def test_download(self): + filename = await download_file(self.get_url(), fname('index.html')) + assert filename == fname('index.html') - def test_download_without_required_cookies(self): + @pytest_twisted.ensureDeferred + async def test_download_without_required_cookies(self): url = self.get_url('cookie') - d = download_file(url, fname('none')) - d.addCallback(self.fail) - d.addErrback(self.assertIsInstance, Failure) - return d + filename = await download_file(url, fname('none')) + self.assert_contains(filename, 'Password cookie not set!') - def test_download_with_required_cookies(self): + @pytest_twisted.ensureDeferred + async def test_download_with_required_cookies(self): url = self.get_url('cookie') cookie = {'cookie': 'password=deluge'} - d = download_file(url, fname('monster'), headers=cookie) - d.addCallback(self.assertEqual, fname('monster')) - d.addCallback(self.assertContains, 'COOKIE MONSTER!') - return d - - def test_download_with_rename(self): - - if windows_check(): - raise unittest.SkipTest('on windows \\ != / for path names') + filename = await download_file(url, fname('monster'), headers=cookie) + assert filename == fname('monster') + self.assert_contains(filename, 'COOKIE MONSTER!') + @pytest_twisted.ensureDeferred + async def test_download_with_rename(self): url = self.get_url('rename?filename=renamed') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('renamed')) - d.addCallback(self.assertContains, 'This file should be called renamed') - return d - - def test_download_with_rename_exists(self): - - if windows_check(): - raise unittest.SkipTest('on windows \\ != / for path names') + filename = await download_file(url, fname('original')) + assert filename == fname('renamed') + self.assert_contains(filename, 'This file should be called renamed') + @pytest_twisted.ensureDeferred + async def test_download_with_rename_exists(self): open(fname('renamed'), 'w').close() url = self.get_url('rename?filename=renamed') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('renamed-1')) - d.addCallback(self.assertContains, 'This file should be called renamed') - return d - - def test_download_with_rename_sanitised(self): - - if windows_check(): - raise unittest.SkipTest('on windows \\ != / for path names') + filename = await download_file(url, fname('original')) + assert filename == fname('renamed-1') + self.assert_contains(filename, 'This file should be called renamed') + @pytest_twisted.ensureDeferred + async def test_download_with_rename_sanitised(self): url = self.get_url('rename?filename=/etc/passwd') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('passwd')) - d.addCallback(self.assertContains, 'This file should be called /etc/passwd') - return d + filename = await download_file(url, fname('original')) + assert filename == fname('passwd') + self.assert_contains(filename, 'This file should be called /etc/passwd') - def test_download_with_attachment_no_filename(self): + @pytest_twisted.ensureDeferred + async def test_download_with_attachment_no_filename(self): url = self.get_url('attachment') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('original')) - d.addCallback(self.assertContains, 'Attachement with no filename set') - return d + filename = await download_file(url, fname('original')) + assert filename == fname('original') + self.assert_contains(filename, 'Attachment with no filename set') - def test_download_with_rename_prevented(self): + @pytest_twisted.ensureDeferred + async def test_download_with_rename_prevented(self): url = self.get_url('rename?filename=spam') - d = download_file(url, fname('forced'), force_filename=True) - d.addCallback(self.assertEqual, fname('forced')) - d.addCallback(self.assertContains, 'This file should be called spam') - return d + filename = await download_file(url, fname('forced'), force_filename=True) + assert filename == fname('forced') + self.assert_contains(filename, 'This file should be called spam') - def test_download_with_gzip_encoding(self): + @pytest_twisted.ensureDeferred + async def test_download_with_gzip_encoding(self): url = self.get_url('gzip?msg=success') - d = download_file(url, fname('gzip_encoded')) - d.addCallback(self.assertContains, 'success') - return d + filename = await download_file(url, fname('gzip_encoded')) + self.assert_contains(filename, 'success') - def test_download_with_gzip_encoding_disabled(self): + @pytest_twisted.ensureDeferred + async def test_download_with_gzip_encoding_disabled(self): url = self.get_url('gzip?msg=unzip') - d = download_file(url, fname('gzip_encoded'), allow_compression=False) - d.addCallback(self.assertContains, 'unzip') - return d + filename = await download_file( + url, fname('gzip_encoded'), allow_compression=False + ) + self.assert_contains(filename, 'unzip') - def test_page_redirect_unhandled(self): + @pytest_twisted.ensureDeferred + async def test_page_redirect_unhandled(self): url = self.get_url('redirect') - d = download_file(url, fname('none')) - d.addCallback(self.fail) + with pytest.raises(PageRedirect): + await download_file(url, fname('none'), handle_redirects=False) - def on_redirect(failure): - self.assertTrue(type(failure), PageRedirect) + @pytest_twisted.ensureDeferred + async def test_page_redirect(self): + url = self.get_url('redirect') + filename = await download_file(url, fname('none'), handle_redirects=True) + assert filename == fname('none') - d.addErrback(on_redirect) - return d + @pytest_twisted.ensureDeferred + async def test_page_not_found(self): + with pytest.raises(Error): + await download_file(self.get_url('page/not/found'), fname('none')) - def test_page_redirect(self): - url = self.get_url('redirect') - d = download_file(url, fname('none'), handle_redirects=True) - d.addCallback(self.assertEqual, fname('none')) - d.addErrback(self.fail) - return d - - def test_page_not_found(self): - d = download_file(self.get_url('page/not/found'), fname('none')) - d.addCallback(self.fail) - d.addErrback(self.assertIsInstance, Failure) - return d - - def test_page_not_modified(self): + @pytest.mark.xfail(reason="Doesn't seem like httpdownloader ever implemented this.") + @pytest_twisted.ensureDeferred + async def test_page_not_modified(self): headers = {'If-Modified-Since': formatdate(usegmt=True)} - d = download_file(self.get_url(), fname('index.html'), headers=headers) - d.addCallback(self.fail) - d.addErrback(self.assertIsInstance, Failure) - return d + with pytest.raises(Error) as exc_info: + await download_file(self.get_url(), fname('index.html'), headers=headers) + assert exc_info.value.status == NOT_MODIFIED + + @pytest_twisted.ensureDeferred + async def test_download_text_reencode_charset(self): + """Re-encode as UTF-8 specified charset for text content-type header""" + url = self.get_url('attachment') + filepath = fname('test.txt') + headers = {'content-charset': 'Windows-1251', 'content-append': 'бвгде'} + filename = await download_file(url, filepath, headers=headers) + assert filename == filepath + self.assert_contains(filename, 'Attachment with no filename setбвгде') + + @pytest_twisted.ensureDeferred + async def test_download_binary_ignore_charset(self): + """Ignore charset for binary content-type header e.g. torrent files""" + url = self.get_url('torrent') + headers = {'content-charset': 'Windows-1251'} + filepath = fname('test.torrent') + filename = await download_file(url, fname('test.torrent'), headers=headers) + assert filename == filepath + self.assert_contains(filename, 'Binary attachment ignore charset 世丕且\n') |