~ubuntu-branches/ubuntu/lucid/bzr/lucid-proposed

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

  • Committer: Bazaar Package Importer
  • Author(s): Jeff Bailey
  • Date: 2006-03-20 08:31:00 UTC
  • mfrom: (1.1.2 upstream)
  • mto: This revision was merged to the branch mainline in revision 4.
  • Revision ID: james.westby@ubuntu.com-20060320083100-ovdi2ssuw0epcx8s
Tags: 0.8~200603200831-0ubuntu1
* Snapshot uploaded to Dapper at Martin Pool's request.

* Disable testsuite for upload.  Fakeroot and the testsuite don't
  play along.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import os
 
24
from cStringIO import StringIO
 
25
import stat
 
26
import sys
 
27
 
 
28
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
 
29
                           LockError,
 
30
                           PathError,
 
31
                           TransportNotPossible, ConnectionError)
 
32
from bzrlib.tests import TestCaseInTempDir, TestSkipped
 
33
from bzrlib.transport import memory, urlescape
 
34
import bzrlib.transport
 
35
 
 
36
 
 
37
def _append(fn, txt):
 
38
    """Append the given text (file-like object) to the supplied filename."""
 
39
    f = open(fn, 'ab')
 
40
    try:
 
41
        f.write(txt.read())
 
42
    finally:
 
43
        f.close()
 
44
 
 
45
 
 
46
class TestTransportImplementation(TestCaseInTempDir):
 
47
    """Implementation verification for transports.
 
48
    
 
49
    To verify a transport we need a server factory, which is a callable
 
50
    that accepts no parameters and returns an implementation of
 
51
    bzrlib.transport.Server.
 
52
    
 
53
    That Server is then used to construct transport instances and test
 
54
    the transport via loopback activity.
 
55
 
 
56
    Currently this assumes that the Transport object is connected to the 
 
57
    current working directory.  So that whatever is done 
 
58
    through the transport, should show up in the working 
 
59
    directory, and vice-versa. This is a bug, because its possible to have
 
60
    URL schemes which provide access to something that may not be 
 
61
    result in storage on the local disk, i.e. due to file system limits, or 
 
62
    due to it being a database or some other non-filesystem tool.
 
63
 
 
64
    This also tests to make sure that the functions work with both
 
65
    generators and lists (assuming iter(list) is effectively a generator)
 
66
    """
 
67
    
 
68
    def setUp(self):
 
69
        super(TestTransportImplementation, self).setUp()
 
70
        self._server = self.transport_server()
 
71
        self._server.setUp()
 
72
 
 
73
    def tearDown(self):
 
74
        super(TestTransportImplementation, self).tearDown()
 
75
        self._server.tearDown()
 
76
        
 
77
    def check_transport_contents(self, content, transport, relpath):
 
78
        """Check that transport.get(relpath).read() == content."""
 
79
        self.assertEqualDiff(content, transport.get(relpath).read())
 
80
 
 
81
    def get_transport(self):
 
82
        """Return a connected transport to the local directory."""
 
83
        t = bzrlib.transport.get_transport(self._server.get_url())
 
84
        self.failUnless(isinstance(t, self.transport_class), 
 
85
                        "Got the wrong class from get_transport"
 
86
                        "(%r, expected %r)" % (t.__class__, 
 
87
                                               self.transport_class))
 
88
        return t
 
89
 
 
90
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
91
        """Fail unless excClass is raised when the iterator from func is used.
 
92
        
 
93
        Many transport functions can return generators this makes sure
 
94
        to wrap them in a list() call to make sure the whole generator
 
95
        is run, and that the proper exception is raised.
 
96
        """
 
97
        try:
 
98
            list(func(*args, **kwargs))
 
99
        except excClass:
 
100
            return
 
101
        else:
 
102
            if hasattr(excClass,'__name__'): excName = excClass.__name__
 
103
            else: excName = str(excClass)
 
104
            raise self.failureException, "%s not raised" % excName
 
105
 
 
106
    def test_has(self):
 
107
        t = self.get_transport()
 
108
 
 
109
        files = ['a', 'b', 'e', 'g', '%']
 
110
        self.build_tree(files, transport=t)
 
111
        self.assertEqual(True, t.has('a'))
 
112
        self.assertEqual(False, t.has('c'))
 
113
        self.assertEqual(True, t.has(urlescape('%')))
 
114
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
115
                [True, True, False, False, True, False, True, False])
 
