summaryrefslogtreecommitdiffstats
path: root/python/libknot/control.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/libknot/control.py')
-rw-r--r--python/libknot/control.py374
1 files changed, 374 insertions, 0 deletions
diff --git a/python/libknot/control.py b/python/libknot/control.py
new file mode 100644
index 0000000..44aa516
--- /dev/null
+++ b/python/libknot/control.py
@@ -0,0 +1,374 @@
+"""Libknot server control interface wrapper."""
+
+import ctypes
+import enum
+import warnings
+import libknot
+
+
+def load_lib(path: str = None) -> None:
+ """Compatibility wrapper."""
+
+ libknot.Knot(path)
+ warnings.warn("libknot.control.load_lib() is deprecated, use libknot.Knot() instead", \
+ category=Warning, stacklevel=2)
+
+
+class KnotCtlType(enum.IntEnum):
+ """Libknot server control data unit types."""
+
+ END = 0
+ DATA = 1
+ EXTRA = 2
+ BLOCK = 3
+
+
+class KnotCtlDataIdx(enum.IntEnum):
+ """Libknot server control data unit indices."""
+
+ COMMAND = 0
+ FLAGS = 1
+ ERROR = 2
+ SECTION = 3
+ ITEM = 4
+ ID = 5
+ ZONE = 6
+ OWNER = 7
+ TTL = 8
+ TYPE = 9
+ DATA = 10
+ FILTER = 11
+
+
+class KnotCtlData(object):
+ """Libknot server control data unit."""
+
+ DataArray = ctypes.c_char_p * len(KnotCtlDataIdx)
+
+ def __init__(self) -> None:
+ self.data = self.DataArray()
+
+ def __str__(self) -> str:
+ """Returns data unit in text form."""
+
+ string = str()
+
+ for idx in KnotCtlDataIdx:
+ if self.data[idx]:
+ if string:
+ string += ", "
+ string += "%s = '%s'" % (idx.name, self.data[idx].decode())
+
+ return string
+
+ def __getitem__(self, index: KnotCtlDataIdx) -> str:
+ """Data unit item getter."""
+
+ value = self.data[index]
+ return value.decode() if value else str()
+
+ def __setitem__(self, index: KnotCtlDataIdx, value: str) -> None:
+ """Data unit item setter."""
+
+ self.data[index] = ctypes.c_char_p(value.encode()) if value else ctypes.c_char_p()
+
+
+class KnotCtlError(Exception):
+ """Libknot server control error."""
+
+ def __init__(self, message: str, data: KnotCtlData = None) -> None:
+ super().__init__()
+ self.message = message
+ self.data = data
+
+ def __str__(self) -> str:
+ out = "%s" % self.message
+ if self.data:
+ out += " (%s)" % self.data
+ return out
+
+
+class KnotCtlErrorConnect(KnotCtlError):
+ """Control connection error."""
+
+
+class KnotCtlErrorSend(KnotCtlError):
+ """Control data send error."""
+
+
+class KnotCtlErrorReceive(KnotCtlError):
+ """Control data receive error."""
+
+
+class KnotCtlErrorRemote(KnotCtlError):
+ """Control error on the remote (server) side."""
+
+
+class KnotCtl(object):
+ """Libknot server control interface."""
+
+ ALLOC = None
+ FREE = None
+ SET_TIMEOUT = None
+ CONNECT = None
+ CLOSE = None
+ SEND = None
+ RECEIVE = None
+
+ def __init__(self) -> None:
+ """Initializes a control interface instance."""
+
+ if not KnotCtl.ALLOC:
+ libknot.Knot()
+
+ KnotCtl.ALLOC = libknot.Knot.LIBKNOT.knot_ctl_alloc
+ KnotCtl.ALLOC.restype = ctypes.c_void_p
+
+ KnotCtl.FREE = libknot.Knot.LIBKNOT.knot_ctl_free
+ KnotCtl.FREE.argtypes = [ctypes.c_void_p]
+
+ KnotCtl.SET_TIMEOUT = libknot.Knot.LIBKNOT.knot_ctl_set_timeout
+ KnotCtl.SET_TIMEOUT.argtypes = [ctypes.c_void_p, ctypes.c_int]
+
+ KnotCtl.CONNECT = libknot.Knot.LIBKNOT.knot_ctl_connect
+ KnotCtl.CONNECT.restype = ctypes.c_int
+ KnotCtl.CONNECT.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
+
+ KnotCtl.CLOSE = libknot.Knot.LIBKNOT.knot_ctl_close
+ KnotCtl.CLOSE.argtypes = [ctypes.c_void_p]
+
+ KnotCtl.SEND = libknot.Knot.LIBKNOT.knot_ctl_send
+ KnotCtl.SEND.restype = ctypes.c_int
+ KnotCtl.SEND.argtypes = [ctypes.c_void_p, ctypes.c_uint, ctypes.c_void_p]
+
+ KnotCtl.RECEIVE = libknot.Knot.LIBKNOT.knot_ctl_receive
+ KnotCtl.RECEIVE.restype = ctypes.c_int
+ KnotCtl.RECEIVE.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
+
+ self.obj = KnotCtl.ALLOC()
+
+ def __del__(self) -> None:
+ """Deallocates control interface instance."""
+
+ KnotCtl.FREE(self.obj)
+
+ def set_timeout(self, timeout: int) -> None:
+ """Sets control socket operations timeout in seconds."""
+
+ KnotCtl.SET_TIMEOUT(self.obj, timeout * 1000)
+
+ def connect(self, path: str) -> None:
+ """Connect to a specified control UNIX socket."""
+
+ ret = KnotCtl.CONNECT(self.obj, path.encode())
+ if ret != 0:
+ err = libknot.Knot.STRERROR(ret)
+ raise KnotCtlErrorConnect(err.decode())
+
+ def close(self) -> None:
+ """Disconnects from the current control socket."""
+
+ KnotCtl.CLOSE(self.obj)
+
+ def send(self, data_type: KnotCtlType, data: KnotCtlData = None) -> None:
+ """Sends a data unit to the connected control socket."""
+
+ ret = KnotCtl.SEND(self.obj, data_type,
+ data.data if data else ctypes.c_char_p())
+ if ret != 0:
+ err = libknot.Knot.STRERROR(ret)
+ raise KnotCtlErrorSend(err.decode())
+
+ def receive(self, data: KnotCtlData = None) -> KnotCtlType:
+ """Receives a data unit from the connected control socket."""
+
+ data_type = ctypes.c_uint()
+ ret = KnotCtl.RECEIVE(self.obj, ctypes.byref(data_type),
+ data.data if data else ctypes.c_char_p())
+ if ret != 0:
+ err = libknot.Knot.STRERROR(ret)
+ raise KnotCtlErrorReceive(err.decode())
+ return KnotCtlType(data_type.value)
+
+ def send_block(self, cmd: str, section: str = None, item: str = None,
+ identifier: str = None, zone: str = None, owner: str = None,
+ ttl: str = None, rtype: str = None, data: str = None,
+ flags: str = None, filters: str = None) -> None:
+ """Sends a control query block."""
+
+ query = KnotCtlData()
+ query[KnotCtlDataIdx.COMMAND] = cmd
+ query[KnotCtlDataIdx.SECTION] = section
+ query[KnotCtlDataIdx.ITEM] = item
+ query[KnotCtlDataIdx.ID] = identifier
+ query[KnotCtlDataIdx.ZONE] = zone
+ query[KnotCtlDataIdx.OWNER] = owner
+ query[KnotCtlDataIdx.TTL] = ttl
+ query[KnotCtlDataIdx.TYPE] = rtype
+ query[KnotCtlDataIdx.DATA] = data
+ query[KnotCtlDataIdx.FLAGS] = flags
+ query[KnotCtlDataIdx.FILTER] = filters
+
+ self.send(KnotCtlType.DATA, query)
+ self.send(KnotCtlType.BLOCK)
+
+ def _receive_conf(self, out, reply):
+
+ section = reply[KnotCtlDataIdx.SECTION]
+ ident = reply[KnotCtlDataIdx.ID]
+ item = reply[KnotCtlDataIdx.ITEM]
+ data = reply[KnotCtlDataIdx.DATA]
+
+ # Add the section if not exists.
+ if section not in out:
+ out[section] = dict()
+
+ # Add the identifier if not exists.
+ if ident and ident not in out[section]:
+ out[section][ident] = dict()
+
+ # Return if no item/value.
+ if not item:
+ return
+
+ item_level = out[section][ident] if ident else out[section]
+
+ # Treat alone identifier item differently.
+ if item in ["id", "domain", "target"]:
+ if data not in out[section]:
+ out[section][data] = dict()
+ else:
+ if item not in item_level:
+ item_level[item] = list()
+
+ if data:
+ item_level[item].append(data)
+
+ def _receive_zone_status(self, out, reply):
+
+ zone = reply[KnotCtlDataIdx.ZONE]
+ rtype = reply[KnotCtlDataIdx.TYPE]
+ data = reply[KnotCtlDataIdx.DATA]
+
+ # Add the zone if not exists.
+ if zone not in out:
+ out[zone] = dict()
+
+ out[zone][rtype] = data
+
+ def _receive_zone(self, out, reply):
+
+ zone = reply[KnotCtlDataIdx.ZONE]
+ owner = reply[KnotCtlDataIdx.OWNER]
+ ttl = reply[KnotCtlDataIdx.TTL]
+ rtype = reply[KnotCtlDataIdx.TYPE]
+ data = reply[KnotCtlDataIdx.DATA]
+
+ # Add the zone if not exists.
+ if zone not in out:
+ out[zone] = dict()
+
+ if owner not in out[zone]:
+ out[zone][owner] = dict()
+
+ if rtype not in out[zone][owner]:
+ out[zone][owner][rtype] = dict()
+
+ # Add the key/value.
+ out[zone][owner][rtype]["ttl"] = ttl
+
+ if not "data" in out[zone][owner][rtype]:
+ out[zone][owner][rtype]["data"] = [data]
+ else:
+ out[zone][owner][rtype]["data"].append(data)
+
+ def _receive_stats(self, out, reply):
+
+ zone = reply[KnotCtlDataIdx.ZONE]
+ section = reply[KnotCtlDataIdx.SECTION]
+ item = reply[KnotCtlDataIdx.ITEM]
+ idx = reply[KnotCtlDataIdx.ID]
+ data = int(reply[KnotCtlDataIdx.DATA])
+
+ # Add the zone if not exists.
+ if zone:
+ if "zone" not in out:
+ out["zone"] = dict()
+
+ if zone not in out["zone"]:
+ out["zone"][zone] = dict()
+
+ section_level = out["zone"][zone] if zone else out
+
+ if section not in section_level:
+ section_level[section] = dict()
+
+ if idx:
+ if item not in section_level[section]:
+ section_level[section][item] = dict()
+
+ section_level[section][item][idx] = data
+ else:
+ section_level[section][item] = data
+
+ def receive_stats(self) -> dict:
+ """Receives statistics answer and returns it as a structured dictionary."""
+
+ out = dict()
+ err_reply = None
+
+ while True:
+ reply = KnotCtlData()
+ reply_type = self.receive(reply)
+
+ # Stop if not data type.
+ if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
+ break
+
+ # Check for an error.
+ if reply[KnotCtlDataIdx.ERROR]:
+ err_reply = reply
+ continue
+
+ self._receive_stats(out, reply)
+
+ if err_reply:
+ raise KnotCtlErrorRemote(err_reply[KnotCtlDataIdx.ERROR], err_reply)
+
+ return out
+
+ def receive_block(self) -> dict:
+ """Receives a control answer and returns it as a structured dictionary."""
+
+ out = dict()
+ err_reply = None
+
+ while True:
+ reply = KnotCtlData()
+ reply_type = self.receive(reply)
+
+ # Stop if not data type.
+ if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
+ break
+
+ # Check for an error.
+ if reply[KnotCtlDataIdx.ERROR]:
+ err_reply = reply
+ continue
+
+ # Check for config data.
+ if reply[KnotCtlDataIdx.SECTION]:
+ self._receive_conf(out, reply)
+ # Check for zone data.
+ elif reply[KnotCtlDataIdx.ZONE]:
+ if reply[KnotCtlDataIdx.OWNER]:
+ self._receive_zone(out, reply)
+ else:
+ self._receive_zone_status(out, reply)
+ else:
+ continue
+
+ if err_reply:
+ raise KnotCtlErrorRemote(err_reply[KnotCtlDataIdx.ERROR], err_reply)
+
+ return out