"""Run various standard Linux commands on remote host.""" from __future__ import annotations from typing import Any import jc from pytest_mh import MultihostHost, MultihostUtility from pytest_mh.conn import Process __all__ = [ "UnixObject", "UnixUser", "UnixGroup", "IdEntry", "PasswdEntry", "GroupEntry", "InitgroupsEntry", "LinuxToolsUtils", "KillCommand", "GetentUtils", ] class UnixObject(object): """ Generic Unix object. """ def __init__(self, id: int | None, name: str | None) -> None: """ :param id: Object ID. :type id: int | None :param name: Object name. :type name: str | None """ self.id: int | None = id """ ID. """ self.name: str | None = name """ Name. """ def __str__(self) -> str: return f'({self.id},"{self.name}")' def __repr__(self) -> str: return str(self) def __eq__(self, o: object) -> bool: if isinstance(o, str): return o == self.name elif isinstance(o, int): return o == self.id elif isinstance(o, tuple): if len(o) != 2 or not isinstance(o[0], int) or not isinstance(o[1], str): raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}") (id, name) = o return id == self.id and name == self.name elif isinstance(o, UnixObject): # Fallback to identity comparison return NotImplemented raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}") class UnixUser(UnixObject): """ Unix user. """ pass class UnixGroup(UnixObject): """ Unix group. """ pass class IdEntry(object): """ Result of ``id`` """ def __init__(self, user: UnixUser, group: UnixGroup, groups: list[UnixGroup]) -> None: self.user: UnixUser = user """ User information. """ self.group: UnixGroup = group """ Primary group. """ self.groups: list[UnixGroup] = groups """ Secondary groups. """ def memberof(self, groups: int | str | tuple[int, str] | list[int | str | tuple[int, str]]) -> bool: """ Check if the user is member of give group(s). Group specification can be either a single gid or group name. But it can be also a tuple of (gid, name) where both gid and name must match or list of groups where the user must be member of all given groups. :param groups: _description_ :type groups: int | str | tuple :return: _description_ :rtype: bool """ if isinstance(groups, (int, str, tuple)): return groups in self.groups return all(x in self.groups for x in groups) def __str__(self) -> str: return f"{{user={str(self.user)},group={str(self.group)},groups={str(self.groups)}}}" def __repr__(self) -> str: return str(self) @classmethod def FromDict(cls, d: dict[str, Any]) -> IdEntry: user = UnixUser(d["uid"]["id"], d["uid"].get("name", None)) group = UnixGroup(d["gid"]["id"], d["gid"].get("name", None)) groups = [] for secondary_group in d["groups"]: groups.append(UnixGroup(secondary_group["id"], secondary_group.get("name", None))) return cls(user, group, groups) @classmethod def FromOutput(cls, stdout: str) -> IdEntry: jcresult = jc.parse("id", stdout) if not isinstance(jcresult, dict): raise TypeError(f"Unexpected type: {type(jcresult)}, expecting dict") return cls.FromDict(jcresult) class PasswdEntry(object): """ Result of ``getent passwd`` """ def __init__(self, name: str, password: str, uid: int, gid: int, gecos: str, home: str, shell: str) -> None: self.name: str | None = name """ User name. """ self.password: str | None = password """ User password. """ self.uid: int = uid """ User id. """ self.gid: int = gid """ Group id. """ self.gecos: str | None = gecos """ GECOS. """ self.home: str | None = home """ Home directory. """ self.shell: str | None = shell """ Login shell. """ def __str__(self) -> str: return f"({self.name}:{self.password}:{self.uid}:{self.gid}:{self.gecos}:{self.home}:{self.shell})" def __repr__(self) -> str: return str(self) @classmethod def FromDict(cls, d: dict[str, Any]) -> PasswdEntry: return cls( name=d.get("username", None), password=d.get("password", None), uid=d.get("uid", None), gid=d.get("gid", None), gecos=d.get("comment", None), home=d.get("home", None), shell=d.get("shell", None), ) @classmethod def FromOutput(cls, stdout: str) -> PasswdEntry: result = jc.parse("passwd", stdout) if not isinstance(result, list): raise TypeError(f"Unexpected type: {type(result)}, expecting list") if len(result) != 1: raise ValueError("More then one entry was returned") return cls.FromDict(result[0]) class GroupEntry(object): """ Result of ``getent group`` """ def __init__(self, name: str, password: str, gid: int, members: list[str]) -> None: self.name: str | None = name """ Group name. """ self.password: str | None = password """ Group password. """ self.gid: int = gid """ Group id. """ self.members: list[str] = members """ Group members. """ def __str__(self) -> str: return f'({self.name}:{self.password}:{self.gid}:{",".join(self.members)})' def __repr__(self) -> str: return str(self) @classmethod def FromDict(cls, d: dict[str, Any]) -> GroupEntry: return cls( name=d.get("group_name", None), password=d.get("password", None), gid=d.get("gid", None), members=d.get("members", []), ) @classmethod def FromOutput(cls, stdout: str) -> GroupEntry: result = jc.parse("group", stdout) if not isinstance(result, list): raise TypeError(f"Unexpected type: {type(result)}, expecting list") if len(result) != 1: raise ValueError("More then one entry was returned") return cls.FromDict(result[0]) class InitgroupsEntry(object): """ Result of ``getent initgroups`` If user does not exist or does not have any supplementary groups then ``self.groups`` is empty. """ def __init__(self, name: str, groups: list[int]) -> None: self.name: str = name """ Exact username for which ``initgroups`` was called """ self.groups: list[int] = groups """ Group ids that ``name`` is member of. """ def __str__(self) -> str: return f'({self.name}:{",".join([str(i) for i in self.groups])})' def __repr__(self) -> str: return str(self) def memberof(self, groups: list[int]) -> bool: """ Check if the user is member of given groups. This method checks only supplementary groups not the primary group. :param groups: List of group ids :type groups: list[int] :return: If user is member of all given groups True, otherwise False. :rtype: bool """ return all(x in self.groups for x in groups) @classmethod def FromDict(cls, d: dict[str, Any]) -> InitgroupsEntry: return cls( name=d["name"], groups=d.get("groups", []), ) @classmethod def FromOutput(cls, stdout: str) -> InitgroupsEntry: result: list[str] = stdout.split() dictionary: dict[str, str | list[int]] = {} dictionary["name"] = result[0] if len(result) > 1: dictionary["groups"] = [int(x) for x in result[1:]] return cls.FromDict(dictionary) class LinuxToolsUtils(MultihostUtility[MultihostHost]): """ Run various standard commands on remote host. """ def __init__(self, host: MultihostHost) -> None: """ :param host: Remote host. :type host: MultihostHost """ super().__init__(host) self.getent: GetentUtils = GetentUtils(host) """ Run ``getent`` command. """ def id(self, name: str | int) -> IdEntry | None: """ Run ``id`` command. :param name: User name or id. :type name: str | int :return: id data, None if not found :rtype: IdEntry | None """ command = self.host.conn.exec(["id", name], raise_on_error=False) if command.rc != 0: return None return IdEntry.FromOutput(command.stdout) def grep(self, pattern: str, paths: str | list[str], args: list[str] | None = None) -> bool: """ Run ``grep`` command. :param pattern: Pattern to match. :type pattern: str :param paths: Paths to search. :type paths: str | list[str] :param args: Additional arguments to ``grep`` command, defaults to None. :type args: list[str] | None, optional :return: True if grep returned 0, False otherwise. :rtype: bool """ if args is None: args = [] paths = [paths] if isinstance(paths, str) else paths command = self.host.conn.exec(["grep", *args, pattern, *paths]) return command.rc == 0 class KillCommand(object): def __init__(self, host: MultihostHost, process: Process, pid: int) -> None: self.host = host self.process = process self.pid = pid self.__killed: bool = False def kill(self) -> None: if self.__killed: return self.host.conn.exec(["kill", self.pid]) self.__killed = True def __enter__(self) -> KillCommand: return self def __exit__(self, exception_type, exception_value, traceback) -> None: self.kill() self.process.wait() class GetentUtils(MultihostUtility[MultihostHost]): """ Interface to getent command. """ def __init__(self, host: MultihostHost) -> None: """ :param host: Remote host. :type host: MultihostHost """ super().__init__(host) def passwd(self, name: str | int, *, service: str | None = None) -> PasswdEntry | None: """ Call ``getent passwd $name`` :param name: User name or id. :type name: str | int :param service: Service used, defaults to None :type service: str | None :return: passwd data, None if not found :rtype: PasswdEntry | None """ return self.__exec(PasswdEntry, "passwd", name, service) def group(self, name: str | int, *, service: str | None = None) -> GroupEntry | None: """ Call ``getent group $name`` :param name: Group name or id. :type name: str | int :param service: Service used, defaults to None :type service: str | None :return: group data, None if not found :rtype: PasswdEntry | None """ return self.__exec(GroupEntry, "group", name, service) def initgroups(self, name: str, *, service: str | None = None) -> InitgroupsEntry: """ Call ``getent initgroups $name`` If ``name`` does not exist, group list is empty. This is standard behavior of ``getent initgroups`` :param name: User name. :type name: str :param service: Service used, defaults to None :type service: str | None :return: Initgroups data :rtype: InitgroupsEntry """ return self.__exec(InitgroupsEntry, "initgroups", name, service) def __exec(self, cls, cmd: str, name: str | int, service: str | None = None) -> Any: args = [] if service is not None: args = ["-s", service] command = self.host.conn.exec(["getent", *args, cmd, name], raise_on_error=False) if command.rc != 0: return None return cls.FromOutput(command.stdout)