1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
#!/usr/bin/python3
# Copyright (C) 2017
import collections.abc
import logging
import os # requires posix
from augeas import Augeas
AUGEAS_LOAD_PATH = '/augeas/load/'
AUGEAS_FILES_PATH = '/files/'
AUGEAS_ERROR_PATH = '//error'
log = logging.getLogger('augeas')
def join(*paths):
"""
join two Augeas tree paths
FIXME: Beware: // is normalized to /
"""
norm_paths = [os.path.normpath(path) for path in paths]
# first path must be absolute
assert norm_paths[0][0] == '/'
new_paths = [norm_paths[0]]
# relativize all other paths so join works as expected
for path in norm_paths[1:]:
if path.startswith('/'):
path = path[1:]
new_paths.append(path)
new_path = os.path.join(*new_paths)
log.debug("join: new_path %s", new_path)
return os.path.normpath(new_path)
class AugeasWrapper:
"""python-augeas higher-level wrapper.
Load single augeas lens and configuration file.
Exposes configuration file as AugeasNode object with dict-like interface.
AugeasWrapper can be used in with statement in the same way as file does.
"""
def __init__(self, confpath, lens, root=None, loadpath=None,
flags=Augeas.NO_MODL_AUTOLOAD | Augeas.NO_LOAD | Augeas.ENABLE_SPAN):
"""Parse configuration file using given lens.
Params:
confpath (str): Absolute path to the configuration file
lens (str): Name of module containing Augeas lens
root: passed down to original Augeas
flags: passed down to original Augeas
loadpath: passed down to original Augeas
flags: passed down to original Augeas
"""
log.debug('loadpath: %s', loadpath)
log.debug('confpath: %s', confpath)
self._aug = Augeas(root=root, loadpath=loadpath, flags=flags)
# /augeas/load/{lens}
aug_load_path = join(AUGEAS_LOAD_PATH, lens)
# /augeas/load/{lens}/lens = {lens}.lns
self._aug.set(join(aug_load_path, 'lens'), '%s.lns' % lens)
# /augeas/load/{lens}/incl[0] = {confpath}
self._aug.set(join(aug_load_path, 'incl[0]'), confpath)
self._aug.load()
errors = self._aug.match(AUGEAS_ERROR_PATH)
if errors:
err_msg = '\n'.join(
["{}: {}".format(e, self._aug.get(e)) for e in errors]
)
raise RuntimeError(err_msg)
path = join(AUGEAS_FILES_PATH, confpath)
paths = self._aug.match(path)
if len(paths) != 1:
raise ValueError('path %s did not match exactly once' % path)
self.tree = AugeasNode(self._aug, path)
self._loaded = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.save()
self.close()
def save(self):
"""Save Augeas tree to its original file."""
assert self._loaded
try:
self._aug.save()
except IOError as exc:
log.exception(exc)
for err_path in self._aug.match('//error'):
log.error('%s: %s', err_path,
self._aug.get(os.path.join(err_path, 'message')))
raise
def close(self):
"""
close Augeas library
After calling close() the object must not be used anymore.
"""
assert self._loaded
self._aug.close()
del self._aug
self._loaded = False
def match(self, path):
"""Yield AugeasNodes matching given expression."""
assert self._loaded
assert path
log.debug('tree match %s', path)
for matched_path in self._aug.match(path):
yield AugeasNode(self._aug, matched_path)
class AugeasNode(collections.abc.MutableMapping):
"""One Augeas tree node with dict-like interface."""
def __init__(self, aug, path):
"""
Args:
aug (AugeasWrapper or Augeas): Augeas library instance
path (str): absolute path in Augeas tree matching single node
BEWARE: There are no sanity checks of given path for performance reasons.
"""
assert aug
assert path
assert path.startswith('/')
self._aug = aug
self._path = path
self._span = None
@property
def path(self):
"""canonical path in Augeas tree, read-only"""
return self._path
@property
def value(self):
"""
get value of this node in Augeas tree
"""
value = self._aug.get(self._path)
log.debug('tree get: %s = %s', self._path, value)
return value
@value.setter
def value(self, value):
"""
set value of this node in Augeas tree
"""
log.debug('tree set: %s = %s', self._path, value)
self._aug.set(self._path, value)
@property
def span(self):
if self._span is None:
self._span = "char position %s" % self._aug.span(self._path)[5]
return self._span
@property
def char(self):
return self._aug.span(self._path)[5]
def __len__(self):
"""
number of items matching this path
It is always 1 after __init__() but it may change
as Augeas tree changes.
"""
return len(self._aug.match(self._path))
def __getitem__(self, key):
if isinstance(key, int):
# int is a shortcut to write [int]
target_path = '%s[%s]' % (self._path, key)
else:
target_path = self._path + key
log.debug('tree getitem: target_path %s', target_path)
paths = self._aug.match(target_path)
if len(paths) != 1:
raise KeyError('path %s did not match exactly once' % target_path)
return AugeasNode(self._aug, target_path)
def __delitem__(self, key):
log.debug('tree delitem: %s + %s', self._path, key)
target_path = self._path + key
log.debug('tree delitem: target_path %s', target_path)
self._aug.remove(target_path)
def __setitem__(self, key, value):
assert isinstance(value, AugeasNode)
target_path = self.path + key
self._aug.copy(value.path, target_path)
def __iter__(self):
self_path_len = len(self._path)
assert self_path_len > 0
log.debug('tree iter: %s', self._path)
for new_path in self._aug.match(self._path):
if len(new_path) == self_path_len:
yield ''
else:
yield new_path[self_path_len - 1:]
def match(self, subpath):
"""Yield AugeasNodes matching given sub-expression."""
assert subpath.startswith("/")
match_path = "%s%s" % (self._path, subpath)
log.debug('tree match %s: %s', match_path, self._path)
for matched_path in self._aug.match(match_path):
yield AugeasNode(self._aug, matched_path)
def __repr__(self):
return 'AugeasNode(%s)' % self._path
|