diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-08 13:04:44 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-08 13:04:44 +0000 |
commit | 394ebc3350b1ede6028b6ecfa4dcac6e4ec5eed6 (patch) | |
tree | 3eecf214cd57fecedd32d49a6705af9cd491c634 /python | |
parent | Adding debian version 2:4.20.0+dfsg-1~exp2. (diff) | |
download | samba-394ebc3350b1ede6028b6ecfa4dcac6e4ec5eed6.tar.xz samba-394ebc3350b1ede6028b6ecfa4dcac6e4ec5eed6.zip |
Merging upstream version 2:4.20.1+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python')
-rw-r--r-- | python/samba/tests/blackbox/http_chunk.py | 129 | ||||
-rw-r--r-- | python/samba/tests/blackbox/http_content.py | 95 | ||||
-rw-r--r-- | python/samba/tests/blackbox/smbcacls_propagate_inhertance.py | 108 |
3 files changed, 332 insertions, 0 deletions
diff --git a/python/samba/tests/blackbox/http_chunk.py b/python/samba/tests/blackbox/http_chunk.py new file mode 100644 index 0000000..6745c8c --- /dev/null +++ b/python/samba/tests/blackbox/http_chunk.py @@ -0,0 +1,129 @@ +# Blackbox tests for http_test +# +# Copyright (C) Noel Power noel.power@suse.com +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import time +import threading +import logging +import json +from http.server import HTTPServer, BaseHTTPRequestHandler +from samba.logger import get_samba_logger +from samba.tests import BlackboxTestCase, BlackboxProcessError + +logger = get_samba_logger(name=__name__) +COMMAND = "bin/http_test" +def make_chunks(msg, chunk_size): + chunks = [] + while len(msg) > chunk_size: + chunk = msg[:chunk_size] + chunks.append(chunk) + msg = msg[chunk_size:] + if len(msg): + chunks.append(msg) + return chunks + +# simple handler, spits back the 'path' passed in +# GET or POST and a chunked encoded http response +# where the chunk size is 10 octets +class ChunkHTTPRequestHandler(BaseHTTPRequestHandler): + def handle_req(self): + msg = bytes(self.path, encoding="utf-8") + chunks = make_chunks(msg, 10) + + self.send_response(200) + self.send_header('content-type', 'application/json; charset=UTF-8') + if self.path == "usegziptransferencoding": + self.send_header('Transfer-Encoding', 'gzip') + else: + self.send_header('Transfer-Encoding', 'chunked') + self.end_headers() + resp = bytes() + for chunk in chunks: + resp = resp + ("%x" % len(chunk)).encode("utf-8") + b'\r\n' + chunk + b'\r\n' + resp += b'0\r\n\r\n' + self.wfile.write(resp) + + def do_POST(self): + self.handle_req() + def do_GET(self): + self.handle_req() + +class HttpChunkBlackboxTests(BlackboxTestCase): + def setUp(self): + self.server = HTTPServer((os.getenv("SERVER_IP", "localhost"), 8080), + ChunkHTTPRequestHandler, + bind_and_activate=False) + self.t = threading.Thread(target=HttpChunkBlackboxTests.http_server, args=(self,)) + self.t.setDaemon(True) + self.t.start() + time.sleep(1) + + def tearDown(self): + super().tearDown() + + def http_server(self): + self.server.server_bind() + self.server.server_activate() + self.server.serve_forever() + + def test_single_chunk(self): + try: + msg = "one_chunk" + resp = self.check_output("%s -U%% -I%s --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.assertEqual(msg,resp.decode('utf-8')) + except BlackboxProcessError as e: + print("Failed with: %s" % e) + self.fail(str(e)) + + def test_multi_chunks(self): + try: + msg = "snglechunksnglechunksnglechunksnglechunksnglechunk" + resp = self.check_output("%s -U%% -I%s --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.assertEqual(msg, resp.decode('utf-8')) + except BlackboxProcessError as e: + print("Failed with: %s" % e) + self.fail(str(e)) + + def test_exceed_request_size(self): + try: + msg = "snglechunksnglechunksnglechunksnglechunksnglechunk" + resp = self.check_output("%s -d11 -U%% -I%s --rsize 49 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.fail("unexpected success") + except BlackboxProcessError as e: + if "http_read_chunk: size 50 exceeds max content len 49 skipping body" not in e.stderr.decode('utf-8'): + self.fail(str(e)) + if "unexpected 0 len response" not in e.stdout.decode('utf-8'): + self.fail(str(e)) + + def test_exact_request_size(self): + try: + msg = "snglechunksnglechunksnglechunksnglechunksnglechunk" + resp = self.check_output("%s -U%% -I%s --rsize 50 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.assertEqual(msg, resp.decode('utf-8')) + except BlackboxProcessError as e: + print("Failed with: %s" % e) + self.fail(str(e)) + + def test_gzip_transfer_encoding(self): + try: + msg = "usegziptransferencoding" + resp = self.check_output("%s -U%% -I%s --rsize 50 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.assertEqual(msg, resp.decode('utf-8')) + self.fail("unexpected success") + except BlackboxProcessError as e: + if "http_response_needs_body: Unsupported transfer encoding type gzip" not in e.stderr.decode('utf-8'): + self.fail(str(e)) diff --git a/python/samba/tests/blackbox/http_content.py b/python/samba/tests/blackbox/http_content.py new file mode 100644 index 0000000..3d674aa --- /dev/null +++ b/python/samba/tests/blackbox/http_content.py @@ -0,0 +1,95 @@ +# Blackbox tests for http_test +# +# Copyright (C) Noel Power noel.power@suse.com +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import time +import threading +import logging +import json +from http.server import HTTPServer, BaseHTTPRequestHandler +from samba.logger import get_samba_logger +from samba.tests import BlackboxTestCase, BlackboxProcessError + +logger = get_samba_logger(name=__name__) +COMMAND = "bin/http_test" + +# simple handler, spits back the 'path' passed in +# GET or POST and a chunked encoded http response +# where the chunk size is 10 octets +class ContentHTTPRequestHandler(BaseHTTPRequestHandler): + def handle_req(self): + msg = bytes(self.path, encoding="utf-8") + + self.send_response(200) + self.send_header('content-type', 'application/json; charset=UTF-8') + self.send_header('content-length', len(msg)) + self.end_headers() + self.wfile.write(msg) + + def do_POST(self): + self.handle_req() + def do_GET(self): + self.handle_req() + +class HttpContentBlackboxTests(BlackboxTestCase): + def setUp(self): + self.server = HTTPServer((os.getenv("SERVER_IP", "localhost"), 8080), + ContentHTTPRequestHandler, + bind_and_activate=False) + self.t = threading.Thread(target=HttpContentBlackboxTests.http_server, args=(self,)) + self.t.setDaemon(True) + self.t.start() + time.sleep(1) + + def tearDown(self): + super().tearDown() + + def http_server(self): + self.server.server_bind() + self.server.server_activate() + self.server.serve_forever() + + def notest_simple_msg(self): + try: + msg = "simplemessage" + resp = self.check_output("%s -U%% -I%s --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.assertEqual(msg, resp.decode('utf-8')) + except BlackboxProcessError as e: + print("Failed with: %s" % e) + self.fail(str(e)) + + def test_exceed_request_size(self): + try: + msg = "012345678" # 9 bytes + # limit response to 8 bytes + resp = self.check_output("%s -d11 -U%% -I%s --rsize 8 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.fail("unexpected success") + except BlackboxProcessError as e: + if "unexpected 0 len response" not in e.stdout.decode('utf-8'): + self.fail(str(e)) + if "http_parse_headers: Skipping body for code 200" not in e.stderr.decode('utf-8'): + self.fail(str(e)) + + def test_exact_request_size(self): + try: + msg = "012345678" # 9 bytes + # limit response to 9 bytes + resp = self.check_output("%s -U%% -I%s --rsize 9 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg)) + self.assertEqual(msg, resp.decode('utf-8')) + except BlackboxProcessError as e: + print("Failed with: %s" % e) + self.fail(str(e)) diff --git a/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py b/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py index cc13727..5b3a271 100644 --- a/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py +++ b/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py @@ -1288,3 +1288,111 @@ class InheritanceSmbCaclsTests(SmbCaclsBlockboxTestBase): except BlackboxProcessError as e: self.fail(str(e)) + + def test_simple_iocioi_add(self): + """test smbcacls '--propagate-inheritance --add' which attempts to add the ACL + for the file and additionally use inheritance rules to propagate appropriate + changes to children + + This test adds an ACL with (IO)(CI)(OI)(READ) + + before: + + +-tar_test_dir/ (OI)(CI)(I)(F) + +-oi_dir/ (OI)(CI)(I)(F) + | +-file.1 (I)(F) + | +-nested/ (OI)(CI)(I)(F) + | +-file.2 (I)(F) + | +-nested_again/ (OI)(CI)(I)(F) + | +-file.3 (I)(F) + + after/expected: + + +-tar_test_dir/ (OI)(CI)(I)(F) + +-oi_dir/ (OI)(CI)(I)(F), (IO)(CI)(OI)(READ) + | +-file.1 (I)(F), (I)(READ) + | +-nested/ (OI)(CI)(I)(F), (I)(CI)(OI)(READ) + | +-file.2 (I)(F), (I)(READ) + | +-nested_again/ (OI)(CI)(I)(F), (I)(CI)(OI)(READ) + | +-file.3 (I)(F), (I)(READ)""" + + dir_add_acl_str = "ACL:%s:ALLOWED/OI|CI|IO/READ" % self.user + obj_inherited_ace_str = "ACL:%s:ALLOWED/I/READ" % self.user + dir_inherited_ace_str = "ACL:%s:ALLOWED/OI|CI|I/READ" % self.user + + try: + + self.smb_cacls(["--propagate-inheritance", "--add", + dir_add_acl_str, self.oi_dir]) + + # check top level container 'oi_dir' has IO|CI|OI/READ + dir_ace = self.ace_parse_str(dir_add_acl_str) + self.assertTrue(self.file_ace_check(self.oi_dir, dir_ace)) + + # file 'oi_dir/file-1' should have inherited I/READ + child_file_ace = self.ace_parse_str(obj_inherited_ace_str) + self.assertTrue(self.file_ace_check(self.f1, child_file_ace)) + + # nested dir 'oi_dir/nested/' should have I|CI|OI/READ + child_dir_ace = self.ace_parse_str(dir_inherited_ace_str) + self.assertTrue(self.file_ace_check(self.nested_dir, child_dir_ace)) + + # nested file 'oi_dir/nested/file-2' should have inherited I/READ + self.assertTrue(self.file_ace_check(self.f2, child_file_ace)) + + # nested_again dir 'oi_dir/nested/nested_again' should have I|CI|OI/READ + child_dir_ace = self.ace_parse_str(dir_inherited_ace_str) + self.assertTrue(self.file_ace_check(self.nested_again_dir, child_dir_ace)) + # nested_again file 'oi_dir/nested/nested_again/file-3' should have inherited I/READ + self.assertTrue(self.file_ace_check(self.f3, child_file_ace)) + except BlackboxProcessError as e: + self.fail(str(e)) + + def test_simple_ioci_add(self): + """test smbcacls '--propagate-inheritance --add' which attempts to add the ACL + for the file and additionally use inheritance rules to propagate appropriate + changes to children + + This test adds an ACL with (IO)(CI)(READ) + + before: + + +-tar_test_dir/ (OI)(CI)(I)(F) + +-oi_dir/ (OI)(CI)(I)(F) + | +-file.1 (I)(F) + | +-nested/ (OI)(CI)(I)(F) + | +-file.2 (I)(F) + | +-nested_again/ (OI)(CI)(I)(F) + | +-file.3 (I)(F) + + after/expected: + + +-tar_test_dir/ (OI)(CI)(I)(F) + +-oi_dir/ (OI)(CI)(I)(F), (IO)(CI)(READ) + | +-file.1 (I)(F) + | +-nested/ (OI)(CI)(I)(F), (I)(CI)(READ) + | +-file.2 (I)(F) + | +-nested_again/ (OI)(CI)(I)(F), (I)(CI)(READ) + | +-file.3 (I)(F)""" + + dir_add_acl_str = "ACL:%s:ALLOWED/CI|IO/READ" % self.user + dir_inherited_ace_str = "ACL:%s:ALLOWED/CI|I/READ" % self.user + + try: + + self.smb_cacls(["--propagate-inheritance", "--add", + dir_add_acl_str, self.oi_dir]) + + # check top level container 'oi_dir' has IO|CI/READ + dir_ace = self.ace_parse_str(dir_add_acl_str) + self.assertTrue(self.file_ace_check(self.oi_dir, dir_ace)) + + # nested dir 'oi_dir/nested/' should have I|CI/READ + child_dir_ace = self.ace_parse_str(dir_inherited_ace_str) + self.assertTrue(self.file_ace_check(self.nested_dir, child_dir_ace)) + + # nested_again dir 'oi_dir/nested/nested_again' should have I|CI/READ + child_dir_ace = self.ace_parse_str(dir_inherited_ace_str) + self.assertTrue(self.file_ace_check(self.nested_again_dir, child_dir_ace)) + except BlackboxProcessError as e: + self.fail(str(e)) |