diff options
Diffstat (limited to 'python/libknot/control.py')
-rw-r--r-- | python/libknot/control.py | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/python/libknot/control.py b/python/libknot/control.py new file mode 100644 index 0000000..44aa516 --- /dev/null +++ b/python/libknot/control.py @@ -0,0 +1,374 @@ +"""Libknot server control interface wrapper.""" + +import ctypes +import enum +import warnings +import libknot + + +def load_lib(path: str = None) -> None: + """Compatibility wrapper.""" + + libknot.Knot(path) + warnings.warn("libknot.control.load_lib() is deprecated, use libknot.Knot() instead", \ + category=Warning, stacklevel=2) + + +class KnotCtlType(enum.IntEnum): + """Libknot server control data unit types.""" + + END = 0 + DATA = 1 + EXTRA = 2 + BLOCK = 3 + + +class KnotCtlDataIdx(enum.IntEnum): + """Libknot server control data unit indices.""" + + COMMAND = 0 + FLAGS = 1 + ERROR = 2 + SECTION = 3 + ITEM = 4 + ID = 5 + ZONE = 6 + OWNER = 7 + TTL = 8 + TYPE = 9 + DATA = 10 + FILTER = 11 + + +class KnotCtlData(object): + """Libknot server control data unit.""" + + DataArray = ctypes.c_char_p * len(KnotCtlDataIdx) + + def __init__(self) -> None: + self.data = self.DataArray() + + def __str__(self) -> str: + """Returns data unit in text form.""" + + string = str() + + for idx in KnotCtlDataIdx: + if self.data[idx]: + if string: + string += ", " + string += "%s = '%s'" % (idx.name, self.data[idx].decode()) + + return string + + def __getitem__(self, index: KnotCtlDataIdx) -> str: + """Data unit item getter.""" + + value = self.data[index] + return value.decode() if value else str() + + def __setitem__(self, index: KnotCtlDataIdx, value: str) -> None: + """Data unit item setter.""" + + self.data[index] = ctypes.c_char_p(value.encode()) if value else ctypes.c_char_p() + + +class KnotCtlError(Exception): + """Libknot server control error.""" + + def __init__(self, message: str, data: KnotCtlData = None) -> None: + super().__init__() + self.message = message + self.data = data + + def __str__(self) -> str: + out = "%s" % self.message + if self.data: + out += " (%s)" % self.data + return out + + +class KnotCtlErrorConnect(KnotCtlError): + """Control connection error.""" + + +class KnotCtlErrorSend(KnotCtlError): + """Control data send error.""" + + +class KnotCtlErrorReceive(KnotCtlError): + """Control data receive error.""" + + +class KnotCtlErrorRemote(KnotCtlError): + """Control error on the remote (server) side.""" + + +class KnotCtl(object): + """Libknot server control interface.""" + + ALLOC = None + FREE = None + SET_TIMEOUT = None + CONNECT = None + CLOSE = None + SEND = None + RECEIVE = None + + def __init__(self) -> None: + """Initializes a control interface instance.""" + + if not KnotCtl.ALLOC: + libknot.Knot() + + KnotCtl.ALLOC = libknot.Knot.LIBKNOT.knot_ctl_alloc + KnotCtl.ALLOC.restype = ctypes.c_void_p + + KnotCtl.FREE = libknot.Knot.LIBKNOT.knot_ctl_free + KnotCtl.FREE.argtypes = [ctypes.c_void_p] + + KnotCtl.SET_TIMEOUT = libknot.Knot.LIBKNOT.knot_ctl_set_timeout + KnotCtl.SET_TIMEOUT.argtypes = [ctypes.c_void_p, ctypes.c_int] + + KnotCtl.CONNECT = libknot.Knot.LIBKNOT.knot_ctl_connect + KnotCtl.CONNECT.restype = ctypes.c_int + KnotCtl.CONNECT.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + + KnotCtl.CLOSE = libknot.Knot.LIBKNOT.knot_ctl_close + KnotCtl.CLOSE.argtypes = [ctypes.c_void_p] + + KnotCtl.SEND = libknot.Knot.LIBKNOT.knot_ctl_send + KnotCtl.SEND.restype = ctypes.c_int + KnotCtl.SEND.argtypes = [ctypes.c_void_p, ctypes.c_uint, ctypes.c_void_p] + + KnotCtl.RECEIVE = libknot.Knot.LIBKNOT.knot_ctl_receive + KnotCtl.RECEIVE.restype = ctypes.c_int + KnotCtl.RECEIVE.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] + + self.obj = KnotCtl.ALLOC() + + def __del__(self) -> None: + """Deallocates control interface instance.""" + + KnotCtl.FREE(self.obj) + + def set_timeout(self, timeout: int) -> None: + """Sets control socket operations timeout in seconds.""" + + KnotCtl.SET_TIMEOUT(self.obj, timeout * 1000) + + def connect(self, path: str) -> None: + """Connect to a specified control UNIX socket.""" + + ret = KnotCtl.CONNECT(self.obj, path.encode()) + if ret != 0: + err = libknot.Knot.STRERROR(ret) + raise KnotCtlErrorConnect(err.decode()) + + def close(self) -> None: + """Disconnects from the current control socket.""" + + KnotCtl.CLOSE(self.obj) + + def send(self, data_type: KnotCtlType, data: KnotCtlData = None) -> None: + """Sends a data unit to the connected control socket.""" + + ret = KnotCtl.SEND(self.obj, data_type, + data.data if data else ctypes.c_char_p()) + if ret != 0: + err = libknot.Knot.STRERROR(ret) + raise KnotCtlErrorSend(err.decode()) + + def receive(self, data: KnotCtlData = None) -> KnotCtlType: + """Receives a data unit from the connected control socket.""" + + data_type = ctypes.c_uint() + ret = KnotCtl.RECEIVE(self.obj, ctypes.byref(data_type), + data.data if data else ctypes.c_char_p()) + if ret != 0: + err = libknot.Knot.STRERROR(ret) + raise KnotCtlErrorReceive(err.decode()) + return KnotCtlType(data_type.value) + + def send_block(self, cmd: str, section: str = None, item: str = None, + identifier: str = None, zone: str = None, owner: str = None, + ttl: str = None, rtype: str = None, data: str = None, + flags: str = None, filters: str = None) -> None: + """Sends a control query block.""" + + query = KnotCtlData() + query[KnotCtlDataIdx.COMMAND] = cmd + query[KnotCtlDataIdx.SECTION] = section + query[KnotCtlDataIdx.ITEM] = item + query[KnotCtlDataIdx.ID] = identifier + query[KnotCtlDataIdx.ZONE] = zone + query[KnotCtlDataIdx.OWNER] = owner + query[KnotCtlDataIdx.TTL] = ttl + query[KnotCtlDataIdx.TYPE] = rtype + query[KnotCtlDataIdx.DATA] = data + query[KnotCtlDataIdx.FLAGS] = flags + query[KnotCtlDataIdx.FILTER] = filters + + self.send(KnotCtlType.DATA, query) + self.send(KnotCtlType.BLOCK) + + def _receive_conf(self, out, reply): + + section = reply[KnotCtlDataIdx.SECTION] + ident = reply[KnotCtlDataIdx.ID] + item = reply[KnotCtlDataIdx.ITEM] + data = reply[KnotCtlDataIdx.DATA] + + # Add the section if not exists. + if section not in out: + out[section] = dict() + + # Add the identifier if not exists. + if ident and ident not in out[section]: + out[section][ident] = dict() + + # Return if no item/value. + if not item: + return + + item_level = out[section][ident] if ident else out[section] + + # Treat alone identifier item differently. + if item in ["id", "domain", "target"]: + if data not in out[section]: + out[section][data] = dict() + else: + if item not in item_level: + item_level[item] = list() + + if data: + item_level[item].append(data) + + def _receive_zone_status(self, out, reply): + + zone = reply[KnotCtlDataIdx.ZONE] + rtype = reply[KnotCtlDataIdx.TYPE] + data = reply[KnotCtlDataIdx.DATA] + + # Add the zone if not exists. + if zone not in out: + out[zone] = dict() + + out[zone][rtype] = data + + def _receive_zone(self, out, reply): + + zone = reply[KnotCtlDataIdx.ZONE] + owner = reply[KnotCtlDataIdx.OWNER] + ttl = reply[KnotCtlDataIdx.TTL] + rtype = reply[KnotCtlDataIdx.TYPE] + data = reply[KnotCtlDataIdx.DATA] + + # Add the zone if not exists. + if zone not in out: + out[zone] = dict() + + if owner not in out[zone]: + out[zone][owner] = dict() + + if rtype not in out[zone][owner]: + out[zone][owner][rtype] = dict() + + # Add the key/value. + out[zone][owner][rtype]["ttl"] = ttl + + if not "data" in out[zone][owner][rtype]: + out[zone][owner][rtype]["data"] = [data] + else: + out[zone][owner][rtype]["data"].append(data) + + def _receive_stats(self, out, reply): + + zone = reply[KnotCtlDataIdx.ZONE] + section = reply[KnotCtlDataIdx.SECTION] + item = reply[KnotCtlDataIdx.ITEM] + idx = reply[KnotCtlDataIdx.ID] + data = int(reply[KnotCtlDataIdx.DATA]) + + # Add the zone if not exists. + if zone: + if "zone" not in out: + out["zone"] = dict() + + if zone not in out["zone"]: + out["zone"][zone] = dict() + + section_level = out["zone"][zone] if zone else out + + if section not in section_level: + section_level[section] = dict() + + if idx: + if item not in section_level[section]: + section_level[section][item] = dict() + + section_level[section][item][idx] = data + else: + section_level[section][item] = data + + def receive_stats(self) -> dict: + """Receives statistics answer and returns it as a structured dictionary.""" + + out = dict() + err_reply = None + + while True: + reply = KnotCtlData() + reply_type = self.receive(reply) + + # Stop if not data type. + if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]: + break + + # Check for an error. + if reply[KnotCtlDataIdx.ERROR]: + err_reply = reply + continue + + self._receive_stats(out, reply) + + if err_reply: + raise KnotCtlErrorRemote(err_reply[KnotCtlDataIdx.ERROR], err_reply) + + return out + + def receive_block(self) -> dict: + """Receives a control answer and returns it as a structured dictionary.""" + + out = dict() + err_reply = None + + while True: + reply = KnotCtlData() + reply_type = self.receive(reply) + + # Stop if not data type. + if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]: + break + + # Check for an error. + if reply[KnotCtlDataIdx.ERROR]: + err_reply = reply + continue + + # Check for config data. + if reply[KnotCtlDataIdx.SECTION]: + self._receive_conf(out, reply) + # Check for zone data. + elif reply[KnotCtlDataIdx.ZONE]: + if reply[KnotCtlDataIdx.OWNER]: + self._receive_zone(out, reply) + else: + self._receive_zone_status(out, reply) + else: + continue + + if err_reply: + raise KnotCtlErrorRemote(err_reply[KnotCtlDataIdx.ERROR], err_reply) + + return out |