26
26
except ImportError:
29
from cdimage.semaphore import Semaphore, SemaphoreError
29
from cdimage.multipidfile import MultiPIDFile, MultiPIDFileError
30
30
from cdimage.tests.helpers import TestCase, mkfile
32
32
__metaclass__ = type
35
class TestSemaphore(TestCase):
35
class TestMultiPIDFile(TestCase):
37
super(TestSemaphore, self).setUp()
37
super(TestMultiPIDFile, self).setUp()
38
38
self.use_temp_dir()
39
self.semaphore = Semaphore(os.path.join(self.temp_dir, "sem"))
39
self.multipidfile = MultiPIDFile(os.path.join(self.temp_dir, "pids"))
41
41
def test_str(self):
42
"""A Semaphore stringifies to 'semaphore PATH'."""
42
"""A MultiPIDFile stringifies to 'multipidfile PATH'."""
44
"semaphore %s" % self.semaphore.path, str(self.semaphore))
44
"multipidfile %s" % self.multipidfile.path, str(self.multipidfile))
46
46
def test_context_manager(self):
47
"""A Semaphore operates as a context manager, with locking."""
48
self.assertFalse(os.path.exists(self.semaphore.lock_path))
50
self.assertTrue(os.path.exists(self.semaphore.lock_path))
51
self.assertFalse(os.path.exists(self.semaphore.lock_path))
47
"""A MultiPIDFile operates as a context manager, with locking."""
48
self.assertFalse(os.path.exists(self.multipidfile.lock_path))
49
with self.multipidfile:
50
self.assertTrue(os.path.exists(self.multipidfile.lock_path))
51
self.assertFalse(os.path.exists(self.multipidfile.lock_path))
53
53
@mock.patch("subprocess.call", return_value=1)
54
54
def test_lock_failure(self, mock_call):
55
"""__enter__ raises SemaphoreError if the lock is already held."""
56
self.assertRaises(SemaphoreError, self.semaphore.__enter__)
55
"""__enter__ raises MultiPIDFileError if the lock is already held."""
56
self.assertRaises(MultiPIDFileError, self.multipidfile.__enter__)
58
58
def test_read_requires_lock(self):
59
59
"""_read must be called within the lock."""
60
self.assertRaises(AssertionError, self.semaphore._read)
60
self.assertRaises(AssertionError, self.multipidfile._read)
62
62
def test_read_missing(self):
63
"""A missing semaphore file reads as zero."""
65
self.assertEqual(0, self.semaphore._read())
67
def test_read_existing(self):
68
"""An existing semaphore file reads as its integer value."""
69
with mkfile(self.semaphore.path) as fd:
72
self.assertEqual(10, self.semaphore._read())
74
def test_add_requires_lock(self):
75
"""_add must be called within the lock."""
76
self.assertRaises(AssertionError, self.semaphore._add, 1)
78
def test_add_missing(self):
79
"""Adding to a missing semaphore file treats it as initially zero."""
81
self.assertEqual(1, self.semaphore._add(1))
82
self.assertEqual(1, self.semaphore._read())
84
def test_add_existing(self):
85
"""Adding to an existing semaphore file adjusts its contents."""
86
with mkfile(self.semaphore.path) as fd:
89
self.assertEqual(9, self.semaphore._add(-1))
90
self.assertEqual(9, self.semaphore._read())
63
"""A missing MultiPIDFile reads as empty."""
64
with self.multipidfile:
65
self.assertEqual(set(), self.multipidfile._read())
67
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
68
def test_read_existing(self, mock_pid_exists):
69
"""An existing MultiPIDFile reads as the set of PIDs it contains."""
70
with mkfile(self.multipidfile.path) as fd:
73
with self.multipidfile:
74
self.assertEqual(set([1, 2]), self.multipidfile._read())
76
@mock.patch("cdimage.osextras.pid_exists")
77
def test_read_skips_dead_pids(self, mock_pid_exists):
78
"""Only live PIDs are returned."""
79
with mkfile(self.multipidfile.path) as fd:
82
mock_pid_exists.side_effect = lambda pid: pid == 2
83
with self.multipidfile:
84
self.assertEqual(set([2]), self.multipidfile._read())
86
def test_write_requires_lock(self):
87
"""_write must be called within the lock."""
88
self.assertRaises(AssertionError, self.multipidfile._write, 1)
90
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
91
def test_write_missing(self, mock_pid_exists):
92
"""Writing to a missing MultiPIDFile works."""
93
with self.multipidfile:
94
self.multipidfile._write(set([1, 2]))
95
self.assertEqual(set([1, 2]), self.multipidfile._read())
97
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
98
def test_write_existing(self, mock_pid_exists):
99
"""Writing to an existing MultiPIDFile adjusts its contents."""
100
with mkfile(self.multipidfile.path) as fd:
102
with self.multipidfile:
103
self.multipidfile._write(set([1, 2]))
104
self.assertEqual(set([1, 2]), self.multipidfile._read())
92
106
def test_state(self):
93
107
"""The state can be fetched without explicit locking."""
94
self.assertEqual(0, self.semaphore.state)
96
def test_test_increment(self):
97
self.assertEqual(0, self.semaphore.test_increment())
98
self.assertEqual(1, self.semaphore.state)
99
self.assertEqual(1, self.semaphore.test_increment())
100
self.assertEqual(2, self.semaphore.state)
102
def test_decrement_test(self):
103
with mkfile(self.semaphore.path) as fd:
108
self.assertEqual(set(), self.multipidfile.state)
110
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
111
def test_test_add(self, mock_pid_exists):
112
self.assertFalse(self.multipidfile.test_add(1))
113
self.assertEqual(set([1]), self.multipidfile.state)
114
self.assertTrue(self.multipidfile.test_add(2))
115
self.assertEqual(set([1, 2]), self.multipidfile.state)
117
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
118
def test_test_add_error_on_existing_pid(self, mock_pid_exists):
119
with mkfile(self.multipidfile.path) as fd:
121
self.assertRaises(MultiPIDFileError, self.multipidfile.test_add, 1)
123
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
124
def test_remove_test(self, mock_pid_exists):
125
with mkfile(self.multipidfile.path) as fd:
104
127
print(2, file=fd)
105
self.assertEqual(1, self.semaphore.decrement_test())
106
self.assertEqual(1, self.semaphore.state)
107
self.assertEqual(0, self.semaphore.decrement_test())
108
self.assertEqual(0, self.semaphore.state)
110
def test_decrement_test_error_on_zero(self):
111
"""decrement_test raises SemaphoreError if already zero."""
112
self.assertRaises(SemaphoreError, self.semaphore.decrement_test)
114
def test_round_trip(self):
115
"""After a +/- round-trip, the semaphore path is missing."""
116
self.assertEqual(0, self.semaphore.test_increment())
117
self.assertEqual(0, self.semaphore.decrement_test())
118
self.assertFalse(os.path.exists(self.semaphore.path))
121
self.assertEqual(0, self.semaphore.state)
122
with self.semaphore.held() as state_zero:
123
self.assertEqual(0, state_zero)
124
self.assertEqual(1, self.semaphore.state)
125
with self.semaphore.held() as state_one:
126
self.assertEqual(1, state_one)
127
self.assertEqual(2, self.semaphore.state)
128
self.assertEqual(1, self.semaphore.state)
129
self.assertEqual(0, self.semaphore.state)
128
self.assertTrue(self.multipidfile.remove_test(2))
129
self.assertEqual(set([1]), self.multipidfile.state)
130
self.assertFalse(self.multipidfile.remove_test(1))
131
self.assertEqual(set(), self.multipidfile.state)
133
def test_remove_test_error_on_empty(self):
134
"""remove_test raises MultiPIDFileError if already empty."""
135
self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 1)
137
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
138
def test_remove_test_error_on_missing_pid(self, mock_pid_exists):
139
with mkfile(self.multipidfile.path) as fd:
141
self.assertRaises(MultiPIDFileError, self.multipidfile.remove_test, 2)
143
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
144
def test_round_trip(self, mock_pid_exists):
145
"""After a +/- round-trip, the MultiPIDFile path is missing."""
146
self.assertFalse(self.multipidfile.test_add(1))
147
self.assertFalse(self.multipidfile.remove_test(1))
148
self.assertFalse(os.path.exists(self.multipidfile.path))
150
@mock.patch("cdimage.osextras.pid_exists", return_value=True)
151
def test_held(self, mock_pid_exists):
152
self.assertEqual(set(), self.multipidfile.state)
153
with self.multipidfile.held(1) as state_zero:
154
self.assertFalse(state_zero)
155
self.assertEqual(set([1]), self.multipidfile.state)
156
with self.multipidfile.held(2) as state_one:
157
self.assertTrue(state_one)
158
self.assertEqual(set([1, 2]), self.multipidfile.state)
159
self.assertEqual(set([1]), self.multipidfile.state)
160
self.assertEqual(set(), self.multipidfile.state)