From 8daa83a594a2e98f39d764422bfbdbc62c9efd44 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 19:20:00 +0200 Subject: Adding upstream version 2:4.20.0+dfsg. Signed-off-by: Daniel Baumann --- python/samba/tests/libsmb-basic.py | 268 +++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 python/samba/tests/libsmb-basic.py (limited to 'python/samba/tests/libsmb-basic.py') diff --git a/python/samba/tests/libsmb-basic.py b/python/samba/tests/libsmb-basic.py new file mode 100644 index 0000000..20d3dd4 --- /dev/null +++ b/python/samba/tests/libsmb-basic.py @@ -0,0 +1,268 @@ +# Unix SMB/CIFS implementation. +# Copyright Volker Lendecke 2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Tests for samba.samba3.libsmb.""" + +from samba.samba3 import libsmb_samba_internal as libsmb +from samba.dcerpc import security +from samba import NTSTATUSError,ntstatus +from samba.ntstatus import NT_STATUS_DELETE_PENDING +from samba.credentials import SMB_ENCRYPTION_REQUIRED +import samba.tests.libsmb +import threading +import sys +import random + + +class LibsmbTestCase(samba.tests.libsmb.LibsmbTests): + + class OpenClose(threading.Thread): + + def __init__(self, conn, filename, num_ops): + threading.Thread.__init__(self) + self.conn = conn + self.filename = filename + self.num_ops = num_ops + self.exc = False + + def run(self): + c = self.conn + try: + for i in range(self.num_ops): + f = c.create(self.filename, CreateDisposition=3, + DesiredAccess=security.SEC_STD_DELETE) + c.delete_on_close(f, True) + c.close(f) + except Exception: + self.exc = sys.exc_info() + + def test_OpenClose(self): + + c = libsmb.Conn( + self.server_ip, + "tmp", + self.lp, + self.creds, + multi_threaded=True, + force_smb1=True) + + mythreads = [] + + for i in range(3): + t = LibsmbTestCase.OpenClose(c, "test" + str(i), 10) + mythreads.append(t) + + for t in mythreads: + t.start() + + for t in mythreads: + t.join() + if t.exc: + raise t.exc[0](t.exc[1]) + + def test_SMB3EncryptionRequired(self): + test_dir = 'testing_%d' % random.randint(0, 0xFFFF) + + self.creds.set_smb_encryption(SMB_ENCRYPTION_REQUIRED) + + c = libsmb.Conn(self.server_ip, "tmp", self.lp, self.creds) + + c.mkdir(test_dir) + c.rmdir(test_dir) + + def test_SMB1EncryptionRequired(self): + test_dir = 'testing_%d' % random.randint(0, 0xFFFF) + + self.creds.set_smb_encryption(SMB_ENCRYPTION_REQUIRED) + + c = libsmb.Conn( + self.server_ip, + "tmp", + self.lp, + self.creds, + force_smb1=True) + + c.mkdir(test_dir) + c.rmdir(test_dir) + + def test_RenameDstDelOnClose(self): + + dstdir = "\\dst-subdir" + + c1 = libsmb.Conn(self.server_ip, "tmp", self.lp, self.creds) + c2 = libsmb.Conn(self.server_ip, "tmp", self.lp, self.creds) + + try: + c1.deltree(dstdir) + except: + pass + + c1.mkdir(dstdir) + dnum = c1.create(dstdir, DesiredAccess=security.SEC_STD_DELETE) + c1.delete_on_close(dnum,1) + c2.savefile("\\src.txt", b"Content") + + with self.assertRaises(NTSTATUSError) as cm: + c2.rename("\\src.txt", dstdir + "\\dst.txt") + if (cm.exception.args[0] != NT_STATUS_DELETE_PENDING): + raise AssertionError("Rename must fail with DELETE_PENDING") + + c1.delete_on_close(dnum,0) + c1.close(dnum) + + try: + c1.deltree(dstdir) + c1.unlink("\\src.txt") + except: + pass + + def test_libsmb_CreateContexts(self): + c = libsmb.Conn(self.server_ip, "tmp", self.lp, self.creds) + cc_in = [(libsmb.SMB2_CREATE_TAG_MXAC, b'')] + fnum,cr,cc = c.create_ex("",CreateContexts=cc_in) + self.assertEqual( + cr['file_attributes'] & libsmb.FILE_ATTRIBUTE_DIRECTORY, + libsmb.FILE_ATTRIBUTE_DIRECTORY) + self.assertEqual(cc[0][0],libsmb.SMB2_CREATE_TAG_MXAC) + self.assertEqual(len(cc[0][1]),8) + c.close(fnum) + + def test_libsmb_TortureCaseSensitivity(self): + testdir = "test_libsmb_torture_case_sensitivity" + filename = "file" + filepath = testdir + "/" + filename + + c = libsmb.Conn(self.server_ip, "tmp", self.lp, self.creds) + + try: + c.deltree(testdir) + except: + pass + + c.mkdir(testdir) + + try: + # Now check for all possible upper-/lowercase combinations: + # - testdir/file + # - TESTDIR/file + # - testdir/FILE + # - TESTDIR/FILE + + dircases = [testdir, testdir, testdir.upper(), testdir.upper()] + filecases = [filename, filename.upper(), filename, filename.upper()] + tcases = [{'dir':dir, 'file':file} for dir,file in zip(dircases,filecases)] + + for tcase in tcases: + testpath = tcase['dir'] + "/" + tcase['file'] + + # Create the testfile + h = c.create(filepath, + DesiredAccess=security.SEC_FILE_ALL, + CreateDisposition=libsmb.FILE_OPEN_IF) + c.close(h) + + # Open + c.loadfile(testpath) + + # Search + ls = [f['name'] for f in c.list(tcase['dir'], mask=tcase['file'])] + self.assertIn(filename, ls, msg='When searching for "%s" not found in "%s"' % (tcase['file'], tcase['dir'])) + + # Rename + c.rename(testpath, tcase['dir'] + "/tmp") + c.rename(tcase['dir'] + "/TMP", filepath) + c.loadfile(testpath) + + # Delete + c.unlink(testpath) + + finally: + c.deltree(testdir) + + def test_libsmb_TortureDirCaseSensitive(self): + c = libsmb.Conn(self.server_ip, "lowercase", self.lp, self.creds) + c.mkdir("subdir") + c.mkdir("subdir/b") + ret = c.chkpath("SubDir/b") + c.rmdir("subdir/b") + c.rmdir("subdir") + self.assertTrue(ret) + + def test_libsmb_shadow_depot(self): + c = libsmb.Conn(self.server_ip, "shadow_depot", self.lp, self.creds) + try: + fnum=c.create("x:y",CreateDisposition=libsmb.FILE_CREATE) + c.close(fnum) + except: + self.fail() + finally: + # "c" might have crashed, get a new connection + c1 = libsmb.Conn(self.server_ip, "shadow_depot", self.lp, self.creds) + c1.unlink("x") + c1 = None + + def test_gencache_pollution_bz15481(self): + c = libsmb.Conn(self.server_ip, "tmp", self.lp, self.creds) + fh = c.create("file", + DesiredAccess=security.SEC_STD_DELETE, + CreateDisposition=libsmb.FILE_CREATE) + + # prime the gencache File->file + fh_upper = c.create("File", + DesiredAccess=security.SEC_FILE_READ_ATTRIBUTE, + CreateDisposition=libsmb.FILE_OPEN) + c.close(fh_upper) + + c.delete_on_close(fh, 1) + c.close(fh) + + fh = c.create("File", + DesiredAccess=security.SEC_STD_DELETE, + CreateDisposition=libsmb.FILE_CREATE) + + directory = c.list("\\", "File") + + c.delete_on_close(fh, 1) + c.close(fh) + + # Without the bugfix for 15481 we get 'file' not 'File' + self.assertEqual(directory[0]['name'], 'File') + + def test_stream_close_with_full_information(self): + c = libsmb.Conn(self.server_ip, "streams_xattr", self.lp, self.creds) + + try: + c.deltree("teststreams") + except: + pass + + c.mkdir("teststreams") + fh = c.create("teststreams\\stream_full_close_info.txt:Stream", + DesiredAccess=security.SEC_STD_DELETE, + CreateDisposition=libsmb.FILE_CREATE) + c.delete_on_close(fh, 1) + + try: + c.close(fh, libsmb.SMB2_CLOSE_FLAGS_FULL_INFORMATION) + except: + self.fail() + + c.deltree("teststreams") + +if __name__ == "__main__": + import unittest + unittest.main() -- cgit v1.2.3