116
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
117
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
 
118
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
119
                [True, True, False, False, True, False, True, False])
 
120
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
121
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
122
 
 
123
    def test_get(self):
 
124
        t = self.get_transport()
 
125
 
 
126
        files = ['a', 'b', 'e', 'g']
 
127
        contents = ['contents of a\n',
 
128
                    'contents of b\n',
 
129
                    'contents of e\n',
 
130
                    'contents of g\n',
 
131
                    ]
 
132
        self.build_tree(files, transport=t)
 
133
        self.check_transport_contents('contents of a\n', t, 'a')
 
134
        content_f = t.get_multi(files)
 
135
        for content, f in zip(contents, content_f):
 
136
            self.assertEqual(content, f.read())
 
137
 
 
138
        content_f = t.get_multi(iter(files))
 
139
        for content, f in zip(contents, content_f):
 
140
            self.assertEqual(content, f.read())
 
141
 
 
142
        self.assertRaises(NoSuchFile, t.get, 'c')
 
143
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
144
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
145
 
 
146
    def test_put(self):
 
147
        t = self.get_transport()
 
148
 
 
149
        if t.is_readonly():
 
150
            self.assertRaises(TransportNotPossible,
 
151
                    t.put, 'a', 'some text for a\n')
 
152
            return
 
153
 
 
154
        t.put('a', StringIO('some text for a\n'))
 
155
        self.failUnless(t.has('a'))
 
156
        self.check_transport_contents('some text for a\n', t, 'a')
 
157
        # Make sure 'has' is updated
 
158
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
159
                [True, False, False, False, False])
 
160
        # Put also replaces contents
 
161
        self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
 
162
                                      ('d', StringIO('contents\nfor d\n'))]),
 
163
                         2)
 
164
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
165
                [True, False, False, True, False])
 
166
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
167
        self.check_transport_contents('contents\nfor d\n', t, 'd')
 
168
 
 
169
        self.assertEqual(
 
170
            t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
 
171
                              ('d', StringIO('another contents\nfor d\n'))])),
 
172
                        2)
 
173
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
 
174
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
 
175
 
 
176
        self.assertRaises(NoSuchFile,
 
177
                          t.put, 'path/doesnt/exist/c', 'contents')
 
178
 
 
179
    def test_put_permissions(self):
 
180
        t = self.get_transport()
 
181
 
 
182
        if t.is_readonly():
 
183
            return
 
184
        t.put('mode644', StringIO('test text\n'), mode=0644)
 
185
        self.assertTransportMode(t, 'mode644', 0644)
 
186
        t.put('mode666', StringIO('test text\n'), mode=0666)
 
187
        self.assertTransportMode(t, 'mode666', 0666)
 
188
        t.put('mode600', StringIO('test text\n'), mode=0600)
 
189
        self.assertTransportMode(t, 'mode600', 0600)
 
190
        # Yes, you can put a file such that it becomes readonly
 
191
        t.put('mode400', StringIO('test text\n'), mode=0400)
 
192
        self.assertTransportMode(t, 'mode400', 0400)
 
193
        t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
 
194
        self.assertTransportMode(t, 'mmode644', 0644)
 
195
        
 
196
    def test_mkdir(self):
 
197
        t = self.get_transport()
 
198
 
 
199
        if t.is_readonly():
 
200
            # cannot mkdir on readonly transports. We're not testing for 
 
201
            # cache coherency because cache behaviour is not currently
 
202
            # defined for the transport interface.
 
203
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
204
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
205
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
206
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
207
            return
 
208
        # Test mkdir
 
209
        t.mkdir('dir_a')
 
210
        self.assertEqual(t.has('dir_a'), True)
 
211
        self.assertEqual(t.has('dir_b'), False)
 
212
 
 
213
        t.mkdir('dir_b')
 
214
        self.assertEqual(t.has('dir_b'), True)
 
215
 
 
216
        t.mkdir_multi(['dir_c', 'dir_d'])
 
