summaryrefslogtreecommitdiffstats
path: root/tests/_stub_sftp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/_stub_sftp.py')
-rw-r--r--tests/_stub_sftp.py232
1 files changed, 232 insertions, 0 deletions
diff --git a/tests/_stub_sftp.py b/tests/_stub_sftp.py
new file mode 100644
index 0000000..0c0372e
--- /dev/null
+++ b/tests/_stub_sftp.py
@@ -0,0 +1,232 @@
+# Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""
+A stub SFTP server for loopback SFTP testing.
+"""
+
+import os
+
+from paramiko import (
+ AUTH_SUCCESSFUL,
+ OPEN_SUCCEEDED,
+ SFTPAttributes,
+ SFTPHandle,
+ SFTPServer,
+ SFTPServerInterface,
+ SFTP_FAILURE,
+ SFTP_OK,
+ ServerInterface,
+)
+from paramiko.common import o666
+
+
+class StubServer(ServerInterface):
+ def check_auth_password(self, username, password):
+ # all are allowed
+ return AUTH_SUCCESSFUL
+
+ def check_channel_request(self, kind, chanid):
+ return OPEN_SUCCEEDED
+
+
+class StubSFTPHandle(SFTPHandle):
+ def stat(self):
+ try:
+ return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def chattr(self, attr):
+ # python doesn't have equivalents to fchown or fchmod, so we have to
+ # use the stored filename
+ try:
+ SFTPServer.set_file_attr(self.filename, attr)
+ return SFTP_OK
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+
+
+class StubSFTPServer(SFTPServerInterface):
+ # assume current folder is a fine root
+ # (the tests always create and eventually delete a subfolder, so there
+ # shouldn't be any mess)
+ ROOT = os.getcwd()
+
+ def _realpath(self, path):
+ return self.ROOT + self.canonicalize(path)
+
+ def list_folder(self, path):
+ path = self._realpath(path)
+ try:
+ out = []
+ flist = os.listdir(path)
+ for fname in flist:
+ attr = SFTPAttributes.from_stat(
+ os.stat(os.path.join(path, fname))
+ )
+ attr.filename = fname
+ out.append(attr)
+ return out
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def stat(self, path):
+ path = self._realpath(path)
+ try:
+ return SFTPAttributes.from_stat(os.stat(path))
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def lstat(self, path):
+ path = self._realpath(path)
+ try:
+ return SFTPAttributes.from_stat(os.lstat(path))
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def open(self, path, flags, attr):
+ path = self._realpath(path)
+ try:
+ binary_flag = getattr(os, "O_BINARY", 0)
+ flags |= binary_flag
+ mode = getattr(attr, "st_mode", None)
+ if mode is not None:
+ fd = os.open(path, flags, mode)
+ else:
+ # os.open() defaults to 0777 which is
+ # an odd default mode for files
+ fd = os.open(path, flags, o666)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ if (flags & os.O_CREAT) and (attr is not None):
+ attr._flags &= ~attr.FLAG_PERMISSIONS
+ SFTPServer.set_file_attr(path, attr)
+ if flags & os.O_WRONLY:
+ if flags & os.O_APPEND:
+ fstr = "ab"
+ else:
+ fstr = "wb"
+ elif flags & os.O_RDWR:
+ if flags & os.O_APPEND:
+ fstr = "a+b"
+ else:
+ fstr = "r+b"
+ else:
+ # O_RDONLY (== 0)
+ fstr = "rb"
+ try:
+ f = os.fdopen(fd, fstr)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ fobj = StubSFTPHandle(flags)
+ fobj.filename = path
+ fobj.readfile = f
+ fobj.writefile = f
+ return fobj
+
+ def remove(self, path):
+ path = self._realpath(path)
+ try:
+ os.remove(path)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def rename(self, oldpath, newpath):
+ oldpath = self._realpath(oldpath)
+ newpath = self._realpath(newpath)
+ if os.path.exists(newpath):
+ return SFTP_FAILURE
+ try:
+ os.rename(oldpath, newpath)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def posix_rename(self, oldpath, newpath):
+ oldpath = self._realpath(oldpath)
+ newpath = self._realpath(newpath)
+ try:
+ os.rename(oldpath, newpath)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def mkdir(self, path, attr):
+ path = self._realpath(path)
+ try:
+ os.mkdir(path)
+ if attr is not None:
+ SFTPServer.set_file_attr(path, attr)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def rmdir(self, path):
+ path = self._realpath(path)
+ try:
+ os.rmdir(path)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def chattr(self, path, attr):
+ path = self._realpath(path)
+ try:
+ SFTPServer.set_file_attr(path, attr)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def symlink(self, target_path, path):
+ path = self._realpath(path)
+ if (len(target_path) > 0) and (target_path[0] == "/"):
+ # absolute symlink
+ target_path = os.path.join(self.ROOT, target_path[1:])
+ if target_path[:2] == "//":
+ # bug in os.path.join
+ target_path = target_path[1:]
+ else:
+ # compute relative to path
+ abspath = os.path.join(os.path.dirname(path), target_path)
+ if abspath[: len(self.ROOT)] != self.ROOT:
+ # this symlink isn't going to work anyway -- just break it
+ # immediately
+ target_path = "<error>"
+ try:
+ os.symlink(target_path, path)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def readlink(self, path):
+ path = self._realpath(path)
+ try:
+ symlink = os.readlink(path)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ # if it's absolute, remove the root
+ if os.path.isabs(symlink):
+ if symlink[: len(self.ROOT)] == self.ROOT:
+ symlink = symlink[len(self.ROOT) :]
+ if (len(symlink) == 0) or (symlink[0] != "/"):
+ symlink = "/" + symlink
+ else:
+ symlink = "<error>"
+ return symlink