diff options
Diffstat (limited to 'third_party/python/py/testing')
31 files changed, 6944 insertions, 0 deletions
diff --git a/third_party/python/py/testing/code/test_assertion.py b/third_party/python/py/testing/code/test_assertion.py new file mode 100644 index 0000000000..e2a7f90399 --- /dev/null +++ b/third_party/python/py/testing/code/test_assertion.py @@ -0,0 +1,305 @@ +import pytest, py +import re + +def exvalue(): + import sys + return sys.exc_info()[1] + +def f(): + return 2 + +def test_assert(): + try: + assert f() == 3 + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith('assert 2 == 3\n') + + +def test_assert_within_finally(): + excinfo = py.test.raises(ZeroDivisionError, """ + try: + 1/0 + finally: + i = 42 + """) + s = excinfo.exconly() + assert re.search("ZeroDivisionError:.*division", s) is not None + + +def test_assert_multiline_1(): + try: + assert (f() == + 3) + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith('assert 2 == 3\n') + +def test_assert_multiline_2(): + try: + assert (f() == (4, + 3)[-1]) + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith('assert 2 ==') + +def test_in(): + try: + assert "hi" in [1, 2] + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith("assert 'hi' in") + +def test_is(): + try: + assert 1 is 2 + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith("assert 1 is 2") + + +def test_attrib(): + class Foo(object): + b = 1 + i = Foo() + try: + assert i.b == 2 + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith("assert 1 == 2") + +def test_attrib_inst(): + class Foo(object): + b = 1 + try: + assert Foo().b == 2 + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith("assert 1 == 2") + +def test_len(): + l = list(range(42)) + try: + assert len(l) == 100 + except AssertionError: + e = exvalue() + s = str(e) + assert s.startswith("assert 42 == 100") + assert "where 42 = len([" in s + + +def test_assert_keyword_arg(): + def f(x=3): + return False + try: + assert f(x=5) + except AssertionError: + e = exvalue() + assert "x=5" in str(e) + +# These tests should both fail, but should fail nicely... +class WeirdRepr: + def __repr__(self): + return '<WeirdRepr\nsecond line>' + +def bug_test_assert_repr(): + v = WeirdRepr() + try: + assert v == 1 + except AssertionError: + e = exvalue() + assert str(e).find('WeirdRepr') != -1 + assert str(e).find('second line') != -1 + assert 0 + +def test_assert_non_string(): + try: + assert 0, ['list'] + except AssertionError: + e = exvalue() + assert str(e).find("list") != -1 + +def test_assert_implicit_multiline(): + try: + x = [1,2,3] + assert x != [1, + 2, 3] + except AssertionError: + e = exvalue() + assert str(e).find('assert [1, 2, 3] !=') != -1 + +@py.test.mark.xfail(py.test.__version__[0] != "2", + reason="broken on modern pytest", + run=False +) +def test_assert_with_brokenrepr_arg(): + class BrokenRepr: + def __repr__(self): 0 / 0 + e = AssertionError(BrokenRepr()) + if e.msg.find("broken __repr__") == -1: + py.test.fail("broken __repr__ not handle correctly") + +def test_multiple_statements_per_line(): + try: + a = 1; assert a == 2 + except AssertionError: + e = exvalue() + assert "assert 1 == 2" in str(e) + +def test_power(): + try: + assert 2**3 == 7 + except AssertionError: + e = exvalue() + assert "assert (2 ** 3) == 7" in str(e) + + +class TestView: + + def setup_class(cls): + cls.View = py.test.importorskip("py._code._assertionold").View + + def test_class_dispatch(self): + ### Use a custom class hierarchy with existing instances + + class Picklable(self.View): + pass + + class Simple(Picklable): + __view__ = object + def pickle(self): + return repr(self.__obj__) + + class Seq(Picklable): + __view__ = list, tuple, dict + def pickle(self): + return ';'.join( + [Picklable(item).pickle() for item in self.__obj__]) + + class Dict(Seq): + __view__ = dict + def pickle(self): + return Seq.pickle(self) + '!' + Seq(self.values()).pickle() + + assert Picklable(123).pickle() == '123' + assert Picklable([1,[2,3],4]).pickle() == '1;2;3;4' + assert Picklable({1:2}).pickle() == '1!2' + + def test_viewtype_class_hierarchy(self): + # Use a custom class hierarchy based on attributes of existing instances + class Operation: + "Existing class that I don't want to change." + def __init__(self, opname, *args): + self.opname = opname + self.args = args + + existing = [Operation('+', 4, 5), + Operation('getitem', '', 'join'), + Operation('setattr', 'x', 'y', 3), + Operation('-', 12, 1)] + + class PyOp(self.View): + def __viewkey__(self): + return self.opname + def generate(self): + return '%s(%s)' % (self.opname, ', '.join(map(repr, self.args))) + + class PyBinaryOp(PyOp): + __view__ = ('+', '-', '*', '/') + def generate(self): + return '%s %s %s' % (self.args[0], self.opname, self.args[1]) + + codelines = [PyOp(op).generate() for op in existing] + assert codelines == ["4 + 5", "getitem('', 'join')", + "setattr('x', 'y', 3)", "12 - 1"] + +def test_underscore_api(): + py.code._AssertionError + py.code._reinterpret_old # used by pypy + py.code._reinterpret + +def test_assert_customizable_reprcompare(monkeypatch): + util = pytest.importorskip("_pytest.assertion.util") + monkeypatch.setattr(util, '_reprcompare', lambda *args: 'hello') + try: + assert 3 == 4 + except AssertionError: + e = exvalue() + s = str(e) + assert "hello" in s + +def test_assert_long_source_1(): + try: + assert len == [ + (None, ['somet text', 'more text']), + ] + except AssertionError: + e = exvalue() + s = str(e) + assert 're-run' not in s + assert 'somet text' in s + +def test_assert_long_source_2(): + try: + assert(len == [ + (None, ['somet text', 'more text']), + ]) + except AssertionError: + e = exvalue() + s = str(e) + assert 're-run' not in s + assert 'somet text' in s + +def test_assert_raise_alias(testdir): + testdir.makepyfile(""" + import sys + EX = AssertionError + def test_hello(): + raise EX("hello" + "multi" + "line") + """) + result = testdir.runpytest() + result.stdout.fnmatch_lines([ + "*def test_hello*", + "*raise EX*", + "*1 failed*", + ]) + +@py.test.mark.xfail(py.test.__version__[0] != "2", + reason="broken on modern pytest", + run=False) +def test_assert_raise_subclass(): + class SomeEx(AssertionError): + def __init__(self, *args): + super(SomeEx, self).__init__() + try: + raise SomeEx("hello") + except AssertionError as e: + s = str(e) + assert 're-run' not in s + assert 'could not determine' in s + +def test_assert_raises_in_nonzero_of_object_pytest_issue10(): + class A(object): + def __nonzero__(self): + raise ValueError(42) + def __lt__(self, other): + return A() + def __repr__(self): + return "<MY42 object>" + def myany(x): + return True + try: + assert not(myany(A() < 0)) + except AssertionError: + e = exvalue() + s = str(e) + assert "<MY42 object> < 0" in s diff --git a/third_party/python/py/testing/code/test_code.py b/third_party/python/py/testing/code/test_code.py new file mode 100644 index 0000000000..28ec628b00 --- /dev/null +++ b/third_party/python/py/testing/code/test_code.py @@ -0,0 +1,159 @@ +import py +import sys + +def test_ne(): + code1 = py.code.Code(compile('foo = "bar"', '', 'exec')) + assert code1 == code1 + code2 = py.code.Code(compile('foo = "baz"', '', 'exec')) + assert code2 != code1 + +def test_code_gives_back_name_for_not_existing_file(): + name = 'abc-123' + co_code = compile("pass\n", name, 'exec') + assert co_code.co_filename == name + code = py.code.Code(co_code) + assert str(code.path) == name + assert code.fullsource is None + +def test_code_with_class(): + class A: + pass + py.test.raises(TypeError, "py.code.Code(A)") + +if True: + def x(): + pass + +def test_code_fullsource(): + code = py.code.Code(x) + full = code.fullsource + assert 'test_code_fullsource()' in str(full) + +def test_code_source(): + code = py.code.Code(x) + src = code.source() + expected = """def x(): + pass""" + assert str(src) == expected + +def test_frame_getsourcelineno_myself(): + def func(): + return sys._getframe(0) + f = func() + f = py.code.Frame(f) + source, lineno = f.code.fullsource, f.lineno + assert source[lineno].startswith(" return sys._getframe(0)") + +def test_getstatement_empty_fullsource(): + def func(): + return sys._getframe(0) + f = func() + f = py.code.Frame(f) + prop = f.code.__class__.fullsource + try: + f.code.__class__.fullsource = None + assert f.statement == py.code.Source("") + finally: + f.code.__class__.fullsource = prop + +def test_code_from_func(): + co = py.code.Code(test_frame_getsourcelineno_myself) + assert co.firstlineno + assert co.path + + + +def test_builtin_patch_unpatch(monkeypatch): + cpy_builtin = py.builtin.builtins + comp = cpy_builtin.compile + def mycompile(*args, **kwargs): + return comp(*args, **kwargs) + class Sub(AssertionError): + pass + monkeypatch.setattr(cpy_builtin, 'AssertionError', Sub) + monkeypatch.setattr(cpy_builtin, 'compile', mycompile) + py.code.patch_builtins() + assert cpy_builtin.AssertionError != Sub + assert cpy_builtin.compile != mycompile + py.code.unpatch_builtins() + assert cpy_builtin.AssertionError is Sub + assert cpy_builtin.compile == mycompile + + +def test_unicode_handling(): + value = py.builtin._totext('\xc4\x85\xc4\x87\n', 'utf-8').encode('utf8') + def f(): + raise Exception(value) + excinfo = py.test.raises(Exception, f) + s = str(excinfo) + if sys.version_info[0] < 3: + u = unicode(excinfo) + +def test_code_getargs(): + def f1(x): + pass + c1 = py.code.Code(f1) + assert c1.getargs(var=True) == ('x',) + + def f2(x, *y): + pass + c2 = py.code.Code(f2) + assert c2.getargs(var=True) == ('x', 'y') + + def f3(x, **z): + pass + c3 = py.code.Code(f3) + assert c3.getargs(var=True) == ('x', 'z') + + def f4(x, *y, **z): + pass + c4 = py.code.Code(f4) + assert c4.getargs(var=True) == ('x', 'y', 'z') + + +def test_frame_getargs(): + def f1(x): + return sys._getframe(0) + fr1 = py.code.Frame(f1('a')) + assert fr1.getargs(var=True) == [('x', 'a')] + + def f2(x, *y): + return sys._getframe(0) + fr2 = py.code.Frame(f2('a', 'b', 'c')) + assert fr2.getargs(var=True) == [('x', 'a'), ('y', ('b', 'c'))] + + def f3(x, **z): + return sys._getframe(0) + fr3 = py.code.Frame(f3('a', b='c')) + assert fr3.getargs(var=True) == [('x', 'a'), ('z', {'b': 'c'})] + + def f4(x, *y, **z): + return sys._getframe(0) + fr4 = py.code.Frame(f4('a', 'b', c='d')) + assert fr4.getargs(var=True) == [('x', 'a'), ('y', ('b',)), + ('z', {'c': 'd'})] + + +class TestExceptionInfo: + + def test_bad_getsource(self): + try: + if False: pass + else: assert False + except AssertionError: + exci = py.code.ExceptionInfo() + assert exci.getrepr() + + +class TestTracebackEntry: + + def test_getsource(self): + try: + if False: pass + else: assert False + except AssertionError: + exci = py.code.ExceptionInfo() + entry = exci.traceback[0] + source = entry.getsource() + assert len(source) == 4 + assert 'else: assert False' in source[3] diff --git a/third_party/python/py/testing/code/test_excinfo.py b/third_party/python/py/testing/code/test_excinfo.py new file mode 100644 index 0000000000..c148ab8cfb --- /dev/null +++ b/third_party/python/py/testing/code/test_excinfo.py @@ -0,0 +1,956 @@ +# -*- coding: utf-8 -*- + +import py +import pytest +import sys +from test_source import astonly + +from py._code.code import FormattedExcinfo, ReprExceptionInfo +queue = py.builtin._tryimport('queue', 'Queue') + +failsonjython = py.test.mark.xfail("sys.platform.startswith('java')") + +try: + import importlib +except ImportError: + invalidate_import_caches = None +else: + invalidate_import_caches = getattr(importlib, "invalidate_caches", None) + + +pytest_version_info = tuple(map(int, pytest.__version__.split(".")[:3])) + +broken_on_modern_pytest = pytest.mark.xfail( + pytest_version_info[0] != 2, + reason="this test hasn't been fixed after moving py.code into pytest", + run=False + ) + + +class TWMock: + def __init__(self): + self.lines = [] + + def sep(self, sep, line=None): + self.lines.append((sep, line)) + + def line(self, line, **kw): + self.lines.append(line) + + def markup(self, text, **kw): + return text + + fullwidth = 80 + + +def test_excinfo_simple(): + try: + raise ValueError + except ValueError: + info = py.code.ExceptionInfo() + assert info.type == ValueError + + +def test_excinfo_getstatement(): + def g(): + raise ValueError + + def f(): + g() + try: + f() + except ValueError: + excinfo = py.code.ExceptionInfo() + linenumbers = [ + py.code.getrawcode(f).co_firstlineno-1+3, + py.code.getrawcode(f).co_firstlineno-1+1, + py.code.getrawcode(g).co_firstlineno-1+1, + ] + l = list(excinfo.traceback) + foundlinenumbers = [x.lineno for x in l] + assert foundlinenumbers == linenumbers + #for x in info: + # print "%s:%d %s" %(x.path.relto(root), x.lineno, x.statement) + #xxx + +# testchain for getentries test below +def f(): + # + raise ValueError + # +def g(): + # + __tracebackhide__ = True + f() + # +def h(): + # + g() + # + +class TestTraceback_f_g_h: + def setup_method(self, method): + try: + h() + except ValueError: + self.excinfo = py.code.ExceptionInfo() + + def test_traceback_entries(self): + tb = self.excinfo.traceback + entries = list(tb) + assert len(tb) == 4 # maybe fragile test + assert len(entries) == 4 # maybe fragile test + names = ['f', 'g', 'h'] + for entry in entries: + try: + names.remove(entry.frame.code.name) + except ValueError: + pass + assert not names + + def test_traceback_entry_getsource(self): + tb = self.excinfo.traceback + s = str(tb[-1].getsource()) + assert s.startswith("def f():") + assert s.endswith("raise ValueError") + + @astonly + @failsonjython + def test_traceback_entry_getsource_in_construct(self): + source = py.code.Source("""\ + def xyz(): + try: + raise ValueError + except somenoname: + pass + xyz() + """) + try: + exec (source.compile()) + except NameError: + tb = py.code.ExceptionInfo().traceback + print (tb[-1].getsource()) + s = str(tb[-1].getsource()) + assert s.startswith("def xyz():\n try:") + assert s.strip().endswith("except somenoname:") + + def test_traceback_cut(self): + co = py.code.Code(f) + path, firstlineno = co.path, co.firstlineno + traceback = self.excinfo.traceback + newtraceback = traceback.cut(path=path, firstlineno=firstlineno) + assert len(newtraceback) == 1 + newtraceback = traceback.cut(path=path, lineno=firstlineno+2) + assert len(newtraceback) == 1 + + def test_traceback_cut_excludepath(self, testdir): + p = testdir.makepyfile("def f(): raise ValueError") + excinfo = py.test.raises(ValueError, "p.pyimport().f()") + basedir = py.path.local(py.test.__file__).dirpath() + newtraceback = excinfo.traceback.cut(excludepath=basedir) + for x in newtraceback: + if hasattr(x, 'path'): + assert not py.path.local(x.path).relto(basedir) + assert newtraceback[-1].frame.code.path == p + + def test_traceback_filter(self): + traceback = self.excinfo.traceback + ntraceback = traceback.filter() + assert len(ntraceback) == len(traceback) - 1 + + def test_traceback_recursion_index(self): + def f(n): + if n < 10: + n += 1 + f(n) + excinfo = py.test.raises(RuntimeError, f, 8) + traceback = excinfo.traceback + recindex = traceback.recursionindex() + assert recindex == 3 + + def test_traceback_only_specific_recursion_errors(self, monkeypatch): + def f(n): + if n == 0: + raise RuntimeError("hello") + f(n-1) + + excinfo = pytest.raises(RuntimeError, f, 100) + monkeypatch.delattr(excinfo.traceback.__class__, "recursionindex") + repr = excinfo.getrepr() + assert "RuntimeError: hello" in str(repr.reprcrash) + + def test_traceback_no_recursion_index(self): + def do_stuff(): + raise RuntimeError + + def reraise_me(): + import sys + exc, val, tb = sys.exc_info() + py.builtin._reraise(exc, val, tb) + + def f(n): + try: + do_stuff() + except: + reraise_me() + excinfo = py.test.raises(RuntimeError, f, 8) + traceback = excinfo.traceback + recindex = traceback.recursionindex() + assert recindex is None + + def test_traceback_messy_recursion(self): + # XXX: simplified locally testable version + decorator = py.test.importorskip('decorator').decorator + + def log(f, *k, **kw): + print('%s %s' % (k, kw)) + f(*k, **kw) + log = decorator(log) + + def fail(): + raise ValueError('') + + fail = log(log(fail)) + + excinfo = py.test.raises(ValueError, fail) + assert excinfo.traceback.recursionindex() is None + + def test_traceback_getcrashentry(self): + def i(): + __tracebackhide__ = True + raise ValueError + + def h(): + i() + + def g(): + __tracebackhide__ = True + h() + + def f(): + g() + + excinfo = py.test.raises(ValueError, f) + tb = excinfo.traceback + entry = tb.getcrashentry() + co = py.code.Code(h) + assert entry.frame.code.path == co.path + assert entry.lineno == co.firstlineno + 1 + assert entry.frame.code.name == 'h' + + def test_traceback_getcrashentry_empty(self): + def g(): + __tracebackhide__ = True + raise ValueError + + def f(): + __tracebackhide__ = True + g() + + excinfo = py.test.raises(ValueError, f) + tb = excinfo.traceback + entry = tb.getcrashentry() + co = py.code.Code(g) + assert entry.frame.code.path == co.path + assert entry.lineno == co.firstlineno + 2 + assert entry.frame.code.name == 'g' + + +def hello(x): + x + 5 + + +def test_tbentry_reinterpret(): + try: + hello("hello") + except TypeError: + excinfo = py.code.ExceptionInfo() + tbentry = excinfo.traceback[-1] + msg = tbentry.reinterpret() + assert msg.startswith("TypeError: ('hello' + 5)") + + +def test_excinfo_exconly(): + excinfo = py.test.raises(ValueError, h) + assert excinfo.exconly().startswith('ValueError') + excinfo = py.test.raises(ValueError, + "raise ValueError('hello\\nworld')") + msg = excinfo.exconly(tryshort=True) + assert msg.startswith('ValueError') + assert msg.endswith("world") + + +def test_excinfo_repr(): + excinfo = py.test.raises(ValueError, h) + s = repr(excinfo) + assert s == "<ExceptionInfo ValueError tblen=4>" + + +def test_excinfo_str(): + excinfo = py.test.raises(ValueError, h) + s = str(excinfo) + assert s.startswith(__file__[:-9]) # pyc file and $py.class + assert s.endswith("ValueError") + assert len(s.split(":")) >= 3 # on windows it's 4 + + +def test_excinfo_errisinstance(): + excinfo = py.test.raises(ValueError, h) + assert excinfo.errisinstance(ValueError) + + +def test_excinfo_no_sourcecode(): + try: + exec ("raise ValueError()") + except ValueError: + excinfo = py.code.ExceptionInfo() + s = str(excinfo.traceback[-1]) + assert s == " File '<string>':1 in <module>\n ???\n" + + +def test_excinfo_no_python_sourcecode(tmpdir): + #XXX: simplified locally testable version + tmpdir.join('test.txt').write("{{ h()}}:") + + jinja2 = py.test.importorskip('jinja2') + loader = jinja2.FileSystemLoader(str(tmpdir)) + env = jinja2.Environment(loader=loader) + template = env.get_template('test.txt') + excinfo = py.test.raises(ValueError, + template.render, h=h) + for item in excinfo.traceback: + print(item) # XXX: for some reason jinja.Template.render is printed in full + item.source # shouldnt fail + if item.path.basename == 'test.txt': + assert str(item.source) == '{{ h()}}:' + + +def test_entrysource_Queue_example(): + try: + queue.Queue().get(timeout=0.001) + except queue.Empty: + excinfo = py.code.ExceptionInfo() + entry = excinfo.traceback[-1] + source = entry.getsource() + assert source is not None + s = str(source).strip() + assert s.startswith("def get") + + +def test_codepath_Queue_example(): + try: + queue.Queue().get(timeout=0.001) + except queue.Empty: + excinfo = py.code.ExceptionInfo() + entry = excinfo.traceback[-1] + path = entry.path + assert isinstance(path, py.path.local) + assert path.basename.lower() == "queue.py" + assert path.check() + + +class TestFormattedExcinfo: + def pytest_funcarg__importasmod(self, request): + def importasmod(source): + source = py.code.Source(source) + tmpdir = request.getfuncargvalue("tmpdir") + modpath = tmpdir.join("mod.py") + tmpdir.ensure("__init__.py") + modpath.write(source) + if invalidate_import_caches is not None: + invalidate_import_caches() + return modpath.pyimport() + return importasmod + + def excinfo_from_exec(self, source): + source = py.code.Source(source).strip() + try: + exec (source.compile()) + except KeyboardInterrupt: + raise + except: + return py.code.ExceptionInfo() + assert 0, "did not raise" + + def test_repr_source(self): + pr = FormattedExcinfo() + source = py.code.Source(""" + def f(x): + pass + """).strip() + pr.flow_marker = "|" + lines = pr.get_source(source, 0) + assert len(lines) == 2 + assert lines[0] == "| def f(x):" + assert lines[1] == " pass" + + @broken_on_modern_pytest + def test_repr_source_excinfo(self): + """ check if indentation is right """ + pr = FormattedExcinfo() + excinfo = self.excinfo_from_exec(""" + def f(): + assert 0 + f() + """) + pr = FormattedExcinfo() + source = pr._getentrysource(excinfo.traceback[-1]) + lines = pr.get_source(source, 1, excinfo) + assert lines == [ + ' def f():', + '> assert 0', + 'E assert 0' + ] + + def test_repr_source_not_existing(self): + pr = FormattedExcinfo() + co = compile("raise ValueError()", "", "exec") + try: + exec (co) + except ValueError: + excinfo = py.code.ExceptionInfo() + repr = pr.repr_excinfo(excinfo) + assert repr.reprtraceback.reprentries[1].lines[0] == "> ???" + + def test_repr_many_line_source_not_existing(self): + pr = FormattedExcinfo() + co = compile(""" +a = 1 +raise ValueError() +""", "", "exec") + try: + exec (co) + except ValueError: + excinfo = py.code.ExceptionInfo() + repr = pr.repr_excinfo(excinfo) + assert repr.reprtraceback.reprentries[1].lines[0] == "> ???" + + def test_repr_source_failing_fullsource(self): + pr = FormattedExcinfo() + + class FakeCode(object): + class raw: + co_filename = '?' + path = '?' + firstlineno = 5 + + def fullsource(self): + return None + fullsource = property(fullsource) + + class FakeFrame(object): + code = FakeCode() + f_locals = {} + f_globals = {} + + class FakeTracebackEntry(py.code.Traceback.Entry): + def __init__(self, tb): + self.lineno = 5+3 + + @property + def frame(self): + return FakeFrame() + + class Traceback(py.code.Traceback): + Entry = FakeTracebackEntry + + class FakeExcinfo(py.code.ExceptionInfo): + typename = "Foo" + def __init__(self): + pass + + def exconly(self, tryshort): + return "EXC" + def errisinstance(self, cls): + return False + + excinfo = FakeExcinfo() + class FakeRawTB(object): + tb_next = None + tb = FakeRawTB() + excinfo.traceback = Traceback(tb) + + fail = IOError() + repr = pr.repr_excinfo(excinfo) + assert repr.reprtraceback.reprentries[0].lines[0] == "> ???" + + fail = py.error.ENOENT + repr = pr.repr_excinfo(excinfo) + assert repr.reprtraceback.reprentries[0].lines[0] == "> ???" + + + def test_repr_local(self): + p = FormattedExcinfo(showlocals=True) + loc = {'y': 5, 'z': 7, 'x': 3, '@x': 2, '__builtins__': {}} + reprlocals = p.repr_locals(loc) + assert reprlocals.lines + assert reprlocals.lines[0] == '__builtins__ = <builtins>' + assert reprlocals.lines[1] == 'x = 3' + assert reprlocals.lines[2] == 'y = 5' + assert reprlocals.lines[3] == 'z = 7' + + def test_repr_tracebackentry_lines(self, importasmod): + mod = importasmod(""" + def func1(): + raise ValueError("hello\\nworld") + """) + excinfo = py.test.raises(ValueError, mod.func1) + excinfo.traceback = excinfo.traceback.filter() + p = FormattedExcinfo() + reprtb = p.repr_traceback_entry(excinfo.traceback[-1]) + + # test as intermittent entry + lines = reprtb.lines + assert lines[0] == ' def func1():' + assert lines[1] == '> raise ValueError("hello\\nworld")' + + # test as last entry + p = FormattedExcinfo(showlocals=True) + repr_entry = p.repr_traceback_entry(excinfo.traceback[-1], excinfo) + lines = repr_entry.lines + assert lines[0] == ' def func1():' + assert lines[1] == '> raise ValueError("hello\\nworld")' + assert lines[2] == 'E ValueError: hello' + assert lines[3] == 'E world' + assert not lines[4:] + + loc = repr_entry.reprlocals is not None + loc = repr_entry.reprfileloc + assert loc.path == mod.__file__ + assert loc.lineno == 3 + #assert loc.message == "ValueError: hello" + + def test_repr_tracebackentry_lines(self, importasmod): + mod = importasmod(""" + def func1(m, x, y, z): + raise ValueError("hello\\nworld") + """) + excinfo = py.test.raises(ValueError, mod.func1, "m"*90, 5, 13, "z"*120) + excinfo.traceback = excinfo.traceback.filter() + entry = excinfo.traceback[-1] + p = FormattedExcinfo(funcargs=True) + reprfuncargs = p.repr_args(entry) + assert reprfuncargs.args[0] == ('m', repr("m"*90)) + assert reprfuncargs.args[1] == ('x', '5') + assert reprfuncargs.args[2] == ('y', '13') + assert reprfuncargs.args[3] == ('z', repr("z" * 120)) + + p = FormattedExcinfo(funcargs=True) + repr_entry = p.repr_traceback_entry(entry) + assert repr_entry.reprfuncargs.args == reprfuncargs.args + tw = TWMock() + repr_entry.toterminal(tw) + assert tw.lines[0] == "m = " + repr('m' * 90) + assert tw.lines[1] == "x = 5, y = 13" + assert tw.lines[2] == "z = " + repr('z' * 120) + + def test_repr_tracebackentry_lines_var_kw_args(self, importasmod): + mod = importasmod(""" + def func1(x, *y, **z): + raise ValueError("hello\\nworld") + """) + excinfo = py.test.raises(ValueError, mod.func1, 'a', 'b', c='d') + excinfo.traceback = excinfo.traceback.filter() + entry = excinfo.traceback[-1] + p = FormattedExcinfo(funcargs=True) + reprfuncargs = p.repr_args(entry) + assert reprfuncargs.args[0] == ('x', repr('a')) + assert reprfuncargs.args[1] == ('y', repr(('b',))) + assert reprfuncargs.args[2] == ('z', repr({'c': 'd'})) + + p = FormattedExcinfo(funcargs=True) + repr_entry = p.repr_traceback_entry(entry) + assert repr_entry.reprfuncargs.args == reprfuncargs.args + tw = TWMock() + repr_entry.toterminal(tw) + assert tw.lines[0] == "x = 'a', y = ('b',), z = {'c': 'd'}" + + def test_repr_tracebackentry_short(self, importasmod): + mod = importasmod(""" + def func1(): + raise ValueError("hello") + def entry(): + func1() + """) + excinfo = py.test.raises(ValueError, mod.entry) + p = FormattedExcinfo(style="short") + reprtb = p.repr_traceback_entry(excinfo.traceback[-2]) + lines = reprtb.lines + basename = py.path.local(mod.__file__).basename + assert lines[0] == ' func1()' + assert basename in str(reprtb.reprfileloc.path) + assert reprtb.reprfileloc.lineno == 5 + + # test last entry + p = FormattedExcinfo(style="short") + reprtb = p.repr_traceback_entry(excinfo.traceback[-1], excinfo) + lines = reprtb.lines + assert lines[0] == ' raise ValueError("hello")' + assert lines[1] == 'E ValueError: hello' + assert basename in str(reprtb.reprfileloc.path) + assert reprtb.reprfileloc.lineno == 3 + + def test_repr_tracebackentry_no(self, importasmod): + mod = importasmod(""" + def func1(): + raise ValueError("hello") + def entry(): + func1() + """) + excinfo = py.test.raises(ValueError, mod.entry) + p = FormattedExcinfo(style="no") + p.repr_traceback_entry(excinfo.traceback[-2]) + + p = FormattedExcinfo(style="no") + reprentry = p.repr_traceback_entry(excinfo.traceback[-1], excinfo) + lines = reprentry.lines + assert lines[0] == 'E ValueError: hello' + assert not lines[1:] + + def test_repr_traceback_tbfilter(self, importasmod): + mod = importasmod(""" + def f(x): + raise ValueError(x) + def entry(): + f(0) + """) + excinfo = py.test.raises(ValueError, mod.entry) + p = FormattedExcinfo(tbfilter=True) + reprtb = p.repr_traceback(excinfo) + assert len(reprtb.reprentries) == 2 + p = FormattedExcinfo(tbfilter=False) + reprtb = p.repr_traceback(excinfo) + assert len(reprtb.reprentries) == 3 + + def test_traceback_short_no_source(self, importasmod, monkeypatch): + mod = importasmod(""" + def func1(): + raise ValueError("hello") + def entry(): + func1() + """) + try: + mod.entry() + except ValueError: + excinfo = py.code.ExceptionInfo() + from py._code.code import Code + monkeypatch.setattr(Code, 'path', 'bogus') + excinfo.traceback[0].frame.code.path = "bogus" + p = FormattedExcinfo(style="short") + reprtb = p.repr_traceback_entry(excinfo.traceback[-2]) + lines = reprtb.lines + last_p = FormattedExcinfo(style="short") + last_reprtb = last_p.repr_traceback_entry(excinfo.traceback[-1], excinfo) + last_lines = last_reprtb.lines + monkeypatch.undo() + basename = py.path.local(mod.__file__).basename + assert lines[0] == ' func1()' + + assert last_lines[0] == ' raise ValueError("hello")' + assert last_lines[1] == 'E ValueError: hello' + + def test_repr_traceback_and_excinfo(self, importasmod): + mod = importasmod(""" + def f(x): + raise ValueError(x) + def entry(): + f(0) + """) + excinfo = py.test.raises(ValueError, mod.entry) + + for style in ("long", "short"): + p = FormattedExcinfo(style=style) + reprtb = p.repr_traceback(excinfo) + assert len(reprtb.reprentries) == 2 + assert reprtb.style == style + assert not reprtb.extraline + repr = p.repr_excinfo(excinfo) + assert repr.reprtraceback + assert len(repr.reprtraceback.reprentries) == len(reprtb.reprentries) + assert repr.reprcrash.path.endswith("mod.py") + assert repr.reprcrash.message == "ValueError: 0" + + def test_repr_traceback_with_invalid_cwd(self, importasmod, monkeypatch): + mod = importasmod(""" + def f(x): + raise ValueError(x) + def entry(): + f(0) + """) + excinfo = py.test.raises(ValueError, mod.entry) + + p = FormattedExcinfo() + def raiseos(): + raise OSError(2) + monkeypatch.setattr('os.getcwd', raiseos) + assert p._makepath(__file__) == __file__ + reprtb = p.repr_traceback(excinfo) + + @broken_on_modern_pytest + def test_repr_excinfo_addouterr(self, importasmod): + mod = importasmod(""" + def entry(): + raise ValueError() + """) + excinfo = py.test.raises(ValueError, mod.entry) + repr = excinfo.getrepr() + repr.addsection("title", "content") + twmock = TWMock() + repr.toterminal(twmock) + assert twmock.lines[-1] == "content" + assert twmock.lines[-2] == ("-", "title") + + def test_repr_excinfo_reprcrash(self, importasmod): + mod = importasmod(""" + def entry(): + raise ValueError() + """) + excinfo = py.test.raises(ValueError, mod.entry) + repr = excinfo.getrepr() + assert repr.reprcrash.path.endswith("mod.py") + assert repr.reprcrash.lineno == 3 + assert repr.reprcrash.message == "ValueError" + assert str(repr.reprcrash).endswith("mod.py:3: ValueError") + + def test_repr_traceback_recursion(self, importasmod): + mod = importasmod(""" + def rec2(x): + return rec1(x+1) + def rec1(x): + return rec2(x-1) + def entry(): + rec1(42) + """) + excinfo = py.test.raises(RuntimeError, mod.entry) + + for style in ("short", "long", "no"): + p = FormattedExcinfo(style="short") + reprtb = p.repr_traceback(excinfo) + assert reprtb.extraline == "!!! Recursion detected (same locals & position)" + assert str(reprtb) + + @broken_on_modern_pytest + def test_tb_entry_AssertionError(self, importasmod): + # probably this test is a bit redundant + # as py/magic/testing/test_assertion.py + # already tests correctness of + # assertion-reinterpretation logic + mod = importasmod(""" + def somefunc(): + x = 1 + assert x == 2 + """) + excinfo = py.test.raises(AssertionError, mod.somefunc) + + p = FormattedExcinfo() + reprentry = p.repr_traceback_entry(excinfo.traceback[-1], excinfo) + lines = reprentry.lines + assert lines[-1] == "E assert 1 == 2" + + def test_reprexcinfo_getrepr(self, importasmod): + mod = importasmod(""" + def f(x): + raise ValueError(x) + def entry(): + f(0) + """) + try: + mod.entry() + except ValueError: + excinfo = py.code.ExceptionInfo() + + for style in ("short", "long", "no"): + for showlocals in (True, False): + repr = excinfo.getrepr(style=style, showlocals=showlocals) + assert isinstance(repr, ReprExceptionInfo) + assert repr.reprtraceback.style == style + + def test_reprexcinfo_unicode(self): + from py._code.code import TerminalRepr + class MyRepr(TerminalRepr): + def toterminal(self, tw): + tw.line(py.builtin._totext("я", "utf-8")) + x = py.builtin._totext(MyRepr()) + assert x == py.builtin._totext("я", "utf-8") + + @broken_on_modern_pytest + def test_toterminal_long(self, importasmod): + mod = importasmod(""" + def g(x): + raise ValueError(x) + def f(): + g(3) + """) + excinfo = py.test.raises(ValueError, mod.f) + excinfo.traceback = excinfo.traceback.filter() + repr = excinfo.getrepr() + tw = TWMock() + repr.toterminal(tw) + assert tw.lines[0] == "" + tw.lines.pop(0) + assert tw.lines[0] == " def f():" + assert tw.lines[1] == "> g(3)" + assert tw.lines[2] == "" + assert tw.lines[3].endswith("mod.py:5: ") + assert tw.lines[4] == ("_ ", None) + assert tw.lines[5] == "" + assert tw.lines[6] == " def g(x):" + assert tw.lines[7] == "> raise ValueError(x)" + assert tw.lines[8] == "E ValueError: 3" + assert tw.lines[9] == "" + assert tw.lines[10].endswith("mod.py:3: ValueError") + + @broken_on_modern_pytest + def test_toterminal_long_missing_source(self, importasmod, tmpdir): + mod = importasmod(""" + def g(x): + raise ValueError(x) + def f(): + g(3) + """) + excinfo = py.test.raises(ValueError, mod.f) + tmpdir.join('mod.py').remove() + excinfo.traceback = excinfo.traceback.filter() + repr = excinfo.getrepr() + tw = TWMock() + repr.toterminal(tw) + assert tw.lines[0] == "" + tw.lines.pop(0) + assert tw.lines[0] == "> ???" + assert tw.lines[1] == "" + assert tw.lines[2].endswith("mod.py:5: ") + assert tw.lines[3] == ("_ ", None) + assert tw.lines[4] == "" + assert tw.lines[5] == "> ???" + assert tw.lines[6] == "E ValueError: 3" + assert tw.lines[7] == "" + assert tw.lines[8].endswith("mod.py:3: ValueError") + + @broken_on_modern_pytest + def test_toterminal_long_incomplete_source(self, importasmod, tmpdir): + mod = importasmod(""" + def g(x): + raise ValueError(x) + def f(): + g(3) + """) + excinfo = py.test.raises(ValueError, mod.f) + tmpdir.join('mod.py').write('asdf') + excinfo.traceback = excinfo.traceback.filter() + repr = excinfo.getrepr() + tw = TWMock() + repr.toterminal(tw) + assert tw.lines[0] == "" + tw.lines.pop(0) + assert tw.lines[0] == "> ???" + assert tw.lines[1] == "" + assert tw.lines[2].endswith("mod.py:5: ") + assert tw.lines[3] == ("_ ", None) + assert tw.lines[4] == "" + assert tw.lines[5] == "> ???" + assert tw.lines[6] == "E ValueError: 3" + assert tw.lines[7] == "" + assert tw.lines[8].endswith("mod.py:3: ValueError") + + @broken_on_modern_pytest + def test_toterminal_long_filenames(self, importasmod): + mod = importasmod(""" + def f(): + raise ValueError() + """) + excinfo = py.test.raises(ValueError, mod.f) + tw = TWMock() + path = py.path.local(mod.__file__) + old = path.dirpath().chdir() + try: + repr = excinfo.getrepr(abspath=False) + repr.toterminal(tw) + line = tw.lines[-1] + x = py.path.local().bestrelpath(path) + if len(x) < len(str(path)): + assert line == "mod.py:3: ValueError" + + repr = excinfo.getrepr(abspath=True) + repr.toterminal(tw) + line = tw.lines[-1] + assert line == "%s:3: ValueError" %(path,) + finally: + old.chdir() + + @pytest.mark.parametrize('style', ("long", "short", "no")) + @pytest.mark.parametrize('showlocals', (True, False), + ids=['locals', 'nolocals']) + @pytest.mark.parametrize('tbfilter', (True, False), + ids=['tbfilter', 'nofilter']) + @pytest.mark.parametrize('funcargs', (True, False), + ids=['funcargs', 'nofuncargs']) + def test_format_excinfo(self, importasmod, + style, showlocals, tbfilter, funcargs): + + mod = importasmod(""" + def g(x): + raise ValueError(x) + def f(): + g(3) + """) + excinfo = py.test.raises(ValueError, mod.f) + tw = py.io.TerminalWriter(stringio=True) + repr = excinfo.getrepr( + style=style, + showlocals=showlocals, + funcargs=funcargs, + tbfilter=tbfilter + ) + repr.toterminal(tw) + assert tw.stringio.getvalue() + + @broken_on_modern_pytest + def test_native_style(self): + excinfo = self.excinfo_from_exec(""" + assert 0 + """) + repr = excinfo.getrepr(style='native') + assert "assert 0" in str(repr.reprcrash) + s = str(repr) + assert s.startswith('Traceback (most recent call last):\n File') + assert s.endswith('\nAssertionError: assert 0') + assert 'exec (source.compile())' in s + assert s.count('assert 0') == 2 + + @broken_on_modern_pytest + def test_traceback_repr_style(self, importasmod): + mod = importasmod(""" + def f(): + g() + def g(): + h() + def h(): + i() + def i(): + raise ValueError() + """) + excinfo = py.test.raises(ValueError, mod.f) + excinfo.traceback = excinfo.traceback.filter() + excinfo.traceback[1].set_repr_style("short") + excinfo.traceback[2].set_repr_style("short") + r = excinfo.getrepr(style="long") + tw = TWMock() + r.toterminal(tw) + for line in tw.lines: print (line) + assert tw.lines[0] == "" + assert tw.lines[1] == " def f():" + assert tw.lines[2] == "> g()" + assert tw.lines[3] == "" + assert tw.lines[4].endswith("mod.py:3: ") + assert tw.lines[5] == ("_ ", None) + assert tw.lines[6].endswith("in g") + assert tw.lines[7] == " h()" + assert tw.lines[8].endswith("in h") + assert tw.lines[9] == " i()" + assert tw.lines[10] == ("_ ", None) + assert tw.lines[11] == "" + assert tw.lines[12] == " def i():" + assert tw.lines[13] == "> raise ValueError()" + assert tw.lines[14] == "E ValueError" + assert tw.lines[15] == "" + assert tw.lines[16].endswith("mod.py:9: ValueError") diff --git a/third_party/python/py/testing/code/test_source.py b/third_party/python/py/testing/code/test_source.py new file mode 100644 index 0000000000..3492761a4e --- /dev/null +++ b/third_party/python/py/testing/code/test_source.py @@ -0,0 +1,648 @@ +from py.code import Source +import py +import sys +import inspect + +from py._code.source import _ast +if _ast is not None: + astonly = py.test.mark.nothing +else: + astonly = py.test.mark.xfail("True", reason="only works with AST-compile") + +failsonjython = py.test.mark.xfail("sys.platform.startswith('java')") + +def test_source_str_function(): + x = Source("3") + assert str(x) == "3" + + x = Source(" 3") + assert str(x) == "3" + + x = Source(""" + 3 + """, rstrip=False) + assert str(x) == "\n3\n " + + x = Source(""" + 3 + """, rstrip=True) + assert str(x) == "\n3" + +def test_unicode(): + try: + unicode + except NameError: + return + x = Source(unicode("4")) + assert str(x) == "4" + co = py.code.compile(unicode('u"\xc3\xa5"', 'utf8'), mode='eval') + val = eval(co) + assert isinstance(val, unicode) + +def test_source_from_function(): + source = py.code.Source(test_source_str_function) + assert str(source).startswith('def test_source_str_function():') + +def test_source_from_method(): + class TestClass: + def test_method(self): + pass + source = py.code.Source(TestClass().test_method) + assert source.lines == ["def test_method(self):", + " pass"] + +def test_source_from_lines(): + lines = ["a \n", "b\n", "c"] + source = py.code.Source(lines) + assert source.lines == ['a ', 'b', 'c'] + +def test_source_from_inner_function(): + def f(): + pass + source = py.code.Source(f, deindent=False) + assert str(source).startswith(' def f():') + source = py.code.Source(f) + assert str(source).startswith('def f():') + +def test_source_putaround_simple(): + source = Source("raise ValueError") + source = source.putaround( + "try:", """\ + except ValueError: + x = 42 + else: + x = 23""") + assert str(source)=="""\ +try: + raise ValueError +except ValueError: + x = 42 +else: + x = 23""" + +def test_source_putaround(): + source = Source() + source = source.putaround(""" + if 1: + x=1 + """) + assert str(source).strip() == "if 1:\n x=1" + +def test_source_strips(): + source = Source("") + assert source == Source() + assert str(source) == '' + assert source.strip() == source + +def test_source_strip_multiline(): + source = Source() + source.lines = ["", " hello", " "] + source2 = source.strip() + assert source2.lines == [" hello"] + +def test_syntaxerror_rerepresentation(): + ex = py.test.raises(SyntaxError, py.code.compile, 'xyz xyz') + assert ex.value.lineno == 1 + assert ex.value.offset in (4,7) # XXX pypy/jython versus cpython? + assert ex.value.text.strip(), 'x x' + +def test_isparseable(): + assert Source("hello").isparseable() + assert Source("if 1:\n pass").isparseable() + assert Source(" \nif 1:\n pass").isparseable() + assert not Source("if 1:\n").isparseable() + assert not Source(" \nif 1:\npass").isparseable() + assert not Source(chr(0)).isparseable() + +class TestAccesses: + source = Source("""\ + def f(x): + pass + def g(x): + pass + """) + def test_getrange(self): + x = self.source[0:2] + assert x.isparseable() + assert len(x.lines) == 2 + assert str(x) == "def f(x):\n pass" + + def test_getline(self): + x = self.source[0] + assert x == "def f(x):" + + def test_len(self): + assert len(self.source) == 4 + + def test_iter(self): + l = [x for x in self.source] + assert len(l) == 4 + +class TestSourceParsingAndCompiling: + source = Source("""\ + def f(x): + assert (x == + 3 + + 4) + """).strip() + + def test_compile(self): + co = py.code.compile("x=3") + d = {} + exec (co, d) + assert d['x'] == 3 + + def test_compile_and_getsource_simple(self): + co = py.code.compile("x=3") + exec (co) + source = py.code.Source(co) + assert str(source) == "x=3" + + def test_compile_and_getsource_through_same_function(self): + def gensource(source): + return py.code.compile(source) + co1 = gensource(""" + def f(): + raise KeyError() + """) + co2 = gensource(""" + def f(): + raise ValueError() + """) + source1 = inspect.getsource(co1) + assert 'KeyError' in source1 + source2 = inspect.getsource(co2) + assert 'ValueError' in source2 + + def test_getstatement(self): + #print str(self.source) + ass = str(self.source[1:]) + for i in range(1, 4): + #print "trying start in line %r" % self.source[i] + s = self.source.getstatement(i) + #x = s.deindent() + assert str(s) == ass + + def test_getstatementrange_triple_quoted(self): + #print str(self.source) + source = Source("""hello(''' + ''')""") + s = source.getstatement(0) + assert s == str(source) + s = source.getstatement(1) + assert s == str(source) + + @astonly + def test_getstatementrange_within_constructs(self): + source = Source("""\ + try: + try: + raise ValueError + except SomeThing: + pass + finally: + 42 + """) + assert len(source) == 7 + # check all lineno's that could occur in a traceback + #assert source.getstatementrange(0) == (0, 7) + #assert source.getstatementrange(1) == (1, 5) + assert source.getstatementrange(2) == (2, 3) + assert source.getstatementrange(3) == (3, 4) + assert source.getstatementrange(4) == (4, 5) + #assert source.getstatementrange(5) == (0, 7) + assert source.getstatementrange(6) == (6, 7) + + def test_getstatementrange_bug(self): + source = Source("""\ + try: + x = ( + y + + z) + except: + pass + """) + assert len(source) == 6 + assert source.getstatementrange(2) == (1, 4) + + def test_getstatementrange_bug2(self): + source = Source("""\ + assert ( + 33 + == + [ + X(3, + b=1, c=2 + ), + ] + ) + """) + assert len(source) == 9 + assert source.getstatementrange(5) == (0, 9) + + def test_getstatementrange_ast_issue58(self): + source = Source("""\ + + def test_some(): + for a in [a for a in + CAUSE_ERROR]: pass + + x = 3 + """) + assert getstatement(2, source).lines == source.lines[2:3] + assert getstatement(3, source).lines == source.lines[3:4] + + def test_getstatementrange_out_of_bounds_py3(self): + source = Source("if xxx:\n from .collections import something") + r = source.getstatementrange(1) + assert r == (1,2) + + def test_getstatementrange_with_syntaxerror_issue7(self): + source = Source(":") + py.test.raises(SyntaxError, lambda: source.getstatementrange(0)) + + def test_compile_to_ast(self): + import ast + source = Source("x = 4") + mod = source.compile(flag=ast.PyCF_ONLY_AST) + assert isinstance(mod, ast.Module) + compile(mod, "<filename>", "exec") + + def test_compile_and_getsource(self): + co = self.source.compile() + py.builtin.exec_(co, globals()) + f(7) + excinfo = py.test.raises(AssertionError, "f(6)") + frame = excinfo.traceback[-1].frame + stmt = frame.code.fullsource.getstatement(frame.lineno) + #print "block", str(block) + assert str(stmt).strip().startswith('assert') + + def test_compilefuncs_and_path_sanity(self): + def check(comp, name): + co = comp(self.source, name) + if not name: + expected = "codegen %s:%d>" %(mypath, mylineno+2+1) + else: + expected = "codegen %r %s:%d>" % (name, mypath, mylineno+2+1) + fn = co.co_filename + assert fn.endswith(expected) + + mycode = py.code.Code(self.test_compilefuncs_and_path_sanity) + mylineno = mycode.firstlineno + mypath = mycode.path + + for comp in py.code.compile, py.code.Source.compile: + for name in '', None, 'my': + yield check, comp, name + + def test_offsetless_synerr(self): + py.test.raises(SyntaxError, py.code.compile, "lambda a,a: 0", mode='eval') + +def test_getstartingblock_singleline(): + class A: + def __init__(self, *args): + frame = sys._getframe(1) + self.source = py.code.Frame(frame).statement + + x = A('x', 'y') + + l = [i for i in x.source.lines if i.strip()] + assert len(l) == 1 + +def test_getstartingblock_multiline(): + class A: + def __init__(self, *args): + frame = sys._getframe(1) + self.source = py.code.Frame(frame).statement + + x = A('x', + 'y' \ + , + 'z') + + l = [i for i in x.source.lines if i.strip()] + assert len(l) == 4 + +def test_getline_finally(): + def c(): pass + excinfo = py.test.raises(TypeError, """ + teardown = None + try: + c(1) + finally: + if teardown: + teardown() + """) + source = excinfo.traceback[-1].statement + assert str(source).strip() == 'c(1)' + +def test_getfuncsource_dynamic(): + source = """ + def f(): + raise ValueError + + def g(): pass + """ + co = py.code.compile(source) + py.builtin.exec_(co, globals()) + assert str(py.code.Source(f)).strip() == 'def f():\n raise ValueError' + assert str(py.code.Source(g)).strip() == 'def g(): pass' + + +def test_getfuncsource_with_multine_string(): + def f(): + c = '''while True: + pass +''' + assert str(py.code.Source(f)).strip() == "def f():\n c = '''while True:\n pass\n'''" + + +def test_deindent(): + from py._code.source import deindent as deindent + assert deindent(['\tfoo', '\tbar', ]) == ['foo', 'bar'] + + def f(): + c = '''while True: + pass +''' + import inspect + lines = deindent(inspect.getsource(f).splitlines()) + assert lines == ["def f():", " c = '''while True:", " pass", "'''"] + + source = """ + def f(): + def g(): + pass + """ + lines = deindent(source.splitlines()) + assert lines == ['', 'def f():', ' def g():', ' pass', ' '] + +def test_source_of_class_at_eof_without_newline(tmpdir): + # this test fails because the implicit inspect.getsource(A) below + # does not return the "x = 1" last line. + source = py.code.Source(''' + class A(object): + def method(self): + x = 1 + ''') + path = tmpdir.join("a.py") + path.write(source) + s2 = py.code.Source(tmpdir.join("a.py").pyimport().A) + assert str(source).strip() == str(s2).strip() + +if True: + def x(): + pass + +def test_getsource_fallback(): + from py._code.source import getsource + expected = """def x(): + pass""" + src = getsource(x) + assert src == expected + +def test_idem_compile_and_getsource(): + from py._code.source import getsource + expected = "def x(): pass" + co = py.code.compile(expected) + src = getsource(co) + assert src == expected + +def test_findsource_fallback(): + from py._code.source import findsource + src, lineno = findsource(x) + assert 'test_findsource_simple' in str(src) + assert src[lineno] == ' def x():' + +def test_findsource(): + from py._code.source import findsource + co = py.code.compile("""if 1: + def x(): + pass +""") + + src, lineno = findsource(co) + assert 'if 1:' in str(src) + + d = {} + eval(co, d) + src, lineno = findsource(d['x']) + assert 'if 1:' in str(src) + assert src[lineno] == " def x():" + + +def test_getfslineno(): + from py.code import getfslineno + + def f(x): + pass + + fspath, lineno = getfslineno(f) + + assert fspath.basename == "test_source.py" + assert lineno == py.code.getrawcode(f).co_firstlineno-1 # see findsource + + class A(object): + pass + + fspath, lineno = getfslineno(A) + + _, A_lineno = inspect.findsource(A) + assert fspath.basename == "test_source.py" + assert lineno == A_lineno + + assert getfslineno(3) == ("", -1) + class B: + pass + B.__name__ = "B2" + assert getfslineno(B)[1] == -1 + +def test_code_of_object_instance_with_call(): + class A: + pass + py.test.raises(TypeError, lambda: py.code.Source(A())) + class WithCall: + def __call__(self): + pass + + code = py.code.Code(WithCall()) + assert 'pass' in str(code.source()) + + class Hello(object): + def __call__(self): + pass + py.test.raises(TypeError, lambda: py.code.Code(Hello)) + + +def getstatement(lineno, source): + from py._code.source import getstatementrange_ast + source = py.code.Source(source, deindent=False) + ast, start, end = getstatementrange_ast(lineno, source) + return source[start:end] + +def test_oneline(): + source = getstatement(0, "raise ValueError") + assert str(source) == "raise ValueError" + +def test_comment_and_no_newline_at_end(): + from py._code.source import getstatementrange_ast + source = Source(['def test_basic_complex():', + ' assert 1 == 2', + '# vim: filetype=pyopencl:fdm=marker']) + ast, start, end = getstatementrange_ast(1, source) + assert end == 2 + +def test_oneline_and_comment(): + source = getstatement(0, "raise ValueError\n#hello") + assert str(source) == "raise ValueError" + +def test_comments(): + source = '''def test(): + "comment 1" + x = 1 + # comment 2 + # comment 3 + + assert False + +""" +comment 4 +""" +''' + for line in range(2,6): + assert str(getstatement(line, source)) == ' x = 1' + for line in range(6,10): + assert str(getstatement(line, source)) == ' assert False' + assert str(getstatement(10, source)) == '"""' + +def test_comment_in_statement(): + source = '''test(foo=1, + # comment 1 + bar=2) +''' + for line in range(1,3): + assert str(getstatement(line, source)) == \ + 'test(foo=1,\n # comment 1\n bar=2)' + +def test_single_line_else(): + source = getstatement(1, "if False: 2\nelse: 3") + assert str(source) == "else: 3" + +def test_single_line_finally(): + source = getstatement(1, "try: 1\nfinally: 3") + assert str(source) == "finally: 3" + +def test_issue55(): + source = ('def round_trip(dinp):\n assert 1 == dinp\n' + 'def test_rt():\n round_trip("""\n""")\n') + s = getstatement(3, source) + assert str(s) == ' round_trip("""\n""")' + + +def XXXtest_multiline(): + source = getstatement(0, """\ +raise ValueError( + 23 +) +x = 3 +""") + assert str(source) == "raise ValueError(\n 23\n)" + +class TestTry: + pytestmark = astonly + source = """\ +try: + raise ValueError +except Something: + raise IndexError(1) +else: + raise KeyError() +""" + + def test_body(self): + source = getstatement(1, self.source) + assert str(source) == " raise ValueError" + + def test_except_line(self): + source = getstatement(2, self.source) + assert str(source) == "except Something:" + + def test_except_body(self): + source = getstatement(3, self.source) + assert str(source) == " raise IndexError(1)" + + def test_else(self): + source = getstatement(5, self.source) + assert str(source) == " raise KeyError()" + +class TestTryFinally: + source = """\ +try: + raise ValueError +finally: + raise IndexError(1) +""" + + def test_body(self): + source = getstatement(1, self.source) + assert str(source) == " raise ValueError" + + def test_finally(self): + source = getstatement(3, self.source) + assert str(source) == " raise IndexError(1)" + + + +class TestIf: + pytestmark = astonly + source = """\ +if 1: + y = 3 +elif False: + y = 5 +else: + y = 7 +""" + + def test_body(self): + source = getstatement(1, self.source) + assert str(source) == " y = 3" + + def test_elif_clause(self): + source = getstatement(2, self.source) + assert str(source) == "elif False:" + + def test_elif(self): + source = getstatement(3, self.source) + assert str(source) == " y = 5" + + def test_else(self): + source = getstatement(5, self.source) + assert str(source) == " y = 7" + +def test_semicolon(): + s = """\ +hello ; pytest.skip() +""" + source = getstatement(0, s) + assert str(source) == s.strip() + +def test_def_online(): + s = """\ +def func(): raise ValueError(42) + +def something(): + pass +""" + source = getstatement(0, s) + assert str(source) == "def func(): raise ValueError(42)" + +def XXX_test_expression_multiline(): + source = """\ +something +''' +'''""" + result = getstatement(1, source) + assert str(result) == "'''\n'''" + diff --git a/third_party/python/py/testing/conftest.py b/third_party/python/py/testing/conftest.py new file mode 100644 index 0000000000..0f956b3dd2 --- /dev/null +++ b/third_party/python/py/testing/conftest.py @@ -0,0 +1,3 @@ + +pytest_plugins = "pytester", + diff --git a/third_party/python/py/testing/io_/__init__.py b/third_party/python/py/testing/io_/__init__.py new file mode 100644 index 0000000000..792d600548 --- /dev/null +++ b/third_party/python/py/testing/io_/__init__.py @@ -0,0 +1 @@ +# diff --git a/third_party/python/py/testing/io_/test_capture.py b/third_party/python/py/testing/io_/test_capture.py new file mode 100644 index 0000000000..b5fedd0abc --- /dev/null +++ b/third_party/python/py/testing/io_/test_capture.py @@ -0,0 +1,501 @@ +from __future__ import with_statement + +import os, sys +import py + +needsdup = py.test.mark.skipif("not hasattr(os, 'dup')") + +from py.builtin import print_ + +if sys.version_info >= (3,0): + def tobytes(obj): + if isinstance(obj, str): + obj = obj.encode('UTF-8') + assert isinstance(obj, bytes) + return obj + def totext(obj): + if isinstance(obj, bytes): + obj = str(obj, 'UTF-8') + assert isinstance(obj, str) + return obj +else: + def tobytes(obj): + if isinstance(obj, unicode): + obj = obj.encode('UTF-8') + assert isinstance(obj, str) + return obj + def totext(obj): + if isinstance(obj, str): + obj = unicode(obj, 'UTF-8') + assert isinstance(obj, unicode) + return obj + +def oswritebytes(fd, obj): + os.write(fd, tobytes(obj)) + +class TestTextIO: + def test_text(self): + f = py.io.TextIO() + f.write("hello") + s = f.getvalue() + assert s == "hello" + f.close() + + def test_unicode_and_str_mixture(self): + f = py.io.TextIO() + if sys.version_info >= (3,0): + f.write("\u00f6") + py.test.raises(TypeError, "f.write(bytes('hello', 'UTF-8'))") + else: + f.write(unicode("\u00f6", 'UTF-8')) + f.write("hello") # bytes + s = f.getvalue() + f.close() + assert isinstance(s, unicode) + +def test_bytes_io(): + f = py.io.BytesIO() + f.write(tobytes("hello")) + py.test.raises(TypeError, "f.write(totext('hello'))") + s = f.getvalue() + assert s == tobytes("hello") + +def test_dontreadfrominput(): + from py._io.capture import DontReadFromInput + f = DontReadFromInput() + assert not f.isatty() + py.test.raises(IOError, f.read) + py.test.raises(IOError, f.readlines) + py.test.raises(IOError, iter, f) + py.test.raises(ValueError, f.fileno) + f.close() # just for completeness + +def pytest_funcarg__tmpfile(request): + testdir = request.getfuncargvalue("testdir") + f = testdir.makepyfile("").open('wb+') + request.addfinalizer(f.close) + return f + +@needsdup +def test_dupfile(tmpfile): + flist = [] + for i in range(5): + nf = py.io.dupfile(tmpfile, encoding="utf-8") + assert nf != tmpfile + assert nf.fileno() != tmpfile.fileno() + assert nf not in flist + print_(i, end="", file=nf) + flist.append(nf) + for i in range(5): + f = flist[i] + f.close() + tmpfile.seek(0) + s = tmpfile.read() + assert "01234" in repr(s) + tmpfile.close() + +def test_dupfile_no_mode(): + """ + dupfile should trap an AttributeError and return f if no mode is supplied. + """ + class SomeFileWrapper(object): + "An object with a fileno method but no mode attribute" + def fileno(self): + return 1 + tmpfile = SomeFileWrapper() + assert py.io.dupfile(tmpfile) is tmpfile + with py.test.raises(AttributeError): + py.io.dupfile(tmpfile, raising=True) + +def lsof_check(func): + pid = os.getpid() + try: + out = py.process.cmdexec("lsof -p %d" % pid) + except py.process.cmdexec.Error: + py.test.skip("could not run 'lsof'") + func() + out2 = py.process.cmdexec("lsof -p %d" % pid) + len1 = len([x for x in out.split("\n") if "REG" in x]) + len2 = len([x for x in out2.split("\n") if "REG" in x]) + assert len2 < len1 + 3, out2 + +class TestFDCapture: + pytestmark = needsdup + + def test_not_now(self, tmpfile): + fd = tmpfile.fileno() + cap = py.io.FDCapture(fd, now=False) + data = tobytes("hello") + os.write(fd, data) + f = cap.done() + s = f.read() + assert not s + cap = py.io.FDCapture(fd, now=False) + cap.start() + os.write(fd, data) + f = cap.done() + s = f.read() + assert s == "hello" + + def test_simple(self, tmpfile): + fd = tmpfile.fileno() + cap = py.io.FDCapture(fd) + data = tobytes("hello") + os.write(fd, data) + f = cap.done() + s = f.read() + assert s == "hello" + f.close() + + def test_simple_many(self, tmpfile): + for i in range(10): + self.test_simple(tmpfile) + + def test_simple_many_check_open_files(self, tmpfile): + lsof_check(lambda: self.test_simple_many(tmpfile)) + + def test_simple_fail_second_start(self, tmpfile): + fd = tmpfile.fileno() + cap = py.io.FDCapture(fd) + f = cap.done() + py.test.raises(ValueError, cap.start) + f.close() + + def test_stderr(self): + cap = py.io.FDCapture(2, patchsys=True) + print_("hello", file=sys.stderr) + f = cap.done() + s = f.read() + assert s == "hello\n" + + def test_stdin(self, tmpfile): + tmpfile.write(tobytes("3")) + tmpfile.seek(0) + cap = py.io.FDCapture(0, tmpfile=tmpfile) + # check with os.read() directly instead of raw_input(), because + # sys.stdin itself may be redirected (as py.test now does by default) + x = os.read(0, 100).strip() + f = cap.done() + assert x == tobytes("3") + + def test_writeorg(self, tmpfile): + data1, data2 = tobytes("foo"), tobytes("bar") + try: + cap = py.io.FDCapture(tmpfile.fileno()) + tmpfile.write(data1) + cap.writeorg(data2) + finally: + tmpfile.close() + f = cap.done() + scap = f.read() + assert scap == totext(data1) + stmp = open(tmpfile.name, 'rb').read() + assert stmp == data2 + + +class TestStdCapture: + def getcapture(self, **kw): + return py.io.StdCapture(**kw) + + def test_capturing_done_simple(self): + cap = self.getcapture() + sys.stdout.write("hello") + sys.stderr.write("world") + outfile, errfile = cap.done() + s = outfile.read() + assert s == "hello" + s = errfile.read() + assert s == "world" + + def test_capturing_reset_simple(self): + cap = self.getcapture() + print("hello world") + sys.stderr.write("hello error\n") + out, err = cap.reset() + assert out == "hello world\n" + assert err == "hello error\n" + + def test_capturing_readouterr(self): + cap = self.getcapture() + try: + print ("hello world") + sys.stderr.write("hello error\n") + out, err = cap.readouterr() + assert out == "hello world\n" + assert err == "hello error\n" + sys.stderr.write("error2") + finally: + out, err = cap.reset() + assert err == "error2" + + def test_capturing_readouterr_unicode(self): + cap = self.getcapture() + print ("hx\xc4\x85\xc4\x87") + out, err = cap.readouterr() + assert out == py.builtin._totext("hx\xc4\x85\xc4\x87\n", "utf8") + + @py.test.mark.skipif('sys.version_info >= (3,)', + reason='text output different for bytes on python3') + def test_capturing_readouterr_decode_error_handling(self): + cap = self.getcapture() + # triggered a internal error in pytest + print('\xa6') + out, err = cap.readouterr() + assert out == py.builtin._totext('\ufffd\n', 'unicode-escape') + + def test_capturing_mixed(self): + cap = self.getcapture(mixed=True) + sys.stdout.write("hello ") + sys.stderr.write("world") + sys.stdout.write(".") + out, err = cap.reset() + assert out.strip() == "hello world." + assert not err + + def test_reset_twice_error(self): + cap = self.getcapture() + print ("hello") + out, err = cap.reset() + py.test.raises(ValueError, cap.reset) + assert out == "hello\n" + assert not err + + def test_capturing_modify_sysouterr_in_between(self): + oldout = sys.stdout + olderr = sys.stderr + cap = self.getcapture() + sys.stdout.write("hello") + sys.stderr.write("world") + sys.stdout = py.io.TextIO() + sys.stderr = py.io.TextIO() + print ("not seen") + sys.stderr.write("not seen\n") + out, err = cap.reset() + assert out == "hello" + assert err == "world" + assert sys.stdout == oldout + assert sys.stderr == olderr + + def test_capturing_error_recursive(self): + cap1 = self.getcapture() + print ("cap1") + cap2 = self.getcapture() + print ("cap2") + out2, err2 = cap2.reset() + out1, err1 = cap1.reset() + assert out1 == "cap1\n" + assert out2 == "cap2\n" + + def test_just_out_capture(self): + cap = self.getcapture(out=True, err=False) + sys.stdout.write("hello") + sys.stderr.write("world") + out, err = cap.reset() + assert out == "hello" + assert not err + + def test_just_err_capture(self): + cap = self.getcapture(out=False, err=True) + sys.stdout.write("hello") + sys.stderr.write("world") + out, err = cap.reset() + assert err == "world" + assert not out + + def test_stdin_restored(self): + old = sys.stdin + cap = self.getcapture(in_=True) + newstdin = sys.stdin + out, err = cap.reset() + assert newstdin != sys.stdin + assert sys.stdin is old + + def test_stdin_nulled_by_default(self): + print ("XXX this test may well hang instead of crashing") + print ("XXX which indicates an error in the underlying capturing") + print ("XXX mechanisms") + cap = self.getcapture() + py.test.raises(IOError, "sys.stdin.read()") + out, err = cap.reset() + + def test_suspend_resume(self): + cap = self.getcapture(out=True, err=False, in_=False) + try: + print ("hello") + sys.stderr.write("error\n") + out, err = cap.suspend() + assert out == "hello\n" + assert not err + print ("in between") + sys.stderr.write("in between\n") + cap.resume() + print ("after") + sys.stderr.write("error_after\n") + finally: + out, err = cap.reset() + assert out == "after\n" + assert not err + +class TestStdCaptureNotNow(TestStdCapture): + def getcapture(self, **kw): + kw['now'] = False + cap = py.io.StdCapture(**kw) + cap.startall() + return cap + +class TestStdCaptureFD(TestStdCapture): + pytestmark = needsdup + + def getcapture(self, **kw): + return py.io.StdCaptureFD(**kw) + + def test_intermingling(self): + cap = self.getcapture() + oswritebytes(1, "1") + sys.stdout.write(str(2)) + sys.stdout.flush() + oswritebytes(1, "3") + oswritebytes(2, "a") + sys.stderr.write("b") + sys.stderr.flush() + oswritebytes(2, "c") + out, err = cap.reset() + assert out == "123" + assert err == "abc" + + def test_callcapture(self): + def func(x, y): + print (x) + sys.stderr.write(str(y)) + return 42 + + res, out, err = py.io.StdCaptureFD.call(func, 3, y=4) + assert res == 42 + assert out.startswith("3") + assert err.startswith("4") + + def test_many(self, capfd): + def f(): + for i in range(10): + cap = py.io.StdCaptureFD() + cap.reset() + lsof_check(f) + +class TestStdCaptureFDNotNow(TestStdCaptureFD): + pytestmark = needsdup + + def getcapture(self, **kw): + kw['now'] = False + cap = py.io.StdCaptureFD(**kw) + cap.startall() + return cap + +@needsdup +def test_stdcapture_fd_tmpfile(tmpfile): + capfd = py.io.StdCaptureFD(out=tmpfile) + os.write(1, "hello".encode("ascii")) + os.write(2, "world".encode("ascii")) + outf, errf = capfd.done() + assert outf == tmpfile + +class TestStdCaptureFDinvalidFD: + pytestmark = needsdup + def test_stdcapture_fd_invalid_fd(self, testdir): + testdir.makepyfile(""" + import py, os + def test_stdout(): + os.close(1) + cap = py.io.StdCaptureFD(out=True, err=False, in_=False) + cap.done() + def test_stderr(): + os.close(2) + cap = py.io.StdCaptureFD(out=False, err=True, in_=False) + cap.done() + def test_stdin(): + os.close(0) + cap = py.io.StdCaptureFD(out=False, err=False, in_=True) + cap.done() + """) + result = testdir.runpytest("--capture=fd") + assert result.ret == 0 + assert result.parseoutcomes()['passed'] == 3 + +def test_capture_not_started_but_reset(): + capsys = py.io.StdCapture(now=False) + capsys.done() + capsys.done() + capsys.reset() + +@needsdup +def test_capture_no_sys(): + capsys = py.io.StdCapture() + try: + cap = py.io.StdCaptureFD(patchsys=False) + sys.stdout.write("hello") + sys.stderr.write("world") + oswritebytes(1, "1") + oswritebytes(2, "2") + out, err = cap.reset() + assert out == "1" + assert err == "2" + finally: + capsys.reset() + +@needsdup +def test_callcapture_nofd(): + def func(x, y): + oswritebytes(1, "hello") + oswritebytes(2, "hello") + print (x) + sys.stderr.write(str(y)) + return 42 + + capfd = py.io.StdCaptureFD(patchsys=False) + try: + res, out, err = py.io.StdCapture.call(func, 3, y=4) + finally: + capfd.reset() + assert res == 42 + assert out.startswith("3") + assert err.startswith("4") + +@needsdup +@py.test.mark.parametrize('use', [True, False]) +def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): + if not use: + tmpfile = True + cap = py.io.StdCaptureFD(out=False, err=tmpfile, now=False) + cap.startall() + capfile = cap.err.tmpfile + cap.suspend() + cap.resume() + capfile2 = cap.err.tmpfile + assert capfile2 == capfile + +@py.test.mark.parametrize('method', ['StdCapture', 'StdCaptureFD']) +def test_capturing_and_logging_fundamentals(testdir, method): + if method == "StdCaptureFD" and not hasattr(os, 'dup'): + py.test.skip("need os.dup") + # here we check a fundamental feature + p = testdir.makepyfile(""" + import sys, os + import py, logging + cap = py.io.%s(out=False, in_=False) + + logging.warn("hello1") + outerr = cap.suspend() + print ("suspend, captured %%s" %%(outerr,)) + logging.warn("hello2") + + cap.resume() + logging.warn("hello3") + + outerr = cap.suspend() + print ("suspend2, captured %%s" %% (outerr,)) + """ % (method,)) + result = testdir.runpython(p) + result.stdout.fnmatch_lines([ + "suspend, captured*hello1*", + "suspend2, captured*hello2*WARNING:root:hello3*", + ]) + assert "atexit" not in result.stderr.str() diff --git a/third_party/python/py/testing/io_/test_saferepr.py b/third_party/python/py/testing/io_/test_saferepr.py new file mode 100644 index 0000000000..97be1416fe --- /dev/null +++ b/third_party/python/py/testing/io_/test_saferepr.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- + +from __future__ import generators +import py +import sys + +saferepr = py.io.saferepr + +class TestSafeRepr: + def test_simple_repr(self): + assert saferepr(1) == '1' + assert saferepr(None) == 'None' + + def test_maxsize(self): + s = saferepr('x'*50, maxsize=25) + assert len(s) == 25 + expected = repr('x'*10 + '...' + 'x'*10) + assert s == expected + + def test_maxsize_error_on_instance(self): + class A: + def __repr__(self): + raise ValueError('...') + + s = saferepr(('*'*50, A()), maxsize=25) + assert len(s) == 25 + assert s[0] == '(' and s[-1] == ')' + + def test_exceptions(self): + class BrokenRepr: + def __init__(self, ex): + self.ex = ex + foo = 0 + def __repr__(self): + raise self.ex + class BrokenReprException(Exception): + __str__ = None + __repr__ = None + assert 'Exception' in saferepr(BrokenRepr(Exception("broken"))) + s = saferepr(BrokenReprException("really broken")) + assert 'TypeError' in s + assert 'TypeError' in saferepr(BrokenRepr("string")) + + s2 = saferepr(BrokenRepr(BrokenReprException('omg even worse'))) + assert 'NameError' not in s2 + assert 'unknown' in s2 + + def test_big_repr(self): + from py._io.saferepr import SafeRepr + assert len(saferepr(range(1000))) <= \ + len('[' + SafeRepr().maxlist * "1000" + ']') + + def test_repr_on_newstyle(self): + class Function(object): + def __repr__(self): + return "<%s>" %(self.name) + try: + s = saferepr(Function()) + except Exception: + py.test.fail("saferepr failed for newstyle class") + + def test_unicode(self): + val = py.builtin._totext('£€', 'utf-8') + reprval = py.builtin._totext("'£€'", 'utf-8') + assert saferepr(val) == reprval + +def test_unicode_handling(): + value = py.builtin._totext('\xc4\x85\xc4\x87\n', 'utf-8').encode('utf8') + def f(): + raise Exception(value) + excinfo = py.test.raises(Exception, f) + s = str(excinfo) + if sys.version_info[0] < 3: + u = unicode(excinfo) + diff --git a/third_party/python/py/testing/io_/test_terminalwriter.py b/third_party/python/py/testing/io_/test_terminalwriter.py new file mode 100644 index 0000000000..7e9ebf409e --- /dev/null +++ b/third_party/python/py/testing/io_/test_terminalwriter.py @@ -0,0 +1,292 @@ + +import py +import os, sys +from py._io import terminalwriter +import codecs +import pytest + +def test_get_terminal_width(): + x = py.io.get_terminal_width + assert x == terminalwriter.get_terminal_width + +def test_getdimensions(monkeypatch): + fcntl = py.test.importorskip("fcntl") + import struct + l = [] + monkeypatch.setattr(fcntl, 'ioctl', lambda *args: l.append(args)) + try: + terminalwriter._getdimensions() + except (TypeError, struct.error): + pass + assert len(l) == 1 + assert l[0][0] == 1 + +def test_terminal_width_COLUMNS(monkeypatch): + """ Dummy test for get_terminal_width + """ + fcntl = py.test.importorskip("fcntl") + monkeypatch.setattr(fcntl, 'ioctl', lambda *args: int('x')) + monkeypatch.setenv('COLUMNS', '42') + assert terminalwriter.get_terminal_width() == 42 + monkeypatch.delenv('COLUMNS', raising=False) + +def test_terminalwriter_defaultwidth_80(monkeypatch): + monkeypatch.setattr(terminalwriter, '_getdimensions', lambda: 0/0) + monkeypatch.delenv('COLUMNS', raising=False) + tw = py.io.TerminalWriter() + assert tw.fullwidth == 80 + +def test_terminalwriter_getdimensions_bogus(monkeypatch): + monkeypatch.setattr(terminalwriter, '_getdimensions', lambda: (10,10)) + monkeypatch.delenv('COLUMNS', raising=False) + tw = py.io.TerminalWriter() + assert tw.fullwidth == 80 + +def test_terminalwriter_getdimensions_emacs(monkeypatch): + # emacs terminal returns (0,0) but set COLUMNS properly + monkeypatch.setattr(terminalwriter, '_getdimensions', lambda: (0,0)) + monkeypatch.setenv('COLUMNS', '42') + tw = py.io.TerminalWriter() + assert tw.fullwidth == 42 + +def test_terminalwriter_computes_width(monkeypatch): + monkeypatch.setattr(terminalwriter, 'get_terminal_width', lambda: 42) + tw = py.io.TerminalWriter() + assert tw.fullwidth == 42 + +def test_terminalwriter_default_instantiation(): + tw = py.io.TerminalWriter(stringio=True) + assert hasattr(tw, 'stringio') + +def test_terminalwriter_dumb_term_no_markup(monkeypatch): + monkeypatch.setattr(os, 'environ', {'TERM': 'dumb', 'PATH': ''}) + class MyFile: + closed = False + def isatty(self): + return True + monkeypatch.setattr(sys, 'stdout', MyFile()) + try: + assert sys.stdout.isatty() + tw = py.io.TerminalWriter() + assert not tw.hasmarkup + finally: + monkeypatch.undo() + +def test_terminalwriter_file_unicode(tmpdir): + f = codecs.open(str(tmpdir.join("xyz")), "wb", "utf8") + tw = py.io.TerminalWriter(file=f) + assert tw.encoding == "utf8" + +def test_unicode_encoding(): + msg = py.builtin._totext('b\u00f6y', 'utf8') + for encoding in 'utf8', 'latin1': + l = [] + tw = py.io.TerminalWriter(l.append, encoding=encoding) + tw.line(msg) + assert l[0].strip() == msg.encode(encoding) + +@pytest.mark.parametrize("encoding", ["ascii"]) +def test_unicode_on_file_with_ascii_encoding(tmpdir, monkeypatch, encoding): + msg = py.builtin._totext('hell\xf6', "latin1") + #pytest.raises(UnicodeEncodeError, lambda: bytes(msg)) + f = codecs.open(str(tmpdir.join("x")), "w", encoding) + tw = py.io.TerminalWriter(f) + tw.line(msg) + f.close() + s = tmpdir.join("x").open("rb").read().strip() + assert encoding == "ascii" + assert s == msg.encode("unicode-escape") + + +win32 = int(sys.platform == "win32") +class TestTerminalWriter: + def pytest_generate_tests(self, metafunc): + if "tw" in metafunc.funcargnames: + metafunc.addcall(id="path", param="path") + metafunc.addcall(id="stringio", param="stringio") + metafunc.addcall(id="callable", param="callable") + def pytest_funcarg__tw(self, request): + if request.param == "path": + tmpdir = request.getfuncargvalue("tmpdir") + p = tmpdir.join("tmpfile") + f = codecs.open(str(p), 'w+', encoding='utf8') + tw = py.io.TerminalWriter(f) + def getlines(): + tw._file.flush() + return codecs.open(str(p), 'r', + encoding='utf8').readlines() + elif request.param == "stringio": + tw = py.io.TerminalWriter(stringio=True) + def getlines(): + tw.stringio.seek(0) + return tw.stringio.readlines() + elif request.param == "callable": + writes = [] + tw = py.io.TerminalWriter(writes.append) + def getlines(): + io = py.io.TextIO() + io.write("".join(writes)) + io.seek(0) + return io.readlines() + tw.getlines = getlines + tw.getvalue = lambda: "".join(getlines()) + return tw + + def test_line(self, tw): + tw.line("hello") + l = tw.getlines() + assert len(l) == 1 + assert l[0] == "hello\n" + + def test_line_unicode(self, tw): + for encoding in 'utf8', 'latin1': + tw._encoding = encoding + msg = py.builtin._totext('b\u00f6y', 'utf8') + tw.line(msg) + l = tw.getlines() + assert l[0] == msg + "\n" + + def test_sep_no_title(self, tw): + tw.sep("-", fullwidth=60) + l = tw.getlines() + assert len(l) == 1 + assert l[0] == "-" * (60-win32) + "\n" + + def test_sep_with_title(self, tw): + tw.sep("-", "hello", fullwidth=60) + l = tw.getlines() + assert len(l) == 1 + assert l[0] == "-" * 26 + " hello " + "-" * (27-win32) + "\n" + + @py.test.mark.skipif("sys.platform == 'win32'") + def test__escaped(self, tw): + text2 = tw._escaped("hello", (31)) + assert text2.find("hello") != -1 + + @py.test.mark.skipif("sys.platform == 'win32'") + def test_markup(self, tw): + for bold in (True, False): + for color in ("red", "green"): + text2 = tw.markup("hello", **{color: True, 'bold': bold}) + assert text2.find("hello") != -1 + py.test.raises(ValueError, "tw.markup('x', wronkw=3)") + py.test.raises(ValueError, "tw.markup('x', wronkw=0)") + + def test_line_write_markup(self, tw): + tw.hasmarkup = True + tw.line("x", bold=True) + tw.write("x\n", red=True) + l = tw.getlines() + if sys.platform != "win32": + assert len(l[0]) >= 2, l + assert len(l[1]) >= 2, l + + def test_attr_fullwidth(self, tw): + tw.sep("-", "hello", fullwidth=70) + tw.fullwidth = 70 + tw.sep("-", "hello") + l = tw.getlines() + assert len(l[0]) == len(l[1]) + + def test_reline(self, tw): + tw.line("hello") + tw.hasmarkup = False + pytest.raises(ValueError, lambda: tw.reline("x")) + tw.hasmarkup = True + tw.reline("0 1 2") + tw.getlines() + l = tw.getvalue().split("\n") + assert len(l) == 2 + tw.reline("0 1 3") + l = tw.getvalue().split("\n") + assert len(l) == 2 + assert l[1].endswith("0 1 3\r") + tw.line("so") + l = tw.getvalue().split("\n") + assert len(l) == 3 + assert l[-1] == "" + assert l[1] == ("0 1 2\r0 1 3\rso ") + assert l[0] == "hello" + + +def test_terminal_with_callable_write_and_flush(): + l = set() + class fil: + flush = lambda self: l.add("1") + write = lambda self, x: l.add("1") + __call__ = lambda self, x: l.add("2") + + tw = py.io.TerminalWriter(fil()) + tw.line("hello") + assert l == set(["1"]) + del fil.flush + l.clear() + tw = py.io.TerminalWriter(fil()) + tw.line("hello") + assert l == set(["2"]) + + +def test_chars_on_current_line(): + tw = py.io.TerminalWriter(stringio=True) + + written = [] + + def write_and_check(s, expected): + tw.write(s, bold=True) + written.append(s) + assert tw.chars_on_current_line == expected + assert tw.stringio.getvalue() == ''.join(written) + + write_and_check('foo', 3) + write_and_check('bar', 6) + write_and_check('\n', 0) + write_and_check('\n', 0) + write_and_check('\n\n\n', 0) + write_and_check('\nfoo', 3) + write_and_check('\nfbar\nhello', 5) + write_and_check('10', 7) + + +@pytest.mark.skipif(sys.platform == "win32", reason="win32 has no native ansi") +def test_attr_hasmarkup(): + tw = py.io.TerminalWriter(stringio=True) + assert not tw.hasmarkup + tw.hasmarkup = True + tw.line("hello", bold=True) + s = tw.stringio.getvalue() + assert len(s) > len("hello\n") + assert '\x1b[1m' in s + assert '\x1b[0m' in s + +@pytest.mark.skipif(sys.platform == "win32", reason="win32 has no native ansi") +def test_ansi_print(): + # we have no easy way to construct a file that + # represents a terminal + f = py.io.TextIO() + f.isatty = lambda: True + py.io.ansi_print("hello", 0x32, file=f) + text2 = f.getvalue() + assert text2.find("hello") != -1 + assert len(text2) >= len("hello\n") + assert '\x1b[50m' in text2 + assert '\x1b[0m' in text2 + +def test_should_do_markup_PY_COLORS_eq_1(monkeypatch): + monkeypatch.setitem(os.environ, 'PY_COLORS', '1') + tw = py.io.TerminalWriter(stringio=True) + assert tw.hasmarkup + tw.line("hello", bold=True) + s = tw.stringio.getvalue() + assert len(s) > len("hello\n") + assert '\x1b[1m' in s + assert '\x1b[0m' in s + +def test_should_do_markup_PY_COLORS_eq_0(monkeypatch): + monkeypatch.setitem(os.environ, 'PY_COLORS', '0') + f = py.io.TextIO() + f.isatty = lambda: True + tw = py.io.TerminalWriter(file=f) + assert not tw.hasmarkup + tw.line("hello", bold=True) + s = f.getvalue() + assert s == "hello\n" diff --git a/third_party/python/py/testing/log/__init__.py b/third_party/python/py/testing/log/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/third_party/python/py/testing/log/__init__.py diff --git a/third_party/python/py/testing/log/test_log.py b/third_party/python/py/testing/log/test_log.py new file mode 100644 index 0000000000..5c706d9b6a --- /dev/null +++ b/third_party/python/py/testing/log/test_log.py @@ -0,0 +1,191 @@ +import py + +from py._log.log import default_keywordmapper + +callcapture = py.io.StdCapture.call + + +def setup_module(mod): + mod._oldstate = default_keywordmapper.getstate() + +def teardown_module(mod): + default_keywordmapper.setstate(mod._oldstate) + +class TestLogProducer: + def setup_method(self, meth): + from py._log.log import default_keywordmapper + default_keywordmapper.setstate(_oldstate) + + def test_getstate_setstate(self): + state = py.log._getstate() + py.log.setconsumer("hello", [].append) + state2 = py.log._getstate() + assert state2 != state + py.log._setstate(state) + state3 = py.log._getstate() + assert state3 == state + + def test_producer_repr(self): + d = py.log.Producer("default") + assert repr(d).find('default') != -1 + + def test_produce_one_keyword(self): + l = [] + py.log.setconsumer('s1', l.append) + py.log.Producer('s1')("hello world") + assert len(l) == 1 + msg = l[0] + assert msg.content().startswith('hello world') + assert msg.prefix() == '[s1] ' + assert str(msg) == "[s1] hello world" + + def test_producer_class(self): + p = py.log.Producer('x1') + l = [] + py.log.setconsumer(p._keywords, l.append) + p("hello") + assert len(l) == 1 + assert len(l[0].keywords) == 1 + assert 'x1' == l[0].keywords[0] + + def test_producer_caching(self): + p = py.log.Producer('x1') + x2 = p.x2 + assert x2 is p.x2 + +class TestLogConsumer: + def setup_method(self, meth): + default_keywordmapper.setstate(_oldstate) + def test_log_none(self): + log = py.log.Producer("XXX") + l = [] + py.log.setconsumer('XXX', l.append) + log("1") + assert l + l[:] = [] + py.log.setconsumer('XXX', None) + log("2") + assert not l + + def test_log_default_stderr(self): + res, out, err = callcapture(py.log.Producer("default"), "hello") + assert err.strip() == "[default] hello" + + def test_simple_consumer_match(self): + l = [] + py.log.setconsumer("x1", l.append) + p = py.log.Producer("x1 x2") + p("hello") + assert l + assert l[0].content() == "hello" + + def test_simple_consumer_match_2(self): + l = [] + p = py.log.Producer("x1 x2") + py.log.setconsumer(p._keywords, l.append) + p("42") + assert l + assert l[0].content() == "42" + + def test_no_auto_producer(self): + p = py.log.Producer('x') + py.test.raises(AttributeError, "p._x") + py.test.raises(AttributeError, "p.x_y") + + def test_setconsumer_with_producer(self): + l = [] + p = py.log.Producer("hello") + py.log.setconsumer(p, l.append) + p("world") + assert str(l[0]) == "[hello] world" + + def test_multi_consumer(self): + l = [] + py.log.setconsumer("x1", l.append) + py.log.setconsumer("x1 x2", None) + p = py.log.Producer("x1 x2") + p("hello") + assert not l + py.log.Producer("x1")("hello") + assert l + assert l[0].content() == "hello" + + def test_log_stderr(self): + py.log.setconsumer("xyz", py.log.STDOUT) + res, out, err = callcapture(py.log.Producer("xyz"), "hello") + assert not err + assert out.strip() == '[xyz] hello' + + def test_log_file(self, tmpdir): + customlog = tmpdir.join('log.out') + py.log.setconsumer("default", open(str(customlog), 'w', 1)) + py.log.Producer("default")("hello world #1") + assert customlog.readlines() == ['[default] hello world #1\n'] + + py.log.setconsumer("default", py.log.Path(customlog, buffering=False)) + py.log.Producer("default")("hello world #2") + res = customlog.readlines() + assert res == ['[default] hello world #2\n'] # no append by default! + + def test_log_file_append_mode(self, tmpdir): + logfilefn = tmpdir.join('log_append.out') + + # The append mode is on by default, so we don't need to specify it for File + py.log.setconsumer("default", py.log.Path(logfilefn, append=True, + buffering=0)) + assert logfilefn.check() + py.log.Producer("default")("hello world #1") + lines = logfilefn.readlines() + assert lines == ['[default] hello world #1\n'] + py.log.setconsumer("default", py.log.Path(logfilefn, append=True, + buffering=0)) + py.log.Producer("default")("hello world #1") + lines = logfilefn.readlines() + assert lines == ['[default] hello world #1\n', + '[default] hello world #1\n'] + + def test_log_file_delayed_create(self, tmpdir): + logfilefn = tmpdir.join('log_create.out') + + py.log.setconsumer("default", py.log.Path(logfilefn, + delayed_create=True, buffering=0)) + assert not logfilefn.check() + py.log.Producer("default")("hello world #1") + lines = logfilefn.readlines() + assert lines == ['[default] hello world #1\n'] + + def test_keyword_based_log_files(self, tmpdir): + logfiles = [] + keywords = 'k1 k2 k3'.split() + for key in keywords: + path = tmpdir.join(key) + py.log.setconsumer(key, py.log.Path(path, buffering=0)) + + py.log.Producer('k1')('1') + py.log.Producer('k2')('2') + py.log.Producer('k3')('3') + + for key in keywords: + path = tmpdir.join(key) + assert path.read().strip() == '[%s] %s' % (key, key[-1]) + + # disabled for now; the syslog log file can usually be read only by root + # I manually inspected /var/log/messages and the entries were there + def no_test_log_syslog(self): + py.log.setconsumer("default", py.log.Syslog()) + py.log.default("hello world #1") + + # disabled for now until I figure out how to read entries in the + # Event Logs on Windows + # I manually inspected the Application Log and the entries were there + def no_test_log_winevent(self): + py.log.setconsumer("default", py.log.WinEvent()) + py.log.default("hello world #1") + + # disabled for now until I figure out how to properly pass the parameters + def no_test_log_email(self): + py.log.setconsumer("default", py.log.Email(mailhost="gheorghiu.net", + fromaddr="grig", + toaddrs="grig", + subject = "py.log email")) + py.log.default("hello world #1") diff --git a/third_party/python/py/testing/log/test_warning.py b/third_party/python/py/testing/log/test_warning.py new file mode 100644 index 0000000000..a460c319e8 --- /dev/null +++ b/third_party/python/py/testing/log/test_warning.py @@ -0,0 +1,86 @@ +import sys +from distutils.version import LooseVersion + +import pytest + +import py + +mypath = py.path.local(__file__).new(ext=".py") + + +win = sys.platform.startswith('win') +pytestmark = pytest.mark.skipif(win and LooseVersion(pytest.__version__) >= LooseVersion('3.1'), + reason='apiwarn is not compatible with pytest >= 3.1 (#162)') + + +@pytest.mark.xfail +def test_forwarding_to_warnings_module(): + pytest.deprecated_call(py.log._apiwarn, "1.3", "..") + +def test_apiwarn_functional(recwarn): + capture = py.io.StdCapture() + py.log._apiwarn("x.y.z", "something", stacklevel=1) + out, err = capture.reset() + py.builtin.print_("out", out) + py.builtin.print_("err", err) + assert err.find("x.y.z") != -1 + lno = py.code.getrawcode(test_apiwarn_functional).co_firstlineno + 2 + exp = "%s:%s" % (mypath, lno) + assert err.find(exp) != -1 + +def test_stacklevel(recwarn): + def f(): + py.log._apiwarn("x", "some", stacklevel=2) + # 3 + # 4 + capture = py.io.StdCapture() + f() + out, err = capture.reset() + lno = py.code.getrawcode(test_stacklevel).co_firstlineno + 6 + warning = str(err) + assert warning.find(":%s" % lno) != -1 + +def test_stacklevel_initpkg_with_resolve(testdir, recwarn): + testdir.makepyfile(modabc=""" + import py + def f(): + py.log._apiwarn("x", "some", stacklevel="apipkg123") + """) + testdir.makepyfile(apipkg123=""" + def __getattr__(): + import modabc + modabc.f() + """) + p = testdir.makepyfile(""" + import apipkg123 + apipkg123.__getattr__() + """) + capture = py.io.StdCapture() + p.pyimport() + out, err = capture.reset() + warning = str(err) + loc = 'test_stacklevel_initpkg_with_resolve.py:2' + assert warning.find(loc) != -1 + +def test_stacklevel_initpkg_no_resolve(recwarn): + def f(): + py.log._apiwarn("x", "some", stacklevel="apipkg") + capture = py.io.StdCapture() + f() + out, err = capture.reset() + lno = py.code.getrawcode(test_stacklevel_initpkg_no_resolve).co_firstlineno + 2 + warning = str(err) + assert warning.find(":%s" % lno) != -1 + + +def test_function(recwarn): + capture = py.io.StdCapture() + py.log._apiwarn("x.y.z", "something", function=test_function) + out, err = capture.reset() + py.builtin.print_("out", out) + py.builtin.print_("err", err) + assert err.find("x.y.z") != -1 + lno = py.code.getrawcode(test_function).co_firstlineno + exp = "%s:%s" % (mypath, lno) + assert err.find(exp) != -1 + diff --git a/third_party/python/py/testing/path/common.py b/third_party/python/py/testing/path/common.py new file mode 100644 index 0000000000..d69a1c39d0 --- /dev/null +++ b/third_party/python/py/testing/path/common.py @@ -0,0 +1,492 @@ +import py +import sys + +import pytest + +class CommonFSTests(object): + def test_constructor_equality(self, path1): + p = path1.__class__(path1) + assert p == path1 + + def test_eq_nonstring(self, path1): + p1 = path1.join('sampledir') + p2 = path1.join('sampledir') + assert p1 == p2 + + def test_new_identical(self, path1): + assert path1 == path1.new() + + def test_join(self, path1): + p = path1.join('sampledir') + strp = str(p) + assert strp.endswith('sampledir') + assert strp.startswith(str(path1)) + + def test_join_normalized(self, path1): + newpath = path1.join(path1.sep+'sampledir') + strp = str(newpath) + assert strp.endswith('sampledir') + assert strp.startswith(str(path1)) + newpath = path1.join((path1.sep*2) + 'sampledir') + strp = str(newpath) + assert strp.endswith('sampledir') + assert strp.startswith(str(path1)) + + def test_join_noargs(self, path1): + newpath = path1.join() + assert path1 == newpath + + def test_add_something(self, path1): + p = path1.join('sample') + p = p + 'dir' + assert p.check() + assert p.exists() + assert p.isdir() + assert not p.isfile() + + def test_parts(self, path1): + newpath = path1.join('sampledir', 'otherfile') + par = newpath.parts()[-3:] + assert par == [path1, path1.join('sampledir'), newpath] + + revpar = newpath.parts(reverse=True)[:3] + assert revpar == [newpath, path1.join('sampledir'), path1] + + def test_common(self, path1): + other = path1.join('sampledir') + x = other.common(path1) + assert x == path1 + + #def test_parents_nonexisting_file(self, path1): + # newpath = path1 / 'dirnoexist' / 'nonexisting file' + # par = list(newpath.parents()) + # assert par[:2] == [path1 / 'dirnoexist', path1] + + def test_basename_checks(self, path1): + newpath = path1.join('sampledir') + assert newpath.check(basename='sampledir') + assert newpath.check(notbasename='xyz') + assert newpath.basename == 'sampledir' + + def test_basename(self, path1): + newpath = path1.join('sampledir') + assert newpath.check(basename='sampledir') + assert newpath.basename, 'sampledir' + + def test_dirname(self, path1): + newpath = path1.join('sampledir') + assert newpath.dirname == str(path1) + + def test_dirpath(self, path1): + newpath = path1.join('sampledir') + assert newpath.dirpath() == path1 + + def test_dirpath_with_args(self, path1): + newpath = path1.join('sampledir') + assert newpath.dirpath('x') == path1.join('x') + + def test_newbasename(self, path1): + newpath = path1.join('samplefile') + newbase = newpath.new(basename="samplefile2") + assert newbase.basename == "samplefile2" + assert newbase.dirpath() == newpath.dirpath() + + def test_not_exists(self, path1): + assert not path1.join('does_not_exist').check() + assert path1.join('does_not_exist').check(exists=0) + + def test_exists(self, path1): + assert path1.join("samplefile").check() + assert path1.join("samplefile").check(exists=1) + assert path1.join("samplefile").exists() + assert path1.join("samplefile").isfile() + assert not path1.join("samplefile").isdir() + + def test_dir(self, path1): + #print repr(path1.join("sampledir")) + assert path1.join("sampledir").check(dir=1) + assert path1.join('samplefile').check(notdir=1) + assert not path1.join("samplefile").check(dir=1) + assert path1.join("samplefile").exists() + assert not path1.join("samplefile").isdir() + assert path1.join("samplefile").isfile() + + def test_fnmatch_file(self, path1): + assert path1.join("samplefile").check(fnmatch='s*e') + assert path1.join("samplefile").fnmatch('s*e') + assert not path1.join("samplefile").fnmatch('s*x') + assert not path1.join("samplefile").check(fnmatch='s*x') + + #def test_fnmatch_dir(self, path1): + + # pattern = path1.sep.join(['s*file']) + # sfile = path1.join("samplefile") + # assert sfile.check(fnmatch=pattern) + + def test_relto(self, path1): + l=path1.join("sampledir", "otherfile") + assert l.relto(path1) == l.sep.join(["sampledir", "otherfile"]) + assert l.check(relto=path1) + assert path1.check(notrelto=l) + assert not path1.check(relto=l) + + def test_bestrelpath(self, path1): + curdir = path1 + sep = curdir.sep + s = curdir.bestrelpath(curdir) + assert s == "." + s = curdir.bestrelpath(curdir.join("hello", "world")) + assert s == "hello" + sep + "world" + + s = curdir.bestrelpath(curdir.dirpath().join("sister")) + assert s == ".." + sep + "sister" + assert curdir.bestrelpath(curdir.dirpath()) == ".." + + assert curdir.bestrelpath("hello") == "hello" + + def test_relto_not_relative(self, path1): + l1=path1.join("bcde") + l2=path1.join("b") + assert not l1.relto(l2) + assert not l2.relto(l1) + + @py.test.mark.xfail("sys.platform.startswith('java')") + def test_listdir(self, path1): + l = path1.listdir() + assert path1.join('sampledir') in l + assert path1.join('samplefile') in l + py.test.raises(py.error.ENOTDIR, + "path1.join('samplefile').listdir()") + + def test_listdir_fnmatchstring(self, path1): + l = path1.listdir('s*dir') + assert len(l) + assert l[0], path1.join('sampledir') + + def test_listdir_filter(self, path1): + l = path1.listdir(lambda x: x.check(dir=1)) + assert path1.join('sampledir') in l + assert not path1.join('samplefile') in l + + def test_listdir_sorted(self, path1): + l = path1.listdir(lambda x: x.check(basestarts="sample"), sort=True) + assert path1.join('sampledir') == l[0] + assert path1.join('samplefile') == l[1] + assert path1.join('samplepickle') == l[2] + + def test_visit_nofilter(self, path1): + l = [] + for i in path1.visit(): + l.append(i.relto(path1)) + assert "sampledir" in l + assert path1.sep.join(["sampledir", "otherfile"]) in l + + def test_visit_norecurse(self, path1): + l = [] + for i in path1.visit(None, lambda x: x.basename != "sampledir"): + l.append(i.relto(path1)) + assert "sampledir" in l + assert not path1.sep.join(["sampledir", "otherfile"]) in l + + @pytest.mark.parametrize('fil', ['*dir', u'*dir', + pytest.mark.skip("sys.version_info <" + " (3,6)")(b'*dir')]) + def test_visit_filterfunc_is_string(self, path1, fil): + l = [] + for i in path1.visit(fil): + l.append(i.relto(path1)) + assert len(l), 2 + assert "sampledir" in l + assert "otherdir" in l + + @py.test.mark.xfail("sys.platform.startswith('java')") + def test_visit_ignore(self, path1): + p = path1.join('nonexisting') + assert list(p.visit(ignore=py.error.ENOENT)) == [] + + def test_visit_endswith(self, path1): + l = [] + for i in path1.visit(lambda x: x.check(endswith="file")): + l.append(i.relto(path1)) + assert path1.sep.join(["sampledir", "otherfile"]) in l + assert "samplefile" in l + + def test_endswith(self, path1): + assert path1.check(notendswith='.py') + x = path1.join('samplefile') + assert x.check(endswith='file') + + def test_cmp(self, path1): + path1 = path1.join('samplefile') + path2 = path1.join('samplefile2') + assert (path1 < path2) == ('samplefile' < 'samplefile2') + assert not (path1 < path1) + + def test_simple_read(self, path1): + x = path1.join('samplefile').read('r') + assert x == 'samplefile\n' + + def test_join_div_operator(self, path1): + newpath = path1 / '/sampledir' / '/test//' + newpath2 = path1.join('sampledir', 'test') + assert newpath == newpath2 + + def test_ext(self, path1): + newpath = path1.join('sampledir.ext') + assert newpath.ext == '.ext' + newpath = path1.join('sampledir') + assert not newpath.ext + + def test_purebasename(self, path1): + newpath = path1.join('samplefile.py') + assert newpath.purebasename == 'samplefile' + + def test_multiple_parts(self, path1): + newpath = path1.join('samplefile.py') + dirname, purebasename, basename, ext = newpath._getbyspec( + 'dirname,purebasename,basename,ext') + assert str(path1).endswith(dirname) # be careful with win32 'drive' + assert purebasename == 'samplefile' + assert basename == 'samplefile.py' + assert ext == '.py' + + def test_dotted_name_ext(self, path1): + newpath = path1.join('a.b.c') + ext = newpath.ext + assert ext == '.c' + assert newpath.ext == '.c' + + def test_newext(self, path1): + newpath = path1.join('samplefile.py') + newext = newpath.new(ext='.txt') + assert newext.basename == "samplefile.txt" + assert newext.purebasename == "samplefile" + + def test_readlines(self, path1): + fn = path1.join('samplefile') + contents = fn.readlines() + assert contents == ['samplefile\n'] + + def test_readlines_nocr(self, path1): + fn = path1.join('samplefile') + contents = fn.readlines(cr=0) + assert contents == ['samplefile', ''] + + def test_file(self, path1): + assert path1.join('samplefile').check(file=1) + + def test_not_file(self, path1): + assert not path1.join("sampledir").check(file=1) + assert path1.join("sampledir").check(file=0) + + def test_non_existent(self, path1): + assert path1.join("sampledir.nothere").check(dir=0) + assert path1.join("sampledir.nothere").check(file=0) + assert path1.join("sampledir.nothere").check(notfile=1) + assert path1.join("sampledir.nothere").check(notdir=1) + assert path1.join("sampledir.nothere").check(notexists=1) + assert not path1.join("sampledir.nothere").check(notfile=0) + + # pattern = path1.sep.join(['s*file']) + # sfile = path1.join("samplefile") + # assert sfile.check(fnmatch=pattern) + + def test_size(self, path1): + url = path1.join("samplefile") + assert url.size() > len("samplefile") + + def test_mtime(self, path1): + url = path1.join("samplefile") + assert url.mtime() > 0 + + def test_relto_wrong_type(self, path1): + py.test.raises(TypeError, "path1.relto(42)") + + def test_load(self, path1): + p = path1.join('samplepickle') + obj = p.load() + assert type(obj) is dict + assert obj.get('answer',None) == 42 + + def test_visit_filesonly(self, path1): + l = [] + for i in path1.visit(lambda x: x.check(file=1)): + l.append(i.relto(path1)) + assert not "sampledir" in l + assert path1.sep.join(["sampledir", "otherfile"]) in l + + def test_visit_nodotfiles(self, path1): + l = [] + for i in path1.visit(lambda x: x.check(dotfile=0)): + l.append(i.relto(path1)) + assert "sampledir" in l + assert path1.sep.join(["sampledir", "otherfile"]) in l + assert not ".dotfile" in l + + def test_visit_breadthfirst(self, path1): + l = [] + for i in path1.visit(bf=True): + l.append(i.relto(path1)) + for i, p in enumerate(l): + if path1.sep in p: + for j in range(i, len(l)): + assert path1.sep in l[j] + break + else: + py.test.fail("huh") + + def test_visit_sort(self, path1): + l = [] + for i in path1.visit(bf=True, sort=True): + l.append(i.relto(path1)) + for i, p in enumerate(l): + if path1.sep in p: + break + assert l[:i] == sorted(l[:i]) + assert l[i:] == sorted(l[i:]) + + def test_endswith(self, path1): + def chk(p): + return p.check(endswith="pickle") + assert not chk(path1) + assert not chk(path1.join('samplefile')) + assert chk(path1.join('somepickle')) + + def test_copy_file(self, path1): + otherdir = path1.join('otherdir') + initpy = otherdir.join('__init__.py') + copied = otherdir.join('copied') + initpy.copy(copied) + try: + assert copied.check() + s1 = initpy.read() + s2 = copied.read() + assert s1 == s2 + finally: + if copied.check(): + copied.remove() + + def test_copy_dir(self, path1): + otherdir = path1.join('otherdir') + copied = path1.join('newdir') + try: + otherdir.copy(copied) + assert copied.check(dir=1) + assert copied.join('__init__.py').check(file=1) + s1 = otherdir.join('__init__.py').read() + s2 = copied.join('__init__.py').read() + assert s1 == s2 + finally: + if copied.check(dir=1): + copied.remove(rec=1) + + def test_remove_file(self, path1): + d = path1.ensure('todeleted') + assert d.check() + d.remove() + assert not d.check() + + def test_remove_dir_recursive_by_default(self, path1): + d = path1.ensure('to', 'be', 'deleted') + assert d.check() + p = path1.join('to') + p.remove() + assert not p.check() + + def test_ensure_dir(self, path1): + b = path1.ensure_dir("001", "002") + assert b.basename == "002" + assert b.isdir() + + def test_mkdir_and_remove(self, path1): + tmpdir = path1 + py.test.raises(py.error.EEXIST, tmpdir.mkdir, 'sampledir') + new = tmpdir.join('mktest1') + new.mkdir() + assert new.check(dir=1) + new.remove() + + new = tmpdir.mkdir('mktest') + assert new.check(dir=1) + new.remove() + assert tmpdir.join('mktest') == new + + def test_move_file(self, path1): + p = path1.join('samplefile') + newp = p.dirpath('moved_samplefile') + p.move(newp) + try: + assert newp.check(file=1) + assert not p.check() + finally: + dp = newp.dirpath() + if hasattr(dp, 'revert'): + dp.revert() + else: + newp.move(p) + assert p.check() + + def test_move_dir(self, path1): + source = path1.join('sampledir') + dest = path1.join('moveddir') + source.move(dest) + assert dest.check(dir=1) + assert dest.join('otherfile').check(file=1) + assert not source.join('sampledir').check() + + def test_fspath_protocol_match_strpath(self, path1): + assert path1.__fspath__() == path1.strpath + + def test_fspath_func_match_strpath(self, path1): + try: + from os import fspath + except ImportError: + from py._path.common import fspath + assert fspath(path1) == path1.strpath + + @py.test.mark.skip("sys.version_info < (3,6)") + def test_fspath_open(self, path1): + f = path1.join('opentestfile') + open(f) + + @py.test.mark.skip("sys.version_info < (3,6)") + def test_fspath_fsencode(self, path1): + from os import fsencode + assert fsencode(path1) == fsencode(path1.strpath) + +def setuptestfs(path): + if path.join('samplefile').check(): + return + #print "setting up test fs for", repr(path) + samplefile = path.ensure('samplefile') + samplefile.write('samplefile\n') + + execfile = path.ensure('execfile') + execfile.write('x=42') + + execfilepy = path.ensure('execfile.py') + execfilepy.write('x=42') + + d = {1:2, 'hello': 'world', 'answer': 42} + path.ensure('samplepickle').dump(d) + + sampledir = path.ensure('sampledir', dir=1) + sampledir.ensure('otherfile') + + otherdir = path.ensure('otherdir', dir=1) + otherdir.ensure('__init__.py') + + module_a = otherdir.ensure('a.py') + module_a.write('from .b import stuff as result\n') + module_b = otherdir.ensure('b.py') + module_b.write('stuff="got it"\n') + module_c = otherdir.ensure('c.py') + module_c.write('''import py; +import otherdir.a +value = otherdir.a.result +''') + module_d = otherdir.ensure('d.py') + module_d.write('''import py; +from otherdir import a +value2 = a.result +''') diff --git a/third_party/python/py/testing/path/conftest.py b/third_party/python/py/testing/path/conftest.py new file mode 100644 index 0000000000..84fb5c8269 --- /dev/null +++ b/third_party/python/py/testing/path/conftest.py @@ -0,0 +1,80 @@ +import py +import sys +from py._path import svnwc as svncommon + +svnbin = py.path.local.sysfind('svn') +repodump = py.path.local(__file__).dirpath('repotest.dump') +from py.builtin import print_ + +def pytest_funcarg__repowc1(request): + if svnbin is None: + py.test.skip("svn binary not found") + + tmpdir = request.getfuncargvalue("tmpdir") + repo, repourl, wc = request.cached_setup( + setup=lambda: getrepowc(tmpdir, "path1repo", "path1wc"), + scope="module", + ) + for x in ('test_remove', 'test_move', 'test_status_deleted'): + if request.function.__name__.startswith(x): + #print >>sys.stderr, ("saving repo", repo, "for", request.function) + _savedrepowc = save_repowc(repo, wc) + request.addfinalizer(lambda: restore_repowc(_savedrepowc)) + return repo, repourl, wc + +def pytest_funcarg__repowc2(request): + tmpdir = request.getfuncargvalue("tmpdir") + name = request.function.__name__ + repo, url, wc = getrepowc(tmpdir, "%s-repo-2" % name, "%s-wc-2" % name) + return repo, url, wc + +def getsvnbin(): + if svnbin is None: + py.test.skip("svn binary not found") + return svnbin + +# make a wc directory out of a given root url +# cache previously obtained wcs! +# +def getrepowc(tmpdir, reponame='basetestrepo', wcname='wc'): + repo = tmpdir.mkdir(reponame) + wcdir = tmpdir.mkdir(wcname) + repo.ensure(dir=1) + py.process.cmdexec('svnadmin create "%s"' % + svncommon._escape_helper(repo)) + py.process.cmdexec('svnadmin load -q "%s" <"%s"' % + (svncommon._escape_helper(repo), repodump)) + print_("created svn repository", repo) + wcdir.ensure(dir=1) + wc = py.path.svnwc(wcdir) + if sys.platform == 'win32': + repourl = "file://" + '/' + str(repo).replace('\\', '/') + else: + repourl = "file://%s" % repo + wc.checkout(repourl) + print_("checked out new repo into", wc) + return (repo, repourl, wc) + + +def save_repowc(repo, wc): + assert not str(repo).startswith("file://"), repo + assert repo.check() + savedrepo = repo.dirpath(repo.basename+".1") + savedwc = wc.dirpath(wc.basename+".1") + repo.copy(savedrepo) + wc.localpath.copy(savedwc.localpath) + return savedrepo, savedwc + +def restore_repowc(obj): + savedrepo, savedwc = obj + #print >>sys.stderr, ("restoring", savedrepo) + repo = savedrepo.new(basename=savedrepo.basename[:-2]) + assert repo.check() + wc = savedwc.new(basename=savedwc.basename[:-2]) + assert wc.check() + wc.localpath.remove() + repo.remove() + savedrepo.move(repo) + savedwc.localpath.move(wc.localpath) + py.path.svnurl._lsnorevcache.clear() + py.path.svnurl._lsrevcache.clear() diff --git a/third_party/python/py/testing/path/repotest.dump b/third_party/python/py/testing/path/repotest.dump new file mode 100644 index 0000000000..c7819cad7a --- /dev/null +++ b/third_party/python/py/testing/path/repotest.dump @@ -0,0 +1,228 @@ +SVN-fs-dump-format-version: 2 + +UUID: 876a30f4-1eed-0310-aeb7-ae314d1e5934 + +Revision-number: 0 +Prop-content-length: 56 +Content-length: 56 + +K 8 +svn:date +V 27 +2005-01-07T23:55:31.755989Z +PROPS-END + +Revision-number: 1 +Prop-content-length: 118 +Content-length: 118 + +K 7 +svn:log +V 20 +testrepo setup rev 1 +K 10 +svn:author +V 3 +hpk +K 8 +svn:date +V 27 +2005-01-07T23:55:37.815386Z +PROPS-END + +Node-path: execfile +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 4 +Text-content-md5: d4b5bc61e16310f08c5d11866eba0a22 +Content-length: 14 + +PROPS-END +x=42 + +Node-path: otherdir +Node-kind: dir +Node-action: add +Prop-content-length: 10 +Content-length: 10 + +PROPS-END + + +Node-path: otherdir/__init__.py +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 0 +Text-content-md5: d41d8cd98f00b204e9800998ecf8427e +Content-length: 10 + +PROPS-END + + +Node-path: otherdir/a.py +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 30 +Text-content-md5: 247c7daeb2ee5dcab0aba7bd12bad665 +Content-length: 40 + +PROPS-END +from b import stuff as result + + +Node-path: otherdir/b.py +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 15 +Text-content-md5: c1b13503469a7711306d03a4b0721bc6 +Content-length: 25 + +PROPS-END +stuff="got it" + + +Node-path: otherdir/c.py +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 75 +Text-content-md5: 250cdb6b5df68536152c681f48297569 +Content-length: 85 + +PROPS-END +import py; py.magic.autopath() +import otherdir.a +value = otherdir.a.result + + +Node-path: otherdir/d.py +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 72 +Text-content-md5: 940c9c621e7b198e081459642c37f5a7 +Content-length: 82 + +PROPS-END +import py; py.magic.autopath() +from otherdir import a +value2 = a.result + + +Node-path: sampledir +Node-kind: dir +Node-action: add +Prop-content-length: 10 +Content-length: 10 + +PROPS-END + + +Node-path: sampledir/otherfile +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 0 +Text-content-md5: d41d8cd98f00b204e9800998ecf8427e +Content-length: 10 + +PROPS-END + + +Node-path: samplefile +Node-kind: file +Node-action: add +Prop-content-length: 40 +Text-content-length: 11 +Text-content-md5: 9225ac28b32156979ab6482b8bb5fb8c +Content-length: 51 + +K 13 +svn:eol-style +V 6 +native +PROPS-END +samplefile + + +Node-path: samplepickle +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 56 +Text-content-md5: 719d85c1329a33134bb98f56b756c545 +Content-length: 66 + +PROPS-END +(dp1 +S'answer' +p2 +I42 +sI1 +I2 +sS'hello' +p3 +S'world' +p4 +s. + +Revision-number: 2 +Prop-content-length: 108 +Content-length: 108 + +K 7 +svn:log +V 10 +second rev +K 10 +svn:author +V 3 +hpk +K 8 +svn:date +V 27 +2005-01-07T23:55:39.223202Z +PROPS-END + +Node-path: anotherfile +Node-kind: file +Node-action: add +Prop-content-length: 10 +Text-content-length: 5 +Text-content-md5: 5d41402abc4b2a76b9719d911017c592 +Content-length: 15 + +PROPS-END +hello + +Revision-number: 3 +Prop-content-length: 106 +Content-length: 106 + +K 7 +svn:log +V 9 +third rev +K 10 +svn:author +V 3 +hpk +K 8 +svn:date +V 27 +2005-01-07T23:55:41.556642Z +PROPS-END + +Node-path: anotherfile +Node-kind: file +Node-action: change +Text-content-length: 5 +Text-content-md5: 7d793037a0760186574b0282f2f435e7 +Content-length: 5 + +world + diff --git a/third_party/python/py/testing/path/svntestbase.py b/third_party/python/py/testing/path/svntestbase.py new file mode 100644 index 0000000000..8d94a9ca64 --- /dev/null +++ b/third_party/python/py/testing/path/svntestbase.py @@ -0,0 +1,31 @@ +import sys +import py +from py._path import svnwc as svncommon +from common import CommonFSTests + +class CommonSvnTests(CommonFSTests): + + def test_propget(self, path1): + url = path1.join("samplefile") + value = url.propget('svn:eol-style') + assert value == 'native' + + def test_proplist(self, path1): + url = path1.join("samplefile") + res = url.proplist() + assert res['svn:eol-style'] == 'native' + + def test_info(self, path1): + url = path1.join("samplefile") + res = url.info() + assert res.size > len("samplefile") and res.created_rev >= 0 + + def test_log_simple(self, path1): + url = path1.join("samplefile") + logentries = url.log() + for logentry in logentries: + assert logentry.rev == 1 + assert hasattr(logentry, 'author') + assert hasattr(logentry, 'date') + +#cache.repositories.put(svnrepourl, 1200, 0) diff --git a/third_party/python/py/testing/path/test_cacheutil.py b/third_party/python/py/testing/path/test_cacheutil.py new file mode 100644 index 0000000000..c9fc07463a --- /dev/null +++ b/third_party/python/py/testing/path/test_cacheutil.py @@ -0,0 +1,89 @@ +import pytest +from py._path import cacheutil + +import time + +class BasicCacheAPITest: + cache = None + def test_getorbuild(self): + val = self.cache.getorbuild(-42, lambda: 42) + assert val == 42 + val = self.cache.getorbuild(-42, lambda: 23) + assert val == 42 + + def test_cache_get_key_error(self): + pytest.raises(KeyError, "self.cache._getentry(-23)") + + def test_delentry_non_raising(self): + self.cache.getorbuild(100, lambda: 100) + self.cache.delentry(100) + pytest.raises(KeyError, "self.cache._getentry(100)") + + def test_delentry_raising(self): + self.cache.getorbuild(100, lambda: 100) + self.cache.delentry(100) + pytest.raises(KeyError, self.cache.delentry, 100, raising=True) + + def test_clear(self): + self.cache.clear() + + +class TestBuildcostAccess(BasicCacheAPITest): + cache = cacheutil.BuildcostAccessCache(maxentries=128) + + def test_cache_works_somewhat_simple(self, monkeypatch): + cache = cacheutil.BuildcostAccessCache() + # the default gettime + # BuildcostAccessCache.build can + # result into time()-time() == 0 which makes the below + # test fail randomly. Let's rather use incrementing + # numbers instead. + l = [0] + + def counter(): + l[0] = l[0] + 1 + return l[0] + monkeypatch.setattr(cacheutil, 'gettime', counter) + for x in range(cache.maxentries): + y = cache.getorbuild(x, lambda: x) + assert x == y + for x in range(cache.maxentries): + assert cache.getorbuild(x, None) == x + halfentries = int(cache.maxentries / 2) + for x in range(halfentries): + assert cache.getorbuild(x, None) == x + assert cache.getorbuild(x, None) == x + # evict one entry + val = cache.getorbuild(-1, lambda: 42) + assert val == 42 + # check that recently used ones are still there + # and are not build again + for x in range(halfentries): + assert cache.getorbuild(x, None) == x + assert cache.getorbuild(-1, None) == 42 + + +class TestAging(BasicCacheAPITest): + maxsecs = 0.10 + cache = cacheutil.AgingCache(maxentries=128, maxseconds=maxsecs) + + def test_cache_eviction(self): + self.cache.getorbuild(17, lambda: 17) + endtime = time.time() + self.maxsecs * 10 + while time.time() < endtime: + try: + self.cache._getentry(17) + except KeyError: + break + time.sleep(self.maxsecs*0.3) + else: + pytest.fail("waiting for cache eviction failed") + + +def test_prune_lowestweight(): + maxsecs = 0.05 + cache = cacheutil.AgingCache(maxentries=10, maxseconds=maxsecs) + for x in range(cache.maxentries): + cache.getorbuild(x, lambda: x) + time.sleep(maxsecs*1.1) + cache.getorbuild(cache.maxentries+1, lambda: 42) diff --git a/third_party/python/py/testing/path/test_local.py b/third_party/python/py/testing/path/test_local.py new file mode 100644 index 0000000000..ee4b9bde9c --- /dev/null +++ b/third_party/python/py/testing/path/test_local.py @@ -0,0 +1,976 @@ +# -*- coding: utf-8 -*- + +from __future__ import with_statement +import time +import py +import pytest +import os +import sys +import multiprocessing +from py.path import local +import common + +failsonjython = py.test.mark.xfail("sys.platform.startswith('java')") +failsonjywin32 = py.test.mark.xfail( + "sys.platform.startswith('java') " + "and getattr(os, '_name', None) == 'nt'") +win32only = py.test.mark.skipif( + "not (sys.platform == 'win32' or getattr(os, '_name', None) == 'nt')") +skiponwin32 = py.test.mark.skipif( + "sys.platform == 'win32' or getattr(os, '_name', None) == 'nt'") + +ATIME_RESOLUTION = 0.01 + + +@pytest.yield_fixture(scope="session") +def path1(tmpdir_factory): + path = tmpdir_factory.mktemp('path') + common.setuptestfs(path) + yield path + assert path.join("samplefile").check() + + +@pytest.fixture +def fake_fspath_obj(request): + class FakeFSPathClass(object): + def __init__(self, path): + self._path = path + + def __fspath__(self): + return self._path + + return FakeFSPathClass(os.path.join("this", "is", "a", "fake", "path")) + + +def batch_make_numbered_dirs(rootdir, repeats): + try: + for i in range(repeats): + dir_ = py.path.local.make_numbered_dir(prefix='repro-', rootdir=rootdir) + file_ = dir_.join('foo') + file_.write('%s' % i) + actual = int(file_.read()) + assert actual == i, 'int(file_.read()) is %s instead of %s' % (actual, i) + dir_.join('.lock').remove(ignore_errors=True) + return True + except KeyboardInterrupt: + # makes sure that interrupting test session won't hang it + os.exit(2) + + +class TestLocalPath(common.CommonFSTests): + def test_join_normpath(self, tmpdir): + assert tmpdir.join(".") == tmpdir + p = tmpdir.join("../%s" % tmpdir.basename) + assert p == tmpdir + p = tmpdir.join("..//%s/" % tmpdir.basename) + assert p == tmpdir + + @skiponwin32 + def test_dirpath_abs_no_abs(self, tmpdir): + p = tmpdir.join('foo') + assert p.dirpath('/bar') == tmpdir.join('bar') + assert tmpdir.dirpath('/bar', abs=True) == local('/bar') + + def test_gethash(self, tmpdir): + md5 = py.builtin._tryimport('md5', 'hashlib').md5 + lib = py.builtin._tryimport('sha', 'hashlib') + sha = getattr(lib, 'sha1', getattr(lib, 'sha', None)) + fn = tmpdir.join("testhashfile") + data = 'hello'.encode('ascii') + fn.write(data, mode="wb") + assert fn.computehash("md5") == md5(data).hexdigest() + assert fn.computehash("sha1") == sha(data).hexdigest() + py.test.raises(ValueError, fn.computehash, "asdasd") + + def test_remove_removes_readonly_file(self, tmpdir): + readonly_file = tmpdir.join('readonly').ensure() + readonly_file.chmod(0) + readonly_file.remove() + assert not readonly_file.check(exists=1) + + def test_remove_removes_readonly_dir(self, tmpdir): + readonly_dir = tmpdir.join('readonlydir').ensure(dir=1) + readonly_dir.chmod(int("500", 8)) + readonly_dir.remove() + assert not readonly_dir.check(exists=1) + + def test_remove_removes_dir_and_readonly_file(self, tmpdir): + readonly_dir = tmpdir.join('readonlydir').ensure(dir=1) + readonly_file = readonly_dir.join('readonlyfile').ensure() + readonly_file.chmod(0) + readonly_dir.remove() + assert not readonly_dir.check(exists=1) + + def test_remove_routes_ignore_errors(self, tmpdir, monkeypatch): + l = [] + monkeypatch.setattr( + 'shutil.rmtree', + lambda *args, **kwargs: l.append(kwargs)) + tmpdir.remove() + assert not l[0]['ignore_errors'] + for val in (True, False): + l[:] = [] + tmpdir.remove(ignore_errors=val) + assert l[0]['ignore_errors'] == val + + def test_initialize_curdir(self): + assert str(local()) == os.getcwd() + + @skiponwin32 + def test_chdir_gone(self, path1): + p = path1.ensure("dir_to_be_removed", dir=1) + p.chdir() + p.remove() + pytest.raises(py.error.ENOENT, py.path.local) + assert path1.chdir() is None + assert os.getcwd() == str(path1) + + def test_as_cwd(self, path1): + dir = path1.ensure("subdir", dir=1) + old = py.path.local() + with dir.as_cwd() as x: + assert x == old + assert py.path.local() == dir + assert os.getcwd() == str(old) + + def test_as_cwd_exception(self, path1): + old = py.path.local() + dir = path1.ensure("subdir", dir=1) + with pytest.raises(ValueError): + with dir.as_cwd(): + raise ValueError() + assert old == py.path.local() + + def test_initialize_reldir(self, path1): + with path1.as_cwd(): + p = local('samplefile') + assert p.check() + + def test_tilde_expansion(self, monkeypatch, tmpdir): + monkeypatch.setenv("HOME", str(tmpdir)) + p = py.path.local("~", expanduser=True) + assert p == os.path.expanduser("~") + + def test_eq_with_strings(self, path1): + path1 = path1.join('sampledir') + path2 = str(path1) + assert path1 == path2 + assert path2 == path1 + path3 = path1.join('samplefile') + assert path3 != path2 + assert path2 != path3 + + def test_eq_with_none(self, path1): + assert path1 != None # noqa + + def test_eq_non_ascii_unicode(self, path1): + path2 = path1.join(u'temp') + path3 = path1.join(u'ação') + path4 = path1.join(u'ディレクトリ') + + assert path2 != path3 + assert path2 != path4 + assert path4 != path3 + + def test_gt_with_strings(self, path1): + path2 = path1.join('sampledir') + path3 = str(path1.join("ttt")) + assert path3 > path2 + assert path2 < path3 + assert path2 < "ttt" + assert "ttt" > path2 + path4 = path1.join("aaa") + l = [path2, path4, path3] + assert sorted(l) == [path4, path2, path3] + + def test_open_and_ensure(self, path1): + p = path1.join("sub1", "sub2", "file") + with p.open("w", ensure=1) as f: + f.write("hello") + assert p.read() == "hello" + + def test_write_and_ensure(self, path1): + p = path1.join("sub1", "sub2", "file") + p.write("hello", ensure=1) + assert p.read() == "hello" + + @py.test.mark.parametrize('bin', (False, True)) + def test_dump(self, tmpdir, bin): + path = tmpdir.join("dumpfile%s" % int(bin)) + try: + d = {'answer': 42} + path.dump(d, bin=bin) + f = path.open('rb+') + import pickle + dnew = pickle.load(f) + assert d == dnew + finally: + f.close() + + @failsonjywin32 + def test_setmtime(self): + import tempfile + import time + try: + fd, name = tempfile.mkstemp() + os.close(fd) + except AttributeError: + name = tempfile.mktemp() + open(name, 'w').close() + try: + mtime = int(time.time())-100 + path = local(name) + assert path.mtime() != mtime + path.setmtime(mtime) + assert path.mtime() == mtime + path.setmtime() + assert path.mtime() != mtime + finally: + os.remove(name) + + def test_normpath(self, path1): + new1 = path1.join("/otherdir") + new2 = path1.join("otherdir") + assert str(new1) == str(new2) + + def test_mkdtemp_creation(self): + d = local.mkdtemp() + try: + assert d.check(dir=1) + finally: + d.remove(rec=1) + + def test_tmproot(self): + d = local.mkdtemp() + tmproot = local.get_temproot() + try: + assert d.check(dir=1) + assert d.dirpath() == tmproot + finally: + d.remove(rec=1) + + def test_chdir(self, tmpdir): + old = local() + try: + res = tmpdir.chdir() + assert str(res) == str(old) + assert os.getcwd() == str(tmpdir) + finally: + old.chdir() + + def test_ensure_filepath_withdir(self, tmpdir): + newfile = tmpdir.join('test1', 'test') + newfile.ensure() + assert newfile.check(file=1) + newfile.write("42") + newfile.ensure() + s = newfile.read() + assert s == "42" + + def test_ensure_filepath_withoutdir(self, tmpdir): + newfile = tmpdir.join('test1file') + t = newfile.ensure() + assert t == newfile + assert newfile.check(file=1) + + def test_ensure_dirpath(self, tmpdir): + newfile = tmpdir.join('test1', 'testfile') + t = newfile.ensure(dir=1) + assert t == newfile + assert newfile.check(dir=1) + + def test_ensure_non_ascii_unicode(self, tmpdir): + newfile = tmpdir.join(u'ação',u'ディレクトリ') + t = newfile.ensure(dir=1) + assert t == newfile + assert newfile.check(dir=1) + + def test_init_from_path(self, tmpdir): + l = local() + l2 = local(l) + assert l2 == l + + wc = py.path.svnwc('.') + l3 = local(wc) + assert l3 is not wc + assert l3.strpath == wc.strpath + assert not hasattr(l3, 'commit') + + @py.test.mark.xfail(run=False, reason="unreliable est for long filenames") + def test_long_filenames(self, tmpdir): + if sys.platform == "win32": + py.test.skip("win32: work around needed for path length limit") + # see http://codespeak.net/pipermail/py-dev/2008q2/000922.html + + # testing paths > 260 chars (which is Windows' limitation, but + # depending on how the paths are used), but > 4096 (which is the + # Linux' limitation) - the behaviour of paths with names > 4096 chars + # is undetermined + newfilename = '/test' * 60 + l = tmpdir.join(newfilename) + l.ensure(file=True) + l.write('foo') + l2 = tmpdir.join(newfilename) + assert l2.read() == 'foo' + + def test_visit_depth_first(self, tmpdir): + tmpdir.ensure("a", "1") + tmpdir.ensure("b", "2") + p3 = tmpdir.ensure("breadth") + l = list(tmpdir.visit(lambda x: x.check(file=1))) + assert len(l) == 3 + # check that breadth comes last + assert l[2] == p3 + + def test_visit_rec_fnmatch(self, tmpdir): + p1 = tmpdir.ensure("a", "123") + tmpdir.ensure(".b", "345") + l = list(tmpdir.visit("???", rec="[!.]*")) + assert len(l) == 1 + # check that breadth comes last + assert l[0] == p1 + + def test_fnmatch_file_abspath(self, tmpdir): + b = tmpdir.join("a", "b") + assert b.fnmatch(os.sep.join("ab")) + pattern = os.sep.join([str(tmpdir), "*", "b"]) + assert b.fnmatch(pattern) + + def test_sysfind(self): + name = sys.platform == "win32" and "cmd" or "test" + x = py.path.local.sysfind(name) + assert x.check(file=1) + assert py.path.local.sysfind('jaksdkasldqwe') is None + assert py.path.local.sysfind(name, paths=[]) is None + x2 = py.path.local.sysfind(name, paths=[x.dirpath()]) + assert x2 == x + + def test_fspath_protocol_other_class(self, fake_fspath_obj): + # py.path is always absolute + py_path = py.path.local(fake_fspath_obj) + str_path = fake_fspath_obj.__fspath__() + assert py_path.check(endswith=str_path) + assert py_path.join(fake_fspath_obj).strpath == os.path.join( + py_path.strpath, str_path) + + def test_make_numbered_dir_multiprocess_safe(self, tmpdir): + # https://github.com/pytest-dev/py/issues/30 + pool = multiprocessing.Pool() + results = [pool.apply_async(batch_make_numbered_dirs, [tmpdir, 100]) for _ in range(20)] + for r in results: + assert r.get() + + +class TestExecutionOnWindows: + pytestmark = win32only + + def test_sysfind_bat_exe_before(self, tmpdir, monkeypatch): + monkeypatch.setenv("PATH", str(tmpdir), prepend=os.pathsep) + tmpdir.ensure("hello") + h = tmpdir.ensure("hello.bat") + x = py.path.local.sysfind("hello") + assert x == h + + +class TestExecution: + pytestmark = skiponwin32 + + def test_sysfind_no_permisson_ignored(self, monkeypatch, tmpdir): + noperm = tmpdir.ensure('noperm', dir=True) + monkeypatch.setenv("PATH", noperm, prepend=":") + noperm.chmod(0) + assert py.path.local.sysfind('jaksdkasldqwe') is None + + def test_sysfind_absolute(self): + x = py.path.local.sysfind('test') + assert x.check(file=1) + y = py.path.local.sysfind(str(x)) + assert y.check(file=1) + assert y == x + + def test_sysfind_multiple(self, tmpdir, monkeypatch): + monkeypatch.setenv('PATH', "%s:%s" % ( + tmpdir.ensure('a'), + tmpdir.join('b')), + prepend=":") + tmpdir.ensure('b', 'a') + x = py.path.local.sysfind( + 'a', checker=lambda x: x.dirpath().basename == 'b') + assert x.basename == 'a' + assert x.dirpath().basename == 'b' + assert py.path.local.sysfind('a', checker=lambda x: None) is None + + def test_sysexec(self): + x = py.path.local.sysfind('ls') + out = x.sysexec('-a') + for x in py.path.local().listdir(): + assert out.find(x.basename) != -1 + + def test_sysexec_failing(self): + x = py.path.local.sysfind('false') + with pytest.raises(py.process.cmdexec.Error): + x.sysexec('aksjdkasjd') + + def test_make_numbered_dir(self, tmpdir): + tmpdir.ensure('base.not_an_int', dir=1) + for i in range(10): + numdir = local.make_numbered_dir(prefix='base.', rootdir=tmpdir, + keep=2, lock_timeout=0) + assert numdir.check() + assert numdir.basename == 'base.%d' % i + if i >= 1: + assert numdir.new(ext=str(i-1)).check() + if i >= 2: + assert numdir.new(ext=str(i-2)).check() + if i >= 3: + assert not numdir.new(ext=str(i-3)).check() + + def test_make_numbered_dir_case(self, tmpdir): + """make_numbered_dir does not make assumptions on the underlying + filesystem based on the platform and will assume it _could_ be case + insensitive. + + See issues: + - https://github.com/pytest-dev/pytest/issues/708 + - https://github.com/pytest-dev/pytest/issues/3451 + """ + d1 = local.make_numbered_dir( + prefix='CAse.', rootdir=tmpdir, keep=2, lock_timeout=0, + ) + d2 = local.make_numbered_dir( + prefix='caSE.', rootdir=tmpdir, keep=2, lock_timeout=0, + ) + assert str(d1).lower() != str(d2).lower() + assert str(d2).endswith('.1') + + def test_make_numbered_dir_NotImplemented_Error(self, tmpdir, monkeypatch): + def notimpl(x, y): + raise NotImplementedError(42) + monkeypatch.setattr(os, 'symlink', notimpl) + x = tmpdir.make_numbered_dir(rootdir=tmpdir, lock_timeout=0) + assert x.relto(tmpdir) + assert x.check() + + def test_locked_make_numbered_dir(self, tmpdir): + for i in range(10): + numdir = local.make_numbered_dir(prefix='base2.', rootdir=tmpdir, + keep=2) + assert numdir.check() + assert numdir.basename == 'base2.%d' % i + for j in range(i): + assert numdir.new(ext=str(j)).check() + + def test_error_preservation(self, path1): + py.test.raises(EnvironmentError, path1.join('qwoeqiwe').mtime) + py.test.raises(EnvironmentError, path1.join('qwoeqiwe').read) + + # def test_parentdirmatch(self): + # local.parentdirmatch('std', startmodule=__name__) + # + + +class TestImport: + def test_pyimport(self, path1): + obj = path1.join('execfile.py').pyimport() + assert obj.x == 42 + assert obj.__name__ == 'execfile' + + def test_pyimport_renamed_dir_creates_mismatch(self, tmpdir): + p = tmpdir.ensure("a", "test_x123.py") + p.pyimport() + tmpdir.join("a").move(tmpdir.join("b")) + with pytest.raises(tmpdir.ImportMismatchError): + tmpdir.join("b", "test_x123.py").pyimport() + + def test_pyimport_messy_name(self, tmpdir): + # http://bitbucket.org/hpk42/py-trunk/issue/129 + path = tmpdir.ensure('foo__init__.py') + path.pyimport() + + def test_pyimport_dir(self, tmpdir): + p = tmpdir.join("hello_123") + p_init = p.ensure("__init__.py") + m = p.pyimport() + assert m.__name__ == "hello_123" + m = p_init.pyimport() + assert m.__name__ == "hello_123" + + def test_pyimport_execfile_different_name(self, path1): + obj = path1.join('execfile.py').pyimport(modname="0x.y.z") + assert obj.x == 42 + assert obj.__name__ == '0x.y.z' + + def test_pyimport_a(self, path1): + otherdir = path1.join('otherdir') + mod = otherdir.join('a.py').pyimport() + assert mod.result == "got it" + assert mod.__name__ == 'otherdir.a' + + def test_pyimport_b(self, path1): + otherdir = path1.join('otherdir') + mod = otherdir.join('b.py').pyimport() + assert mod.stuff == "got it" + assert mod.__name__ == 'otherdir.b' + + def test_pyimport_c(self, path1): + otherdir = path1.join('otherdir') + mod = otherdir.join('c.py').pyimport() + assert mod.value == "got it" + + def test_pyimport_d(self, path1): + otherdir = path1.join('otherdir') + mod = otherdir.join('d.py').pyimport() + assert mod.value2 == "got it" + + def test_pyimport_and_import(self, tmpdir): + tmpdir.ensure('xxxpackage', '__init__.py') + mod1path = tmpdir.ensure('xxxpackage', 'module1.py') + mod1 = mod1path.pyimport() + assert mod1.__name__ == 'xxxpackage.module1' + from xxxpackage import module1 + assert module1 is mod1 + + def test_pyimport_check_filepath_consistency(self, monkeypatch, tmpdir): + name = 'pointsback123' + ModuleType = type(os) + p = tmpdir.ensure(name + '.py') + for ending in ('.pyc', '$py.class', '.pyo'): + mod = ModuleType(name) + pseudopath = tmpdir.ensure(name+ending) + mod.__file__ = str(pseudopath) + monkeypatch.setitem(sys.modules, name, mod) + newmod = p.pyimport() + assert mod == newmod + monkeypatch.undo() + mod = ModuleType(name) + pseudopath = tmpdir.ensure(name+"123.py") + mod.__file__ = str(pseudopath) + monkeypatch.setitem(sys.modules, name, mod) + excinfo = py.test.raises(pseudopath.ImportMismatchError, p.pyimport) + modname, modfile, orig = excinfo.value.args + assert modname == name + assert modfile == pseudopath + assert orig == p + assert issubclass(pseudopath.ImportMismatchError, ImportError) + + def test_issue131_pyimport_on__init__(self, tmpdir): + # __init__.py files may be namespace packages, and thus the + # __file__ of an imported module may not be ourselves + # see issue + p1 = tmpdir.ensure("proja", "__init__.py") + p2 = tmpdir.ensure("sub", "proja", "__init__.py") + m1 = p1.pyimport() + m2 = p2.pyimport() + assert m1 == m2 + + def test_ensuresyspath_append(self, tmpdir): + root1 = tmpdir.mkdir("root1") + file1 = root1.ensure("x123.py") + assert str(root1) not in sys.path + file1.pyimport(ensuresyspath="append") + assert str(root1) == sys.path[-1] + assert str(root1) not in sys.path[:-1] + + +def test_pypkgdir(tmpdir): + pkg = tmpdir.ensure('pkg1', dir=1) + pkg.ensure("__init__.py") + pkg.ensure("subdir/__init__.py") + assert pkg.pypkgpath() == pkg + assert pkg.join('subdir', '__init__.py').pypkgpath() == pkg + + +def test_pypkgdir_unimportable(tmpdir): + pkg = tmpdir.ensure('pkg1-1', dir=1) # unimportable + pkg.ensure("__init__.py") + subdir = pkg.ensure("subdir/__init__.py").dirpath() + assert subdir.pypkgpath() == subdir + assert subdir.ensure("xyz.py").pypkgpath() == subdir + assert not pkg.pypkgpath() + + +def test_isimportable(): + from py._path.local import isimportable + assert not isimportable("") + assert isimportable("x") + assert isimportable("x1") + assert isimportable("x_1") + assert isimportable("_") + assert isimportable("_1") + assert not isimportable("x-1") + assert not isimportable("x:1") + + +def test_homedir_from_HOME(monkeypatch): + path = os.getcwd() + monkeypatch.setenv("HOME", path) + assert py.path.local._gethomedir() == py.path.local(path) + + +def test_homedir_not_exists(monkeypatch): + monkeypatch.delenv("HOME", raising=False) + monkeypatch.delenv("HOMEDRIVE", raising=False) + homedir = py.path.local._gethomedir() + assert homedir is None + + +def test_samefile(tmpdir): + assert tmpdir.samefile(tmpdir) + p = tmpdir.ensure("hello") + assert p.samefile(p) + with p.dirpath().as_cwd(): + assert p.samefile(p.basename) + if sys.platform == "win32": + p1 = p.__class__(str(p).lower()) + p2 = p.__class__(str(p).upper()) + assert p1.samefile(p2) + + +def test_listdir_single_arg(tmpdir): + tmpdir.ensure("hello") + assert tmpdir.listdir("hello")[0].basename == "hello" + + +def test_mkdtemp_rootdir(tmpdir): + dtmp = local.mkdtemp(rootdir=tmpdir) + assert tmpdir.listdir() == [dtmp] + + +class TestWINLocalPath: + pytestmark = win32only + + def test_owner_group_not_implemented(self, path1): + py.test.raises(NotImplementedError, "path1.stat().owner") + py.test.raises(NotImplementedError, "path1.stat().group") + + def test_chmod_simple_int(self, path1): + py.builtin.print_("path1 is", path1) + mode = path1.stat().mode + # Ensure that we actually change the mode to something different. + path1.chmod(mode == 0 and 1 or 0) + try: + print(path1.stat().mode) + print(mode) + assert path1.stat().mode != mode + finally: + path1.chmod(mode) + assert path1.stat().mode == mode + + def test_path_comparison_lowercase_mixed(self, path1): + t1 = path1.join("a_path") + t2 = path1.join("A_path") + assert t1 == t1 + assert t1 == t2 + + def test_relto_with_mixed_case(self, path1): + t1 = path1.join("a_path", "fiLe") + t2 = path1.join("A_path") + assert t1.relto(t2) == "fiLe" + + def test_allow_unix_style_paths(self, path1): + t1 = path1.join('a_path') + assert t1 == str(path1) + '\\a_path' + t1 = path1.join('a_path/') + assert t1 == str(path1) + '\\a_path' + t1 = path1.join('dir/a_path') + assert t1 == str(path1) + '\\dir\\a_path' + + def test_sysfind_in_currentdir(self, path1): + cmd = py.path.local.sysfind('cmd') + root = cmd.new(dirname='', basename='') # c:\ in most installations + with root.as_cwd(): + x = py.path.local.sysfind(cmd.relto(root)) + assert x.check(file=1) + + def test_fnmatch_file_abspath_posix_pattern_on_win32(self, tmpdir): + # path-matching patterns might contain a posix path separator '/' + # Test that we can match that pattern on windows. + import posixpath + b = tmpdir.join("a", "b") + assert b.fnmatch(posixpath.sep.join("ab")) + pattern = posixpath.sep.join([str(tmpdir), "*", "b"]) + assert b.fnmatch(pattern) + + +class TestPOSIXLocalPath: + pytestmark = skiponwin32 + + def test_hardlink(self, tmpdir): + linkpath = tmpdir.join('test') + filepath = tmpdir.join('file') + filepath.write("Hello") + nlink = filepath.stat().nlink + linkpath.mklinkto(filepath) + assert filepath.stat().nlink == nlink + 1 + + def test_symlink_are_identical(self, tmpdir): + filepath = tmpdir.join('file') + filepath.write("Hello") + linkpath = tmpdir.join('test') + linkpath.mksymlinkto(filepath) + assert linkpath.readlink() == str(filepath) + + def test_symlink_isfile(self, tmpdir): + linkpath = tmpdir.join('test') + filepath = tmpdir.join('file') + filepath.write("") + linkpath.mksymlinkto(filepath) + assert linkpath.check(file=1) + assert not linkpath.check(link=0, file=1) + assert linkpath.islink() + + def test_symlink_relative(self, tmpdir): + linkpath = tmpdir.join('test') + filepath = tmpdir.join('file') + filepath.write("Hello") + linkpath.mksymlinkto(filepath, absolute=False) + assert linkpath.readlink() == "file" + assert filepath.read() == linkpath.read() + + def test_symlink_not_existing(self, tmpdir): + linkpath = tmpdir.join('testnotexisting') + assert not linkpath.check(link=1) + assert linkpath.check(link=0) + + def test_relto_with_root(self, path1, tmpdir): + y = path1.join('x').relto(py.path.local('/')) + assert y[0] == str(path1)[1] + + def test_visit_recursive_symlink(self, tmpdir): + linkpath = tmpdir.join('test') + linkpath.mksymlinkto(tmpdir) + visitor = tmpdir.visit(None, lambda x: x.check(link=0)) + assert list(visitor) == [linkpath] + + def test_symlink_isdir(self, tmpdir): + linkpath = tmpdir.join('test') + linkpath.mksymlinkto(tmpdir) + assert linkpath.check(dir=1) + assert not linkpath.check(link=0, dir=1) + + def test_symlink_remove(self, tmpdir): + linkpath = tmpdir.join('test') + linkpath.mksymlinkto(linkpath) # point to itself + assert linkpath.check(link=1) + linkpath.remove() + assert not linkpath.check() + + def test_realpath_file(self, tmpdir): + linkpath = tmpdir.join('test') + filepath = tmpdir.join('file') + filepath.write("") + linkpath.mksymlinkto(filepath) + realpath = linkpath.realpath() + assert realpath.basename == 'file' + + def test_owner(self, path1, tmpdir): + from pwd import getpwuid + from grp import getgrgid + stat = path1.stat() + assert stat.path == path1 + + uid = stat.uid + gid = stat.gid + owner = getpwuid(uid)[0] + group = getgrgid(gid)[0] + + assert uid == stat.uid + assert owner == stat.owner + assert gid == stat.gid + assert group == stat.group + + def test_stat_helpers(self, tmpdir, monkeypatch): + path1 = tmpdir.ensure("file") + stat1 = path1.stat() + stat2 = tmpdir.stat() + assert stat1.isfile() + assert stat2.isdir() + assert not stat1.islink() + assert not stat2.islink() + + def test_stat_non_raising(self, tmpdir): + path1 = tmpdir.join("file") + pytest.raises(py.error.ENOENT, lambda: path1.stat()) + res = path1.stat(raising=False) + assert res is None + + def test_atime(self, tmpdir): + import time + path = tmpdir.ensure('samplefile') + now = time.time() + atime1 = path.atime() + # we could wait here but timer resolution is very + # system dependent + path.read() + time.sleep(ATIME_RESOLUTION) + atime2 = path.atime() + time.sleep(ATIME_RESOLUTION) + duration = time.time() - now + assert (atime2-atime1) <= duration + + def test_commondir(self, path1): + # XXX This is here in local until we find a way to implement this + # using the subversion command line api. + p1 = path1.join('something') + p2 = path1.join('otherthing') + assert p1.common(p2) == path1 + assert p2.common(p1) == path1 + + def test_commondir_nocommon(self, path1): + # XXX This is here in local until we find a way to implement this + # using the subversion command line api. + p1 = path1.join('something') + p2 = py.path.local(path1.sep+'blabla') + assert p1.common(p2) == '/' + + def test_join_to_root(self, path1): + root = path1.parts()[0] + assert len(str(root)) == 1 + assert str(root.join('a')) == '/a' + + def test_join_root_to_root_with_no_abs(self, path1): + nroot = path1.join('/') + assert str(path1) == str(nroot) + assert path1 == nroot + + def test_chmod_simple_int(self, path1): + mode = path1.stat().mode + path1.chmod(int(mode/2)) + try: + assert path1.stat().mode != mode + finally: + path1.chmod(mode) + assert path1.stat().mode == mode + + def test_chmod_rec_int(self, path1): + # XXX fragile test + def recfilter(x): return x.check(dotfile=0, link=0) + oldmodes = {} + for x in path1.visit(rec=recfilter): + oldmodes[x] = x.stat().mode + path1.chmod(int("772", 8), rec=recfilter) + try: + for x in path1.visit(rec=recfilter): + assert x.stat().mode & int("777", 8) == int("772", 8) + finally: + for x, y in oldmodes.items(): + x.chmod(y) + + def test_copy_archiving(self, tmpdir): + unicode_fn = u"something-\342\200\223.txt" + f = tmpdir.ensure("a", unicode_fn) + a = f.dirpath() + oldmode = f.stat().mode + newmode = oldmode ^ 1 + f.chmod(newmode) + b = tmpdir.join("b") + a.copy(b, mode=True) + assert b.join(f.basename).stat().mode == newmode + + def test_copy_stat_file(self, tmpdir): + src = tmpdir.ensure('src') + dst = tmpdir.join('dst') + # a small delay before the copy + time.sleep(ATIME_RESOLUTION) + src.copy(dst, stat=True) + oldstat = src.stat() + newstat = dst.stat() + assert oldstat.mode == newstat.mode + assert (dst.atime() - src.atime()) < ATIME_RESOLUTION + assert (dst.mtime() - src.mtime()) < ATIME_RESOLUTION + + def test_copy_stat_dir(self, tmpdir): + test_files = ['a', 'b', 'c'] + src = tmpdir.join('src') + for f in test_files: + src.join(f).write(f, ensure=True) + dst = tmpdir.join('dst') + # a small delay before the copy + time.sleep(ATIME_RESOLUTION) + src.copy(dst, stat=True) + for f in test_files: + oldstat = src.join(f).stat() + newstat = dst.join(f).stat() + assert (newstat.atime - oldstat.atime) < ATIME_RESOLUTION + assert (newstat.mtime - oldstat.mtime) < ATIME_RESOLUTION + assert oldstat.mode == newstat.mode + + @failsonjython + def test_chown_identity(self, path1): + owner = path1.stat().owner + group = path1.stat().group + path1.chown(owner, group) + + @failsonjython + def test_chown_dangling_link(self, path1): + owner = path1.stat().owner + group = path1.stat().group + x = path1.join('hello') + x.mksymlinkto('qlwkejqwlek') + try: + path1.chown(owner, group, rec=1) + finally: + x.remove(rec=0) + + @failsonjython + def test_chown_identity_rec_mayfail(self, path1): + owner = path1.stat().owner + group = path1.stat().group + path1.chown(owner, group) + + +class TestUnicodePy2Py3: + def test_join_ensure(self, tmpdir, monkeypatch): + if sys.version_info >= (3, 0) and "LANG" not in os.environ: + pytest.skip("cannot run test without locale") + x = py.path.local(tmpdir.strpath) + part = "hällo" + y = x.ensure(part) + assert x.join(part) == y + + def test_listdir(self, tmpdir): + if sys.version_info >= (3, 0) and "LANG" not in os.environ: + pytest.skip("cannot run test without locale") + x = py.path.local(tmpdir.strpath) + part = "hällo" + y = x.ensure(part) + assert x.listdir(part)[0] == y + + @pytest.mark.xfail( + reason="changing read/write might break existing usages") + def test_read_write(self, tmpdir): + x = tmpdir.join("hello") + part = py.builtin._totext("hällo", "utf8") + x.write(part) + assert x.read() == part + x.write(part.encode(sys.getdefaultencoding())) + assert x.read() == part.encode(sys.getdefaultencoding()) + + +class TestBinaryAndTextMethods: + def test_read_binwrite(self, tmpdir): + x = tmpdir.join("hello") + part = py.builtin._totext("hällo", "utf8") + part_utf8 = part.encode("utf8") + x.write_binary(part_utf8) + assert x.read_binary() == part_utf8 + s = x.read_text(encoding="utf8") + assert s == part + assert py.builtin._istext(s) + + def test_read_textwrite(self, tmpdir): + x = tmpdir.join("hello") + part = py.builtin._totext("hällo", "utf8") + part_utf8 = part.encode("utf8") + x.write_text(part, encoding="utf8") + assert x.read_binary() == part_utf8 + assert x.read_text(encoding="utf8") == part + + def test_default_encoding(self, tmpdir): + x = tmpdir.join("hello") + # Can't use UTF8 as the default encoding (ASCII) doesn't support it + part = py.builtin._totext("hello", "ascii") + x.write_text(part, "ascii") + s = x.read_text("ascii") + assert s == part + assert type(s) == type(part) diff --git a/third_party/python/py/testing/path/test_svnauth.py b/third_party/python/py/testing/path/test_svnauth.py new file mode 100644 index 0000000000..654f033224 --- /dev/null +++ b/third_party/python/py/testing/path/test_svnauth.py @@ -0,0 +1,460 @@ +import py +from py.path import SvnAuth +import time +import sys + +svnbin = py.path.local.sysfind('svn') + + +def make_repo_auth(repo, userdata): + """ write config to repo + + user information in userdata is used for auth + userdata has user names as keys, and a tuple (password, readwrite) as + values, where 'readwrite' is either 'r' or 'rw' + """ + confdir = py.path.local(repo).join('conf') + confdir.join('svnserve.conf').write('''\ +[general] +anon-access = none +password-db = passwd +authz-db = authz +realm = TestRepo +''') + authzdata = '[/]\n' + passwddata = '[users]\n' + for user in userdata: + authzdata += '%s = %s\n' % (user, userdata[user][1]) + passwddata += '%s = %s\n' % (user, userdata[user][0]) + confdir.join('authz').write(authzdata) + confdir.join('passwd').write(passwddata) + +def serve_bg(repopath): + pidfile = py.path.local(repopath).join('pid') + port = 10000 + e = None + while port < 10010: + cmd = 'svnserve -d -T --listen-port=%d --pid-file=%s -r %s' % ( + port, pidfile, repopath) + print(cmd) + try: + py.process.cmdexec(cmd) + except py.process.cmdexec.Error: + e = sys.exc_info()[1] + else: + # XXX we assume here that the pid file gets written somewhere, I + # guess this should be relatively safe... (I hope, at least?) + counter = pid = 0 + while counter < 10: + counter += 1 + try: + pid = pidfile.read() + except py.error.ENOENT: + pass + if pid: + break + time.sleep(0.2) + return port, int(pid) + port += 1 + raise IOError('could not start svnserve: %s' % (e,)) + +class TestSvnAuth(object): + def test_basic(self): + auth = SvnAuth('foo', 'bar') + assert auth.username == 'foo' + assert auth.password == 'bar' + assert str(auth) + + def test_makecmdoptions_uname_pw_makestr(self): + auth = SvnAuth('foo', 'bar') + assert auth.makecmdoptions() == '--username="foo" --password="bar"' + + def test_makecmdoptions_quote_escape(self): + auth = SvnAuth('fo"o', '"ba\'r"') + assert auth.makecmdoptions() == '--username="fo\\"o" --password="\\"ba\'r\\""' + + def test_makecmdoptions_no_cache_auth(self): + auth = SvnAuth('foo', 'bar', cache_auth=False) + assert auth.makecmdoptions() == ('--username="foo" --password="bar" ' + '--no-auth-cache') + + def test_makecmdoptions_no_interactive(self): + auth = SvnAuth('foo', 'bar', interactive=False) + assert auth.makecmdoptions() == ('--username="foo" --password="bar" ' + '--non-interactive') + + def test_makecmdoptions_no_interactive_no_cache_auth(self): + auth = SvnAuth('foo', 'bar', cache_auth=False, + interactive=False) + assert auth.makecmdoptions() == ('--username="foo" --password="bar" ' + '--no-auth-cache --non-interactive') + +class svnwc_no_svn(py.path.svnwc): + def __new__(cls, *args, **kwargs): + self = super(svnwc_no_svn, cls).__new__(cls, *args, **kwargs) + self.commands = [] + return self + + def _svn(self, *args): + self.commands.append(args) + +class TestSvnWCAuth(object): + def setup_method(self, meth): + if not svnbin: + py.test.skip("svn binary required") + self.auth = SvnAuth('user', 'pass', cache_auth=False) + + def test_checkout(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.checkout('url') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + + def test_commit(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.commit('msg') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + + def test_checkout_no_cache_auth(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.checkout('url') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + + def test_checkout_auth_from_constructor(self): + wc = svnwc_no_svn('foo', auth=self.auth) + wc.checkout('url') + assert wc.commands[0][-1] == ('--username="user" --password="pass" ' + '--no-auth-cache') + +class svnurl_no_svn(py.path.svnurl): + cmdexec_output = 'test' + popen_output = 'test' + def __new__(cls, *args, **kwargs): + self = super(svnurl_no_svn, cls).__new__(cls, *args, **kwargs) + self.commands = [] + return self + + def _cmdexec(self, cmd): + self.commands.append(cmd) + return self.cmdexec_output + + def _popen(self, cmd): + self.commands.append(cmd) + return self.popen_output + +class TestSvnURLAuth(object): + def setup_method(self, meth): + self.auth = SvnAuth('foo', 'bar') + + def test_init(self): + u = svnurl_no_svn('http://foo.bar/svn') + assert u.auth is None + + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + assert u.auth is self.auth + + def test_new(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + new = u.new(basename='bar') + assert new.auth is self.auth + assert new.url == 'http://foo.bar/svn/bar' + + def test_join(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + new = u.join('foo') + assert new.auth is self.auth + assert new.url == 'http://foo.bar/svn/foo' + + def test_listdir(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + u.cmdexec_output = '''\ + 1717 johnny 1529 Nov 04 14:32 LICENSE.txt + 1716 johnny 5352 Nov 04 14:28 README.txt +''' + paths = u.listdir() + assert paths[0].auth is self.auth + assert paths[1].auth is self.auth + assert paths[0].basename == 'LICENSE.txt' + + def test_info(self): + u = svnurl_no_svn('http://foo.bar/svn/LICENSE.txt', auth=self.auth) + def dirpath(self): + return self + u.cmdexec_output = '''\ + 1717 johnny 1529 Nov 04 14:32 LICENSE.txt + 1716 johnny 5352 Nov 04 14:28 README.txt +''' + org_dp = u.__class__.dirpath + u.__class__.dirpath = dirpath + try: + info = u.info() + finally: + u.dirpath = org_dp + assert info.size == 1529 + + def test_open(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + foo = u.join('foo') + foo.check = lambda *args, **kwargs: True + ret = foo.open() + assert ret == 'test' + assert '--username="foo" --password="bar"' in foo.commands[0] + + def test_dirpath(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + parent = u.dirpath() + assert parent.auth is self.auth + + def test_mkdir(self): + u = svnurl_no_svn('http://foo.bar/svn/qweqwe', auth=self.auth) + assert not u.commands + u.mkdir(msg='created dir foo') + assert u.commands + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_copy(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + u2 = svnurl_no_svn('http://foo.bar/svn2') + u.copy(u2, 'copied dir') + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_rename(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + u.rename('http://foo.bar/svn/bar', 'moved foo to bar') + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_remove(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + u.remove(msg='removing foo') + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_export(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + target = py.path.local('/foo') + u.export(target) + assert '--username="foo" --password="bar"' in u.commands[0] + + def test_log(self): + u = svnurl_no_svn('http://foo.bar/svn/foo', auth=self.auth) + u.popen_output = py.io.TextIO(py.builtin._totext('''\ +<?xml version="1.0"?> +<log> +<logentry revision="51381"> +<author>guido</author> +<date>2008-02-11T12:12:18.476481Z</date> +<msg>Creating branch to work on auth support for py.path.svn*. +</msg> +</logentry> +</log> +''', 'ascii')) + u.check = lambda *args, **kwargs: True + ret = u.log(10, 20, verbose=True) + assert '--username="foo" --password="bar"' in u.commands[0] + assert len(ret) == 1 + assert int(ret[0].rev) == 51381 + assert ret[0].author == 'guido' + + def test_propget(self): + u = svnurl_no_svn('http://foo.bar/svn', auth=self.auth) + u.propget('foo') + assert '--username="foo" --password="bar"' in u.commands[0] + +def pytest_funcarg__setup(request): + return Setup(request) + +class Setup: + def __init__(self, request): + if not svnbin: + py.test.skip("svn binary required") + if not request.config.option.runslowtests: + py.test.skip('use --runslowtests to run these tests') + + tmpdir = request.getfuncargvalue("tmpdir") + repodir = tmpdir.join("repo") + py.process.cmdexec('svnadmin create %s' % repodir) + if sys.platform == 'win32': + repodir = '/' + str(repodir).replace('\\', '/') + self.repo = py.path.svnurl("file://%s" % repodir) + if sys.platform == 'win32': + # remove trailing slash... + repodir = repodir[1:] + self.repopath = py.path.local(repodir) + self.temppath = tmpdir.mkdir("temppath") + self.auth = SvnAuth('johnny', 'foo', cache_auth=False, + interactive=False) + make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')}) + self.port, self.pid = serve_bg(self.repopath.dirpath()) + # XXX caching is too global + py.path.svnurl._lsnorevcache._dict.clear() + request.addfinalizer(lambda: py.process.kill(self.pid)) + +class TestSvnWCAuthFunctional: + def test_checkout_constructor_arg(self, setup): + wc = py.path.svnwc(setup.temppath, auth=setup.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)) + assert wc.join('.svn').check() + + def test_checkout_function_arg(self, setup): + wc = py.path.svnwc(setup.temppath, auth=setup.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)) + assert wc.join('.svn').check() + + def test_checkout_failing_non_interactive(self, setup): + auth = SvnAuth('johnny', 'bar', cache_auth=False, + interactive=False) + wc = py.path.svnwc(setup.temppath, auth) + py.test.raises(Exception, + ("wc.checkout('svn://localhost:%(port)s/%(repopath)s')" % + setup.__dict__)) + + def test_log(self, setup): + wc = py.path.svnwc(setup.temppath, setup.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)) + foo = wc.ensure('foo.txt') + wc.commit('added foo.txt') + log = foo.log() + assert len(log) == 1 + assert log[0].msg == 'added foo.txt' + + def test_switch(self, setup): + import pytest + try: + import xdist + pytest.skip('#160: fails under xdist') + except ImportError: + pass + wc = py.path.svnwc(setup.temppath, auth=setup.auth) + svnurl = 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename) + wc.checkout(svnurl) + wc.ensure('foo', dir=True).ensure('foo.txt').write('foo') + wc.commit('added foo dir with foo.txt file') + wc.ensure('bar', dir=True) + wc.commit('added bar dir') + bar = wc.join('bar') + bar.switch(svnurl + '/foo') + assert bar.join('foo.txt') + + def test_update(self, setup): + wc1 = py.path.svnwc(setup.temppath.ensure('wc1', dir=True), + auth=setup.auth) + wc2 = py.path.svnwc(setup.temppath.ensure('wc2', dir=True), + auth=setup.auth) + wc1.checkout( + 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)) + wc2.checkout( + 'svn://localhost:%s/%s' % (setup.port, setup.repopath.basename)) + wc1.ensure('foo', dir=True) + wc1.commit('added foo dir') + wc2.update() + assert wc2.join('foo').check() + + auth = SvnAuth('unknown', 'unknown', interactive=False) + wc2.auth = auth + py.test.raises(Exception, 'wc2.update()') + + def test_lock_unlock_status(self, setup): + port = setup.port + wc = py.path.svnwc(setup.temppath, auth=setup.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename,)) + wc.ensure('foo', file=True) + wc.commit('added foo file') + foo = wc.join('foo') + foo.lock() + status = foo.status() + assert status.locked + foo.unlock() + status = foo.status() + assert not status.locked + + auth = SvnAuth('unknown', 'unknown', interactive=False) + foo.auth = auth + py.test.raises(Exception, 'foo.lock()') + py.test.raises(Exception, 'foo.unlock()') + + def test_diff(self, setup): + port = setup.port + wc = py.path.svnwc(setup.temppath, auth=setup.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename,)) + wc.ensure('foo', file=True) + wc.commit('added foo file') + wc.update() + rev = int(wc.status().rev) + foo = wc.join('foo') + foo.write('bar') + diff = foo.diff() + assert '\n+bar\n' in diff + foo.commit('added some content') + diff = foo.diff() + assert not diff + diff = foo.diff(rev=rev) + assert '\n+bar\n' in diff + + auth = SvnAuth('unknown', 'unknown', interactive=False) + foo.auth = auth + py.test.raises(Exception, 'foo.diff(rev=rev)') + +class TestSvnURLAuthFunctional: + def test_listdir(self, setup): + port = setup.port + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename), + auth=setup.auth) + u.ensure('foo') + paths = u.listdir() + assert len(paths) == 1 + assert paths[0].auth is setup.auth + + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename), + auth=auth) + py.test.raises(Exception, 'u.listdir()') + + def test_copy(self, setup): + port = setup.port + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename), + auth=setup.auth) + foo = u.mkdir('foo') + assert foo.check() + bar = u.join('bar') + foo.copy(bar) + assert bar.check() + assert bar.auth is setup.auth + + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename), + auth=auth) + foo = u.join('foo') + bar = u.join('bar') + py.test.raises(Exception, 'foo.copy(bar)') + + def test_write_read(self, setup): + port = setup.port + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename), + auth=setup.auth) + foo = u.ensure('foo') + fp = foo.open() + try: + data = fp.read() + finally: + fp.close() + assert data == '' + + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, setup.repopath.basename), + auth=auth) + foo = u.join('foo') + py.test.raises(Exception, 'foo.open()') + + # XXX rinse, repeat... :| diff --git a/third_party/python/py/testing/path/test_svnurl.py b/third_party/python/py/testing/path/test_svnurl.py new file mode 100644 index 0000000000..15fbea5047 --- /dev/null +++ b/third_party/python/py/testing/path/test_svnurl.py @@ -0,0 +1,95 @@ +import py +from py._path.svnurl import InfoSvnCommand +import datetime +import time +from svntestbase import CommonSvnTests + +def pytest_funcarg__path1(request): + repo, repourl, wc = request.getfuncargvalue("repowc1") + return py.path.svnurl(repourl) + +class TestSvnURLCommandPath(CommonSvnTests): + @py.test.mark.xfail + def test_load(self, path1): + super(TestSvnURLCommandPath, self).test_load(path1) + + # the following two work on jython but not in local/svnwc + def test_listdir(self, path1): + super(TestSvnURLCommandPath, self).test_listdir(path1) + def test_visit_ignore(self, path1): + super(TestSvnURLCommandPath, self).test_visit_ignore(path1) + + def test_svnurl_needs_arg(self, path1): + py.test.raises(TypeError, "py.path.svnurl()") + + def test_svnurl_does_not_accept_None_either(self, path1): + py.test.raises(Exception, "py.path.svnurl(None)") + + def test_svnurl_characters_simple(self, path1): + py.path.svnurl("svn+ssh://hello/world") + + def test_svnurl_characters_at_user(self, path1): + py.path.svnurl("http://user@host.com/some/dir") + + def test_svnurl_characters_at_path(self, path1): + py.test.raises(ValueError, 'py.path.svnurl("http://host.com/foo@bar")') + + def test_svnurl_characters_colon_port(self, path1): + py.path.svnurl("http://host.com:8080/some/dir") + + def test_svnurl_characters_tilde_end(self, path1): + py.path.svnurl("http://host.com/some/file~") + + @py.test.mark.xfail("sys.platform == 'win32'") + def test_svnurl_characters_colon_path(self, path1): + # colons are allowed on win32, because they're part of the drive + # part of an absolute path... however, they shouldn't be allowed in + # other parts, I think + py.test.raises(ValueError, 'py.path.svnurl("http://host.com/foo:bar")') + + def test_export(self, path1, tmpdir): + tmpdir = tmpdir.join("empty") + p = path1.export(tmpdir) + assert p == tmpdir # XXX should return None + n1 = [x.basename for x in tmpdir.listdir()] + n2 = [x.basename for x in path1.listdir()] + n1.sort() + n2.sort() + assert n1 == n2 + assert not p.join('.svn').check() + rev = path1.mkdir("newdir") + tmpdir.remove() + assert not tmpdir.check() + path1.new(rev=1).export(tmpdir) + for p in tmpdir.listdir(): + assert p.basename in n2 + +class TestSvnInfoCommand: + + def test_svn_1_2(self): + line = " 2256 hpk 165 Nov 24 17:55 __init__.py" + info = InfoSvnCommand(line) + now = datetime.datetime.now() + assert info.last_author == 'hpk' + assert info.created_rev == 2256 + assert info.kind == 'file' + # we don't check for the year (2006), because that depends + # on the clock correctly being setup + assert time.gmtime(info.mtime)[1:6] == (11, 24, 17, 55, 0) + assert info.size == 165 + assert info.time == info.mtime * 1000000 + + def test_svn_1_3(self): + line =" 4784 hpk 2 Jun 01 2004 __init__.py" + info = InfoSvnCommand(line) + assert info.last_author == 'hpk' + assert info.kind == 'file' + + def test_svn_1_3_b(self): + line =" 74 autoadmi Oct 06 23:59 plonesolutions.com/" + info = InfoSvnCommand(line) + assert info.last_author == 'autoadmi' + assert info.kind == 'dir' + +def test_badchars(): + py.test.raises(ValueError, "py.path.svnurl('http://host/tmp/@@@:')") diff --git a/third_party/python/py/testing/path/test_svnwc.py b/third_party/python/py/testing/path/test_svnwc.py new file mode 100644 index 0000000000..c643d9983f --- /dev/null +++ b/third_party/python/py/testing/path/test_svnwc.py @@ -0,0 +1,557 @@ +import py +import os, sys +import pytest +from py._path.svnwc import InfoSvnWCCommand, XMLWCStatus, parse_wcinfotime +from py._path import svnwc as svncommon +from svntestbase import CommonSvnTests + + +pytestmark = pytest.mark.xfail(sys.platform.startswith('win'), + reason='#161 all tests in this file are failing on Windows', + run=False) + + +def test_make_repo(path1, tmpdir): + repo = tmpdir.join("repo") + py.process.cmdexec('svnadmin create %s' % repo) + if sys.platform == 'win32': + repo = '/' + str(repo).replace('\\', '/') + repo = py.path.svnurl("file://%s" % repo) + wc = py.path.svnwc(tmpdir.join("wc")) + wc.checkout(repo) + assert wc.rev == 0 + assert len(wc.listdir()) == 0 + p = wc.join("a_file") + p.write("test file") + p.add() + rev = wc.commit("some test") + assert p.info().rev == 1 + assert rev == 1 + rev = wc.commit() + assert rev is None + +def pytest_funcarg__path1(request): + repo, repourl, wc = request.getfuncargvalue("repowc1") + return wc + +class TestWCSvnCommandPath(CommonSvnTests): + def test_status_attributes_simple(self, path1): + def assert_nochange(p): + s = p.status() + assert not s.modified + assert not s.prop_modified + assert not s.added + assert not s.deleted + assert not s.replaced + + dpath = path1.join('sampledir') + assert_nochange(path1.join('sampledir')) + assert_nochange(path1.join('samplefile')) + + def test_status_added(self, path1): + nf = path1.join('newfile') + nf.write('hello') + nf.add() + try: + s = nf.status() + assert s.added + assert not s.modified + assert not s.prop_modified + assert not s.replaced + finally: + nf.revert() + + def test_status_change(self, path1): + nf = path1.join('samplefile') + try: + nf.write(nf.read() + 'change') + s = nf.status() + assert not s.added + assert s.modified + assert not s.prop_modified + assert not s.replaced + finally: + nf.revert() + + def test_status_added_ondirectory(self, path1): + sampledir = path1.join('sampledir') + try: + t2 = sampledir.mkdir('t2') + t1 = t2.join('t1') + t1.write('test') + t1.add() + s = sampledir.status(rec=1) + # Comparing just the file names, because paths are unpredictable + # on Windows. (long vs. 8.3 paths) + assert t1.basename in [item.basename for item in s.added] + assert t2.basename in [item.basename for item in s.added] + finally: + t2.revert(rec=1) + t2.localpath.remove(rec=1) + + def test_status_unknown(self, path1): + t1 = path1.join('un1') + try: + t1.write('test') + s = path1.status() + # Comparing just the file names, because paths are unpredictable + # on Windows. (long vs. 8.3 paths) + assert t1.basename in [item.basename for item in s.unknown] + finally: + t1.localpath.remove() + + def test_status_unchanged(self, path1): + r = path1 + s = path1.status(rec=1) + # Comparing just the file names, because paths are unpredictable + # on Windows. (long vs. 8.3 paths) + assert r.join('samplefile').basename in [item.basename + for item in s.unchanged] + assert r.join('sampledir').basename in [item.basename + for item in s.unchanged] + assert r.join('sampledir/otherfile').basename in [item.basename + for item in s.unchanged] + + def test_status_update(self, path1): + # not a mark because the global "pytestmark" will end up overwriting a mark here + pytest.xfail("svn-1.7 has buggy 'status --xml' output") + r = path1 + try: + r.update(rev=1) + s = r.status(updates=1, rec=1) + # Comparing just the file names, because paths are unpredictable + # on Windows. (long vs. 8.3 paths) + import pprint + pprint.pprint(s.allpath()) + assert r.join('anotherfile').basename in [item.basename for + item in s.update_available] + #assert len(s.update_available) == 1 + finally: + r.update() + + def test_status_replaced(self, path1): + p = path1.join("samplefile") + p.remove() + p.ensure(dir=0) + try: + s = path1.status() + assert p.basename in [item.basename for item in s.replaced] + finally: + path1.revert(rec=1) + + def test_status_ignored(self, path1): + try: + d = path1.join('sampledir') + p = py.path.local(d).join('ignoredfile') + p.ensure(file=True) + s = d.status() + assert [x.basename for x in s.unknown] == ['ignoredfile'] + assert [x.basename for x in s.ignored] == [] + d.propset('svn:ignore', 'ignoredfile') + s = d.status() + assert [x.basename for x in s.unknown] == [] + assert [x.basename for x in s.ignored] == ['ignoredfile'] + finally: + path1.revert(rec=1) + + def test_status_conflict(self, path1, tmpdir): + wc = path1 + wccopy = py.path.svnwc(tmpdir.join("conflict_copy")) + wccopy.checkout(wc.url) + p = wc.ensure('conflictsamplefile', file=1) + p.write('foo') + wc.commit('added conflictsamplefile') + wccopy.update() + assert wccopy.join('conflictsamplefile').check() + p.write('bar') + wc.commit('wrote some data') + wccopy.join('conflictsamplefile').write('baz') + wccopy.update(interactive=False) + s = wccopy.status() + assert [x.basename for x in s.conflict] == ['conflictsamplefile'] + + def test_status_external(self, path1, repowc2): + otherrepo, otherrepourl, otherwc = repowc2 + d = path1.ensure('sampledir', dir=1) + try: + d.update() + d.propset('svn:externals', 'otherwc %s' % (otherwc.url,)) + d.update() + s = d.status() + assert [x.basename for x in s.external] == ['otherwc'] + assert 'otherwc' not in [x.basename for x in s.unchanged] + s = d.status(rec=1) + assert [x.basename for x in s.external] == ['otherwc'] + assert 'otherwc' in [x.basename for x in s.unchanged] + finally: + path1.revert(rec=1) + + def test_status_deleted(self, path1): + d = path1.ensure('sampledir', dir=1) + d.remove() + d.ensure(dir=1) + path1.commit() + d.ensure('deletefile', dir=0) + d.commit() + s = d.status() + assert 'deletefile' in [x.basename for x in s.unchanged] + assert not s.deleted + p = d.join('deletefile') + p.remove() + s = d.status() + assert 'deletefile' not in s.unchanged + assert [x.basename for x in s.deleted] == ['deletefile'] + + def test_status_noauthor(self, path1): + # testing for XML without author - this used to raise an exception + xml = '''\ + <entry path="/tmp/pytest-23/wc"> + <wc-status item="normal" props="none" revision="0"> + <commit revision="0"> + <date>2008-08-19T16:50:53.400198Z</date> + </commit> + </wc-status> + </entry> + ''' + XMLWCStatus.fromstring(xml, path1) + + def test_status_wrong_xml(self, path1): + # testing for XML without author - this used to raise an exception + xml = '<entry path="/home/jean/zope/venv/projectdb/parts/development-products/DataGridField">\n<wc-status item="incomplete" props="none" revision="784">\n</wc-status>\n</entry>' + st = XMLWCStatus.fromstring(xml, path1) + assert len(st.incomplete) == 1 + + def test_diff(self, path1): + p = path1 / 'anotherfile' + out = p.diff(rev=2) + assert out.find('hello') != -1 + + def test_blame(self, path1): + p = path1.join('samplepickle') + lines = p.blame() + assert sum([l[0] for l in lines]) == len(lines) + for l1, l2 in zip(p.readlines(), [l[2] for l in lines]): + assert l1 == l2 + assert [l[1] for l in lines] == ['hpk'] * len(lines) + p = path1.join('samplefile') + lines = p.blame() + assert sum([l[0] for l in lines]) == len(lines) + for l1, l2 in zip(p.readlines(), [l[2] for l in lines]): + assert l1 == l2 + assert [l[1] for l in lines] == ['hpk'] * len(lines) + + def test_join_abs(self, path1): + s = str(path1.localpath) + n = path1.join(s, abs=1) + assert path1 == n + + def test_join_abs2(self, path1): + assert path1.join('samplefile', abs=1) == path1.join('samplefile') + + def test_str_gives_localpath(self, path1): + assert str(path1) == str(path1.localpath) + + def test_versioned(self, path1): + assert path1.check(versioned=1) + # TODO: Why does my copy of svn think .svn is versioned? + #assert path1.join('.svn').check(versioned=0) + assert path1.join('samplefile').check(versioned=1) + assert not path1.join('notexisting').check(versioned=1) + notexisting = path1.join('hello').localpath + try: + notexisting.write("") + assert path1.join('hello').check(versioned=0) + finally: + notexisting.remove() + + def test_listdir_versioned(self, path1): + assert path1.check(versioned=1) + p = path1.localpath.ensure("not_a_versioned_file") + l = [x.localpath + for x in path1.listdir(lambda x: x.check(versioned=True))] + assert p not in l + + def test_nonversioned_remove(self, path1): + assert path1.check(versioned=1) + somefile = path1.join('nonversioned/somefile') + nonwc = py.path.local(somefile) + nonwc.ensure() + assert somefile.check() + assert not somefile.check(versioned=True) + somefile.remove() # this used to fail because it tried to 'svn rm' + + def test_properties(self, path1): + try: + path1.propset('gaga', 'this') + assert path1.propget('gaga') == 'this' + # Comparing just the file names, because paths are unpredictable + # on Windows. (long vs. 8.3 paths) + assert path1.basename in [item.basename for item in + path1.status().prop_modified] + assert 'gaga' in path1.proplist() + assert path1.proplist()['gaga'] == 'this' + + finally: + path1.propdel('gaga') + + def test_proplist_recursive(self, path1): + s = path1.join('samplefile') + s.propset('gugu', 'that') + try: + p = path1.proplist(rec=1) + # Comparing just the file names, because paths are unpredictable + # on Windows. (long vs. 8.3 paths) + assert (path1 / 'samplefile').basename in [item.basename + for item in p] + finally: + s.propdel('gugu') + + def test_long_properties(self, path1): + value = """ + vadm:posix : root root 0100755 + Properties on 'chroot/dns/var/bind/db.net.xots': + """ + try: + path1.propset('gaga', value) + backvalue = path1.propget('gaga') + assert backvalue == value + #assert len(backvalue.split('\n')) == 1 + finally: + path1.propdel('gaga') + + + def test_ensure(self, path1): + newpath = path1.ensure('a', 'b', 'c') + try: + assert newpath.check(exists=1, versioned=1) + newpath.write("hello") + newpath.ensure() + assert newpath.read() == "hello" + finally: + path1.join('a').remove(force=1) + + def test_not_versioned(self, path1): + p = path1.localpath.mkdir('whatever') + f = path1.localpath.ensure('testcreatedfile') + try: + assert path1.join('whatever').check(versioned=0) + assert path1.join('testcreatedfile').check(versioned=0) + assert not path1.join('testcreatedfile').check(versioned=1) + finally: + p.remove(rec=1) + f.remove() + + def test_lock_unlock(self, path1): + root = path1 + somefile = root.join('somefile') + somefile.ensure(file=True) + # not yet added to repo + py.test.raises(Exception, 'somefile.lock()') + somefile.write('foo') + somefile.commit('test') + assert somefile.check(versioned=True) + somefile.lock() + try: + locked = root.status().locked + assert len(locked) == 1 + assert locked[0].basename == somefile.basename + assert locked[0].dirpath().basename == somefile.dirpath().basename + #assert somefile.locked() + py.test.raises(Exception, 'somefile.lock()') + finally: + somefile.unlock() + #assert not somefile.locked() + locked = root.status().locked + assert locked == [] + py.test.raises(Exception, 'somefile,unlock()') + somefile.remove() + + def test_commit_nonrecursive(self, path1): + somedir = path1.join('sampledir') + somedir.mkdir("subsubdir") + somedir.propset('foo', 'bar') + status = somedir.status() + assert len(status.prop_modified) == 1 + assert len(status.added) == 1 + + somedir.commit('non-recursive commit', rec=0) + status = somedir.status() + assert len(status.prop_modified) == 0 + assert len(status.added) == 1 + + somedir.commit('recursive commit') + status = somedir.status() + assert len(status.prop_modified) == 0 + assert len(status.added) == 0 + + def test_commit_return_value(self, path1): + testfile = path1.join('test.txt').ensure(file=True) + testfile.write('test') + rev = path1.commit('testing') + assert type(rev) == int + + anotherfile = path1.join('another.txt').ensure(file=True) + anotherfile.write('test') + rev2 = path1.commit('testing more') + assert type(rev2) == int + assert rev2 == rev + 1 + + #def test_log(self, path1): + # l = path1.log() + # assert len(l) == 3 # might need to be upped if more tests are added + +class XTestWCSvnCommandPathSpecial: + + rooturl = 'http://codespeak.net/svn/py.path/trunk/dist/py.path/test/data' + #def test_update_none_rev(self, path1): + # path = tmpdir.join('checkouttest') + # wcpath = newpath(xsvnwc=str(path), url=path1url) + # try: + # wcpath.checkout(rev=2100) + # wcpath.update() + # assert wcpath.info().rev > 2100 + # finally: + # wcpath.localpath.remove(rec=1) + +def test_parse_wcinfotime(): + assert (parse_wcinfotime('2006-05-30 20:45:26 +0200 (Tue, 30 May 2006)') == + 1149021926) + assert (parse_wcinfotime('2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003)') == + 1067287394) + +class TestInfoSvnWCCommand: + + def test_svn_1_2(self, path1): + output = """ + Path: test_svnwc.py + Name: test_svnwc.py + URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py + Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada + Revision: 28137 + Node Kind: file + Schedule: normal + Last Changed Author: jan + Last Changed Rev: 27939 + Last Changed Date: 2006-05-30 20:45:26 +0200 (Tue, 30 May 2006) + Text Last Updated: 2006-06-01 00:42:53 +0200 (Thu, 01 Jun 2006) + Properties Last Updated: 2006-05-23 11:54:59 +0200 (Tue, 23 May 2006) + Checksum: 357e44880e5d80157cc5fbc3ce9822e3 + """ + path = py.path.local(__file__).dirpath().chdir() + try: + info = InfoSvnWCCommand(output) + finally: + path.chdir() + assert info.last_author == 'jan' + assert info.kind == 'file' + assert info.mtime == 1149021926.0 + assert info.url == 'http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py' + assert info.time == 1149021926000000.0 + assert info.rev == 28137 + + + def test_svn_1_3(self, path1): + output = """ + Path: test_svnwc.py + Name: test_svnwc.py + URL: http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py + Repository Root: http://codespeak.net/svn + Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada + Revision: 28124 + Node Kind: file + Schedule: normal + Last Changed Author: jan + Last Changed Rev: 27939 + Last Changed Date: 2006-05-30 20:45:26 +0200 (Tue, 30 May 2006) + Text Last Updated: 2006-06-02 23:46:11 +0200 (Fri, 02 Jun 2006) + Properties Last Updated: 2006-06-02 23:45:28 +0200 (Fri, 02 Jun 2006) + Checksum: 357e44880e5d80157cc5fbc3ce9822e3 + """ + path = py.path.local(__file__).dirpath().chdir() + try: + info = InfoSvnWCCommand(output) + finally: + path.chdir() + assert info.last_author == 'jan' + assert info.kind == 'file' + assert info.mtime == 1149021926.0 + assert info.url == 'http://codespeak.net/svn/py/dist/py/path/svn/wccommand.py' + assert info.rev == 28124 + assert info.time == 1149021926000000.0 + + +def test_characters_at(): + py.test.raises(ValueError, "py.path.svnwc('/tmp/@@@:')") + +def test_characters_tilde(): + py.path.svnwc('/tmp/test~') + + +class TestRepo: + def test_trailing_slash_is_stripped(self, path1): + # XXX we need to test more normalizing properties + url = path1.join("/") + assert path1 == url + + #def test_different_revs_compare_unequal(self, path1): + # newpath = path1.new(rev=1199) + # assert newpath != path1 + + def test_exists_svn_root(self, path1): + assert path1.check() + + #def test_not_exists_rev(self, path1): + # url = path1.__class__(path1url, rev=500) + # assert url.check(exists=0) + + #def test_nonexisting_listdir_rev(self, path1): + # url = path1.__class__(path1url, rev=500) + # raises(py.error.ENOENT, url.listdir) + + #def test_newrev(self, path1): + # url = path1.new(rev=None) + # assert url.rev == None + # assert url.strpath == path1.strpath + # url = path1.new(rev=10) + # assert url.rev == 10 + + #def test_info_rev(self, path1): + # url = path1.__class__(path1url, rev=1155) + # url = url.join("samplefile") + # res = url.info() + # assert res.size > len("samplefile") and res.created_rev == 1155 + + # the following tests are easier if we have a path class + def test_repocache_simple(self, path1): + repocache = svncommon.RepoCache() + repocache.put(path1.strpath, 42) + url, rev = repocache.get(path1.join('test').strpath) + assert rev == 42 + assert url == path1.strpath + + def test_repocache_notimeout(self, path1): + repocache = svncommon.RepoCache() + repocache.timeout = 0 + repocache.put(path1.strpath, path1.rev) + url, rev = repocache.get(path1.strpath) + assert rev == -1 + assert url == path1.strpath + + def test_repocache_outdated(self, path1): + repocache = svncommon.RepoCache() + repocache.put(path1.strpath, 42, timestamp=0) + url, rev = repocache.get(path1.join('test').strpath) + assert rev == -1 + assert url == path1.strpath + + def _test_getreporev(self): + """ this test runs so slow it's usually disabled """ + old = svncommon.repositories.repos + try: + _repocache.clear() + root = path1.new(rev=-1) + url, rev = cache.repocache.get(root.strpath) + assert rev>=0 + assert url == svnrepourl + finally: + repositories.repos = old diff --git a/third_party/python/py/testing/process/__init__.py b/third_party/python/py/testing/process/__init__.py new file mode 100644 index 0000000000..792d600548 --- /dev/null +++ b/third_party/python/py/testing/process/__init__.py @@ -0,0 +1 @@ +# diff --git a/third_party/python/py/testing/process/test_cmdexec.py b/third_party/python/py/testing/process/test_cmdexec.py new file mode 100644 index 0000000000..98463d906d --- /dev/null +++ b/third_party/python/py/testing/process/test_cmdexec.py @@ -0,0 +1,41 @@ +import py +from py.process import cmdexec + +def exvalue(): + import sys + return sys.exc_info()[1] + + +class Test_exec_cmd: + def test_simple(self): + out = cmdexec('echo hallo') + assert out.strip() == 'hallo' + assert py.builtin._istext(out) + + def test_simple_newline(self): + import sys + out = cmdexec(r"""%s -c "print ('hello')" """ % sys.executable) + assert out == 'hello\n' + assert py.builtin._istext(out) + + def test_simple_error(self): + py.test.raises(cmdexec.Error, cmdexec, 'exit 1') + + def test_simple_error_exact_status(self): + try: + cmdexec('exit 1') + except cmdexec.Error: + e = exvalue() + assert e.status == 1 + assert py.builtin._istext(e.out) + assert py.builtin._istext(e.err) + + def test_err(self): + try: + cmdexec('echoqweqwe123 hallo') + raise AssertionError("command succeeded but shouldn't") + except cmdexec.Error: + e = exvalue() + assert hasattr(e, 'err') + assert hasattr(e, 'out') + assert e.err or e.out diff --git a/third_party/python/py/testing/process/test_forkedfunc.py b/third_party/python/py/testing/process/test_forkedfunc.py new file mode 100644 index 0000000000..ae0d9ab7e6 --- /dev/null +++ b/third_party/python/py/testing/process/test_forkedfunc.py @@ -0,0 +1,173 @@ +import pytest +import py, sys, os + +pytestmark = py.test.mark.skipif("not hasattr(os, 'fork')") + + +def test_waitfinish_removes_tempdir(): + ff = py.process.ForkedFunc(boxf1) + assert ff.tempdir.check() + ff.waitfinish() + assert not ff.tempdir.check() + +def test_tempdir_gets_gc_collected(monkeypatch): + monkeypatch.setattr(os, 'fork', lambda: os.getpid()) + ff = py.process.ForkedFunc(boxf1) + assert ff.tempdir.check() + ff.__del__() + assert not ff.tempdir.check() + +def test_basic_forkedfunc(): + result = py.process.ForkedFunc(boxf1).waitfinish() + assert result.out == "some out\n" + assert result.err == "some err\n" + assert result.exitstatus == 0 + assert result.signal == 0 + assert result.retval == 1 + +def test_exitstatus(): + def func(): + os._exit(4) + result = py.process.ForkedFunc(func).waitfinish() + assert result.exitstatus == 4 + assert result.signal == 0 + assert not result.out + assert not result.err + +def test_execption_in_func(): + def fun(): + raise ValueError(42) + ff = py.process.ForkedFunc(fun) + result = ff.waitfinish() + assert result.exitstatus == ff.EXITSTATUS_EXCEPTION + assert result.err.find("ValueError: 42") != -1 + assert result.signal == 0 + assert not result.retval + +def test_forkedfunc_on_fds(): + result = py.process.ForkedFunc(boxf2).waitfinish() + assert result.out == "someout" + assert result.err == "someerr" + assert result.exitstatus == 0 + assert result.signal == 0 + assert result.retval == 2 + +def test_forkedfunc_on_fds_output(): + result = py.process.ForkedFunc(boxf3).waitfinish() + assert result.signal == 11 + assert result.out == "s" + + +def test_forkedfunc_on_stdout(): + def boxf3(): + import sys + sys.stdout.write("hello\n") + os.kill(os.getpid(), 11) + result = py.process.ForkedFunc(boxf3).waitfinish() + assert result.signal == 11 + assert result.out == "hello\n" + +def test_forkedfunc_signal(): + result = py.process.ForkedFunc(boxseg).waitfinish() + assert result.retval is None + assert result.signal == 11 + +def test_forkedfunc_huge_data(): + result = py.process.ForkedFunc(boxhuge).waitfinish() + assert result.out + assert result.exitstatus == 0 + assert result.signal == 0 + assert result.retval == 3 + +def test_box_seq(): + # we run many boxes with huge data, just one after another + for i in range(50): + result = py.process.ForkedFunc(boxhuge).waitfinish() + assert result.out + assert result.exitstatus == 0 + assert result.signal == 0 + assert result.retval == 3 + +def test_box_in_a_box(): + def boxfun(): + result = py.process.ForkedFunc(boxf2).waitfinish() + print (result.out) + sys.stderr.write(result.err + "\n") + return result.retval + + result = py.process.ForkedFunc(boxfun).waitfinish() + assert result.out == "someout\n" + assert result.err == "someerr\n" + assert result.exitstatus == 0 + assert result.signal == 0 + assert result.retval == 2 + +def test_kill_func_forked(): + class A: + pass + info = A() + import time + + def box_fun(): + time.sleep(10) # we don't want to last forever here + + ff = py.process.ForkedFunc(box_fun) + os.kill(ff.pid, 15) + result = ff.waitfinish() + assert result.signal == 15 + + +def test_hooks(monkeypatch): + def _boxed(): + return 1 + + def _on_start(): + sys.stdout.write("some out\n") + sys.stdout.flush() + + def _on_exit(): + sys.stderr.write("some err\n") + sys.stderr.flush() + + result = py.process.ForkedFunc(_boxed, child_on_start=_on_start, + child_on_exit=_on_exit).waitfinish() + assert result.out == "some out\n" + assert result.err == "some err\n" + assert result.exitstatus == 0 + assert result.signal == 0 + assert result.retval == 1 + + +# ====================================================================== +# examples +# ====================================================================== +# + +def boxf1(): + sys.stdout.write("some out\n") + sys.stderr.write("some err\n") + return 1 + +def boxf2(): + os.write(1, "someout".encode('ascii')) + os.write(2, "someerr".encode('ascii')) + return 2 + +def boxf3(): + os.write(1, "s".encode('ascii')) + os.kill(os.getpid(), 11) + +def boxseg(): + os.kill(os.getpid(), 11) + +def boxhuge(): + s = " ".encode('ascii') + os.write(1, s * 10000) + os.write(2, s * 10000) + os.write(1, s * 10000) + + os.write(1, s * 10000) + os.write(2, s * 10000) + os.write(2, s * 10000) + os.write(1, s * 10000) + return 3 diff --git a/third_party/python/py/testing/process/test_killproc.py b/third_party/python/py/testing/process/test_killproc.py new file mode 100644 index 0000000000..b0d6e2f515 --- /dev/null +++ b/third_party/python/py/testing/process/test_killproc.py @@ -0,0 +1,18 @@ +import pytest +import sys +import py + + +@pytest.mark.skipif("sys.platform.startswith('java')") +def test_kill(tmpdir): + subprocess = pytest.importorskip("subprocess") + t = tmpdir.join("t.py") + t.write("import time ; time.sleep(100)") + proc = subprocess.Popen([sys.executable, str(t)]) + assert proc.poll() is None # no return value yet + py.process.kill(proc.pid) + ret = proc.wait() + if sys.platform == "win32" and ret == 0: + pytest.skip("XXX on win32, subprocess.Popen().wait() on a killed " + "process does not yield return value != 0") + assert ret != 0 diff --git a/third_party/python/py/testing/root/__init__.py b/third_party/python/py/testing/root/__init__.py new file mode 100644 index 0000000000..792d600548 --- /dev/null +++ b/third_party/python/py/testing/root/__init__.py @@ -0,0 +1 @@ +# diff --git a/third_party/python/py/testing/root/test_builtin.py b/third_party/python/py/testing/root/test_builtin.py new file mode 100644 index 0000000000..a6f1a3c739 --- /dev/null +++ b/third_party/python/py/testing/root/test_builtin.py @@ -0,0 +1,179 @@ +import sys +import types +import py +from py.builtin import set, frozenset, reversed, sorted + +def test_enumerate(): + l = [0,1,2] + for i,x in enumerate(l): + assert i == x + +def test_any(): + assert not py.builtin.any([0,False, None]) + assert py.builtin.any([0,False, None,1]) + +def test_all(): + assert not py.builtin.all([True, 1, False]) + assert py.builtin.all([True, 1, object]) + +def test_BaseException(): + assert issubclass(IndexError, py.builtin.BaseException) + assert issubclass(Exception, py.builtin.BaseException) + assert issubclass(KeyboardInterrupt, py.builtin.BaseException) + + class MyRandomClass(object): + pass + assert not issubclass(MyRandomClass, py.builtin.BaseException) + + assert py.builtin.BaseException.__module__ in ('exceptions', 'builtins') + assert Exception.__name__ == 'Exception' + + +def test_GeneratorExit(): + assert py.builtin.GeneratorExit.__module__ in ('exceptions', 'builtins') + assert issubclass(py.builtin.GeneratorExit, py.builtin.BaseException) + +def test_reversed(): + reversed = py.builtin.reversed + r = reversed("hello") + assert iter(r) is r + s = "".join(list(r)) + assert s == "olleh" + assert list(reversed(list(reversed("hello")))) == ['h','e','l','l','o'] + py.test.raises(TypeError, reversed, reversed("hello")) + +def test_simple(): + s = set([1, 2, 3, 4]) + assert s == set([3, 4, 2, 1]) + s1 = s.union(set([5, 6])) + assert 5 in s1 + assert 1 in s1 + +def test_frozenset(): + s = set([frozenset([0, 1]), frozenset([1, 0])]) + assert len(s) == 1 + +def test_sorted(): + if sorted == py.builtin.sorted: + return # don't test a real builtin + for s in [py.builtin.sorted]: + def test(): + assert s([3, 2, 1]) == [1, 2, 3] + assert s([1, 2, 3], reverse=True) == [3, 2, 1] + l = s([1, 2, 3, 4, 5, 6], key=lambda x: x % 2) + assert l == [2, 4, 6, 1, 3, 5] + l = s([1, 2, 3, 4], cmp=lambda x, y: -cmp(x, y)) + assert l == [4, 3, 2, 1] + l = s([1, 2, 3, 4], cmp=lambda x, y: -cmp(x, y), + key=lambda x: x % 2) + assert l == [1, 3, 2, 4] + + def compare(x, y): + assert type(x) == str + assert type(y) == str + return cmp(x, y) + data = 'The quick Brown fox Jumped over The lazy Dog'.split() + s(data, cmp=compare, key=str.lower) + yield test + + +def test_print_simple(): + from py.builtin import print_ + py.test.raises(TypeError, "print_(hello=3)") + f = py.io.TextIO() + print_("hello", "world", file=f) + s = f.getvalue() + assert s == "hello world\n" + + f = py.io.TextIO() + print_("hello", end="", file=f) + s = f.getvalue() + assert s == "hello" + + f = py.io.TextIO() + print_("xyz", "abc", sep="", end="", file=f) + s = f.getvalue() + assert s == "xyzabc" + + class X: + def __repr__(self): return "rep" + f = py.io.TextIO() + print_(X(), file=f) + assert f.getvalue() == "rep\n" + +def test_execfile(tmpdir): + test_file = tmpdir.join("test.py") + test_file.write("x = y\ndef f(): pass") + ns = {"y" : 42} + py.builtin.execfile(str(test_file), ns) + assert ns["x"] == 42 + assert py.code.getrawcode(ns["f"]).co_filename == str(test_file) + class A: + y = 3 + x = 4 + py.builtin.execfile(str(test_file)) + assert A.x == 3 + +def test_getfuncdict(): + def f(): + pass + f.x = 4 + assert py.builtin._getfuncdict(f)["x"] == 4 + assert py.builtin._getfuncdict(2) is None + +def test_callable(): + class A: pass + assert py.builtin.callable(test_callable) + assert py.builtin.callable(A) + assert py.builtin.callable(list) + assert py.builtin.callable(id) + assert not py.builtin.callable(4) + assert not py.builtin.callable("hi") + +def test_totext(): + py.builtin._totext("hello", "UTF-8") + +def test_bytes_text(): + if sys.version_info[0] < 3: + assert py.builtin.text == unicode + assert py.builtin.bytes == str + else: + assert py.builtin.text == str + assert py.builtin.bytes == bytes + +def test_totext_badutf8(): + # this was in printouts within the pytest testsuite + # totext would fail + if sys.version_info >= (3,): + errors = 'surrogateescape' + else: # old python has crappy error handlers + errors = 'replace' + py.builtin._totext("\xa6", "UTF-8", errors) + +def test_reraise(): + from py.builtin import _reraise + try: + raise Exception() + except Exception: + cls, val, tb = sys.exc_info() + excinfo = py.test.raises(Exception, "_reraise(cls, val, tb)") + +def test_exec(): + l = [] + py.builtin.exec_("l.append(1)") + assert l == [1] + d = {} + py.builtin.exec_("x=4", d) + assert d['x'] == 4 + +def test_tryimport(): + py.test.raises(ImportError, py.builtin._tryimport, 'xqwe123') + x = py.builtin._tryimport('asldkajsdl', 'py') + assert x == py + x = py.builtin._tryimport('asldkajsdl', 'py.path') + assert x == py.path + +def test_getcode(): + code = py.builtin._getcode(test_getcode) + assert isinstance(code, types.CodeType) + assert py.builtin._getcode(4) is None diff --git a/third_party/python/py/testing/root/test_error.py b/third_party/python/py/testing/root/test_error.py new file mode 100644 index 0000000000..7bfbef3bd4 --- /dev/null +++ b/third_party/python/py/testing/root/test_error.py @@ -0,0 +1,76 @@ + +import py + +import errno +import sys +import subprocess + + +def test_error_classes(): + for name in errno.errorcode.values(): + x = getattr(py.error, name) + assert issubclass(x, py.error.Error) + assert issubclass(x, EnvironmentError) + + +def test_has_name(): + assert py.error.__name__ == 'py.error' + + +def test_picklability_issue1(): + import pickle + e1 = py.error.ENOENT() + s = pickle.dumps(e1) + e2 = pickle.loads(s) + assert isinstance(e2, py.error.ENOENT) + + +def test_unknown_error(): + num = 3999 + cls = py.error._geterrnoclass(num) + assert cls.__name__ == 'UnknownErrno%d' % (num,) + assert issubclass(cls, py.error.Error) + assert issubclass(cls, EnvironmentError) + cls2 = py.error._geterrnoclass(num) + assert cls is cls2 + + +def test_error_conversion_enotdir(testdir): + p = testdir.makepyfile("") + excinfo = py.test.raises(py.error.Error, py.error.checked_call, p.listdir) + assert isinstance(excinfo.value, EnvironmentError) + assert isinstance(excinfo.value, py.error.Error) + assert "ENOTDIR" in repr(excinfo.value) + + +def test_checked_call_supports_kwargs(tmpdir): + import tempfile + py.error.checked_call(tempfile.mkdtemp, dir=str(tmpdir)) + + +def test_error_importable(): + """Regression test for #179""" + subprocess.check_call( + [sys.executable, '-c', 'from py.error import ENOENT']) + + +try: + import unittest + unittest.TestCase.assertWarns +except (ImportError, AttributeError): + pass # required interface not available +else: + import sys + import warnings + + class Case(unittest.TestCase): + def test_assert_warns(self): + # Clear everything "py.*" from sys.modules and re-import py + # as a fresh start + for mod in tuple(sys.modules.keys()): + if mod and (mod == 'py' or mod.startswith('py.')): + del sys.modules[mod] + __import__('py') + + with self.assertWarns(UserWarning): + warnings.warn('this should work') diff --git a/third_party/python/py/testing/root/test_py_imports.py b/third_party/python/py/testing/root/test_py_imports.py new file mode 100644 index 0000000000..31fe6ead81 --- /dev/null +++ b/third_party/python/py/testing/root/test_py_imports.py @@ -0,0 +1,71 @@ +import py +import sys + + +@py.test.mark.parametrize('name', [x for x in dir(py) if x[0] != '_']) +def test_dir(name): + obj = getattr(py, name) + if hasattr(obj, '__map__'): # isinstance(obj, Module): + keys = dir(obj) + assert len(keys) > 0 + print (obj.__map__) + for name in list(obj.__map__): + assert hasattr(obj, name), (obj, name) + + +def test_virtual_module_identity(): + from py import path as path1 + from py import path as path2 + assert path1 is path2 + from py.path import local as local1 + from py.path import local as local2 + assert local1 is local2 + + +def test_importall(): + base = py._pydir + nodirs = [ + ] + if sys.version_info >= (3, 0): + nodirs.append(base.join('_code', '_assertionold.py')) + else: + nodirs.append(base.join('_code', '_assertionnew.py')) + + def recurse(p): + return p.check(dotfile=0) and p.basename != "attic" + + for p in base.visit('*.py', recurse): + if p.basename == '__init__.py': + continue + relpath = p.new(ext='').relto(base) + if base.sep in relpath: # not py/*.py itself + for x in nodirs: + if p == x or p.relto(x): + break + else: + relpath = relpath.replace(base.sep, '.') + modpath = 'py.%s' % relpath + try: + check_import(modpath) + except py.test.skip.Exception: + pass + + +def check_import(modpath): + py.builtin.print_("checking import", modpath) + assert __import__(modpath) + + +def test_star_import(): + exec("from py import *") + + +def test_all_resolves(): + seen = py.builtin.set([py]) + lastlength = None + while len(seen) != lastlength: + lastlength = len(seen) + for item in py.builtin.frozenset(seen): + for value in item.__dict__.values(): + if isinstance(value, type(py.test)): + seen.add(value) diff --git a/third_party/python/py/testing/root/test_std.py b/third_party/python/py/testing/root/test_std.py new file mode 100644 index 0000000000..143556a055 --- /dev/null +++ b/third_party/python/py/testing/root/test_std.py @@ -0,0 +1,13 @@ + +import py + +def test_os(): + import os + assert py.std.os is os + +def test_import_error_converts_to_attributeerror(): + py.test.raises(AttributeError, "py.std.xyzalskdj") + +def test_std_gets_it(): + for x in py.std.sys.modules: + assert x in py.std.__dict__ diff --git a/third_party/python/py/testing/root/test_xmlgen.py b/third_party/python/py/testing/root/test_xmlgen.py new file mode 100644 index 0000000000..fc0e82665f --- /dev/null +++ b/third_party/python/py/testing/root/test_xmlgen.py @@ -0,0 +1,146 @@ + +import py +from py._xmlgen import unicode, html, raw +import sys + +class ns(py.xml.Namespace): + pass + +def test_escape(): + uvalue = py.builtin._totext('\xc4\x85\xc4\x87\n\xe2\x82\xac\n', 'utf-8') + class A: + def __unicode__(self): + return uvalue + def __str__(self): + x = self.__unicode__() + if sys.version_info[0] < 3: + return x.encode('utf-8') + return x + y = py.xml.escape(uvalue) + assert y == uvalue + x = py.xml.escape(A()) + assert x == uvalue + if sys.version_info[0] < 3: + assert isinstance(x, unicode) + assert isinstance(y, unicode) + y = py.xml.escape(uvalue.encode('utf-8')) + assert y == uvalue + + +def test_tag_with_text(): + x = ns.hello("world") + u = unicode(x) + assert u == "<hello>world</hello>" + +def test_class_identity(): + assert ns.hello is ns.hello + +def test_tag_with_text_and_attributes(): + x = ns.some(name="hello", value="world") + assert x.attr.name == 'hello' + assert x.attr.value == 'world' + u = unicode(x) + assert u == '<some name="hello" value="world"/>' + +def test_tag_with_subclassed_attr_simple(): + class my(ns.hello): + class Attr(ns.hello.Attr): + hello="world" + x = my() + assert x.attr.hello == 'world' + assert unicode(x) == '<my hello="world"/>' + +def test_tag_with_raw_attr(): + x = html.object(data=raw('&')) + assert unicode(x) == '<object data="&"></object>' + +def test_tag_nested(): + x = ns.hello(ns.world()) + unicode(x) # triggers parentifying + assert x[0].parent is x + u = unicode(x) + assert u == '<hello><world/></hello>' + +def test_list_nested(): + x = ns.hello([ns.world()]) #pass in a list here + u = unicode(x) + assert u == '<hello><world/></hello>' + +def test_tag_xmlname(): + class my(ns.hello): + xmlname = 'world' + u = unicode(my()) + assert u == '<world/>' + +def test_tag_with_text_entity(): + x = ns.hello('world & rest') + u = unicode(x) + assert u == "<hello>world & rest</hello>" + +def test_tag_with_text_and_attributes_entity(): + x = ns.some(name="hello & world") + assert x.attr.name == "hello & world" + u = unicode(x) + assert u == '<some name="hello & world"/>' + +def test_raw(): + x = ns.some(py.xml.raw("<p>literal</p>")) + u = unicode(x) + assert u == "<some><p>literal</p></some>" + + +def test_html_name_stickyness(): + class my(html.p): + pass + x = my("hello") + assert unicode(x) == '<p>hello</p>' + +def test_stylenames(): + class my: + class body(html.body): + style = html.Style(font_size = "12pt") + u = unicode(my.body()) + assert u == '<body style="font-size: 12pt"></body>' + +def test_class_None(): + t = html.body(class_=None) + u = unicode(t) + assert u == '<body></body>' + +def test_alternating_style(): + alternating = ( + html.Style(background="white"), + html.Style(background="grey"), + ) + class my(html): + class li(html.li): + def style(self): + i = self.parent.index(self) + return alternating[i%2] + style = property(style) + + x = my.ul( + my.li("hello"), + my.li("world"), + my.li("42")) + u = unicode(x) + assert u == ('<ul><li style="background: white">hello</li>' + '<li style="background: grey">world</li>' + '<li style="background: white">42</li>' + '</ul>') + +def test_singleton(): + h = html.head(html.link(href="foo")) + assert unicode(h) == '<head><link href="foo"/></head>' + + h = html.head(html.script(src="foo")) + assert unicode(h) == '<head><script src="foo"></script></head>' + +def test_inline(): + h = html.div(html.span('foo'), html.span('bar')) + assert (h.unicode(indent=2) == + '<div><span>foo</span><span>bar</span></div>') + +def test_object_tags(): + o = html.object(html.object()) + assert o.unicode(indent=0) == '<object><object></object></object>' |