217
 
 
218
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
219
        self.assertEqual(list(t.has_multi(
 
220
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
221
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
222
            [True, True, True, False,
 
223
             True, True, True, True])
 
224
 
 
225
        # we were testing that a local mkdir followed by a transport
 
226
        # mkdir failed thusly, but given that we * in one process * do not
 
227
        # concurrently fiddle with disk dirs and then use transport to do 
 
228
        # things, the win here seems marginal compared to the constraint on
 
229
        # the interface. RBC 20051227
 
230
        t.mkdir('dir_g')
 
231
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
232
 
 
233
        # Test get/put in sub-directories
 
234
        self.assertEqual(
 
235
            t.put_multi([('dir_a/a', StringIO('contents of dir_a/a')),
 
236
                         ('dir_b/b', StringIO('contents of dir_b/b'))])
 
237
                        , 2)
 
238
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
239
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
240
 
 
241
        # mkdir of a dir with an absent parent
 
242
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
243
 
 
244
    def test_mkdir_permissions(self):
 
245
        t = self.get_transport()
 
246
        if t.is_readonly():
 
247
            return
 
248
        # Test mkdir with a mode
 
249
        t.mkdir('dmode755', mode=0755)
 
250
        self.assertTransportMode(t, 'dmode755', 0755)
 
251
        t.mkdir('dmode555', mode=0555)
 
252
        self.assertTransportMode(t, 'dmode555', 0555)
 
253
        t.mkdir('dmode777', mode=0777)
 
254
        self.assertTransportMode(t, 'dmode777', 0777)
 
255
        t.mkdir('dmode700', mode=0700)
 
256
        self.assertTransportMode(t, 'dmode700', 0700)
 
257
        # TODO: jam 20051215 test mkdir_multi with a mode
 
258
        t.mkdir_multi(['mdmode755'], mode=0755)
 
259
        self.assertTransportMode(t, 'mdmode755', 0755)
 
260
 
 
261
    def test_copy_to(self):
 
262
        # FIXME: test:   same server to same server (partly done)
 
263
        # same protocol two servers
 
264
        # and    different protocols (done for now except for MemoryTransport.
 
265
        # - RBC 20060122
 
266
        from bzrlib.transport.memory import MemoryTransport
 
267
 
 
268
        def simple_copy_files(transport_from, transport_to):
 
269
            files = ['a', 'b', 'c', 'd']
 
270
            self.build_tree(files, transport=transport_from)
 
271
            transport_from.copy_to(files, transport_to)
 
272
            for f in files:
 
273
                self.check_transport_contents(transport_to.get(f).read(),
 
274
                                              transport_from, f)
 
275
 
 
276
        t = self.get_transport()
 
277
        temp_transport = MemoryTransport('memory:/')
 
278
        simple_copy_files(t, temp_transport)
 
279
        if not t.is_readonly():
 
280
            t.mkdir('copy_to_simple')
 
281
            t2 = t.clone('copy_to_simple')
 
282
            simple_copy_files(t, t2)
 
283
 
 
284
 
 
285
        # Test that copying into a missing directory raises
 
286
        # NoSuchFile
 
287
        if t.is_readonly():
 
288
            self.build_tree(['e/', 'e/f'])
 
289
        else:
 
290
            t.mkdir('e')
 
291
            t.put('e/f', StringIO('contents of e'))
 
292
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
293
        temp_transport.mkdir('e')
 
294
        t.copy_to(['e/f'], temp_transport)
 
295
 
 
296
        del temp_transport
 
297
        temp_transport = MemoryTransport('memory:/')
 
298
 
 
299
        files = ['a', 'b', 'c', 'd']
 
300
        t.copy_to(iter(files), temp_transport)
 
301
        for f in files:
 
302
            self.check_transport_contents(temp_transport.get(f).read(),
 
303
                                          t, f)
 
304
        del temp_transport
 
305
 
 
306
        for mode in (0666, 0644, 0600, 0400):
 
307
            temp_transport = MemoryTransport("memory:/")
 
308
            t.copy_to(files, temp_transport, mode=mode)
 
309
            for f in files:
 
310
                self.assertTransportMode(temp_transport, f, mode)
 
311
 
 
312
    def test_append(self):
 
313
        t = self.get_transport()
 
314
 
 
315
        if t.is_readonly():
 
316
            open('a', 'wb').write('diff\ncontents for\na\n')
 
317
            open('b', 'wb').write('contents\nfor b\n')
 
318
        else:
 
319
            t.put_multi([
 
320
                    ('a', StringIO('diff\ncontents for\na\n')),
 
321
                    ('b', StringIO('contents\nfor b\n'))
 
322
                    ])
 
323
 
 
324
        if t.is_readonly():
 
325
            self.assertRaises(TransportNotPossible,
 
326
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
 
327
            _append('a', StringIO('add\nsome\nmore\ncontents\n'))
 
328
        else:
 
329
            t.append('a', StringIO('add\nsome\nmore\ncontents\n'))
 
330
 
 
331
        self.check_transport_contents(
 
332
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
333
            t, 'a')
 
334
 
 
335
        if t.is_readonly():
 
336
            self.assertRaises(TransportNotPossible,
 
337
                    t.append_multi,
 
338
                        [('a', 'and\nthen\nsome\nmore\n'),
 
339
                         ('b', 'some\nmore\nfor\nb\n')])
 
340
            _append('a', StringIO('and\nthen\nsome\nmore\n'))
 
341
            _append('b', StringIO('some\nmore\nfor\nb\n'))
 
342
        else:
 
343
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
344
                    ('b', StringIO('some\nmore\nfor\nb\n'))])
 
345
        self.check_transport_contents(
 
346
            'diff\ncontents for\na\n'
 
347
            'add\nsome\nmore\ncontents\n'
 
348
            'and\nthen\nsome\nmore\n',
 
349
            t, 'a')
 
350
        self.check_transport_contents(
 
351
                'contents\nfor b\n'
 
352
                'some\nmore\nfor\nb\n',
 
353
                t, 'b')
 
354
 
 
355
        if t.is_readonly():
 
356
            _append('a', StringIO('a little bit more\n'))
 
357
            _append('b', StringIO('from an iterator\n'))
 
358
        else:
 
359
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
360
                    ('b', StringIO('from an iterator\n'))]))
 
