~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/lockfile/test/compliancetest.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
import threading
 
3
import shutil
 
4
 
 
5
import lockfile
 
6
 
 
7
class ComplianceTest(object):
 
8
    def __init__(self):
 
9
        self.saved_class = lockfile.LockFile
 
10
 
 
11
    def _testfile(self):
 
12
        """Return platform-appropriate file.  Helper for tests."""
 
13
        import tempfile
 
14
        return os.path.join(tempfile.gettempdir(), 'trash-%s' % os.getpid())
 
15
 
 
16
    def setup(self):
 
17
        lockfile.LockFile = self.class_to_test
 
18
 
 
19
    def teardown(self):
 
20
        tf = self._testfile()
 
21
        if os.path.isdir(tf):
 
22
            shutil.rmtree(tf)
 
23
        elif os.path.isfile(tf):
 
24
            os.unlink(tf)
 
25
        lockfile.LockFile = self.saved_class
 
26
 
 
27
    def _test_acquire_helper(self, tbool):
 
28
        # As simple as it gets.
 
29
        lock = lockfile.LockFile(self._testfile(), threaded=tbool)
 
30
        lock.acquire()
 
31
        assert lock.is_locked()
 
32
        lock.release()
 
33
        assert not lock.is_locked()
 
34
 
 
35
    def test_acquire_basic_threaded(self):
 
36
        self._test_acquire_helper(True)
 
37
 
 
38
    def test_acquire_basic_unthreaded(self):
 
39
        self._test_acquire_helper(False)
 
40
 
 
41
    def _test_acquire_no_timeout_helper(self, tbool):
 
42
        # No timeout test
 
43
        e1, e2 = threading.Event(), threading.Event()
 
44
        t = _in_thread(self._lock_wait_unlock, e1, e2)
 
45
        e1.wait()         # wait for thread t to acquire lock
 
46
        lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
 
47
        assert lock2.is_locked()
 
48
        assert not lock2.i_am_locking()
 
49
 
 
50
        try:
 
51
            lock2.acquire(timeout=-1)
 
52
        except lockfile.AlreadyLocked:
 
53
            pass
 
54
        else:
 
55
            lock2.release()
 
56
            raise AssertionError("did not raise AlreadyLocked in"
 
57
                                 " thread %s" %
 
58
                                 threading.current_thread().get_name())
 
59
 
 
60
        e2.set()          # tell thread t to release lock
 
61
        t.join()
 
62
 
 
63
    def test_acquire_no_timeout_threaded(self):
 
64
        self._test_acquire_no_timeout_helper(True)
 
65
 
 
66
    def test_acquire_no_timeout_unthreaded(self):
 
67
        self._test_acquire_no_timeout_helper(False)
 
68
 
 
69
    def _test_acquire_timeout_helper(self, tbool):
 
70
        # Timeout test
 
71
        e1, e2 = threading.Event(), threading.Event()
 
72
        t = _in_thread(self._lock_wait_unlock, e1, e2)
 
73
        e1.wait()                # wait for thread t to acquire lock
 
74
        lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
 
75
        assert lock2.is_locked()
 
76
        try:
 
77
            lock2.acquire(timeout=0.1)
 
78
        except lockfile.LockTimeout:
 
79
            pass
 
80
        else:
 
81
            lock2.release()
 
82
            raise AssertionError("did not raise LockTimeout in thread %s" %
 
83
                                 threading.current_thread().get_name())
 
84
 
 
85
        e2.set()
 
86
        t.join()
 
87
 
 
88
    def test_acquire_timeout_threaded(self):
 
89
        self._test_acquire_timeout_helper(True)
 
90
 
 
91
    def test_acquire_timeout_unthreaded(self):
 
92
        self._test_acquire_timeout_helper(False)
 
93
 
 
94
    def _test_release_basic_helper(self, tbool):
 
95
        lock = lockfile.LockFile(self._testfile(), threaded=tbool)
 
96
        lock.acquire()
 
97
        assert lock.is_locked()
 
98
        lock.release()
 
99
        assert not lock.is_locked()
 
100
        assert not lock.i_am_locking()
 
101
        try:
 
102
            lock.release()
 
103
        except lockfile.NotLocked:
 
104
            pass
 
105
        except lockfile.NotMyLock:
 
106
            raise AssertionError('unexpected exception: %s' %
 
107
                                 lockfile.NotMyLock)
 
108
        else:
 
109
            raise AssertionError('erroneously unlocked file')
 
110
 
 
111
    def test_release_basic_threaded(self):
 
112
        self._test_release_basic_helper(True)
 
