2
# Copyright 2013 Canonical Ltd.
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License version 3, as
6
# published by the Free Software Foundation.
8
# This program is distributed in the hope that it will be useful, but
9
# WITHOUT ANY WARRANTY; without even the implied warranties of
10
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11
# PURPOSE. See the GNU Affero General Public License for more details.
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
25
from django.conf import settings
26
from django.db import connection, transaction
27
from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS
28
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
30
from ci_utils.tastypie.test import TastypieTestCase
31
from ppa_assigner.models import PPA, LaunchpadError
34
class TestModel(TestCase):
37
super(TestModel, self).setUp()
39
PPA.objects.create(name='ci_pool-%s' % str(n).zfill(3))
41
@mock.patch('ppa_assigner.models.PPA.clean_artifacts')
42
def testReserveNoClean(self, clean):
43
p = PPA.reserve('testlockname')
44
self.assertIsNotNone(p)
45
self.assertFalse(clean.called)
46
self.assertEqual('testlockname', p.lockname)
48
reserved = PPA.objects.filter(reserved=True).count()
49
self.assertEqual(1, reserved)
51
def testReserveTimestamp(self):
53
delta = datetime.datetime.now() - p.timestamp
54
# the lock should have happened in less than a second
55
self.assertLess(delta.seconds, 1)
57
@mock.patch('ppa_assigner.models.PPA.clean_artifacts')
58
def testReserveClean(self, clean):
59
p = PPA.reserve('testlockname', True)
60
self.assertIsNotNone(p)
61
self.assertTrue(clean.called)
62
self.assertEqual('testlockname', p.lockname)
64
reserved = PPA.objects.filter(reserved=True).count()
65
self.assertEqual(1, reserved)
67
@mock.patch('ppa_assigner.models.PPA.clean_artifacts')
68
def testReserveCleanFail(self, clean):
69
'''ensure ppa isn't locked if clean operation fails.'''
70
clean.side_effect = RuntimeError
71
locked_before = PPA.objects.filter(reserved=True).count()
72
with self.assertRaises(LaunchpadError):
73
PPA.reserve(clean=True)
75
locked_after = PPA.objects.filter(reserved=True).count()
76
self.assertEqual(locked_before, locked_after)
78
def testReserveNoFree(self):
79
PPA.objects.update(reserved=True)
80
with self.assertRaises(PPA.DoesNotExist):
83
def testCleanOperation(self):
84
# TODO more logic for this
85
PPA.reserve(clean=True)
88
class TestConcurrency(TransactionTestCase):
89
# inspired by, but heavily modified:
90
# https://github.com/django/django/blob/master/tests/select_for_update/
93
super(TestConcurrency, self).setUp()
94
transaction.enter_transaction_management()
96
PPA.objects.create(name='ci_pool-%s' % str(n).zfill(3))
98
self.addCleanup(transaction.abort)
100
@contextlib.contextmanager
101
def blocking_transaction(self):
102
# We need another database connection to test that one connection
103
# issuing a SELECT ... FOR UPDATE will block.
104
con = ConnectionHandler(settings.DATABASES)[DEFAULT_DB_ALIAS]
105
con.enter_transaction_management()
106
cursor = con.cursor()
107
sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
108
'db_table': PPA._meta.db_table,
109
'for_update': con.ops.for_update_sql(),
111
cursor.execute(sql, ())
119
def _reserve(self, params):
120
params['signal'].set()
123
params['time'] = time.time() - start
124
# connections are per-thread, so close this
127
@skipUnlessDBFeature('has_select_for_update')
128
def testReserveConcurrency(self):
129
params = {'signal': threading.Event()}
130
t = threading.Thread(target=self._reserve, args=(params,))
132
with self.blocking_transaction():
134
# wait for thread to let us know its running
135
params['signal'].wait()
138
# it won't be exact but it should be pretty close
139
self.assertLess(abs(locktime - params['time']), 0.05)
142
class TestApi(TastypieTestCase):
145
super(TestApi, self).setUp()
147
PPA.objects.create(name='ci_pool-%s' % str(n).zfill(3))
149
TastypieTestCase.setUp(self, '/api/v1')
151
def testGetAll(self):
152
obj = self.get('ppa/')
153
self.assertEqual(PPA.objects.all().count(), len(obj['objects']))
155
def testGetQuery(self):
157
obj = self.get('ppa/', {'reserved': False})
158
count = PPA.objects.filter(reserved=False).count()
159
self.assertEqual(count, len(obj['objects']))
161
obj = self.getResource('ppa/', {'reserved': True})
162
self.assertEqual(p.name, obj['name'])
166
obj = self.getResource('ppa/', {'reserved': True})
167
self.assertEqual(p.lockname, obj['lockname'])
169
def testReserveNoClean(self):
170
params = {'lockname': 'foo'}
171
obj = self.createResource('ppa/', params)
172
self.assertEqual(obj['lockname'], params['lockname'])
173
self.assertTrue(obj['reserved'])
175
@mock.patch('ppa_assigner.models.PPA.clean_artifacts')
176
def testReserveClean(self, clean):
177
obj = self.createResource('ppa/', {'lockname': 'foo', 'clean': True})
178
self.assertTrue(obj['reserved'])
179
self.assertTrue(clean.called)
181
def testReserveNoFree(self):
182
PPA.objects.update(reserved=True)
183
params = {'lockname': 'foo'}
184
with self.assertRaises(PPA.DoesNotExist):
185
self.post('ppa/', params)
189
self.patch('ppa/%d/' % p.id, {'reserved': False})
190
p = PPA.objects.get(pk=p.id)
191
self.assertFalse(p.reserved)
193
@mock.patch('urllib2.urlopen')
194
def testPopulateFail(self, urlopen):
195
'''ensure a lp error is handled correctly.'''
196
def side_effect(arg):
197
raise urllib2.URLError('foo')
198
urlopen.side_effect = side_effect
199
resp = self.client.patch(
200
self._resource('ppa/'),
201
data={'populate': True},
202
authentication=self.auth)
203
self.assertEqual(500, resp.status_code)
204
self.assertEqual('<urlopen error foo>', resp.content)
206
@mock.patch('urllib2.urlopen')
207
def testPopulateMerge(self, urlopen):
217
resp.read.return_value = json.dumps(data)
218
urlopen.return_value = resp
219
user = settings.LAUNCHPAD_PPA_USER
220
self.patch('ppa/', {'populate': True})
221
expected = ['ppa:%s/%s' % (user, x['name']) for x in data['entries']]
222
found = [x.name for x in PPA.objects.all().order_by('name')]
223
self.assertListEqual(expected, found)