361
        self.check_transport_contents(
 
362
            'diff\ncontents for\na\n'
 
363
            'add\nsome\nmore\ncontents\n'
 
364
            'and\nthen\nsome\nmore\n'
 
365
            'a little bit more\n',
 
366
            t, 'a')
 
367
        self.check_transport_contents(
 
368
                'contents\nfor b\n'
 
369
                'some\nmore\nfor\nb\n'
 
370
                'from an iterator\n',
 
371
                t, 'b')
 
372
 
 
373
        if t.is_readonly():
 
374
            _append('c', StringIO('some text\nfor a missing file\n'))
 
375
            _append('a', StringIO('some text in a\n'))
 
376
            _append('d', StringIO('missing file r\n'))
 
377
        else:
 
378
            t.append('c', StringIO('some text\nfor a missing file\n'))
 
379
            t.append_multi([('a', StringIO('some text in a\n')),
 
380
                            ('d', StringIO('missing file r\n'))])
 
381
        self.check_transport_contents(
 
382
            'diff\ncontents for\na\n'
 
383
            'add\nsome\nmore\ncontents\n'
 
384
            'and\nthen\nsome\nmore\n'
 
385
            'a little bit more\n'
 
386
            'some text in a\n',
 
387
            t, 'a')
 
388
        self.check_transport_contents('some text\nfor a missing file\n',
 
389
                                      t, 'c')
 
390
        self.check_transport_contents('missing file r\n', t, 'd')
 
391
        
 
392
        # a file with no parent should fail..
 
393
        if not t.is_readonly():
 
394
            self.assertRaises(NoSuchFile,
 
395
                              t.append, 'missing/path', 
 
396
                              StringIO('content'))
 
397
 
 
398
    def test_append_file(self):
 
399
        t = self.get_transport()
 
400
 
 
401
        contents = [
 
402
            ('f1', StringIO('this is a string\nand some more stuff\n')),
 
403
            ('f2', StringIO('here is some text\nand a bit more\n')),
 
404
            ('f3', StringIO('some text for the\nthird file created\n')),
 
405
            ('f4', StringIO('this is a string\nand some more stuff\n')),
 
406
            ('f5', StringIO('here is some text\nand a bit more\n')),
 
407
            ('f6', StringIO('some text for the\nthird file created\n'))
 
408
        ]
 
409
        
 
410
        if t.is_readonly():
 
411
            for f, val in contents:
 
412
                open(f, 'wb').write(val.read())
 
413
        else:
 
414
            t.put_multi(contents)
 
415
 
 
416
        a1 = StringIO('appending to\none\n')
 
417
        if t.is_readonly():
 
418
            _append('f1', a1)
 
419
        else:
 
420
            t.append('f1', a1)
 
421
 
 
422
        del a1
 
423
 
 
424
        self.check_transport_contents(
 
425
                'this is a string\nand some more stuff\n'
 
426
                'appending to\none\n',
 
427
                t, 'f1')
 
