diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 16:40:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 16:40:16 +0000 |
commit | 3f25952c13d5847d510c0cae22a8ba876638d570 (patch) | |
tree | 02f505f016ed5a1029277dcae520d5e2a75906fb /powerline/lint/markedjson/reader.py | |
parent | Initial commit. (diff) | |
download | powerline-3f25952c13d5847d510c0cae22a8ba876638d570.tar.xz powerline-3f25952c13d5847d510c0cae22a8ba876638d570.zip |
Adding upstream version 2.8.3.upstream/2.8.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'powerline/lint/markedjson/reader.py')
-rw-r--r-- | powerline/lint/markedjson/reader.py | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/powerline/lint/markedjson/reader.py b/powerline/lint/markedjson/reader.py new file mode 100644 index 0000000..0ca4516 --- /dev/null +++ b/powerline/lint/markedjson/reader.py @@ -0,0 +1,141 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import codecs + +from powerline.lint.markedjson.error import MarkedError, Mark, NON_PRINTABLE_RE +from powerline.lib.unicode import unicode + + +# This module contains abstractions for the input stream. You don’t have to +# looks further, there are no pretty code. + + +class ReaderError(MarkedError): + pass + + +class Reader(object): + # Reader: + # - determines the data encoding and converts it to a unicode string, + # - checks if characters are in allowed range, + # - adds '\0' to the end. + + # Reader accepts + # - a file-like object with its `read` method returning `str`, + + # Yeah, it’s ugly and slow. + def __init__(self, stream): + self.name = None + self.stream = None + self.stream_pointer = 0 + self.eof = True + self.buffer = '' + self.pointer = 0 + self.full_buffer = unicode('') + self.full_pointer = 0 + self.raw_buffer = None + self.raw_decode = codecs.utf_8_decode + self.encoding = 'utf-8' + self.index = 0 + self.line = 0 + self.column = 0 + + self.stream = stream + self.name = getattr(stream, 'name', '<file>') + self.eof = False + self.raw_buffer = None + + while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2): + self.update_raw() + self.update(1) + + def peek(self, index=0): + try: + return self.buffer[self.pointer + index] + except IndexError: + self.update(index + 1) + return self.buffer[self.pointer + index] + + def prefix(self, length=1): + if self.pointer + length >= len(self.buffer): + self.update(length) + return self.buffer[self.pointer:self.pointer + length] + + def update_pointer(self, length): + while length: + ch = self.buffer[self.pointer] + self.pointer += 1 + self.full_pointer += 1 + self.index += 1 + if ch == '\n': + self.line += 1 + self.column = 0 + else: + self.column += 1 + length -= 1 + + def forward(self, length=1): + if self.pointer + length + 1 >= len(self.buffer): + self.update(length + 1) + self.update_pointer(length) + + def get_mark(self): + return Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer) + + def check_printable(self, data): + match = NON_PRINTABLE_RE.search(data) + if match: + self.update_pointer(match.start()) + raise ReaderError( + 'while reading from stream', None, + 'found special characters which are not allowed', + Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer) + ) + + def update(self, length): + if self.raw_buffer is None: + return + self.buffer = self.buffer[self.pointer:] + self.pointer = 0 + while len(self.buffer) < length: + if not self.eof: + self.update_raw() + try: + data, converted = self.raw_decode(self.raw_buffer, 'strict', self.eof) + except UnicodeDecodeError as exc: + character = self.raw_buffer[exc.start] + position = self.stream_pointer - len(self.raw_buffer) + exc.start + data, converted = self.raw_decode(self.raw_buffer[:exc.start], 'strict', self.eof) + self.buffer += data + self.full_buffer += data + '<' + str(ord(character)) + '>' + self.raw_buffer = self.raw_buffer[converted:] + self.update_pointer(exc.start - 1) + raise ReaderError( + 'while reading from stream', None, + 'found character #x%04x that cannot be decoded by UTF-8 codec' % ord(character), + Mark(self.name, self.line, self.column, self.full_buffer, position) + ) + self.buffer += data + self.full_buffer += data + self.raw_buffer = self.raw_buffer[converted:] + self.check_printable(data) + if self.eof: + self.buffer += '\0' + self.raw_buffer = None + break + + def update_raw(self, size=-1): + # Was size=4096 + assert(size < 0) + # WARNING: reading the whole stream at once. To change this behaviour to + # former reading N characters at once one must make sure that reading + # never ends at partial unicode character. + data = self.stream.read(size) + if self.raw_buffer is None: + self.raw_buffer = data + else: + self.raw_buffer += data + self.stream_pointer += len(data) + if not data: + self.eof = True |