summaryrefslogtreecommitdiffstats
path: root/deluge/tests/test_httpdownloader.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/tests/test_httpdownloader.py')
-rw-r--r--deluge/tests/test_httpdownloader.py239
1 files changed, 132 insertions, 107 deletions
diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py
index a503e46..8c491b6 100644
--- a/deluge/tests/test_httpdownloader.py
+++ b/deluge/tests/test_httpdownloader.py
@@ -1,26 +1,23 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
+import os
import tempfile
from email.utils import formatdate
+import pytest
+import pytest_twisted
from twisted.internet import reactor
from twisted.internet.error import CannotListenError
-from twisted.python.failure import Failure
-from twisted.trial import unittest
-from twisted.web.error import PageRedirect
+from twisted.web.error import Error, PageRedirect
from twisted.web.http import NOT_MODIFIED
from twisted.web.resource import EncodingResourceWrapper, Resource
from twisted.web.server import GzipEncoderFactory, Site
from twisted.web.util import redirectTo
-from deluge.common import windows_check
from deluge.httpdownloader import download_file
from deluge.log import setup_logger
@@ -28,7 +25,7 @@ temp_dir = tempfile.mkdtemp()
def fname(name):
- return '%s/%s' % (temp_dir, name)
+ return os.path.join(temp_dir, name)
class RedirectResource(Resource):
@@ -47,9 +44,30 @@ class RenameResource(Resource):
class AttachmentResource(Resource):
def render(self, request):
- request.setHeader(b'Content-Type', b'text/plain')
+ content_type = b'text/plain'
+ charset = request.getHeader(b'content-charset')
+ if charset:
+ content_type += b'; charset=' + charset
+ request.setHeader(b'Content-Type', content_type)
request.setHeader(b'Content-Disposition', b'attachment')
- return b'Attachement with no filename set'
+ append = request.getHeader(b'content-append') or b''
+ content = 'Attachment with no filename set{}'.format(append.decode('utf8'))
+ return (
+ content.encode(charset.decode('utf8'))
+ if charset
+ else content.encode('utf8')
+ )
+
+
+class TorrentResource(Resource):
+ def render(self, request):
+ content_type = b'application/x-bittorrent'
+ charset = request.getHeader(b'content-charset')
+ if charset:
+ content_type += b'; charset=' + charset
+ request.setHeader(b'Content-Type', content_type)
+ request.setHeader(b'Content-Disposition', b'attachment; filename=test.torrent')
+ return 'Binary attachment ignore charset 世丕且\n'.encode()
class CookieResource(Resource):
@@ -101,6 +119,7 @@ class TopLevelResource(Resource):
self.putChild(b'redirect', self.redirect_rsrc)
self.putChild(b'rename', RenameResource())
self.putChild(b'attachment', AttachmentResource())
+ self.putChild(b'torrent', TorrentResource())
self.putChild(b'partial', PartialDownloadResource())
def getChild(self, path, request): # NOQA: N802
@@ -110,16 +129,18 @@ class TopLevelResource(Resource):
return Resource.getChild(self, path, request)
def render(self, request):
- if request.getHeader('If-Modified-Since'):
+ if request.getHeader(b'If-Modified-Since'):
request.setResponseCode(NOT_MODIFIED)
return b'<h1>Deluge HTTP Downloader tests webserver here</h1>'
-class DownloadFileTestCase(unittest.TestCase):
+class TestDownloadFile:
def get_url(self, path=''):
return 'http://localhost:%d/%s' % (self.listen_port, path)
- def setUp(self): # NOQA
+ @pytest_twisted.async_yield_fixture(autouse=True)
+ async def setUp(self, request): # NOQA
+ self = request.instance
setup_logger('warning', fname('log_file'))
self.website = Site(TopLevelResource())
self.listen_port = 51242
@@ -135,132 +156,136 @@ class DownloadFileTestCase(unittest.TestCase):
else:
raise error
- def tearDown(self): # NOQA
- return self.webserver.stopListening()
+ yield
+
+ await self.webserver.stopListening()
- def assertContains(self, filename, contents): # NOQA
- with open(filename) as _file:
+ def assert_contains(self, filename, contents):
+ with open(filename, encoding='utf8') as _file:
try:
- self.assertEqual(_file.read(), contents)
+ assert _file.read() == contents
except Exception as ex:
- self.fail(ex)
+ pytest.fail(ex)
return filename
- def assertNotContains(self, filename, contents, file_mode=''): # NOQA
- with open(filename, file_mode) as _file:
+ def assert_not_contains(self, filename, contents, file_mode=''):
+ with open(filename, encoding='utf8') as _file:
try:
- self.assertNotEqual(_file.read(), contents)
+ assert _file.read() != contents
except Exception as ex:
- self.fail(ex)
+ pytest.fail(ex)
return filename
- def test_download(self):
- d = download_file(self.get_url(), fname('index.html'))
- d.addCallback(self.assertEqual, fname('index.html'))
- return d
+ @pytest_twisted.ensureDeferred
+ async def test_download(self):
+ filename = await download_file(self.get_url(), fname('index.html'))
+ assert filename == fname('index.html')
- def test_download_without_required_cookies(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_without_required_cookies(self):
url = self.get_url('cookie')
- d = download_file(url, fname('none'))
- d.addCallback(self.fail)
- d.addErrback(self.assertIsInstance, Failure)
- return d
+ filename = await download_file(url, fname('none'))
+ self.assert_contains(filename, 'Password cookie not set!')
- def test_download_with_required_cookies(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_required_cookies(self):
url = self.get_url('cookie')
cookie = {'cookie': 'password=deluge'}
- d = download_file(url, fname('monster'), headers=cookie)
- d.addCallback(self.assertEqual, fname('monster'))
- d.addCallback(self.assertContains, 'COOKIE MONSTER!')
- return d
-
- def test_download_with_rename(self):
-
- if windows_check():
- raise unittest.SkipTest('on windows \\ != / for path names')
+ filename = await download_file(url, fname('monster'), headers=cookie)
+ assert filename == fname('monster')
+ self.assert_contains(filename, 'COOKIE MONSTER!')
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename(self):
url = self.get_url('rename?filename=renamed')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('renamed'))
- d.addCallback(self.assertContains, 'This file should be called renamed')
- return d
-
- def test_download_with_rename_exists(self):
-
- if windows_check():
- raise unittest.SkipTest('on windows \\ != / for path names')
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('renamed')
+ self.assert_contains(filename, 'This file should be called renamed')
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename_exists(self):
open(fname('renamed'), 'w').close()
url = self.get_url('rename?filename=renamed')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('renamed-1'))
- d.addCallback(self.assertContains, 'This file should be called renamed')
- return d
-
- def test_download_with_rename_sanitised(self):
-
- if windows_check():
- raise unittest.SkipTest('on windows \\ != / for path names')
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('renamed-1')
+ self.assert_contains(filename, 'This file should be called renamed')
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename_sanitised(self):
url = self.get_url('rename?filename=/etc/passwd')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('passwd'))
- d.addCallback(self.assertContains, 'This file should be called /etc/passwd')
- return d
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('passwd')
+ self.assert_contains(filename, 'This file should be called /etc/passwd')
- def test_download_with_attachment_no_filename(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_attachment_no_filename(self):
url = self.get_url('attachment')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('original'))
- d.addCallback(self.assertContains, 'Attachement with no filename set')
- return d
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('original')
+ self.assert_contains(filename, 'Attachment with no filename set')
- def test_download_with_rename_prevented(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename_prevented(self):
url = self.get_url('rename?filename=spam')
- d = download_file(url, fname('forced'), force_filename=True)
- d.addCallback(self.assertEqual, fname('forced'))
- d.addCallback(self.assertContains, 'This file should be called spam')
- return d
+ filename = await download_file(url, fname('forced'), force_filename=True)
+ assert filename == fname('forced')
+ self.assert_contains(filename, 'This file should be called spam')
- def test_download_with_gzip_encoding(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_gzip_encoding(self):
url = self.get_url('gzip?msg=success')
- d = download_file(url, fname('gzip_encoded'))
- d.addCallback(self.assertContains, 'success')
- return d
+ filename = await download_file(url, fname('gzip_encoded'))
+ self.assert_contains(filename, 'success')
- def test_download_with_gzip_encoding_disabled(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_gzip_encoding_disabled(self):
url = self.get_url('gzip?msg=unzip')
- d = download_file(url, fname('gzip_encoded'), allow_compression=False)
- d.addCallback(self.assertContains, 'unzip')
- return d
+ filename = await download_file(
+ url, fname('gzip_encoded'), allow_compression=False
+ )
+ self.assert_contains(filename, 'unzip')
- def test_page_redirect_unhandled(self):
+ @pytest_twisted.ensureDeferred
+ async def test_page_redirect_unhandled(self):
url = self.get_url('redirect')
- d = download_file(url, fname('none'))
- d.addCallback(self.fail)
+ with pytest.raises(PageRedirect):
+ await download_file(url, fname('none'), handle_redirects=False)
- def on_redirect(failure):
- self.assertTrue(type(failure), PageRedirect)
+ @pytest_twisted.ensureDeferred
+ async def test_page_redirect(self):
+ url = self.get_url('redirect')
+ filename = await download_file(url, fname('none'), handle_redirects=True)
+ assert filename == fname('none')
- d.addErrback(on_redirect)
- return d
+ @pytest_twisted.ensureDeferred
+ async def test_page_not_found(self):
+ with pytest.raises(Error):
+ await download_file(self.get_url('page/not/found'), fname('none'))
- def test_page_redirect(self):
- url = self.get_url('redirect')
- d = download_file(url, fname('none'), handle_redirects=True)
- d.addCallback(self.assertEqual, fname('none'))
- d.addErrback(self.fail)
- return d
-
- def test_page_not_found(self):
- d = download_file(self.get_url('page/not/found'), fname('none'))
- d.addCallback(self.fail)
- d.addErrback(self.assertIsInstance, Failure)
- return d
-
- def test_page_not_modified(self):
+ @pytest.mark.xfail(reason="Doesn't seem like httpdownloader ever implemented this.")
+ @pytest_twisted.ensureDeferred
+ async def test_page_not_modified(self):
headers = {'If-Modified-Since': formatdate(usegmt=True)}
- d = download_file(self.get_url(), fname('index.html'), headers=headers)
- d.addCallback(self.fail)
- d.addErrback(self.assertIsInstance, Failure)
- return d
+ with pytest.raises(Error) as exc_info:
+ await download_file(self.get_url(), fname('index.html'), headers=headers)
+ assert exc_info.value.status == NOT_MODIFIED
+
+ @pytest_twisted.ensureDeferred
+ async def test_download_text_reencode_charset(self):
+ """Re-encode as UTF-8 specified charset for text content-type header"""
+ url = self.get_url('attachment')
+ filepath = fname('test.txt')
+ headers = {'content-charset': 'Windows-1251', 'content-append': 'бвгде'}
+ filename = await download_file(url, filepath, headers=headers)
+ assert filename == filepath
+ self.assert_contains(filename, 'Attachment with no filename setбвгде')
+
+ @pytest_twisted.ensureDeferred
+ async def test_download_binary_ignore_charset(self):
+ """Ignore charset for binary content-type header e.g. torrent files"""
+ url = self.get_url('torrent')
+ headers = {'content-charset': 'Windows-1251'}
+ filepath = fname('test.torrent')
+ filename = await download_file(url, fname('test.torrent'), headers=headers)
+ assert filename == filepath
+ self.assert_contains(filename, 'Binary attachment ignore charset 世丕且\n')