428
 
 
429
        a2 = StringIO('adding more\ntext to two\n')
 
430
        a3 = StringIO('some garbage\nto put in three\n')
 
431
 
 
432
        if t.is_readonly():
 
433
            _append('f2', a2)
 
434
            _append('f3', a3)
 
435
        else:
 
436
            t.append_multi([('f2', a2), ('f3', a3)])
 
437
 
 
438
        del a2, a3
 
439
 
 
440
        self.check_transport_contents(
 
441
                'here is some text\nand a bit more\n'
 
442
                'adding more\ntext to two\n',
 
443
                t, 'f2')
 
444
        self.check_transport_contents( 
 
445
                'some text for the\nthird file created\n'
 
446
                'some garbage\nto put in three\n',
 
447
                t, 'f3')
 
448
 
 
449
        # Test that an actual file object can be used with put
 
450
        a4 = t.get('f1')
 
451
        if t.is_readonly():
 
452
            _append('f4', a4)
 
453
        else:
 
454
            t.append('f4', a4)
 
455
 
 
456
        del a4
 
457
 
 
458
        self.check_transport_contents(
 
459
                'this is a string\nand some more stuff\n'
 
460
                'this is a string\nand some more stuff\n'
 
461
                'appending to\none\n',
 
462
                t, 'f4')
 
463
 
 
464
        a5 = t.get('f2')
 
465
        a6 = t.get('f3')
 
466
        if t.is_readonly():
 
467
            _append('f5', a5)
 
468
            _append('f6', a6)
 
469
        else:
 
470
            t.append_multi([('f5', a5), ('f6', a6)])
 
471
 
 
472
        del a5, a6
 
473
 
 
474
        self.check_transport_contents(
 
475
                'here is some text\nand a bit more\n'
 
476
                'here is some text\nand a bit more\n'
 
477
                'adding more\ntext to two\n',
 
478
                t, 'f5')
 
479
        self.check_transport_contents(
 
480
                'some text for the\nthird file created\n'
 
481
                'some text for the\nthird file created\n'
 
482
                'some garbage\nto put in three\n',
 
483
                t, 'f6')
 
484
 
 
485
        a5 = t.get('f2')
 
486
        a6 = t.get('f2')
 
487
        a7 = t.get('f3')
 
488
        if t.is_readonly():
 
489
            _append('c', a5)
 
490
            _append('a', a6)
 
491
            _append('d', a7)
 
492
        else:
 
493
            t.append('c', a5)
 
494
            t.append_multi([('a', a6), ('d', a7)])
 
495
        del a5, a6, a7
 
496
        self.check_transport_contents(t.get('f2').read(), t, 'c')
 
497
        self.check_transport_contents(t.get('f3').read(), t, 'd')
 
498
 
 
499
    def test_delete(self):
 
500
        # TODO: Test Transport.delete
 
501
        t = self.get_transport()
 
502
 
 
503
        # Not much to do with a readonly transport
 
504
        if t.is_readonly():
 
505
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
506
            return
 
507
 
 
508
        t.put('a', StringIO('a little bit of text\n'))
 
509
        self.failUnless(t.has('a'))
 
510
        t.delete('a')
 
511
        self.failIf(t.has('a'))
 
512
 
 
513
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
514
 
 
515
        t.put('a', StringIO('a text\n'))
 
516
        t.put('b', StringIO('b text\n'))
 
517
        t.put('c', StringIO('c text\n'))
 
518
        self.assertEqual([True, True, True],
 
519
                list(t.has_multi(['a', 'b', 'c'])))
 
520
        t.delete_multi(['a', 'c'])
 
521
        self.assertEqual([False, True, False],
 
522
                list(t.has_multi(['a', 'b', 'c'])))
 
523
        self.failIf(t.has('a'))
 
524
        self.failUnless(t.has('b'))
 
525
        self.failIf(t.has('c'))
 
526
 
 
527
        self.assertRaises(NoSuchFile,
 
528
                t.delete_multi, ['a', 'b', 'c'])
 
529
 
 
530
        self.assertRaises(NoSuchFile,
 
531
                t.delete_multi, iter(['a', 'b', 'c']))
 
532
 
 
533
        t.put('a', StringIO('another a text\n'))
 
534
        t.put('c', StringIO('another c text\n'))
 
535
        t.delete_multi(iter(['a', 'b', 'c']))
 
536
 
 
537
        # We should have deleted everything
 
