1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
|
#
# Authors:
# Pavel Brezina <pbrezina@redhat.com>
#
# Copyright (C) 2017 Red Hat
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections import OrderedDict
from sbus_Introspection import SBus
class Invoker:
""" Invoker is a piece of C code that takes care of executing a method,
signal and property handlers and returning their output values.
SBus code generator tries to reduce amount of generated code by
reusing invokers whenever possible. Therefore we must ensure that
invoker for each input and output signature type is generated only
once.
Each invoker is associated with its input and output SBus signatures
extended by a reserved keyword "raw" that says that the invoker input
or output parameters are handled by its handler and caller respectively
and are passed through as D-Bus iterators.
"""
def __init__(self, sbus_input, sbus_output):
self.input = self.getSignature(sbus_input,
self.IsCustomInputHandler(sbus_input))
self.output = self.getSignature(
sbus_output,
self.IsCustomOutputHandler(sbus_output)
)
def getSignature(self, sbus_signature, is_custom_handler):
if sbus_signature is None:
return InvokerSignature("", {}, {})
invoker_signature = sbus_signature.signature
if is_custom_handler:
invoker_signature = "raw"
return InvokerSignature(invoker_signature,
sbus_signature.arguments,
sbus_signature.annotations)
@staticmethod
def GatherInvokers(interfaces):
"""
Gather all required invokers for given interfaces.
"""
dict = {}
for iface in interfaces.values():
for method in iface.methods.values():
Invoker.Add(dict, method.input, method.output)
for signal in iface.signals.values():
Invoker.Add(dict, signal.input, signal.output)
for property in iface.properties.values():
if property.isReadable():
Invoker.Add(dict, None, property.output)
if property.isWritable():
Invoker.Add(dict, property.input, None)
return OrderedDict(sorted(dict.items()))
@staticmethod
def Add(dict, input, output):
"""
Add a new invoker to dictionary if possible.
"""
invoker = Invoker(input, output)
key = "in:%s, out:%s" % (invoker.input.invokerSignature,
invoker.output.invokerSignature)
if key in dict:
return
dict[key] = invoker
@staticmethod
def IsCustomHandler(type, sbus_signature):
if type == "input":
return Invoker.IsCustomInputHandler(sbus_signature)
elif type == "output":
return Invoker.IsCustomOutputHandler(sbus_signature)
else:
raise ValueError("Invalid type: %s" % type)
@staticmethod
def IsCustomInputHandler(sbus_signature):
names = ["codegen.CustomHandler",
"codegen.CustomInputHandler"]
if sbus_signature is None:
return False
return SBus.Annotation.CheckIfTrue(names, sbus_signature.annotations)
@staticmethod
def IsCustomOutputHandler(sbus_signature):
names = ["codegen.CustomHandler",
"codegen.CustomOutputHandler"]
if sbus_signature is None:
return False
return SBus.Annotation.CheckIfTrue(names, sbus_signature.annotations)
class InvokerSignature:
""" Contains information about Invoker signature and SBus arguments
and annotations. Do not confuse with SBus.Signature.
"""
def __init__(self, invoker_signature, sbus_arguments, sbus_annotations):
self.invokerSignature = invoker_signature
self.arguments = sbus_arguments
self.annotations = sbus_annotations
class InvokerArgumentType:
""" Argument reader/writer is a piece of C code that takes care of
parsing D-Bus methods into C types.
SBus code generator tries to reduce amount of generated code by
reusing reades and writers whenever possible. Therefore we must ensure
that only one reader and writer is generated for each input and output
signature.
"""
@staticmethod
def GatherArgumentTypes(interfaces):
"""
Gather all invoker argument types for given interfaces.
"""
dict = {}
for iface in interfaces.values():
InvokerArgumentType.AddObjects(dict, iface.methods)
InvokerArgumentType.AddObjects(dict, iface.signals)
InvokerArgumentType.AddObjects(dict, iface.properties)
return OrderedDict(sorted(dict.items()))
@staticmethod
def AddObjects(dict, objects):
for object in objects.values():
InvokerArgumentType.AddType(dict, "input", object.input)
InvokerArgumentType.AddType(dict, "output", object.output)
@staticmethod
def AddType(dict, type, sbus_signature):
"""
Add a new argument type to dictionary if possible.
"""
# We don't generate readers and writers for empty arguments
if sbus_signature is None or not sbus_signature.arguments:
return
# We don't generate readers and writers for custom handlers
if Invoker.IsCustomHandler(type, sbus_signature):
return
# We generate each reader and writer only once
if sbus_signature.signature in dict:
return
dict[sbus_signature.signature] = sbus_signature.arguments
class InvokerKeygen:
""" Invoker Keygen is a piece of C code that takes care of
chaining same request into one.
SBus code generator tries to reduce amount of generated code by
reusing keygens whenever possible. Therefore we must ensure
that only one keygen is generated for each signature.
"""
@staticmethod
def BuildKey(sbus_member, sbus_signature, Args=None):
"""
Return dictionary key for given SBUS member and signature or None
if no keying is supported for this member.
"""
args = Args if not None else \
InvokerKeygen.GatherKeyArguments(sbus_member, sbus_signature)
if args is None:
return None
key = sbus_signature.signature if args else "<no-arguments>"
for idx, arg in args.items():
key += ',%d' % idx
return key
@staticmethod
def BuildKeygenName(sbus_member, sbus_signature):
args = InvokerKeygen.GatherKeyArguments(sbus_member, sbus_signature)
if args is None:
return "NULL"
keygen = "_sbus_key_%s" % sbus_signature.signature
for idx, arg in args.items():
keygen += '_%d' % idx
return keygen
@staticmethod
def GatherKeyArguments(sbus_member, sbus_signature):
"""
Gather list of key arguments for an SBus member with given
signature.
Return dictionary of <argument-index, argument> sorted by
its key index or an empty dictionary if no arguments are
necessary to construct a key.
Return None for SBus members that do not allow keying.
"""
keys = {}
if sbus_signature is not None:
for idx, arg in enumerate(sbus_signature.arguments.values()):
if arg.key is not None:
keys[idx] = arg
if not keys and sbus_member.key is None:
return None
return OrderedDict(sorted(keys.items(),
key=lambda p: p[1].key))
@staticmethod
def GatherKeygens(interfaces):
"""
Gather all keygens needed to implement given interfaces.
"""
dict = {}
for iface in interfaces.values():
for method in iface.methods.values():
InvokerKeygen.Add(dict, method, method.input)
for signal in iface.signals.values():
InvokerKeygen.Add(dict, signal, signal.input)
return OrderedDict(sorted(dict.items()))
@staticmethod
def Add(dict, sbus_member, sbus_signature):
"""
Add a new keygen to dictionary if possible.
"""
args = InvokerKeygen.GatherKeyArguments(sbus_member, sbus_signature)
if args is None:
return
key = InvokerKeygen.BuildKey(sbus_member, sbus_signature, Args=args)
dict[key] = InvokerKeygen.KeygenPair(sbus_signature, args)
class KeygenPair:
def __init__(self, sbus_signature, arguments):
self.signature = sbus_signature.signature
self.arguments = arguments
class InvokerCaller:
""" Caller invoker is a piece of C code that takes care of executing
an outgoing method, signal or property and returning their output.
SBus code generator tries to reduce amount of generated code by
reusing invokers whenever possible. Therefore we must ensure that
invoker for each input and output signature type is generated only
once.
"""
@staticmethod
def GatherMethodInvokers(interfaces, type):
"""
Gather all required method invokers for given interfaces.
"""
dict = {}
for iface in interfaces.values():
for method in iface.methods.values():
if not InvokerCaller.IsWanted(iface, method, type):
continue
InvokerCaller.Add(dict, method.input, method.output)
return OrderedDict(sorted(dict.items()))
@staticmethod
def GatherSignalInvokers(interfaces, type):
"""
Gather all required signal invokers for given interfaces.
"""
dict = {}
for iface in interfaces.values():
for signal in iface.signals.values():
if not InvokerCaller.IsWanted(iface, signal, type):
continue
InvokerCaller.Add(dict, signal.input, signal.output)
return OrderedDict(sorted(dict.items()))
@staticmethod
def GatherGetInvokers(interfaces, type):
"""
Gather all required property getters for given interfaces.
"""
dict = {}
for iface in interfaces.values():
for property in iface.properties.values():
if not InvokerCaller.IsWanted(iface, property, type):
continue
if not property.isReadable():
continue
InvokerCaller.Add(dict, None, property.output)
return OrderedDict(sorted(dict.items()))
@staticmethod
def GatherSetInvokers(interfaces, type):
"""
Gather all required property setters for given interfaces.
"""
dict = {}
for iface in interfaces.values():
for property in iface.properties.values():
if not InvokerCaller.IsWanted(iface, property, type):
continue
if not property.isWritable():
continue
InvokerCaller.Add(dict, property.input, None)
return OrderedDict(sorted(dict.items()))
@staticmethod
def Add(dict, input, output):
"""
Add a new invoker to dictionary if possible.
"""
invoker = Invoker(input, output)
key = "in:%s, out:%s" % (invoker.input.invokerSignature,
invoker.output.invokerSignature)
if key in dict:
return
dict[key] = invoker
@staticmethod
def IsWantedSync(interface, member):
names = ["codegen.Caller", "codegen.SyncCaller"]
# First see if the member has one of these annotations
if SBus.Annotation.AtleastOneIsSet(names, member.annotations):
return SBus.Annotation.CheckIfFalse(names, member.annotations)
return SBus.Annotation.CheckIfFalse(names, interface.annotations)
@staticmethod
def IsWantedAsync(interface, member):
names = ["codegen.Caller", "codegen.AsyncCaller"]
# First see if the member has one of these annotations
if SBus.Annotation.AtleastOneIsSet(names, member.annotations):
return SBus.Annotation.CheckIfFalse(names, member.annotations)
return SBus.Annotation.CheckIfFalse(names, interface.annotations)
@staticmethod
def IsWanted(interface, member, type):
if type == "sync":
return InvokerCaller.IsWantedSync(interface, member)
elif type == "async":
return InvokerCaller.IsWantedAsync(interface, member)
wanted_sync = InvokerCaller.IsWantedSync(interface, member)
wanted_async = InvokerCaller.IsWantedAsync(interface, member)
return wanted_sync or wanted_async
|