63
62
helpful_msg = (_("The following migrations are missing a downgrade:"
64
63
"\n\t%s") % '\n\t'.join(sorted(missing_downgrade)))
65
64
self.assert_(not missing_downgrade, helpful_msg)
68
class LockTestCase(test.TestCase):
69
def test_synchronized_wrapped_function_metadata(self):
70
@utils.synchronized('whatever')
74
self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring "
76
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
79
def test_synchronized_internally(self):
80
"""We can lock across multiple green threads"""
81
saved_sem_num = len(utils._semaphores)
84
@utils.synchronized('testlock2', external=False)
87
seen_threads.append(id)
91
pool = greenpool.GreenPool(10)
93
threads.append(pool.spawn(f, i))
95
for thread in threads:
98
self.assertEquals(len(seen_threads), 100)
99
# Looking at the seen threads, split it into chunks of 10, and verify
100
# that the last 9 match the first in each chunk.
103
self.assertEquals(seen_threads[i * 10],
104
seen_threads[i * 10 + 1 + j])
106
self.assertEqual(saved_sem_num, len(utils._semaphores),
107
"Semaphore leak detected")
109
def test_nested_external_works(self):
110
"""We can nest external syncs"""
111
with utils.tempdir() as tempdir:
112
self.flags(lock_path=tempdir)
115
@utils.synchronized('testlock1', external=True)
118
@utils.synchronized('testlock2', external=True)
123
self.assertEqual(sentinel, outer_lock())
125
def test_synchronized_externally(self):
126
"""We can lock across multiple processes"""
127
with utils.tempdir() as tempdir:
128
self.flags(lock_path=tempdir)
129
rpipe1, wpipe1 = os.pipe()
130
rpipe2, wpipe2 = os.pipe()
132
@utils.synchronized('testlock1', external=True)
135
os.write(wpipe, "foo")
137
self.assertEquals(e.errno, errno.EPIPE)
140
rfds, _wfds, _efds = select.select([rpipe], [], [], 1)
141
self.assertEquals(len(rfds), 0, "The other process, which was"
142
" supposed to be locked, "
143
"wrote on its end of the "