538
        # SftpServer creates control files in the
 
539
        # working directory, so we can just do a
 
540
        # plain "listdir".
 
541
        # self.assertEqual([], os.listdir('.'))
 
542
 
 
543
    def test_rmdir(self):
 
544
        t = self.get_transport()
 
545
        # Not much to do with a readonly transport
 
546
        if t.is_readonly():
 
547
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
548
            return
 
549
        t.mkdir('adir')
 
550
        t.mkdir('adir/bdir')
 
551
        t.rmdir('adir/bdir')
 
552
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
 
553
        t.rmdir('adir')
 
554
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
555
 
 
556
    def test_rmdir_not_empty(self):
 
557
        """Deleting a non-empty directory raises an exception
 
558
        
 
559
        sftp (and possibly others) don't give us a specific "directory not
 
560
        empty" exception -- we can just see that the operation failed.
 
561
        """
 
562
        t = self.get_transport()
 
563
        if t.is_readonly():
 
564
            return
 
565
        t.mkdir('adir')
 
566
        t.mkdir('adir/bdir')
 
567
        self.assertRaises(PathError, t.rmdir, 'adir')
 
568
 
 
569
    def test_rename_dir_succeeds(self):
 
570
        t = self.get_transport()
 
571
        if t.is_readonly():
 
572
            raise TestSkipped("transport is readonly")
 
573
        t.mkdir('adir')
 
574
        t.mkdir('adir/asubdir')
 
575
        t.rename('adir', 'bdir')
 
576
        self.assertTrue(t.has('bdir/asubdir'))
 
577
        self.assertFalse(t.has('adir'))
 
578
 
 
579
    def test_rename_dir_nonempty(self):
 
580
        """Attempting to replace a nonemtpy directory should fail"""
 
581
        t = self.get_transport()
 
582
        if t.is_readonly():
 
583
            raise TestSkipped("transport is readonly")
 
584
        t.mkdir('adir')
 
585
        t.mkdir('adir/asubdir')
 
586
        t.mkdir('bdir')
 
587
        t.mkdir('bdir/bsubdir')
 
588
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
589
        # nothing was changed so it should still be as before
 
590
        self.assertTrue(t.has('bdir/bsubdir'))
 
591
        self.assertFalse(t.has('adir/bdir'))
 
592
        self.assertFalse(t.has('adir/bsubdir'))
 
593
 
 
594
    def test_delete_tree(self):
 
595
        t = self.get_transport()
 
596
 
 
597
        # Not much to do with a readonly transport
 
598
        if t.is_readonly():
 
599
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
600
            return
 
601
 
 
602
        # and does it like listing ?
 
603
        t.mkdir('adir')
 
604
        try:
 
605
            t.delete_tree('adir')
 
606
        except TransportNotPossible:
 
607
            # ok, this transport does not support delete_tree
 
608
            return
 
609
        
 
610
        # did it delete that trivial case?
 
611
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
612
 
 
613
        self.build_tree(['adir/',
 
614
                         'adir/file', 
 
615
                         'adir/subdir/', 
 
616
                         'adir/subdir/file', 
 
617
                         'adir/subdir2/',
 
618
                         'adir/subdir2/file',
 
619
                         ], transport=t)
 
620
 
 
621
        t.delete_tree('adir')
 
622
        # adir should be gone now.
 
623
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
624
 
 
625
    def test_move(self):
 
626
        t = self.get_transport()
 
627
 
 
628
        if t.is_readonly():
 
629
            return
 
630
 
 
631
        # TODO: I would like to use os.listdir() to
 
632
        # make sure there are no extra files, but SftpServer
 
633
        # creates control files in the working directory
 
634
        # perhaps all of this could be done in a subdirectory
 
635
 
 
636
        t.put('a', StringIO('a first file\n'))
 
637
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
638
 
 
639
        t.move('a', 'b')
 
640
        self.failUnless(t.has('b'))
 
641
        self.failIf(t.has('a'))
 
642
 
 
643
        self.check_transport_contents('a first file\n', t, 'b')
 
644
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
645
 
 
646
        # Overwrite a file
 
647
        t.put('c', StringIO('c this file\n'))
 
648
        t.move('c', 'b')
 
649
        self.failIf(t.has('c'))
 
650
        self.check_transport_contents('c this file\n', t, 'b')
 
651
 
 
652
        # TODO: Try to write a test for atomicity
 
