#! /usr/bin/env python3
# Copyright © 2020, RedHat Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
# Authors:
# Benjamin Berg
import os
import sys
import fcntl
import io
import re
import time
import threading
import select
import errno
class OutputChecker(object):
def __init__(self, out=sys.stdout):
self._output = out
self._pipe_fd_r, self._pipe_fd_w = os.pipe()
self._eof = False
self._partial_buf = b''
self._lines_sem = threading.Semaphore()
self._lines = []
self._reader_io = io.StringIO()
# Just to be sure, shouldn't be a problem even if we didn't set it
fcntl.fcntl(self._pipe_fd_r, fcntl.F_SETFL,
fcntl.fcntl(self._pipe_fd_r, fcntl.F_GETFL) | os.O_CLOEXEC | os.O_NONBLOCK)
fcntl.fcntl(self._pipe_fd_w, fcntl.F_SETFL,
fcntl.fcntl(self._pipe_fd_w, fcntl.F_GETFL) | os.O_CLOEXEC)
# Start copier thread
self._thread = threading.Thread(target=self._copy, daemon=True)
self._thread.start()
def _copy(self):
p = select.poll()
p.register(self._pipe_fd_r)
while True:
try:
# Be lazy and wake up occasionally in case _pipe_fd_r became invalid
# The reason to do this is because os.read() will *not* return if the
# FD is forcefully closed.
p.poll(0.1)
r = os.read(self._pipe_fd_r, 1024)
if not r:
self._eof = True
self._lines_sem.release()
return
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
continue
# We get a bad file descriptor error when the outside closes the FD
self._lines_sem.release()
return
l = r.split(b'\n')
l[0] = self._partial_buf + l[0]
self._lines.extend(l[:-1])
self._partial_buf = l[-1]
self._lines_sem.release()
os.write(self._output.fileno(), r)
def check_line_re(self, needle_re, timeout=0, failmsg=None):
deadline = time.time() + timeout
if isinstance(needle_re, str):
needle_re = needle_re.encode('ascii')
r = re.compile(needle_re)
ret = []
while True:
try:
l = self._lines.pop(0)
except IndexError:
# EOF, throw error
if self._eof:
if failmsg:
raise AssertionError("No further messages: " % failmsg)
else:
raise AssertionError('No client waiting for needle %s' % (str(needle_re)))
# Check if should wake up
if not self._lines_sem.acquire(timeout = deadline - time.time()):
if failmsg:
raise AssertionError(failmsg)
else:
raise AssertionError('Timed out waiting for needle %s (timeout: %0.2f)' % (str(needle_re), timeout))
continue
ret.append(l)
if r.search(l):
return ret
def check_line(self, needle, timeout=0, failmsg=None):
if isinstance(needle, str):
needle = needle.encode('ascii')
needle_re = re.escape(needle)
return self.check_line_re(needle_re, timeout=timeout, failmsg=failmsg)
def check_no_line_re(self, needle_re, wait=0, failmsg=None):
deadline = time.time() + wait
if isinstance(needle_re, str):
needle_re = needle_re.encode('ascii')
r = re.compile(needle_re)
ret = []
while True:
try:
l = self._lines.pop(0)
except IndexError:
# EOF, so everything good
if self._pipe_fd_r == -1:
break
# Check if should wake up
if not self._lines_sem.acquire(timeout = deadline - time.time()):
# Timed out, so everything is good
break
continue
ret.append(l)
if r.search(l):
if failmsg:
raise AssertionError(failmsg)
else:
raise AssertionError('Found needle %s but shouldn\'t have been there (timeout: %0.2f)' % (str(needle_re), wait))
return ret
def check_no_line(self, needle, wait=0, failmsg=None):
if isinstance(needle, str):
needle = needle.encode('ascii')
needle_re = re.escape(needle)
return self.check_no_line_re(needle_re, wait=wait, failmsg=failmsg)
def clear(self):
ret = self._lines
self._lines = []
return ret
def assert_closed(self, timeout=1):
self._thread.join(timeout)
if self._thread.is_alive() != False:
raise AssertionError("OutputCheck: Write side has not been closed yet!")
def force_close(self):
fd = self._pipe_fd_r
self._pipe_fd_r = -1
if fd >= 0:
os.close(fd)
self._thread.join()
@property
def fd(self):
return self._pipe_fd_w
def writer_attached(self):
os.close(self._pipe_fd_w)
self._pipe_fd_w = -1
def __del__(self):
if self._pipe_fd_r >= 0:
os.close(self._pipe_fd_r)
self._pipe_fd_r = -1
if self._pipe_fd_w >= 0:
os.close(self._pipe_fd_w)
self._pipe_fd_w = -1