1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# This file implements a shared lock that lets us ensure that the test cases in
# this directory run serially. Each test case obtains this lock as its first
# step, and releases it as its last. (The nel_test helper function in
# nel.sub.js automates this process.) Because the lock needs to be shared
# across all of the test cases, we use a hard-coded stash key. This hard-coded
# key is a random UUID, which should not conflict with any other auto-generated
# stash keys.
import time
_LOCK_KEY = b"67966d2e-a847-41d8-b7c3-5f6aee3375ba"
_TIMEOUT = 5 # seconds
def wait_for_lock(request):
t0 = time.time()
while time.time() - t0 < _TIMEOUT:
time.sleep(0.5)
value = request.server.stash.take(key=_LOCK_KEY)
if value is None:
return True
return False
def lock(request, report_id):
with request.server.stash.lock:
# Loop until the lock is free
if not wait_for_lock(request):
return (503, [], b"Cannot obtain lock")
request.server.stash.put(key=_LOCK_KEY, value=report_id)
return b"Obtained lock for %s" % report_id
def unlock(request, report_id):
with request.server.stash.lock:
lock_holder = request.server.stash.take(key=_LOCK_KEY)
if lock_holder != report_id:
# Return the lock holder to the stash
request.server.stash.put(key=_LOCK_KEY, value=lock_holder)
return (503, [], b"Cannot release lock held by %s" % lock_holder)
return b"Released lock for %s" % report_id
def main(request, response):
op = request.GET.first(b"op")
report_id = request.GET.first(b"reportID")
if op == b"lock":
return lock(request, report_id)
elif op == b"unlock":
return unlock(request, report_id)
else:
return (400, [], b"Invalid op")
|