1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
import datetime
import re
import subprocess
import sys
import time
from threading import Thread
from .env import HttpdTestEnv
class CurlPiper:
def __init__(self, env: HttpdTestEnv, url: str):
self.env = env
self.url = url
self.proc = None
self.args = None
self.headerfile = None
self._stderr = []
self._stdout = []
self.stdout_thread = None
self.stderr_thread = None
self._exitcode = -1
self._r = None
@property
def exitcode(self):
return self._exitcode
@property
def response(self):
return self._r.response if self._r else None
def __repr__(self):
return f'CurlPiper[exitcode={self._exitcode}, stderr={self._stderr}, stdout={self._stdout}]'
def start(self):
self.args, self.headerfile = self.env.curl_complete_args([self.url], timeout=5, options=[
"-T", "-", "-X", "POST", "--trace-ascii", "%", "--trace-time"
])
self.args.append(self.url)
sys.stderr.write("starting: {0}\n".format(self.args))
self.proc = subprocess.Popen(self.args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0)
def read_output(fh, buffer):
while True:
chunk = fh.read()
if not chunk:
break
buffer.append(chunk.decode())
# collect all stdout and stderr until we are done
# use separate threads to not block ourself
self._stderr = []
self._stdout = []
if self.proc.stderr:
self.stderr_thread = Thread(target=read_output, args=(self.proc.stderr, self._stderr))
self.stderr_thread.start()
if self.proc.stdout:
self.stdout_thread = Thread(target=read_output, args=(self.proc.stdout, self._stdout))
self.stdout_thread.start()
return self.proc
def send(self, data: str):
self.proc.stdin.write(data.encode())
self.proc.stdin.flush()
def close(self) -> ([str], [str]):
self.proc.stdin.close()
self.stdout_thread.join()
self.stderr_thread.join()
self._end()
return self._stdout, self._stderr
def _end(self):
if self.proc:
# noinspection PyBroadException
try:
if self.proc.stdin:
# noinspection PyBroadException
try:
self.proc.stdin.close()
except Exception:
pass
if self.proc.stdout:
self.proc.stdout.close()
if self.proc.stderr:
self.proc.stderr.close()
except Exception:
self.proc.terminate()
finally:
self.proc.wait()
self.stdout_thread = None
self.stderr_thread = None
self._exitcode = self.proc.returncode
self.proc = None
self._r = self.env.curl_parse_headerfile(self.headerfile)
def stutter_check(self, chunks: [str], stutter: datetime.timedelta):
if not self.proc:
self.start()
for chunk in chunks:
self.send(chunk)
time.sleep(stutter.total_seconds())
recv_out, recv_err = self.close()
# assert we got everything back
assert "".join(chunks) == "".join(recv_out)
# now the tricky part: check *when* we got everything back
recv_times = []
for line in "".join(recv_err).split('\n'):
m = re.match(r'^\s*(\d+:\d+:\d+(\.\d+)?) <= Recv data, (\d+) bytes.*', line)
if m and int(m.group(3)) > 0:
recv_times.append(datetime.time.fromisoformat(m.group(1)))
# received as many chunks as we sent
assert len(chunks) == len(recv_times), "received response not in {0} chunks, but {1}".format(
len(chunks), len(recv_times))
def microsecs(tdelta):
return ((tdelta.hour * 60 + tdelta.minute) * 60 + tdelta.second) * 1000000 + tdelta.microsecond
recv_deltas = []
last_mics = microsecs(recv_times[0])
for ts in recv_times[1:]:
mics = microsecs(ts)
delta_mics = mics - last_mics
if delta_mics < 0:
delta_mics += datetime.time(23, 59, 59, 999999)
recv_deltas.append(datetime.timedelta(microseconds=delta_mics))
last_mics = mics
stutter_td = datetime.timedelta(seconds=stutter.total_seconds() * 0.75) # 25% leeway
# TODO: the first two chunks are often close together, it seems
# there still is a little buffering delay going on
for idx, td in enumerate(recv_deltas[1:]):
assert stutter_td < td, \
f"chunk {idx} arrived too early \n{recv_deltas}\nafter {td}\n{recv_err}"
|