summaryrefslogtreecommitdiffstats
path: root/third_party/python/pyasn1/pyasn1/codec/cer/encoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/pyasn1/pyasn1/codec/cer/encoder.py')
-rw-r--r--third_party/python/pyasn1/pyasn1/codec/cer/encoder.py313
1 files changed, 313 insertions, 0 deletions
diff --git a/third_party/python/pyasn1/pyasn1/codec/cer/encoder.py b/third_party/python/pyasn1/pyasn1/codec/cer/encoder.py
new file mode 100644
index 0000000000..935b696561
--- /dev/null
+++ b/third_party/python/pyasn1/pyasn1/codec/cer/encoder.py
@@ -0,0 +1,313 @@
+#
+# This file is part of pyasn1 software.
+#
+# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
+# License: http://snmplabs.com/pyasn1/license.html
+#
+from pyasn1 import error
+from pyasn1.codec.ber import encoder
+from pyasn1.compat.octets import str2octs, null
+from pyasn1.type import univ
+from pyasn1.type import useful
+
+__all__ = ['encode']
+
+
+class BooleanEncoder(encoder.IntegerEncoder):
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if value == 0:
+ substrate = (0,)
+ else:
+ substrate = (255,)
+ return substrate, False, False
+
+
+class RealEncoder(encoder.RealEncoder):
+ def _chooseEncBase(self, value):
+ m, b, e = value
+ return self._dropFloatingPoint(m, b, e)
+
+
+# specialized GeneralStringEncoder here
+
+class TimeEncoderMixIn(object):
+ Z_CHAR = ord('Z')
+ PLUS_CHAR = ord('+')
+ MINUS_CHAR = ord('-')
+ COMMA_CHAR = ord(',')
+ DOT_CHAR = ord('.')
+ ZERO_CHAR = ord('0')
+
+ MIN_LENGTH = 12
+ MAX_LENGTH = 19
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ # CER encoding constraints:
+ # - minutes are mandatory, seconds are optional
+ # - sub-seconds must NOT be zero / no meaningless zeros
+ # - no hanging fraction dot
+ # - time in UTC (Z)
+ # - only dot is allowed for fractions
+
+ if asn1Spec is not None:
+ value = asn1Spec.clone(value)
+
+ numbers = value.asNumbers()
+
+ if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers:
+ raise error.PyAsn1Error('Must be UTC time: %r' % value)
+
+ if numbers[-1] != self.Z_CHAR:
+ raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value)
+
+ if self.COMMA_CHAR in numbers:
+ raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value)
+
+ if self.DOT_CHAR in numbers:
+
+ isModified = False
+
+ numbers = list(numbers)
+
+ searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1)
+
+ while numbers[searchIndex] != self.DOT_CHAR:
+ if numbers[searchIndex] == self.ZERO_CHAR:
+ del numbers[searchIndex]
+ isModified = True
+
+ searchIndex -= 1
+
+ searchIndex += 1
+
+ if searchIndex < len(numbers):
+ if numbers[searchIndex] == self.Z_CHAR:
+ # drop hanging comma
+ del numbers[searchIndex - 1]
+ isModified = True
+
+ if isModified:
+ value = value.clone(numbers)
+
+ if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH:
+ raise error.PyAsn1Error('Length constraint violated: %r' % value)
+
+ options.update(maxChunkSize=1000)
+
+ return encoder.OctetStringEncoder.encodeValue(
+ self, value, asn1Spec, encodeFun, **options
+ )
+
+
+class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
+ MIN_LENGTH = 12
+ MAX_LENGTH = 20
+
+
+class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
+ MIN_LENGTH = 10
+ MAX_LENGTH = 14
+
+
+class SetOfEncoder(encoder.SequenceOfEncoder):
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ chunks = self._encodeComponents(
+ value, asn1Spec, encodeFun, **options)
+
+ # sort by serialised and padded components
+ if len(chunks) > 1:
+ zero = str2octs('\x00')
+ maxLen = max(map(len, chunks))
+ paddedChunks = [
+ (x.ljust(maxLen, zero), x) for x in chunks
+ ]
+ paddedChunks.sort(key=lambda x: x[0])
+
+ chunks = [x[1] for x in paddedChunks]
+
+ return null.join(chunks), True, True
+
+
+class SequenceOfEncoder(encoder.SequenceOfEncoder):
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+
+ if options.get('ifNotEmpty', False) and not len(value):
+ return null, True, True
+
+ chunks = self._encodeComponents(
+ value, asn1Spec, encodeFun, **options)
+
+ return null.join(chunks), True, True
+
+
+class SetEncoder(encoder.SequenceEncoder):
+ @staticmethod
+ def _componentSortKey(componentAndType):
+ """Sort SET components by tag
+
+ Sort regardless of the Choice value (static sort)
+ """
+ component, asn1Spec = componentAndType
+
+ if asn1Spec is None:
+ asn1Spec = component
+
+ if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet:
+ if asn1Spec.tagSet:
+ return asn1Spec.tagSet
+ else:
+ return asn1Spec.componentType.minTagSet
+ else:
+ return asn1Spec.tagSet
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+
+ substrate = null
+
+ comps = []
+ compsMap = {}
+
+ if asn1Spec is None:
+ # instance of ASN.1 schema
+ inconsistency = value.isInconsistent
+ if inconsistency:
+ raise inconsistency
+
+ namedTypes = value.componentType
+
+ for idx, component in enumerate(value.values()):
+ if namedTypes:
+ namedType = namedTypes[idx]
+
+ if namedType.isOptional and not component.isValue:
+ continue
+
+ if namedType.isDefaulted and component == namedType.asn1Object:
+ continue
+
+ compsMap[id(component)] = namedType
+
+ else:
+ compsMap[id(component)] = None
+
+ comps.append((component, asn1Spec))
+
+ else:
+ # bare Python value + ASN.1 schema
+ for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
+
+ try:
+ component = value[namedType.name]
+
+ except KeyError:
+ raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value))
+
+ if namedType.isOptional and namedType.name not in value:
+ continue
+
+ if namedType.isDefaulted and component == namedType.asn1Object:
+ continue
+
+ compsMap[id(component)] = namedType
+ comps.append((component, asn1Spec[idx]))
+
+ for comp, compType in sorted(comps, key=self._componentSortKey):
+ namedType = compsMap[id(comp)]
+
+ if namedType:
+ options.update(ifNotEmpty=namedType.isOptional)
+
+ chunk = encodeFun(comp, compType, **options)
+
+ # wrap open type blob if needed
+ if namedType and namedType.openType:
+ wrapType = namedType.asn1Object
+ if wrapType.tagSet and not wrapType.isSameTypeWith(comp):
+ chunk = encodeFun(chunk, wrapType, **options)
+
+ substrate += chunk
+
+ return substrate, True, True
+
+
+class SequenceEncoder(encoder.SequenceEncoder):
+ omitEmptyOptionals = True
+
+
+tagMap = encoder.tagMap.copy()
+tagMap.update({
+ univ.Boolean.tagSet: BooleanEncoder(),
+ univ.Real.tagSet: RealEncoder(),
+ useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(),
+ useful.UTCTime.tagSet: UTCTimeEncoder(),
+ # Sequence & Set have same tags as SequenceOf & SetOf
+ univ.SetOf.tagSet: SetOfEncoder(),
+ univ.Sequence.typeId: SequenceEncoder()
+})
+
+typeMap = encoder.typeMap.copy()
+typeMap.update({
+ univ.Boolean.typeId: BooleanEncoder(),
+ univ.Real.typeId: RealEncoder(),
+ useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(),
+ useful.UTCTime.typeId: UTCTimeEncoder(),
+ # Sequence & Set have same tags as SequenceOf & SetOf
+ univ.Set.typeId: SetEncoder(),
+ univ.SetOf.typeId: SetOfEncoder(),
+ univ.Sequence.typeId: SequenceEncoder(),
+ univ.SequenceOf.typeId: SequenceOfEncoder()
+})
+
+
+class Encoder(encoder.Encoder):
+ fixedDefLengthMode = False
+ fixedChunkSize = 1000
+
+#: Turns ASN.1 object into CER octet stream.
+#:
+#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#: walks all its components recursively and produces a CER octet stream.
+#:
+#: Parameters
+#: ----------
+#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
+#: parameter is required to guide the encoding process.
+#:
+#: Keyword Args
+#: ------------
+#: asn1Spec:
+#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
+#:
+#: Returns
+#: -------
+#: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
+#: Given ASN.1 object encoded into BER octet-stream
+#:
+#: Raises
+#: ------
+#: ~pyasn1.error.PyAsn1Error
+#: On encoding errors
+#:
+#: Examples
+#: --------
+#: Encode Python value into CER with ASN.1 schema
+#:
+#: .. code-block:: pycon
+#:
+#: >>> seq = SequenceOf(componentType=Integer())
+#: >>> encode([1, 2, 3], asn1Spec=seq)
+#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
+#:
+#: Encode ASN.1 value object into CER
+#:
+#: .. code-block:: pycon
+#:
+#: >>> seq = SequenceOf(componentType=Integer())
+#: >>> seq.extend([1, 2, 3])
+#: >>> encode(seq)
+#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
+#:
+encode = Encoder(tagMap, typeMap)
+
+# EncoderFactory queries class instance and builds a map of tags -> encoders