summaryrefslogtreecommitdiffstats
path: root/python/libknot/control.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/libknot/control.py')
-rw-r--r--python/libknot/control.py441
1 files changed, 441 insertions, 0 deletions
diff --git a/python/libknot/control.py b/python/libknot/control.py
new file mode 100644
index 0000000..ced1be6
--- /dev/null
+++ b/python/libknot/control.py
@@ -0,0 +1,441 @@
+"""Libknot server control interface wrapper.
+
+Example:
+ import json
+ from libknot.control import *
+
+ #load_lib("/usr/lib/libknot.so")
+
+ ctl = KnotCtl()
+ ctl.connect("/var/run/knot/knot.sock")
+
+ try:
+ ctl.send_block(cmd="conf-begin")
+ resp = ctl.receive_block()
+
+ ctl.send_block(cmd="conf-set", section="zone", item="domain", data="test")
+ resp = ctl.receive_block()
+
+ ctl.send_block(cmd="conf-commit")
+ resp = ctl.receive_block()
+
+ ctl.send_block(cmd="conf-read", section="zone", item="domain")
+ resp = ctl.receive_block()
+ print(json.dumps(resp, indent=4))
+ finally:
+ ctl.send(KnotCtlType.END)
+ ctl.close()
+"""
+
+from ctypes import cdll, c_void_p, c_int, c_char_p, c_uint, byref
+from enum import IntEnum
+import sys
+
+CTL_ALLOC = None
+CTL_FREE = None
+CTL_SET_TIMEOUT = None
+CTL_CONNECT = None
+CTL_CLOSE = None
+CTL_SEND = None
+CTL_RECEIVE = None
+CTL_ERROR = None
+
+
+def load_lib(path=None):
+ """Loads the libknot library."""
+
+ if path is None:
+ version = ""
+ try:
+ from libknot import LIBKNOT_VERSION
+ version = ".%u" % int(LIBKNOT_VERSION)
+ except:
+ pass
+
+ if sys.platform == "darwin":
+ path = "libknot%s.dylib" % version
+ else:
+ path = "libknot.so%s" % version
+ LIB = cdll.LoadLibrary(path)
+
+ global CTL_ALLOC
+ CTL_ALLOC = LIB.knot_ctl_alloc
+ CTL_ALLOC.restype = c_void_p
+
+ global CTL_FREE
+ CTL_FREE = LIB.knot_ctl_free
+ CTL_FREE.argtypes = [c_void_p]
+
+ global CTL_SET_TIMEOUT
+ CTL_SET_TIMEOUT = LIB.knot_ctl_set_timeout
+ CTL_SET_TIMEOUT.argtypes = [c_void_p, c_int]
+
+ global CTL_CONNECT
+ CTL_CONNECT = LIB.knot_ctl_connect
+ CTL_CONNECT.restype = c_int
+ CTL_CONNECT.argtypes = [c_void_p, c_char_p]
+
+ global CTL_CLOSE
+ CTL_CLOSE = LIB.knot_ctl_close
+ CTL_CLOSE.argtypes = [c_void_p]
+
+ global CTL_SEND
+ CTL_SEND = LIB.knot_ctl_send
+ CTL_SEND.restype = c_int
+ CTL_SEND.argtypes = [c_void_p, c_uint, c_void_p]
+
+ global CTL_RECEIVE
+ CTL_RECEIVE = LIB.knot_ctl_receive
+ CTL_RECEIVE.restype = c_int
+ CTL_RECEIVE.argtypes = [c_void_p, c_void_p, c_void_p]
+
+ global CTL_ERROR
+ CTL_ERROR = LIB.knot_strerror
+ CTL_ERROR.restype = c_char_p
+ CTL_ERROR.argtypes = [c_int]
+
+
+class KnotCtlError(Exception):
+ """Libknot server control error."""
+
+ def __init__(self, message, data=None):
+ """
+ @type message: str
+ @type data: KnotCtlData
+ """
+
+ self.message = message
+ self.data = data
+
+ def __str__(self):
+ return "%s (data: %s)" % (self.message, self.data)
+
+
+class KnotCtlType(IntEnum):
+ """Libknot server control data unit types."""
+
+ END = 0
+ DATA = 1
+ EXTRA = 2
+ BLOCK = 3
+
+
+class KnotCtlDataIdx(IntEnum):
+ """Libknot server control data unit indices."""
+
+ COMMAND = 0
+ FLAGS = 1
+ ERROR = 2
+ SECTION = 3
+ ITEM = 4
+ ID = 5
+ ZONE = 6
+ OWNER = 7
+ TTL = 8
+ TYPE = 9
+ DATA = 10
+ FILTER = 11
+
+
+class KnotCtlData(object):
+ """Libknot server control data unit."""
+
+ DataArray = c_char_p * len(KnotCtlDataIdx)
+
+ def __init__(self):
+ self.data = self.DataArray()
+
+ def __str__(self):
+ string = str()
+
+ for idx in KnotCtlDataIdx:
+ if self.data[idx]:
+ if string:
+ string += ", "
+ string += "%s = %s" % (idx.name, self.data[idx])
+
+ return string
+
+ def __getitem__(self, index):
+ """Data unit item getter.
+
+ @type index: KnotCtlDataIdx
+ @rtype: str
+ """
+
+ value = self.data[index]
+ if not value:
+ value = str()
+ return value if isinstance(value, str) else value.decode()
+
+ def __setitem__(self, index, value):
+ """Data unit item setter.
+
+ @type index: KnotCtlDataIdx
+ @type value: str
+ """
+
+ self.data[index] = c_char_p(value.encode()) if value else c_char_p()
+
+class KnotCtl(object):
+ """Libknot server control interface."""
+
+ def __init__(self):
+ if not CTL_ALLOC:
+ load_lib()
+ self.obj = CTL_ALLOC()
+
+ def __del__(self):
+ CTL_FREE(self.obj)
+
+ def set_timeout(self, timeout):
+ """Sets control socket operations timeout in seconds.
+
+ @type timeout: int
+ """
+
+ CTL_SET_TIMEOUT(self.obj, timeout * 1000)
+
+ def connect(self, path):
+ """Connect to a specified control UNIX socket.
+
+ @type path: str
+ """
+
+ ret = CTL_CONNECT(self.obj, path.encode())
+ if ret != 0:
+ err = CTL_ERROR(ret)
+ raise KnotCtlError(err if isinstance(err, str) else err.decode())
+
+ def close(self):
+ """Disconnects from the current control socket."""
+
+ CTL_CLOSE(self.obj)
+
+ def send(self, data_type, data=None):
+ """Sends a data unit to the connected control socket.
+
+ @type data_type: KnotCtlType
+ @type data: KnotCtlData
+ """
+
+ ret = CTL_SEND(self.obj, data_type,
+ data.data if data else c_char_p())
+ if ret != 0:
+ err = CTL_ERROR(ret)
+ raise KnotCtlError(err if isinstance(err, str) else err.decode())
+
+ def receive(self, data=None):
+ """Receives a data unit from the connected control socket.
+
+ @type data: KnotCtlData
+ @rtype: KnotCtlType
+ """
+
+ data_type = c_uint()
+ ret = CTL_RECEIVE(self.obj, byref(data_type),
+ data.data if data else c_char_p())
+ if ret != 0:
+ err = CTL_ERROR(ret)
+ raise KnotCtlError(err if isinstance(err, str) else err.decode())
+ return KnotCtlType(data_type.value)
+
+ def send_block(self, cmd, section=None, item=None, identifier=None, zone=None,
+ owner=None, ttl=None, rtype=None, data=None, flags=None,
+ filter=None):
+ """Sends a control query block.
+
+ @type cmd: str
+ @type section: str
+ @type item: str
+ @type identifier: str
+ @type zone: str
+ @type owner: str
+ @type ttl: str
+ @type rtype: str
+ @type data: str
+ @type filter: str
+ """
+
+ query = KnotCtlData()
+ query[KnotCtlDataIdx.COMMAND] = cmd
+ query[KnotCtlDataIdx.SECTION] = section
+ query[KnotCtlDataIdx.ITEM] = item
+ query[KnotCtlDataIdx.ID] = identifier
+ query[KnotCtlDataIdx.ZONE] = zone
+ query[KnotCtlDataIdx.OWNER] = owner
+ query[KnotCtlDataIdx.TTL] = ttl
+ query[KnotCtlDataIdx.TYPE] = rtype
+ query[KnotCtlDataIdx.DATA] = data
+ query[KnotCtlDataIdx.FLAGS] = flags
+ query[KnotCtlDataIdx.FILTER] = filter
+
+ self.send(KnotCtlType.DATA, query)
+ self.send(KnotCtlType.BLOCK)
+
+ def _receive_conf(self, out, reply):
+
+ section = reply[KnotCtlDataIdx.SECTION]
+ ident = reply[KnotCtlDataIdx.ID]
+ item = reply[KnotCtlDataIdx.ITEM]
+ data = reply[KnotCtlDataIdx.DATA]
+
+ # Add the section if not exists.
+ if section not in out:
+ out[section] = dict()
+
+ # Add the identifier if not exists.
+ if ident and ident not in out[section]:
+ out[section][ident] = dict()
+
+ # Return if no item/value.
+ if not item:
+ return
+
+ item_level = out[section][ident] if ident else out[section]
+
+ # Treat alone identifier item differently.
+ if item in ["id", "domain", "target"]:
+ if data not in out[section]:
+ out[section][data] = dict()
+ else:
+ if item not in item_level:
+ item_level[item] = list()
+
+ if data:
+ item_level[item].append(data)
+
+ def _receive_zone_status(self, out, reply):
+
+ zone = reply[KnotCtlDataIdx.ZONE]
+ rtype = reply[KnotCtlDataIdx.TYPE]
+ data = reply[KnotCtlDataIdx.DATA]
+
+ # Add the zone if not exists.
+ if zone not in out:
+ out[zone] = dict()
+
+ out[zone][rtype] = data
+
+ def _receive_zone(self, out, reply):
+
+ zone = reply[KnotCtlDataIdx.ZONE]
+ owner = reply[KnotCtlDataIdx.OWNER]
+ ttl = reply[KnotCtlDataIdx.TTL]
+ rtype = reply[KnotCtlDataIdx.TYPE]
+ data = reply[KnotCtlDataIdx.DATA]
+
+ # Add the zone if not exists.
+ if zone not in out:
+ out[zone] = dict()
+
+ if owner not in out[zone]:
+ out[zone][owner] = dict()
+
+ if rtype not in out[zone][owner]:
+ out[zone][owner][rtype] = dict()
+
+ # Add the key/value.
+ out[zone][owner][rtype]["ttl"] = ttl
+
+ if not "data" in out[zone][owner][rtype]:
+ out[zone][owner][rtype]["data"] = [data]
+ else:
+ out[zone][owner][rtype]["data"].append(data)
+
+ def _receive_stats(self, out, reply):
+
+ zone = reply[KnotCtlDataIdx.ZONE]
+ section = reply[KnotCtlDataIdx.SECTION]
+ item = reply[KnotCtlDataIdx.ITEM]
+ idx = reply[KnotCtlDataIdx.ID]
+ data = int(reply[KnotCtlDataIdx.DATA])
+
+ # Add the zone if not exists.
+ if zone:
+ if "zone" not in out:
+ out["zone"] = dict()
+
+ if zone not in out["zone"]:
+ out["zone"][zone] = dict()
+
+ section_level = out["zone"][zone] if zone else out
+
+ if section not in section_level:
+ section_level[section] = dict()
+
+ if idx:
+ if item not in section_level[section]:
+ section_level[section][item] = dict()
+
+ section_level[section][item][idx] = data
+ else:
+ section_level[section][item] = data
+
+ def receive_stats(self):
+ """Receives statistics answer and returns it as a structured dictionary.
+
+ @rtype: dict
+ """
+
+ out = dict()
+ err_reply = None
+
+ while True:
+ reply = KnotCtlData()
+ reply_type = self.receive(reply)
+
+ # Stop if not data type.
+ if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
+ break
+
+ # Check for an error.
+ if reply[KnotCtlDataIdx.ERROR]:
+ err_reply = reply
+ continue
+
+ self._receive_stats(out, reply)
+
+ if err_reply:
+ raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply)
+
+ return out
+
+ def receive_block(self):
+ """Receives a control answer and returns it as a structured dictionary.
+
+ @rtype: dict
+ """
+
+ out = dict()
+ err_reply = None
+
+ while True:
+ reply = KnotCtlData()
+ reply_type = self.receive(reply)
+
+ # Stop if not data type.
+ if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
+ break
+
+ # Check for an error.
+ if reply[KnotCtlDataIdx.ERROR]:
+ err_reply = reply
+ continue
+
+ # Check for config data.
+ if reply[KnotCtlDataIdx.SECTION]:
+ self._receive_conf(out, reply)
+ # Check for zone data.
+ elif reply[KnotCtlDataIdx.ZONE]:
+ if reply[KnotCtlDataIdx.OWNER]:
+ self._receive_zone(out, reply)
+ else:
+ self._receive_zone_status(out, reply)
+ else:
+ continue
+
+ if err_reply:
+ raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply)
+
+ return out