summaryrefslogtreecommitdiffstats
path: root/third_party/python/py/testing/process
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/py/testing/process')
-rw-r--r--third_party/python/py/testing/process/__init__.py1
-rw-r--r--third_party/python/py/testing/process/test_cmdexec.py41
-rw-r--r--third_party/python/py/testing/process/test_forkedfunc.py173
-rw-r--r--third_party/python/py/testing/process/test_killproc.py18
4 files changed, 233 insertions, 0 deletions
diff --git a/third_party/python/py/testing/process/__init__.py b/third_party/python/py/testing/process/__init__.py
new file mode 100644
index 0000000000..792d600548
--- /dev/null
+++ b/third_party/python/py/testing/process/__init__.py
@@ -0,0 +1 @@
+#
diff --git a/third_party/python/py/testing/process/test_cmdexec.py b/third_party/python/py/testing/process/test_cmdexec.py
new file mode 100644
index 0000000000..98463d906d
--- /dev/null
+++ b/third_party/python/py/testing/process/test_cmdexec.py
@@ -0,0 +1,41 @@
+import py
+from py.process import cmdexec
+
+def exvalue():
+ import sys
+ return sys.exc_info()[1]
+
+
+class Test_exec_cmd:
+ def test_simple(self):
+ out = cmdexec('echo hallo')
+ assert out.strip() == 'hallo'
+ assert py.builtin._istext(out)
+
+ def test_simple_newline(self):
+ import sys
+ out = cmdexec(r"""%s -c "print ('hello')" """ % sys.executable)
+ assert out == 'hello\n'
+ assert py.builtin._istext(out)
+
+ def test_simple_error(self):
+ py.test.raises(cmdexec.Error, cmdexec, 'exit 1')
+
+ def test_simple_error_exact_status(self):
+ try:
+ cmdexec('exit 1')
+ except cmdexec.Error:
+ e = exvalue()
+ assert e.status == 1
+ assert py.builtin._istext(e.out)
+ assert py.builtin._istext(e.err)
+
+ def test_err(self):
+ try:
+ cmdexec('echoqweqwe123 hallo')
+ raise AssertionError("command succeeded but shouldn't")
+ except cmdexec.Error:
+ e = exvalue()
+ assert hasattr(e, 'err')
+ assert hasattr(e, 'out')
+ assert e.err or e.out
diff --git a/third_party/python/py/testing/process/test_forkedfunc.py b/third_party/python/py/testing/process/test_forkedfunc.py
new file mode 100644
index 0000000000..ae0d9ab7e6
--- /dev/null
+++ b/third_party/python/py/testing/process/test_forkedfunc.py
@@ -0,0 +1,173 @@
+import pytest
+import py, sys, os
+
+pytestmark = py.test.mark.skipif("not hasattr(os, 'fork')")
+
+
+def test_waitfinish_removes_tempdir():
+ ff = py.process.ForkedFunc(boxf1)
+ assert ff.tempdir.check()
+ ff.waitfinish()
+ assert not ff.tempdir.check()
+
+def test_tempdir_gets_gc_collected(monkeypatch):
+ monkeypatch.setattr(os, 'fork', lambda: os.getpid())
+ ff = py.process.ForkedFunc(boxf1)
+ assert ff.tempdir.check()
+ ff.__del__()
+ assert not ff.tempdir.check()
+
+def test_basic_forkedfunc():
+ result = py.process.ForkedFunc(boxf1).waitfinish()
+ assert result.out == "some out\n"
+ assert result.err == "some err\n"
+ assert result.exitstatus == 0
+ assert result.signal == 0
+ assert result.retval == 1
+
+def test_exitstatus():
+ def func():
+ os._exit(4)
+ result = py.process.ForkedFunc(func).waitfinish()
+ assert result.exitstatus == 4
+ assert result.signal == 0
+ assert not result.out
+ assert not result.err
+
+def test_execption_in_func():
+ def fun():
+ raise ValueError(42)
+ ff = py.process.ForkedFunc(fun)
+ result = ff.waitfinish()
+ assert result.exitstatus == ff.EXITSTATUS_EXCEPTION
+ assert result.err.find("ValueError: 42") != -1
+ assert result.signal == 0
+ assert not result.retval
+
+def test_forkedfunc_on_fds():
+ result = py.process.ForkedFunc(boxf2).waitfinish()
+ assert result.out == "someout"
+ assert result.err == "someerr"
+ assert result.exitstatus == 0
+ assert result.signal == 0
+ assert result.retval == 2
+
+def test_forkedfunc_on_fds_output():
+ result = py.process.ForkedFunc(boxf3).waitfinish()
+ assert result.signal == 11
+ assert result.out == "s"
+
+
+def test_forkedfunc_on_stdout():
+ def boxf3():
+ import sys
+ sys.stdout.write("hello\n")
+ os.kill(os.getpid(), 11)
+ result = py.process.ForkedFunc(boxf3).waitfinish()
+ assert result.signal == 11
+ assert result.out == "hello\n"
+
+def test_forkedfunc_signal():
+ result = py.process.ForkedFunc(boxseg).waitfinish()
+ assert result.retval is None
+ assert result.signal == 11
+
+def test_forkedfunc_huge_data():
+ result = py.process.ForkedFunc(boxhuge).waitfinish()
+ assert result.out
+ assert result.exitstatus == 0
+ assert result.signal == 0
+ assert result.retval == 3
+
+def test_box_seq():
+ # we run many boxes with huge data, just one after another
+ for i in range(50):
+ result = py.process.ForkedFunc(boxhuge).waitfinish()
+ assert result.out
+ assert result.exitstatus == 0
+ assert result.signal == 0
+ assert result.retval == 3
+
+def test_box_in_a_box():
+ def boxfun():
+ result = py.process.ForkedFunc(boxf2).waitfinish()
+ print (result.out)
+ sys.stderr.write(result.err + "\n")
+ return result.retval
+
+ result = py.process.ForkedFunc(boxfun).waitfinish()
+ assert result.out == "someout\n"
+ assert result.err == "someerr\n"
+ assert result.exitstatus == 0
+ assert result.signal == 0
+ assert result.retval == 2
+
+def test_kill_func_forked():
+ class A:
+ pass
+ info = A()
+ import time
+
+ def box_fun():
+ time.sleep(10) # we don't want to last forever here
+
+ ff = py.process.ForkedFunc(box_fun)
+ os.kill(ff.pid, 15)
+ result = ff.waitfinish()
+ assert result.signal == 15
+
+
+def test_hooks(monkeypatch):
+ def _boxed():
+ return 1
+
+ def _on_start():
+ sys.stdout.write("some out\n")
+ sys.stdout.flush()
+
+ def _on_exit():
+ sys.stderr.write("some err\n")
+ sys.stderr.flush()
+
+ result = py.process.ForkedFunc(_boxed, child_on_start=_on_start,
+ child_on_exit=_on_exit).waitfinish()
+ assert result.out == "some out\n"
+ assert result.err == "some err\n"
+ assert result.exitstatus == 0
+ assert result.signal == 0
+ assert result.retval == 1
+
+
+# ======================================================================
+# examples
+# ======================================================================
+#
+
+def boxf1():
+ sys.stdout.write("some out\n")
+ sys.stderr.write("some err\n")
+ return 1
+
+def boxf2():
+ os.write(1, "someout".encode('ascii'))
+ os.write(2, "someerr".encode('ascii'))
+ return 2
+
+def boxf3():
+ os.write(1, "s".encode('ascii'))
+ os.kill(os.getpid(), 11)
+
+def boxseg():
+ os.kill(os.getpid(), 11)
+
+def boxhuge():
+ s = " ".encode('ascii')
+ os.write(1, s * 10000)
+ os.write(2, s * 10000)
+ os.write(1, s * 10000)
+
+ os.write(1, s * 10000)
+ os.write(2, s * 10000)
+ os.write(2, s * 10000)
+ os.write(1, s * 10000)
+ return 3
diff --git a/third_party/python/py/testing/process/test_killproc.py b/third_party/python/py/testing/process/test_killproc.py
new file mode 100644
index 0000000000..b0d6e2f515
--- /dev/null
+++ b/third_party/python/py/testing/process/test_killproc.py
@@ -0,0 +1,18 @@
+import pytest
+import sys
+import py
+
+
+@pytest.mark.skipif("sys.platform.startswith('java')")
+def test_kill(tmpdir):
+ subprocess = pytest.importorskip("subprocess")
+ t = tmpdir.join("t.py")
+ t.write("import time ; time.sleep(100)")
+ proc = subprocess.Popen([sys.executable, str(t)])
+ assert proc.poll() is None # no return value yet
+ py.process.kill(proc.pid)
+ ret = proc.wait()
+ if sys.platform == "win32" and ret == 0:
+ pytest.skip("XXX on win32, subprocess.Popen().wait() on a killed "
+ "process does not yield return value != 0")
+ assert ret != 0