1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
from testtools import TestCase
from testtools.matchers import Equals
from jlp import SocketLock
import socket
from lockfile import LockTimeout, AlreadyLocked
from multiprocessing import Process
from autopilot.matchers import Eventually
import sys
import os
import time
from jlp import get_config_option
LOCK_STATUS_ACQUIRED = 0
LOCK_STATUS_TIMEOUT = 1
LOCK_STATUS_ALREADYLOCKED = 2
def run_lock_process(timeout=None, sleeptime=5, kill_process=False):
try:
with SocketLock(get_config_option('lock_name'), timeout=timeout):
time.sleep(sleeptime)
if kill_process:
os.kill(os.getpid(), 9)
except LockTimeout:
sys.exit(LOCK_STATUS_TIMEOUT)
except AlreadyLocked:
sys.exit(LOCK_STATUS_ALREADYLOCKED)
sys.exit(LOCK_STATUS_ACQUIRED)
def is_locked():
try:
mysocket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
mysocket.bind('\0' + get_config_option('lock_name'))
mysocket.close()
return False
except socket.error:
return True
class TestLocking(TestCase):
def setUp(self):
super(TestLocking, self).setUp()
self.assertThat(is_locked(), Equals(False))
def test_lock_can_be_acquired(self):
"""Create a process and check the lock exists """
lock_process = Process(target=run_lock_process, args=())
lock_process.start()
self.assertThat(is_locked, Eventually(Equals(True)))
lock_process.join()
self.assertThat(lock_process.exitcode, Equals(LOCK_STATUS_ACQUIRED))
def test_lock_timeouts(self):
"""Create a process that holds the lock for 20s. Then create another
process that timeouts after trying to acquire the lock for 1s.
Make sure the timeout happens."""
# first create a process that holds the lock for 10 seconds
holder = Process(target=run_lock_process, args=(120, 20))
holder.start()
#give the holder some time to acquire the lock
self.assertThat(is_locked, Eventually(Equals(True)))
#now create a process that will try to acquire the lock with 1s
#timeout
acquirer = Process(target=run_lock_process, args=(1, 5))
acquirer.start()
acquirer.join()
self.assertThat(acquirer.exitcode, Equals(LOCK_STATUS_TIMEOUT))
holder.join()
self.assertThat(holder.exitcode, Equals(LOCK_STATUS_ACQUIRED))
def test_consecutive_holders(self):
"""spawn 5 process each holding the lock for 1s and make sure
they all exit with LOCK_STATUS_ACQUIRED"""
holders = []
for i in range(0, 5):
holders.append(Process(target=run_lock_process, args=(120, 1)))
holders[i].start()
for i in range(0, 5):
holders[i].join()
for i in range(0, 5):
self.assertThat(holders[i].exitcode, Equals(LOCK_STATUS_ACQUIRED))
def test_double_locking(self):
"""spawn 1 holder and then 5 process trying to lock the same lock
with 0 timeout. They should all exit with
LOCK_STATUS_ALREADYLOCKED"""
holder = Process(target=run_lock_process, args=(120, 20))
holder.start()
#give the holder some time to acquire the lock
self.assertThat(is_locked, Eventually(Equals(True)))
acquirers = []
for i in range(0, 5):
acquirers.append(Process(target=run_lock_process, args=(0, 1)))
acquirers[i].start()
for i in range(0, 5):
acquirers[i].join()
for i in range(0, 5):
self.assertThat(acquirers[i].exitcode,
Equals(LOCK_STATUS_ALREADYLOCKED))
holder.join()
self.assertThat(holder.exitcode, Equals(LOCK_STATUS_ACQUIRED))
def test_nonrunning_holder(self):
"""Create a lock which is not released properly and check if the
consequtive process can reclaim it."""
for i in range(0, 5):
holder = Process(target=run_lock_process, args=(120, 1, True))
holder.start()
holder.join()
self.assertThat(-9, Equals(holder.exitcode))
holder = Process(target=run_lock_process, args=(0, 1))
holder.start()
holder.join()
self.assertThat(holder.exitcode, Equals(LOCK_STATUS_ACQUIRED))
|