653
        # TODO: Test moving into a non-existant subdirectory
 
654
        # TODO: Test Transport.move_multi
 
655
 
 
656
    def test_copy(self):
 
657
        t = self.get_transport()
 
658
 
 
659
        if t.is_readonly():
 
660
            return
 
661
 
 
662
        t.put('a', StringIO('a file\n'))
 
663
        t.copy('a', 'b')
 
664
        self.check_transport_contents('a file\n', t, 'b')
 
665
 
 
666
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
667
        os.mkdir('c')
 
668
        # What should the assert be if you try to copy a
 
669
        # file over a directory?
 
670
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
671
        t.put('d', StringIO('text in d\n'))
 
672
        t.copy('d', 'b')
 
673
        self.check_transport_contents('text in d\n', t, 'b')
 
674
 
 
675
        # TODO: test copy_multi
 
676
 
 
677
    def test_connection_error(self):
 
678
        """ConnectionError is raised when connection is impossible"""
 
679
        try:
 
680
            url = self._server.get_bogus_url()
 
681
        except NotImplementedError:
 
682
            raise TestSkipped("Transport %s has no bogus URL support." %
 
683
                              self._server.__class__)
 
684
        t = bzrlib.transport.get_transport(url)
 
685
        try:
 
686
            t.get('.bzr/branch')
 
687
        except (ConnectionError, NoSuchFile), e:
 
688
            pass
 
689
        except (Exception), e:
 
690
            self.failIf(True, 'Wrong exception thrown: %s' % e)
 
691
        else:
 
692
            self.failIf(True, 'Did not get the expected exception.')
 
693
 
 
694
    def test_stat(self):
 
695
        # TODO: Test stat, just try once, and if it throws, stop testing
 
696
        from stat import S_ISDIR, S_ISREG
 
697
 
 
698
        t = self.get_transport()
 
699
 
 
700
        try:
 
701
            st = t.stat('.')
 
702
        except TransportNotPossible, e:
 
703
            # This transport cannot stat
 
704
            return
 
705
 
 
706
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
707
        sizes = [14, 0, 16, 0, 18] 
 
708
        self.build_tree(paths, transport=t)
 
709
 
 
710
        for path, size in zip(paths, sizes):
 
711
            st = t.stat(path)
 
712
            if path.endswith('/'):
 
713
                self.failUnless(S_ISDIR(st.st_mode))
 
714
                # directory sizes are meaningless
 
715
            else:
 
716
                self.failUnless(S_ISREG(st.st_mode))
 
717
                self.assertEqual(size, st.st_size)
 
718
 
 
719
        remote_stats = list(t.stat_multi(paths))
 
720
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
721
 
 
722
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
723
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
724
 
 
725
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
726
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
727
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
728
        subdir = t.clone('subdir')
 
729
        subdir.stat('./file')
 
730
        subdir.stat('.')
 
731
 
 
732
    def test_list_dir(self):
 
733
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
734
        t = self.get_transport()
 
735
        
 
736
        if not t.listable():
 
737
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
738
            return
 
739
 
 
740
        def sorted_list(d):
 
741
            l = list(t.list_dir(d))
 
742
            l.sort()
 
743
            return l
 
744
 
 
745
        # SftpServer creates control files in the working directory
 
746
        # so lets move down a directory to avoid those.
 
747
        if not t.is_readonly():
 
748
            t.mkdir('wd')
 
749
        else:
 
750
            os.mkdir('wd')
 
751
        t = t.clone('wd')
 
752
 
 
753
        self.assertEqual([], sorted_list(u'.'))
 
754
        # c2 is precisely one letter longer than c here to test that
 
755
        # suffixing is not confused.
 
756
        if not t.is_readonly():
 
757
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
 
758
        else:
 
759
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
 
760
 
 
761
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
 
762
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
763
 
 
764
        if not t.is_readonly():
 
765
            t.delete('c/d')
 
766
            t.delete('b')
 
767
        else:
 
768
            os.unlink('wd/c/d')
 
769
            os.unlink('wd/b')
 
770
            
 
771
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
 
772
        self.assertEqual([u'e'], sorted_list(u'c'))
 
773
 
 
774
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
 
775
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
 
776
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
777
 
 
778
    def test_clone(self):
 
779
        # TODO: Test that clone moves up and down the filesystem
 
780
        t1 = self.get_transport()
 
781
 
 
782
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
783
 
 
784
        self.failUnless(t1.has('a'))
 
