1
# test_server.py -- Tests for the git server
2
# Copyright (C) 2010 Google, Inc.
4
# This program is free software; you can redistribute it and/or
5
# modify it under the terms of the GNU General Public License
6
# as published by the Free Software Foundation; version 2
7
# or (at your option) any later version of the License.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
20
"""Tests for the smart protocol server."""
23
from cStringIO import StringIO
24
from unittest import TestCase
26
from dulwich.errors import (
29
from dulwich.server import (
32
SingleAckGraphWalkerImpl,
33
MultiAckGraphWalkerImpl,
34
MultiAckDetailedGraphWalkerImpl,
37
from dulwich.protocol import (
48
class TestProto(object):
51
self._received = {0: [], 1: [], 2: [], 3: []}
53
def set_output(self, output_lines):
54
self._output = ['%s\n' % line.rstrip() for line in output_lines]
56
def read_pkt_line(self):
58
return self._output.pop(0)
62
def write_sideband(self, band, data):
63
self._received[band].append(data)
65
def write_pkt_line(self, data):
68
self._received[0].append(data)
70
def get_received_line(self, band=0):
71
lines = self._received[band]
78
class UploadPackHandlerTestCase(TestCase):
80
self._handler = UploadPackHandler(None, None, None)
82
def test_set_client_capabilities(self):
84
self._handler.set_client_capabilities([])
85
except GitProtocolError:
89
self._handler.set_client_capabilities([
90
'multi_ack', 'side-band-64k', 'thin-pack', 'ofs-delta'])
91
except GitProtocolError:
94
def test_set_client_capabilities_error(self):
95
self.assertRaises(GitProtocolError,
96
self._handler.set_client_capabilities,
97
['weird_ack_level', 'ofs-delta'])
99
self._handler.set_client_capabilities(['include-tag'])
100
except GitProtocolError:
104
class TestCommit(object):
105
def __init__(self, sha, parents, commit_time):
107
self._parents = parents
108
self.commit_time = commit_time
110
def get_parents(self):
114
return '%s(%s)' % (self.__class__.__name__, self._sha)
117
class TestBackend(object):
118
def __init__(self, objects):
119
self.object_store = objects
122
class TestHandler(object):
123
def __init__(self, objects, proto):
124
self.backend = TestBackend(objects)
126
self.stateless_rpc = False
127
self.advertise_refs = False
129
def capabilities(self):
133
class ProtocolGraphWalkerTestCase(TestCase):
135
# Create the following commit tree:
140
ONE: TestCommit(ONE, [], 111),
141
TWO: TestCommit(TWO, [ONE], 222),
142
THREE: TestCommit(THREE, [ONE], 333),
143
FOUR: TestCommit(FOUR, [TWO], 444),
144
FIVE: TestCommit(FIVE, [THREE], 555),
146
self._walker = ProtocolGraphWalker(
147
TestHandler(self._objects, TestProto()))
149
def test_is_satisfied_no_haves(self):
150
self.assertFalse(self._walker._is_satisfied([], ONE, 0))
151
self.assertFalse(self._walker._is_satisfied([], TWO, 0))
152
self.assertFalse(self._walker._is_satisfied([], THREE, 0))
154
def test_is_satisfied_have_root(self):
155
self.assertTrue(self._walker._is_satisfied([ONE], ONE, 0))
156
self.assertTrue(self._walker._is_satisfied([ONE], TWO, 0))
157
self.assertTrue(self._walker._is_satisfied([ONE], THREE, 0))
159
def test_is_satisfied_have_branch(self):
160
self.assertTrue(self._walker._is_satisfied([TWO], TWO, 0))
162
self.assertFalse(self._walker._is_satisfied([TWO], THREE, 0))
164
def test_all_wants_satisfied(self):
165
self._walker.set_wants([FOUR, FIVE])
166
# trivial case: wants == haves
167
self.assertTrue(self._walker.all_wants_satisfied([FOUR, FIVE]))
168
# cases that require walking the commit tree
169
self.assertTrue(self._walker.all_wants_satisfied([ONE]))
170
self.assertFalse(self._walker.all_wants_satisfied([TWO]))
171
self.assertFalse(self._walker.all_wants_satisfied([THREE]))
172
self.assertTrue(self._walker.all_wants_satisfied([TWO, THREE]))
174
def test_read_proto_line(self):
175
self._walker.proto.set_output([
183
self.assertEquals(('want', ONE), self._walker.read_proto_line())
184
self.assertEquals(('want', TWO), self._walker.read_proto_line())
185
self.assertEquals(('have', THREE), self._walker.read_proto_line())
186
self.assertRaises(GitProtocolError, self._walker.read_proto_line)
187
self.assertRaises(GitProtocolError, self._walker.read_proto_line)
188
self.assertEquals(('done', None), self._walker.read_proto_line())
189
self.assertEquals((None, None), self._walker.read_proto_line())
191
def test_determine_wants(self):
192
self.assertRaises(GitProtocolError, self._walker.determine_wants, {})
194
self._walker.proto.set_output([
195
'want %s multi_ack' % ONE,
198
heads = {'ref1': ONE, 'ref2': TWO, 'ref3': THREE}
199
self.assertEquals([ONE, TWO], self._walker.determine_wants(heads))
201
self._walker.proto.set_output(['want %s multi_ack' % FOUR])
202
self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
204
self._walker.proto.set_output([])
205
self.assertEquals([], self._walker.determine_wants(heads))
207
self._walker.proto.set_output(['want %s multi_ack' % ONE, 'foo'])
208
self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
210
self._walker.proto.set_output(['want %s multi_ack' % FOUR])
211
self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
213
# TODO: test commit time cutoff
216
class TestProtocolGraphWalker(object):
221
self.stateless_rpc = False
222
self.advertise_refs = False
224
def read_proto_line(self):
225
return self.lines.pop(0)
227
def send_ack(self, sha, ack_type=''):
228
self.acks.append((sha, ack_type))
231
self.acks.append((None, 'nak'))
233
def all_wants_satisfied(self, haves):
239
return self.acks.pop(0)
242
class AckGraphWalkerImplTestCase(TestCase):
243
"""Base setup and asserts for AckGraphWalker tests."""
245
self._walker = TestProtocolGraphWalker()
246
self._walker.lines = [
252
self._impl = self.impl_cls(self._walker)
254
def assertNoAck(self):
255
self.assertEquals(None, self._walker.pop_ack())
257
def assertAcks(self, acks):
258
for sha, ack_type in acks:
259
self.assertEquals((sha, ack_type), self._walker.pop_ack())
262
def assertAck(self, sha, ack_type=''):
263
self.assertAcks([(sha, ack_type)])
266
self.assertAck(None, 'nak')
268
def assertNextEquals(self, sha):
269
self.assertEquals(sha, self._impl.next())
272
class SingleAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
273
impl_cls = SingleAckGraphWalkerImpl
275
def test_single_ack(self):
276
self.assertNextEquals(TWO)
279
self.assertNextEquals(ONE)
280
self._walker.done = True
284
self.assertNextEquals(THREE)
285
self._impl.ack(THREE)
288
self.assertNextEquals(None)
291
def test_single_ack_flush(self):
292
# same as ack test but ends with a flush-pkt instead of done
293
self._walker.lines[-1] = (None, None)
295
self.assertNextEquals(TWO)
298
self.assertNextEquals(ONE)
299
self._walker.done = True
303
self.assertNextEquals(THREE)
306
self.assertNextEquals(None)
309
def test_single_ack_nak(self):
310
self.assertNextEquals(TWO)
313
self.assertNextEquals(ONE)
316
self.assertNextEquals(THREE)
319
self.assertNextEquals(None)
322
def test_single_ack_nak_flush(self):
323
# same as nak test but ends with a flush-pkt instead of done
324
self._walker.lines[-1] = (None, None)
326
self.assertNextEquals(TWO)
329
self.assertNextEquals(ONE)
332
self.assertNextEquals(THREE)
335
self.assertNextEquals(None)
338
class MultiAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
339
impl_cls = MultiAckGraphWalkerImpl
341
def test_multi_ack(self):
342
self.assertNextEquals(TWO)
345
self.assertNextEquals(ONE)
346
self._walker.done = True
348
self.assertAck(ONE, 'continue')
350
self.assertNextEquals(THREE)
351
self._impl.ack(THREE)
352
self.assertAck(THREE, 'continue')
354
self.assertNextEquals(None)
355
self.assertAck(THREE)
357
def test_multi_ack_partial(self):
358
self.assertNextEquals(TWO)
361
self.assertNextEquals(ONE)
363
self.assertAck(ONE, 'continue')
365
self.assertNextEquals(THREE)
368
self.assertNextEquals(None)
369
# done, re-send ack of last common
372
def test_multi_ack_flush(self):
373
self._walker.lines = [
380
self.assertNextEquals(TWO)
383
self.assertNextEquals(ONE)
384
self.assertNak() # nak the flush-pkt
386
self._walker.done = True
388
self.assertAck(ONE, 'continue')
390
self.assertNextEquals(THREE)
391
self._impl.ack(THREE)
392
self.assertAck(THREE, 'continue')
394
self.assertNextEquals(None)
395
self.assertAck(THREE)
397
def test_multi_ack_nak(self):
398
self.assertNextEquals(TWO)
401
self.assertNextEquals(ONE)
404
self.assertNextEquals(THREE)
407
self.assertNextEquals(None)
410
class MultiAckDetailedGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
411
impl_cls = MultiAckDetailedGraphWalkerImpl
413
def test_multi_ack(self):
414
self.assertNextEquals(TWO)
417
self.assertNextEquals(ONE)
418
self._walker.done = True
420
self.assertAcks([(ONE, 'common'), (ONE, 'ready')])
422
self.assertNextEquals(THREE)
423
self._impl.ack(THREE)
424
self.assertAck(THREE, 'ready')
426
self.assertNextEquals(None)
427
self.assertAck(THREE)
429
def test_multi_ack_partial(self):
430
self.assertNextEquals(TWO)
433
self.assertNextEquals(ONE)
435
self.assertAck(ONE, 'common')
437
self.assertNextEquals(THREE)
440
self.assertNextEquals(None)
441
# done, re-send ack of last common
444
def test_multi_ack_flush(self):
445
# same as ack test but contains a flush-pkt in the middle
446
self._walker.lines = [
453
self.assertNextEquals(TWO)
456
self.assertNextEquals(ONE)
457
self.assertNak() # nak the flush-pkt
459
self._walker.done = True
461
self.assertAcks([(ONE, 'common'), (ONE, 'ready')])
463
self.assertNextEquals(THREE)
464
self._impl.ack(THREE)
465
self.assertAck(THREE, 'ready')
467
self.assertNextEquals(None)
468
self.assertAck(THREE)
470
def test_multi_ack_nak(self):
471
self.assertNextEquals(TWO)
474
self.assertNextEquals(ONE)
477
self.assertNextEquals(THREE)
480
self.assertNextEquals(None)
483
def test_multi_ack_nak_flush(self):
484
# same as nak test but contains a flush-pkt in the middle
485
self._walker.lines = [
492
self.assertNextEquals(TWO)
495
self.assertNextEquals(ONE)
498
self.assertNextEquals(THREE)
501
self.assertNextEquals(None)
504
def test_multi_ack_stateless(self):
505
# transmission ends with a flush-pkt
506
self._walker.lines[-1] = (None, None)
507
self._walker.stateless_rpc = True
509
self.assertNextEquals(TWO)
512
self.assertNextEquals(ONE)
515
self.assertNextEquals(THREE)
518
self.assertNextEquals(None)