113
 
 
114
    def test_release_basic_unthreaded(self):
 
115
        self._test_release_basic_helper(False)
 
116
 
 
117
    def _test_release_from_thread_helper(self, tbool):
 
118
        e1, e2 = threading.Event(), threading.Event()
 
119
        t = _in_thread(self._lock_wait_unlock, e1, e2)
 
120
        e1.wait()
 
121
        lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
 
122
        assert lock2.is_locked()
 
123
        assert not lock2.i_am_locking()
 
124
        try:
 
125
            lock2.release()
 
126
        except lockfile.NotMyLock:
 
127
            pass
 
128
        else:
 
129
            raise AssertionError('erroneously unlocked a file locked'
 
130
                                 ' by another thread.')
 
131
        e2.set()
 
132
        t.join()
 
133
 
 
134
    def test_release_from_thread_threaded(self):
 
135
        self._test_release_from_thread_helper(True)
 
136
 
 
137
    def test_release_from_thread_unthreaded(self):
 
138
        self._test_release_from_thread_helper(False)
 
139
 
 
140
    def _test_is_locked_helper(self, tbool):
 
141
        lock = lockfile.LockFile(self._testfile(), threaded=tbool)
 
142
        lock.acquire()
 
143
        assert lock.is_locked()
 
144
        lock.release()
 
145
        assert not lock.is_locked()
 
146
 
 
147
    def test_is_locked_threaded(self):
 
148
        self._test_is_locked_helper(True)
 
149
 
 
150
    def test_is_locked_unthreaded(self):
 
151
        self._test_is_locked_helper(False)
 
152
 
 
153
    def test_i_am_locking(self):
 
154
        lock1 = lockfile.LockFile(self._testfile(), threaded=False)
 
155
        lock1.acquire()
 
156
        try:
 
157
            assert lock1.is_locked()
 
158
            lock2 = lockfile.LockFile(self._testfile())
 
159
            try:
 
160
                assert lock1.i_am_locking()
 
161
                assert not lock2.i_am_locking()
 
162
                try:
 
163
                    lock2.acquire(timeout=2)
 
164
                except lockfile.LockTimeout:
 
165
                    lock2.break_lock()
 
166
                    assert not lock2.is_locked()
 
167
                    assert not lock1.is_locked()
 
168
                    lock2.acquire()
 
169
                else:
 
170
                    raise AssertionError('expected LockTimeout...')
 
171
                assert not lock1.i_am_locking()
 
172
                assert lock2.i_am_locking()
 
173
            finally:
 
174
                if lock2.i_am_locking():
 
175
                    lock2.release()
 
176
        finally:
 
177
            if lock1.i_am_locking():
 
178
                lock1.release()
 
179
 
 
180
    def _test_break_lock_helper(self, tbool):
 
181
        lock = lockfile.LockFile(self._testfile(), threaded=tbool)
 
182
        lock.acquire()
 
183
        assert lock.is_locked()
 
184
        lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
 
185
        assert lock2.is_locked()
 
186
        lock2.break_lock()
 
187
        assert not lock2.is_locked()
 
188
        try:
 
189
            lock.release()
 
190
        except lockfile.NotLocked:
 
191
            pass
 
192
        else:
 
193
            raise AssertionError('break lock failed')
 
194
 
 
195
    def test_break_lock_threaded(self):
 
196
        self._test_break_lock_helper(True)
 
197
 
 
198
    def test_break_lock_unthreaded(self):
 
199
        self._test_break_lock_helper(False)
 
200
 
 
201
    def _lock_wait_unlock(self, event1, event2):
 
202
        """Lock from another thread.  Helper for tests."""
 
203
        l = lockfile.LockFile(self._testfile())
 
204
        l.acquire()
 
205
        try:
 
206
            event1.set()  # we're in,
 
207
            event2.wait() # wait for boss's permission to leave
 
208
        finally:
 
209
            l.release()
 
210
 
 
211
    def test_enter(self):
 
212
        lock = lockfile.LockFile(self._testfile())
 
213
        lock.acquire()
 
214
        try:
 
215
            assert lock.is_locked(), "Not locked after acquire!"
 
216
        finally:
 
217
            lock.release()
 
218
        assert not lock.is_locked(), "still locked after release!"
 
219
 
 
220
def _in_thread(func, *args, **kwargs):
 
221
    """Execute func(*args, **kwargs) after dt seconds. Helper for tests."""
 
222
    def _f():
 
223
        func(*args, **kwargs)
 
224
    t = threading.Thread(target=_f, name='/*/*')
 
225
    t.setDaemon(True)
 
226
    t.start()
 
227
    return t
 
228