785
        self.failUnless(t1.has('b/c'))
 
786
        self.failIf(t1.has('c'))
 
787
 
 
788
        t2 = t1.clone('b')
 
789
        self.assertEqual(t1.base + 'b/', t2.base)
 
790
 
 
791
        self.failUnless(t2.has('c'))
 
792
        self.failIf(t2.has('a'))
 
793
 
 
794
        t3 = t2.clone('..')
 
795
        self.failUnless(t3.has('a'))
 
796
        self.failIf(t3.has('c'))
 
797
 
 
798
        self.failIf(t1.has('b/d'))
 
799
        self.failIf(t2.has('d'))
 
800
        self.failIf(t3.has('b/d'))
 
801
 
 
802
        if t1.is_readonly():
 
803
            open('b/d', 'wb').write('newfile\n')
 
804
        else:
 
805
            t2.put('d', StringIO('newfile\n'))
 
806
 
 
807
        self.failUnless(t1.has('b/d'))
 
808
        self.failUnless(t2.has('d'))
 
809
        self.failUnless(t3.has('b/d'))
 
810
 
 
811
    def test_relpath(self):
 
812
        t = self.get_transport()
 
813
        self.assertEqual('', t.relpath(t.base))
 
814
        # base ends with /
 
815
        self.assertEqual('', t.relpath(t.base[:-1]))
 
816
        # subdirs which dont exist should still give relpaths.
 
817
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
818
        # trailing slash should be the same.
 
819
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
820
 
 
821
    def test_abspath(self):
 
822
        # smoke test for abspath. Corner cases for backends like unix fs's
 
823
        # that have aliasing problems like symlinks should go in backend
 
824
        # specific test cases.
 
825
        transport = self.get_transport()
 
826
        self.assertEqual(transport.base + 'relpath',
 
827
                         transport.abspath('relpath'))
 
828
 
 
829
    def test_iter_files_recursive(self):
 
830
        transport = self.get_transport()
 
831
        if not transport.listable():
 
832
            self.assertRaises(TransportNotPossible,
 
833
                              transport.iter_files_recursive)
 
834
            return
 
835
        self.build_tree(['isolated/',
 
836
                         'isolated/dir/',
 
837
                         'isolated/dir/foo',
 
838
                         'isolated/dir/bar',
 
839
                         'isolated/bar'],
 
840
                        transport=transport)
 
841
        paths = set(transport.iter_files_recursive())
 
842
        # nb the directories are not converted
 
843
        self.assertEqual(paths,
 
844
                    set(['isolated/dir/foo',
 
845
                         'isolated/dir/bar',
 
846
                         'isolated/bar']))
 
847
        sub_transport = transport.clone('isolated')
 
848
        paths = set(sub_transport.iter_files_recursive())
 
849
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
850
 
 
851
    def test_connect_twice_is_same_content(self):
 
852
        # check that our server (whatever it is) is accessable reliably
 
853
        # via get_transport and multiple connections share content.
 
854
        transport = self.get_transport()
 
855
        if transport.is_readonly():
 
856
            return
 
857
        transport.put('foo', StringIO('bar'))
 
858
        transport2 = self.get_transport()
 
859
        self.check_transport_contents('bar', transport2, 'foo')
 
860
        # its base should be usable.
 
861
        transport2 = bzrlib.transport.get_transport(transport.base)
 
862
        self.check_transport_contents('bar', transport2, 'foo')
 
863
 
 
864
        # now opening at a relative url should give use a sane result:
 
865
        transport.mkdir('newdir')
 
866
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
 
867
        transport2 = transport2.clone('..')
 
868
        self.check_transport_contents('bar', transport2, 'foo')
 
869
 
 
870
    def test_lock_write(self):
 
871
        transport = self.get_transport()
 
872
        if transport.is_readonly():
 
873
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
874
            return
 
875
        transport.put('lock', StringIO())
 
876
        lock = transport.lock_write('lock')
 
877
        # TODO make this consistent on all platforms:
 
878
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
879
        lock.unlock()
 
880
 
 
881
    def test_lock_read(self):
 
882
        transport = self.get_transport()
 
883
        if transport.is_readonly():
 
884
            file('lock', 'w').close()
 
885
        else:
 
886
            transport.put('lock', StringIO())
 
887
        lock = transport.lock_read('lock')
 
888
        # TODO make this consistent on all platforms:
 
889
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
890
        lock.unlock()