summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/samba/tests/blackbox/http_chunk.py129
-rw-r--r--python/samba/tests/blackbox/http_content.py95
-rw-r--r--python/samba/tests/blackbox/smbcacls_propagate_inhertance.py108
3 files changed, 332 insertions, 0 deletions
diff --git a/python/samba/tests/blackbox/http_chunk.py b/python/samba/tests/blackbox/http_chunk.py
new file mode 100644
index 0000000..6745c8c
--- /dev/null
+++ b/python/samba/tests/blackbox/http_chunk.py
@@ -0,0 +1,129 @@
+# Blackbox tests for http_test
+#
+# Copyright (C) Noel Power noel.power@suse.com
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import threading
+import logging
+import json
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from samba.logger import get_samba_logger
+from samba.tests import BlackboxTestCase, BlackboxProcessError
+
+logger = get_samba_logger(name=__name__)
+COMMAND = "bin/http_test"
+def make_chunks(msg, chunk_size):
+ chunks = []
+ while len(msg) > chunk_size:
+ chunk = msg[:chunk_size]
+ chunks.append(chunk)
+ msg = msg[chunk_size:]
+ if len(msg):
+ chunks.append(msg)
+ return chunks
+
+# simple handler, spits back the 'path' passed in
+# GET or POST and a chunked encoded http response
+# where the chunk size is 10 octets
+class ChunkHTTPRequestHandler(BaseHTTPRequestHandler):
+ def handle_req(self):
+ msg = bytes(self.path, encoding="utf-8")
+ chunks = make_chunks(msg, 10)
+
+ self.send_response(200)
+ self.send_header('content-type', 'application/json; charset=UTF-8')
+ if self.path == "usegziptransferencoding":
+ self.send_header('Transfer-Encoding', 'gzip')
+ else:
+ self.send_header('Transfer-Encoding', 'chunked')
+ self.end_headers()
+ resp = bytes()
+ for chunk in chunks:
+ resp = resp + ("%x" % len(chunk)).encode("utf-8") + b'\r\n' + chunk + b'\r\n'
+ resp += b'0\r\n\r\n'
+ self.wfile.write(resp)
+
+ def do_POST(self):
+ self.handle_req()
+ def do_GET(self):
+ self.handle_req()
+
+class HttpChunkBlackboxTests(BlackboxTestCase):
+ def setUp(self):
+ self.server = HTTPServer((os.getenv("SERVER_IP", "localhost"), 8080),
+ ChunkHTTPRequestHandler,
+ bind_and_activate=False)
+ self.t = threading.Thread(target=HttpChunkBlackboxTests.http_server, args=(self,))
+ self.t.setDaemon(True)
+ self.t.start()
+ time.sleep(1)
+
+ def tearDown(self):
+ super().tearDown()
+
+ def http_server(self):
+ self.server.server_bind()
+ self.server.server_activate()
+ self.server.serve_forever()
+
+ def test_single_chunk(self):
+ try:
+ msg = "one_chunk"
+ resp = self.check_output("%s -U%% -I%s --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.assertEqual(msg,resp.decode('utf-8'))
+ except BlackboxProcessError as e:
+ print("Failed with: %s" % e)
+ self.fail(str(e))
+
+ def test_multi_chunks(self):
+ try:
+ msg = "snglechunksnglechunksnglechunksnglechunksnglechunk"
+ resp = self.check_output("%s -U%% -I%s --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.assertEqual(msg, resp.decode('utf-8'))
+ except BlackboxProcessError as e:
+ print("Failed with: %s" % e)
+ self.fail(str(e))
+
+ def test_exceed_request_size(self):
+ try:
+ msg = "snglechunksnglechunksnglechunksnglechunksnglechunk"
+ resp = self.check_output("%s -d11 -U%% -I%s --rsize 49 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.fail("unexpected success")
+ except BlackboxProcessError as e:
+ if "http_read_chunk: size 50 exceeds max content len 49 skipping body" not in e.stderr.decode('utf-8'):
+ self.fail(str(e))
+ if "unexpected 0 len response" not in e.stdout.decode('utf-8'):
+ self.fail(str(e))
+
+ def test_exact_request_size(self):
+ try:
+ msg = "snglechunksnglechunksnglechunksnglechunksnglechunk"
+ resp = self.check_output("%s -U%% -I%s --rsize 50 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.assertEqual(msg, resp.decode('utf-8'))
+ except BlackboxProcessError as e:
+ print("Failed with: %s" % e)
+ self.fail(str(e))
+
+ def test_gzip_transfer_encoding(self):
+ try:
+ msg = "usegziptransferencoding"
+ resp = self.check_output("%s -U%% -I%s --rsize 50 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.assertEqual(msg, resp.decode('utf-8'))
+ self.fail("unexpected success")
+ except BlackboxProcessError as e:
+ if "http_response_needs_body: Unsupported transfer encoding type gzip" not in e.stderr.decode('utf-8'):
+ self.fail(str(e))
diff --git a/python/samba/tests/blackbox/http_content.py b/python/samba/tests/blackbox/http_content.py
new file mode 100644
index 0000000..3d674aa
--- /dev/null
+++ b/python/samba/tests/blackbox/http_content.py
@@ -0,0 +1,95 @@
+# Blackbox tests for http_test
+#
+# Copyright (C) Noel Power noel.power@suse.com
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import threading
+import logging
+import json
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from samba.logger import get_samba_logger
+from samba.tests import BlackboxTestCase, BlackboxProcessError
+
+logger = get_samba_logger(name=__name__)
+COMMAND = "bin/http_test"
+
+# simple handler, spits back the 'path' passed in
+# GET or POST and a chunked encoded http response
+# where the chunk size is 10 octets
+class ContentHTTPRequestHandler(BaseHTTPRequestHandler):
+ def handle_req(self):
+ msg = bytes(self.path, encoding="utf-8")
+
+ self.send_response(200)
+ self.send_header('content-type', 'application/json; charset=UTF-8')
+ self.send_header('content-length', len(msg))
+ self.end_headers()
+ self.wfile.write(msg)
+
+ def do_POST(self):
+ self.handle_req()
+ def do_GET(self):
+ self.handle_req()
+
+class HttpContentBlackboxTests(BlackboxTestCase):
+ def setUp(self):
+ self.server = HTTPServer((os.getenv("SERVER_IP", "localhost"), 8080),
+ ContentHTTPRequestHandler,
+ bind_and_activate=False)
+ self.t = threading.Thread(target=HttpContentBlackboxTests.http_server, args=(self,))
+ self.t.setDaemon(True)
+ self.t.start()
+ time.sleep(1)
+
+ def tearDown(self):
+ super().tearDown()
+
+ def http_server(self):
+ self.server.server_bind()
+ self.server.server_activate()
+ self.server.serve_forever()
+
+ def notest_simple_msg(self):
+ try:
+ msg = "simplemessage"
+ resp = self.check_output("%s -U%% -I%s --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.assertEqual(msg, resp.decode('utf-8'))
+ except BlackboxProcessError as e:
+ print("Failed with: %s" % e)
+ self.fail(str(e))
+
+ def test_exceed_request_size(self):
+ try:
+ msg = "012345678" # 9 bytes
+ # limit response to 8 bytes
+ resp = self.check_output("%s -d11 -U%% -I%s --rsize 8 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.fail("unexpected success")
+ except BlackboxProcessError as e:
+ if "unexpected 0 len response" not in e.stdout.decode('utf-8'):
+ self.fail(str(e))
+ if "http_parse_headers: Skipping body for code 200" not in e.stderr.decode('utf-8'):
+ self.fail(str(e))
+
+ def test_exact_request_size(self):
+ try:
+ msg = "012345678" # 9 bytes
+ # limit response to 9 bytes
+ resp = self.check_output("%s -U%% -I%s --rsize 9 --uri %s" % (COMMAND, os.getenv("SERVER_IP", "localhost"), msg))
+ self.assertEqual(msg, resp.decode('utf-8'))
+ except BlackboxProcessError as e:
+ print("Failed with: %s" % e)
+ self.fail(str(e))
diff --git a/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py b/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py
index cc13727..5b3a271 100644
--- a/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py
+++ b/python/samba/tests/blackbox/smbcacls_propagate_inhertance.py
@@ -1288,3 +1288,111 @@ class InheritanceSmbCaclsTests(SmbCaclsBlockboxTestBase):
except BlackboxProcessError as e:
self.fail(str(e))
+
+ def test_simple_iocioi_add(self):
+ """test smbcacls '--propagate-inheritance --add' which attempts to add the ACL
+ for the file and additionally use inheritance rules to propagate appropriate
+ changes to children
+
+ This test adds an ACL with (IO)(CI)(OI)(READ)
+
+ before:
+
+ +-tar_test_dir/ (OI)(CI)(I)(F)
+ +-oi_dir/ (OI)(CI)(I)(F)
+ | +-file.1 (I)(F)
+ | +-nested/ (OI)(CI)(I)(F)
+ | +-file.2 (I)(F)
+ | +-nested_again/ (OI)(CI)(I)(F)
+ | +-file.3 (I)(F)
+
+ after/expected:
+
+ +-tar_test_dir/ (OI)(CI)(I)(F)
+ +-oi_dir/ (OI)(CI)(I)(F), (IO)(CI)(OI)(READ)
+ | +-file.1 (I)(F), (I)(READ)
+ | +-nested/ (OI)(CI)(I)(F), (I)(CI)(OI)(READ)
+ | +-file.2 (I)(F), (I)(READ)
+ | +-nested_again/ (OI)(CI)(I)(F), (I)(CI)(OI)(READ)
+ | +-file.3 (I)(F), (I)(READ)"""
+
+ dir_add_acl_str = "ACL:%s:ALLOWED/OI|CI|IO/READ" % self.user
+ obj_inherited_ace_str = "ACL:%s:ALLOWED/I/READ" % self.user
+ dir_inherited_ace_str = "ACL:%s:ALLOWED/OI|CI|I/READ" % self.user
+
+ try:
+
+ self.smb_cacls(["--propagate-inheritance", "--add",
+ dir_add_acl_str, self.oi_dir])
+
+ # check top level container 'oi_dir' has IO|CI|OI/READ
+ dir_ace = self.ace_parse_str(dir_add_acl_str)
+ self.assertTrue(self.file_ace_check(self.oi_dir, dir_ace))
+
+ # file 'oi_dir/file-1' should have inherited I/READ
+ child_file_ace = self.ace_parse_str(obj_inherited_ace_str)
+ self.assertTrue(self.file_ace_check(self.f1, child_file_ace))
+
+ # nested dir 'oi_dir/nested/' should have I|CI|OI/READ
+ child_dir_ace = self.ace_parse_str(dir_inherited_ace_str)
+ self.assertTrue(self.file_ace_check(self.nested_dir, child_dir_ace))
+
+ # nested file 'oi_dir/nested/file-2' should have inherited I/READ
+ self.assertTrue(self.file_ace_check(self.f2, child_file_ace))
+
+ # nested_again dir 'oi_dir/nested/nested_again' should have I|CI|OI/READ
+ child_dir_ace = self.ace_parse_str(dir_inherited_ace_str)
+ self.assertTrue(self.file_ace_check(self.nested_again_dir, child_dir_ace))
+ # nested_again file 'oi_dir/nested/nested_again/file-3' should have inherited I/READ
+ self.assertTrue(self.file_ace_check(self.f3, child_file_ace))
+ except BlackboxProcessError as e:
+ self.fail(str(e))
+
+ def test_simple_ioci_add(self):
+ """test smbcacls '--propagate-inheritance --add' which attempts to add the ACL
+ for the file and additionally use inheritance rules to propagate appropriate
+ changes to children
+
+ This test adds an ACL with (IO)(CI)(READ)
+
+ before:
+
+ +-tar_test_dir/ (OI)(CI)(I)(F)
+ +-oi_dir/ (OI)(CI)(I)(F)
+ | +-file.1 (I)(F)
+ | +-nested/ (OI)(CI)(I)(F)
+ | +-file.2 (I)(F)
+ | +-nested_again/ (OI)(CI)(I)(F)
+ | +-file.3 (I)(F)
+
+ after/expected:
+
+ +-tar_test_dir/ (OI)(CI)(I)(F)
+ +-oi_dir/ (OI)(CI)(I)(F), (IO)(CI)(READ)
+ | +-file.1 (I)(F)
+ | +-nested/ (OI)(CI)(I)(F), (I)(CI)(READ)
+ | +-file.2 (I)(F)
+ | +-nested_again/ (OI)(CI)(I)(F), (I)(CI)(READ)
+ | +-file.3 (I)(F)"""
+
+ dir_add_acl_str = "ACL:%s:ALLOWED/CI|IO/READ" % self.user
+ dir_inherited_ace_str = "ACL:%s:ALLOWED/CI|I/READ" % self.user
+
+ try:
+
+ self.smb_cacls(["--propagate-inheritance", "--add",
+ dir_add_acl_str, self.oi_dir])
+
+ # check top level container 'oi_dir' has IO|CI/READ
+ dir_ace = self.ace_parse_str(dir_add_acl_str)
+ self.assertTrue(self.file_ace_check(self.oi_dir, dir_ace))
+
+ # nested dir 'oi_dir/nested/' should have I|CI/READ
+ child_dir_ace = self.ace_parse_str(dir_inherited_ace_str)
+ self.assertTrue(self.file_ace_check(self.nested_dir, child_dir_ace))
+
+ # nested_again dir 'oi_dir/nested/nested_again' should have I|CI/READ
+ child_dir_ace = self.ace_parse_str(dir_inherited_ace_str)
+ self.assertTrue(self.file_ace_check(self.nested_again_dir, child_dir_ace))
+ except BlackboxProcessError as e:
+ self.fail(str(e))