diff options
Diffstat (limited to 'python/libknot')
-rw-r--r-- | python/libknot/__init__.py.in | 3 | ||||
-rw-r--r-- | python/libknot/control.py | 441 |
2 files changed, 444 insertions, 0 deletions
diff --git a/python/libknot/__init__.py.in b/python/libknot/__init__.py.in new file mode 100644 index 0000000..5984340 --- /dev/null +++ b/python/libknot/__init__.py.in @@ -0,0 +1,3 @@ +"""Python libknot interface.""" + +LIBKNOT_VERSION = "@libknot_SOVERSION@" diff --git a/python/libknot/control.py b/python/libknot/control.py new file mode 100644 index 0000000..ced1be6 --- /dev/null +++ b/python/libknot/control.py @@ -0,0 +1,441 @@ +"""Libknot server control interface wrapper. + +Example: + import json + from libknot.control import * + + #load_lib("/usr/lib/libknot.so") + + ctl = KnotCtl() + ctl.connect("/var/run/knot/knot.sock") + + try: + ctl.send_block(cmd="conf-begin") + resp = ctl.receive_block() + + ctl.send_block(cmd="conf-set", section="zone", item="domain", data="test") + resp = ctl.receive_block() + + ctl.send_block(cmd="conf-commit") + resp = ctl.receive_block() + + ctl.send_block(cmd="conf-read", section="zone", item="domain") + resp = ctl.receive_block() + print(json.dumps(resp, indent=4)) + finally: + ctl.send(KnotCtlType.END) + ctl.close() +""" + +from ctypes import cdll, c_void_p, c_int, c_char_p, c_uint, byref +from enum import IntEnum +import sys + +CTL_ALLOC = None +CTL_FREE = None +CTL_SET_TIMEOUT = None +CTL_CONNECT = None +CTL_CLOSE = None +CTL_SEND = None +CTL_RECEIVE = None +CTL_ERROR = None + + +def load_lib(path=None): + """Loads the libknot library.""" + + if path is None: + version = "" + try: + from libknot import LIBKNOT_VERSION + version = ".%u" % int(LIBKNOT_VERSION) + except: + pass + + if sys.platform == "darwin": + path = "libknot%s.dylib" % version + else: + path = "libknot.so%s" % version + LIB = cdll.LoadLibrary(path) + + global CTL_ALLOC + CTL_ALLOC = LIB.knot_ctl_alloc + CTL_ALLOC.restype = c_void_p + + global CTL_FREE + CTL_FREE = LIB.knot_ctl_free + CTL_FREE.argtypes = [c_void_p] + + global CTL_SET_TIMEOUT + CTL_SET_TIMEOUT = LIB.knot_ctl_set_timeout + CTL_SET_TIMEOUT.argtypes = [c_void_p, c_int] + + global CTL_CONNECT + CTL_CONNECT = LIB.knot_ctl_connect + CTL_CONNECT.restype = c_int + CTL_CONNECT.argtypes = [c_void_p, c_char_p] + + global CTL_CLOSE + CTL_CLOSE = LIB.knot_ctl_close + CTL_CLOSE.argtypes = [c_void_p] + + global CTL_SEND + CTL_SEND = LIB.knot_ctl_send + CTL_SEND.restype = c_int + CTL_SEND.argtypes = [c_void_p, c_uint, c_void_p] + + global CTL_RECEIVE + CTL_RECEIVE = LIB.knot_ctl_receive + CTL_RECEIVE.restype = c_int + CTL_RECEIVE.argtypes = [c_void_p, c_void_p, c_void_p] + + global CTL_ERROR + CTL_ERROR = LIB.knot_strerror + CTL_ERROR.restype = c_char_p + CTL_ERROR.argtypes = [c_int] + + +class KnotCtlError(Exception): + """Libknot server control error.""" + + def __init__(self, message, data=None): + """ + @type message: str + @type data: KnotCtlData + """ + + self.message = message + self.data = data + + def __str__(self): + return "%s (data: %s)" % (self.message, self.data) + + +class KnotCtlType(IntEnum): + """Libknot server control data unit types.""" + + END = 0 + DATA = 1 + EXTRA = 2 + BLOCK = 3 + + +class KnotCtlDataIdx(IntEnum): + """Libknot server control data unit indices.""" + + COMMAND = 0 + FLAGS = 1 + ERROR = 2 + SECTION = 3 + ITEM = 4 + ID = 5 + ZONE = 6 + OWNER = 7 + TTL = 8 + TYPE = 9 + DATA = 10 + FILTER = 11 + + +class KnotCtlData(object): + """Libknot server control data unit.""" + + DataArray = c_char_p * len(KnotCtlDataIdx) + + def __init__(self): + self.data = self.DataArray() + + def __str__(self): + string = str() + + for idx in KnotCtlDataIdx: + if self.data[idx]: + if string: + string += ", " + string += "%s = %s" % (idx.name, self.data[idx]) + + return string + + def __getitem__(self, index): + """Data unit item getter. + + @type index: KnotCtlDataIdx + @rtype: str + """ + + value = self.data[index] + if not value: + value = str() + return value if isinstance(value, str) else value.decode() + + def __setitem__(self, index, value): + """Data unit item setter. + + @type index: KnotCtlDataIdx + @type value: str + """ + + self.data[index] = c_char_p(value.encode()) if value else c_char_p() + +class KnotCtl(object): + """Libknot server control interface.""" + + def __init__(self): + if not CTL_ALLOC: + load_lib() + self.obj = CTL_ALLOC() + + def __del__(self): + CTL_FREE(self.obj) + + def set_timeout(self, timeout): + """Sets control socket operations timeout in seconds. + + @type timeout: int + """ + + CTL_SET_TIMEOUT(self.obj, timeout * 1000) + + def connect(self, path): + """Connect to a specified control UNIX socket. + + @type path: str + """ + + ret = CTL_CONNECT(self.obj, path.encode()) + if ret != 0: + err = CTL_ERROR(ret) + raise KnotCtlError(err if isinstance(err, str) else err.decode()) + + def close(self): + """Disconnects from the current control socket.""" + + CTL_CLOSE(self.obj) + + def send(self, data_type, data=None): + """Sends a data unit to the connected control socket. + + @type data_type: KnotCtlType + @type data: KnotCtlData + """ + + ret = CTL_SEND(self.obj, data_type, + data.data if data else c_char_p()) + if ret != 0: + err = CTL_ERROR(ret) + raise KnotCtlError(err if isinstance(err, str) else err.decode()) + + def receive(self, data=None): + """Receives a data unit from the connected control socket. + + @type data: KnotCtlData + @rtype: KnotCtlType + """ + + data_type = c_uint() + ret = CTL_RECEIVE(self.obj, byref(data_type), + data.data if data else c_char_p()) + if ret != 0: + err = CTL_ERROR(ret) + raise KnotCtlError(err if isinstance(err, str) else err.decode()) + return KnotCtlType(data_type.value) + + def send_block(self, cmd, section=None, item=None, identifier=None, zone=None, + owner=None, ttl=None, rtype=None, data=None, flags=None, + filter=None): + """Sends a control query block. + + @type cmd: str + @type section: str + @type item: str + @type identifier: str + @type zone: str + @type owner: str + @type ttl: str + @type rtype: str + @type data: str + @type filter: str + """ + + query = KnotCtlData() + query[KnotCtlDataIdx.COMMAND] = cmd + query[KnotCtlDataIdx.SECTION] = section + query[KnotCtlDataIdx.ITEM] = item + query[KnotCtlDataIdx.ID] = identifier + query[KnotCtlDataIdx.ZONE] = zone + query[KnotCtlDataIdx.OWNER] = owner + query[KnotCtlDataIdx.TTL] = ttl + query[KnotCtlDataIdx.TYPE] = rtype + query[KnotCtlDataIdx.DATA] = data + query[KnotCtlDataIdx.FLAGS] = flags + query[KnotCtlDataIdx.FILTER] = filter + + self.send(KnotCtlType.DATA, query) + self.send(KnotCtlType.BLOCK) + + def _receive_conf(self, out, reply): + + section = reply[KnotCtlDataIdx.SECTION] + ident = reply[KnotCtlDataIdx.ID] + item = reply[KnotCtlDataIdx.ITEM] + data = reply[KnotCtlDataIdx.DATA] + + # Add the section if not exists. + if section not in out: + out[section] = dict() + + # Add the identifier if not exists. + if ident and ident not in out[section]: + out[section][ident] = dict() + + # Return if no item/value. + if not item: + return + + item_level = out[section][ident] if ident else out[section] + + # Treat alone identifier item differently. + if item in ["id", "domain", "target"]: + if data not in out[section]: + out[section][data] = dict() + else: + if item not in item_level: + item_level[item] = list() + + if data: + item_level[item].append(data) + + def _receive_zone_status(self, out, reply): + + zone = reply[KnotCtlDataIdx.ZONE] + rtype = reply[KnotCtlDataIdx.TYPE] + data = reply[KnotCtlDataIdx.DATA] + + # Add the zone if not exists. + if zone not in out: + out[zone] = dict() + + out[zone][rtype] = data + + def _receive_zone(self, out, reply): + + zone = reply[KnotCtlDataIdx.ZONE] + owner = reply[KnotCtlDataIdx.OWNER] + ttl = reply[KnotCtlDataIdx.TTL] + rtype = reply[KnotCtlDataIdx.TYPE] + data = reply[KnotCtlDataIdx.DATA] + + # Add the zone if not exists. + if zone not in out: + out[zone] = dict() + + if owner not in out[zone]: + out[zone][owner] = dict() + + if rtype not in out[zone][owner]: + out[zone][owner][rtype] = dict() + + # Add the key/value. + out[zone][owner][rtype]["ttl"] = ttl + + if not "data" in out[zone][owner][rtype]: + out[zone][owner][rtype]["data"] = [data] + else: + out[zone][owner][rtype]["data"].append(data) + + def _receive_stats(self, out, reply): + + zone = reply[KnotCtlDataIdx.ZONE] + section = reply[KnotCtlDataIdx.SECTION] + item = reply[KnotCtlDataIdx.ITEM] + idx = reply[KnotCtlDataIdx.ID] + data = int(reply[KnotCtlDataIdx.DATA]) + + # Add the zone if not exists. + if zone: + if "zone" not in out: + out["zone"] = dict() + + if zone not in out["zone"]: + out["zone"][zone] = dict() + + section_level = out["zone"][zone] if zone else out + + if section not in section_level: + section_level[section] = dict() + + if idx: + if item not in section_level[section]: + section_level[section][item] = dict() + + section_level[section][item][idx] = data + else: + section_level[section][item] = data + + def receive_stats(self): + """Receives statistics answer and returns it as a structured dictionary. + + @rtype: dict + """ + + out = dict() + err_reply = None + + while True: + reply = KnotCtlData() + reply_type = self.receive(reply) + + # Stop if not data type. + if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]: + break + + # Check for an error. + if reply[KnotCtlDataIdx.ERROR]: + err_reply = reply + continue + + self._receive_stats(out, reply) + + if err_reply: + raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply) + + return out + + def receive_block(self): + """Receives a control answer and returns it as a structured dictionary. + + @rtype: dict + """ + + out = dict() + err_reply = None + + while True: + reply = KnotCtlData() + reply_type = self.receive(reply) + + # Stop if not data type. + if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]: + break + + # Check for an error. + if reply[KnotCtlDataIdx.ERROR]: + err_reply = reply + continue + + # Check for config data. + if reply[KnotCtlDataIdx.SECTION]: + self._receive_conf(out, reply) + # Check for zone data. + elif reply[KnotCtlDataIdx.ZONE]: + if reply[KnotCtlDataIdx.OWNER]: + self._receive_zone(out, reply) + else: + self._receive_zone_status(out, reply) + else: + continue + + if err_reply: + raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply) + + return out |