summaryrefslogtreecommitdiffstats
path: root/dbparse.py
diff options
context:
space:
mode:
Diffstat (limited to 'dbparse.py')
-rwxr-xr-xdbparse.py515
1 files changed, 515 insertions, 0 deletions
diff --git a/dbparse.py b/dbparse.py
new file mode 100755
index 0000000..5f7e082
--- /dev/null
+++ b/dbparse.py
@@ -0,0 +1,515 @@
+#!/usr/bin/env python
+
+from functools import total_ordering
+import sys, math
+from math import ceil, log
+from collections import defaultdict, OrderedDict
+
+# must match <linux/nl80211.h> enum nl80211_reg_rule_flags
+
+flag_definitions = {
+ 'NO-OFDM': 1<<0,
+ 'NO-CCK': 1<<1,
+ 'NO-INDOOR': 1<<2,
+ 'NO-OUTDOOR': 1<<3,
+ 'DFS': 1<<4,
+ 'PTP-ONLY': 1<<5,
+ 'PTMP-ONLY': 1<<6,
+ 'NO-IR': 1<<7,
+ # hole at bit 8
+ # hole at bit 9. FIXME: Where is NO-HT40 defined?
+ 'NO-HT40': 1<<10,
+ 'AUTO-BW': 1<<11,
+}
+
+dfs_regions = {
+ 'DFS-FCC': 1,
+ 'DFS-ETSI': 2,
+ 'DFS-JP': 3,
+}
+
+@total_ordering
+
+class WmmRule(object):
+
+ def __init__(self, vo_c, vi_c, be_c, bk_c, vo_ap, vi_ap, be_ap, bk_ap):
+ self.vo_c = vo_c
+ self.vi_c = vi_c
+ self.be_c = be_c
+ self.bk_c = bk_c
+ self.vo_ap = vo_ap
+ self.vi_ap = vi_ap
+ self.be_ap = be_ap
+ self.bk_ap = bk_ap
+
+ def _as_tuple(self):
+ return (self.vo_c, self.vi_c, self.be_c, self.bk_c,
+ self.vo_ap, self.vi_ap, self.be_ap, self.bk_ap)
+
+ def __eq__(self, other):
+ if other is None:
+ return False
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ if other is None:
+ return False
+ return (self._as_tuple() < other._as_tuple())
+
+ def __hash__(self):
+ return hash(self._as_tuple())
+
+class FreqBand(object):
+ def __init__(self, start, end, bw, comments=None):
+ self.start = start
+ self.end = end
+ self.maxbw = bw
+ self.comments = comments or []
+
+ def _as_tuple(self):
+ return (self.start, self.end, self.maxbw)
+
+ def __eq__(self, other):
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ return (self._as_tuple() < other._as_tuple())
+
+ def __hash__(self):
+ return hash(self._as_tuple())
+
+ def __str__(self):
+ return '<FreqBand %.3f - %.3f @ %.3f>' % (
+ self.start, self.end, self.maxbw)
+
+@total_ordering
+class PowerRestriction(object):
+ def __init__(self, max_ant_gain, max_eirp, comments = None):
+ self.max_ant_gain = max_ant_gain
+ self.max_eirp = max_eirp
+ self.comments = comments or []
+
+ def _as_tuple(self):
+ return (self.max_ant_gain, self.max_eirp)
+
+ def __eq__(self, other):
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ return (self._as_tuple() < other._as_tuple())
+
+ def __hash__(self):
+ return hash(self._as_tuple())
+
+ def __str__(self):
+ return '<PowerRestriction ...>'
+
+class DFSRegionError(Exception):
+ def __init__(self, dfs_region):
+ self.dfs_region = dfs_region
+
+class FlagError(Exception):
+ def __init__(self, flag):
+ self.flag = flag
+
+@total_ordering
+class Permission(object):
+ def __init__(self, freqband, power, flags, wmmrule):
+ assert isinstance(freqband, FreqBand)
+ assert isinstance(power, PowerRestriction)
+ assert isinstance(wmmrule, WmmRule) or wmmrule is None
+ self.freqband = freqband
+ self.power = power
+ self.wmmrule = wmmrule
+ self.flags = 0
+ for flag in flags:
+ if not flag in flag_definitions:
+ raise FlagError(flag)
+ self.flags |= flag_definitions[flag]
+ self.textflags = flags
+
+ def _as_tuple(self):
+ return (self.freqband, self.power, self.flags, self.wmmrule)
+
+ def __eq__(self, other):
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ return (self._as_tuple() < other._as_tuple())
+
+ def __hash__(self):
+ return hash(self._as_tuple())
+
+ def __str__(self):
+ return str(self.freqband) + str(self.power) + str(self.wmmrule)
+
+class Country(object):
+ def __init__(self, dfs_region, permissions=None, comments=None):
+ self._permissions = permissions or []
+ self.comments = comments or []
+ self.dfs_region = 0
+
+ if dfs_region:
+ if not dfs_region in dfs_regions:
+ raise DFSRegionError(dfs_region)
+ self.dfs_region = dfs_regions[dfs_region]
+
+ def add(self, perm):
+ assert isinstance(perm, Permission)
+ self._permissions.append(perm)
+ self._permissions.sort()
+
+ def __contains__(self, perm):
+ assert isinstance(perm, Permission)
+ return perm in self._permissions
+
+ def __str__(self):
+ r = ['(%s, %s)' % (str(b), str(p)) for b, p in self._permissions]
+ return '<Country (%s)>' % (', '.join(r))
+
+ def _get_permissions_tuple(self):
+ return tuple(self._permissions)
+ permissions = property(_get_permissions_tuple)
+
+class SyntaxError(Exception):
+ pass
+
+class DBParser(object):
+ def __init__(self, warn=None):
+ self._warn_callout = warn or sys.stderr.write
+
+ def _syntax_error(self, txt=None):
+ txt = txt and ' (%s)' % txt or ''
+ raise SyntaxError("Syntax error in line %d%s" % (self._lineno, txt))
+
+ def _warn(self, txt):
+ self._warn_callout("Warning (line %d): %s\n" % (self._lineno, txt))
+
+ def _parse_band_def(self, bname, banddef, dupwarn=True):
+ try:
+ freqs, bw = banddef.split('@')
+ bw = float(bw)
+ except ValueError:
+ bw = 20.0
+
+ try:
+ start, end = freqs.split('-')
+ start = float(start)
+ end = float(end)
+ # The kernel will reject these, so might as well reject this
+ # upon building it.
+ if start <= 0:
+ self._syntax_error("Invalid start freq (%d)" % start)
+ if end <= 0:
+ self._syntax_error("Invalid end freq (%d)" % end)
+ if start > end:
+ self._syntax_error("Inverted freq range (%d - %d)" % (start, end))
+ if start == end:
+ self._syntax_error("Start and end freqs are equal (%d)" % start)
+ except ValueError:
+ self._syntax_error("band must have frequency range")
+
+ b = FreqBand(start, end, bw, comments=self._comments)
+ self._comments = []
+ self._banddup[bname] = bname
+ if b in self._bandrev:
+ if dupwarn:
+ self._warn('Duplicate band definition ("%s" and "%s")' % (
+ bname, self._bandrev[b]))
+ self._banddup[bname] = self._bandrev[b]
+ self._bands[bname] = b
+ self._bandrev[b] = bname
+ self._bandline[bname] = self._lineno
+
+ def _parse_band(self, line):
+ try:
+ bname, line = line.split(':', 1)
+ if not bname:
+ self._syntax_error("'band' keyword must be followed by name")
+ except ValueError:
+ self._syntax_error("band name must be followed by colon")
+
+ if bname in flag_definitions:
+ self._syntax_error("Invalid band name")
+
+ self._parse_band_def(bname, line)
+
+ def _parse_power(self, line):
+ try:
+ pname, line = line.split(':', 1)
+ if not pname:
+ self._syntax_error("'power' keyword must be followed by name")
+ except ValueError:
+ self._syntax_error("power name must be followed by colon")
+
+ if pname in flag_definitions:
+ self._syntax_error("Invalid power name")
+
+ self._parse_power_def(pname, line)
+
+ def _parse_power_def(self, pname, line, dupwarn=True):
+ try:
+ max_eirp = line
+ if max_eirp == 'N/A':
+ max_eirp = '0'
+ max_ant_gain = float(0)
+ def conv_pwr(pwr):
+ if pwr.endswith('mW'):
+ pwr = float(pwr[:-2])
+ return 10.0 * math.log10(pwr)
+ else:
+ return float(pwr)
+ max_eirp = conv_pwr(max_eirp)
+ except ValueError:
+ self._syntax_error("invalid power data")
+
+ p = PowerRestriction(max_ant_gain, max_eirp,
+ comments=self._comments)
+ self._comments = []
+ self._powerdup[pname] = pname
+ if p in self._powerrev:
+ if dupwarn:
+ self._warn('Duplicate power definition ("%s" and "%s")' % (
+ pname, self._powerrev[p]))
+ self._powerdup[pname] = self._powerrev[p]
+ self._power[pname] = p
+ self._powerrev[p] = pname
+ self._powerline[pname] = self._lineno
+
+ def _parse_wmmrule(self, line):
+ regions = line[:-1].strip()
+ if not regions:
+ self._syntax_error("'wmmrule' keyword must be followed by region")
+
+ regions = regions.split(',')
+
+ self._current_regions = {}
+ for region in regions:
+ if region in self._wmm_rules:
+ self._warn("region %s was added already to wmm rules" % region)
+ self._current_regions[region] = 1
+ self._comments = []
+
+ def _validate_input(self, cw_min, cw_max, aifsn, cot):
+ if cw_min < 1:
+ self._syntax_error("Invalid cw_min value (%d)" % cw_min)
+ if cw_max < 1:
+ self._syntax_error("Invalid cw_max value (%d)" % cw_max)
+ if cw_min > cw_max:
+ self._syntax_error("Inverted contention window (%d - %d)" %
+ (cw_min, cw_max))
+ if not (bin(cw_min + 1).count('1') == 1 and cw_min < 2**15):
+ self._syntax_error("Invalid cw_min value should be power of 2 - 1 (%d)"
+ % cw_min)
+ if not (bin(cw_max + 1).count('1') == 1 and cw_max < 2**15):
+ self._syntax_error("Invalid cw_max value should be power of 2 - 1 (%d)"
+ % cw_max)
+ if aifsn < 1:
+ self._syntax_error("Invalid aifsn value (%d)" % aifsn)
+ if cot < 0:
+ self._syntax_error("Invalid cot value (%d)" % cot)
+
+
+ def _validate_size(self, var, bytcnt):
+ return bytcnt < ceil(len(bin(var)[2:]) / 8.0)
+
+ def _parse_wmmrule_item(self, line):
+ bytcnt = (2.0, 2.0, 1.0, 2.0)
+ try:
+ ac, cval = line.split(':')
+ if not ac:
+ self._syntax_error("wmm item must have ac prefix")
+ except ValueError:
+ self._syntax_error("access category must be followed by colon")
+ p = tuple([int(v.split('=', 1)[1]) for v in cval.split(',')])
+ self._validate_input(*p)
+ for v, b in zip(p, bytcnt):
+ if self._validate_size(v, b):
+ self._syntax_error("unexpected input size expect %d got %d"
+ % (b, v))
+
+ for r in self._current_regions:
+ self._wmm_rules[r][ac] = p
+
+ def _parse_country(self, line):
+ try:
+ cname, cvals= line.split(':', 1)
+ dfs_region = cvals.strip()
+ if not cname:
+ self._syntax_error("'country' keyword must be followed by name")
+ except ValueError:
+ self._syntax_error("country name must be followed by colon")
+
+ cnames = cname.split(',')
+
+ self._current_countries = {}
+ for cname in cnames:
+ if len(cname) != 2:
+ self._warn("country '%s' not alpha2" % cname)
+ cname = cname.encode('ascii')
+ if not cname in self._countries:
+ self._countries[cname] = Country(dfs_region, comments=self._comments)
+ self._current_countries[cname] = self._countries[cname]
+ self._comments = []
+
+ def _parse_country_item(self, line):
+ if line[0] == '(':
+ try:
+ band, line = line[1:].split('),', 1)
+ bname = 'UNNAMED %d' % self._lineno
+ self._parse_band_def(bname, band, dupwarn=False)
+ except:
+ self._syntax_error("Badly parenthesised band definition")
+ else:
+ try:
+ bname, line = line.split(',', 1)
+ if not bname:
+ self._syntax_error("country definition must have band")
+ if not line:
+ self._syntax_error("country definition must have power")
+ except ValueError:
+ self._syntax_error("country definition must have band and power")
+
+ if line[0] == '(':
+ items = line.split('),', 1)
+ if len(items) == 1:
+ pname = items[0]
+ line = ''
+ if not pname[-1] == ')':
+ self._syntax_error("Badly parenthesised power definition")
+ pname = pname[:-1]
+ flags = []
+ else:
+ pname = items[0]
+ flags = items[1].split(',')
+ power = pname[1:]
+ pname = 'UNNAMED %d' % self._lineno
+ self._parse_power_def(pname, power, dupwarn=False)
+ else:
+ line = line.split(',')
+ pname = line[0]
+ flags = line[1:]
+ w = None
+ if flags and 'wmmrule' in flags[-1]:
+ try:
+ region = flags.pop().split('=', 1)[1]
+ if region not in self._wmm_rules.keys():
+ self._syntax_error("No wmm rule for %s" % region)
+ except IndexError:
+ self._syntax_error("flags is empty list or no region was found")
+ w = WmmRule(*self._wmm_rules[region].values())
+
+ if not bname in self._bands:
+ self._syntax_error("band does not exist")
+ if not pname in self._power:
+ self._syntax_error("power does not exist")
+ self._bands_used[bname] = True
+ self._power_used[pname] = True
+ # de-duplicate so binary database is more compact
+ bname = self._banddup[bname]
+ pname = self._powerdup[pname]
+ b = self._bands[bname]
+ p = self._power[pname]
+ try:
+ perm = Permission(b, p, flags, w)
+ except FlagError as e:
+ self._syntax_error("Invalid flag '%s'" % e.flag)
+ for cname, c in self._current_countries.items():
+ if perm in c:
+ self._warn('Rule "%s, %s" added to "%s" twice' % (
+ bname, pname, cname))
+ else:
+ c.add(perm)
+
+ def parse(self, f):
+ self._current_countries = None
+ self._current_regions = None
+ self._bands = {}
+ self._power = {}
+ self._countries = {}
+ self._bands_used = {}
+ self._power_used = {}
+ self._bandrev = {}
+ self._powerrev = {}
+ self._banddup = {}
+ self._powerdup = {}
+ self._bandline = {}
+ self._powerline = {}
+ self._wmm_rules = defaultdict(lambda: OrderedDict())
+
+ self._comments = []
+
+ self._lineno = 0
+ for line in f:
+ self._lineno += 1
+ line = line.strip()
+ if line[0:1] == '#':
+ self._comments.append(line[1:].strip())
+ line = line.replace(' ', '').replace('\t', '')
+ if not line:
+ self._current_regions = None
+ self._comments = []
+ line = line.split('#')[0]
+ if not line:
+ continue
+ if line[0:4] == 'band':
+ self._parse_band(line[4:])
+ self._current_countries = None
+ self._current_regions = None
+ self._comments = []
+ elif line[0:5] == 'power':
+ self._parse_power(line[5:])
+ self._current_countries = None
+ self._current_regions = None
+ self._comments = []
+ elif line[0:7] == 'country':
+ self._parse_country(line[7:])
+ self._comments = []
+ self._current_regions = None
+ elif self._current_countries is not None:
+ self._current_regions = None
+ self._parse_country_item(line)
+ self._comments = []
+ elif line[0:7] == 'wmmrule':
+ self._parse_wmmrule(line[7:])
+ self._current_countries = None
+ self._comments = []
+ elif self._current_regions is not None:
+ self._parse_wmmrule_item(line)
+ self._current_countries = None
+ self._comments = []
+ else:
+ self._syntax_error("Expected band, power or country definition")
+
+ countries = self._countries
+ bands = {}
+ for k, v in self._bands.items():
+ if k in self._bands_used:
+ bands[self._banddup[k]] = v
+ continue
+ # we de-duplicated, but don't warn again about the dupes
+ if self._banddup[k] == k:
+ self._lineno = self._bandline[k]
+ self._warn('Unused band definition "%s"' % k)
+ power = {}
+ for k, v in self._power.items():
+ if k in self._power_used:
+ power[self._powerdup[k]] = v
+ continue
+ # we de-duplicated, but don't warn again about the dupes
+ if self._powerdup[k] == k:
+ self._lineno = self._powerline[k]
+ self._warn('Unused power definition "%s"' % k)
+ return countries