~ubuntu-branches/ubuntu/precise/ubuntuone-client/precise-security

« back to all changes in this revision

Viewing changes to .pc/01_getnodebyid_fix.patch/tests/syncdaemon/test_fsm.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-10-11 14:59:28 UTC
  • Revision ID: james.westby@ubuntu.com-20111011145928-wii0ehii12wmzvv5
Tags: 2.0.0-0ubuntu2
* debian/patches/01_getnodebyid_fix.patch:
  - Fix filter by share and path (LP: #807737)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Author: Facundo Batista <facundo@canonical.com>
 
3
# Author: Natalia Bidart <natalia.bidart@canonical.com>
 
4
# Author: Manuel de la Pena <manuel@canonical.com>
 
5
#
 
6
# Copyright 2009 Canonical Ltd.
 
7
#
 
8
# This program is free software: you can redistribute it and/or modify it
 
9
# under the terms of the GNU General Public License version 3, as published
 
10
# by the Free Software Foundation.
 
11
#
 
12
# This program is distributed in the hope that it will be useful, but
 
13
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
14
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
15
# PURPOSE.  See the GNU General Public License for more details.
 
16
#
 
17
# You should have received a copy of the GNU General Public License along
 
18
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
19
 
 
20
"""Tests for the File System Manager."""
 
21
 
 
22
from __future__ import with_statement
 
23
 
 
24
import errno
 
25
import os
 
26
import time
 
27
 
 
28
from mocker import MockerTestCase, ANY
 
29
from twisted.internet import defer
 
30
 
 
31
from contrib.testing.testcase import (
 
32
    BaseTwistedTestCase,
 
33
    FakeVolumeManager,
 
34
    FakeMain,
 
35
    Listener,
 
36
    skip_if_win32_and_uses_metadata_older_than_5,
 
37
    skip_if_win32_and_uses_readonly,
 
38
)
 
39
 
 
40
from ubuntuone.devtools.handlers import MementoHandler
 
41
from ubuntuone.platform import (
 
42
    listdir,
 
43
    make_dir,
 
44
    make_link,
 
45
    open_file,
 
46
    path_exists,
 
47
    remove_dir,
 
48
    remove_file,
 
49
    set_dir_readonly,
 
50
    set_dir_readwrite,
 
51
    stat_path,
 
52
)
 
53
from ubuntuone.syncdaemon.filesystem_manager import (
 
54
    DirectoryNotRemovable,
 
55
    EnableShareWrite,
 
56
    FileSystemManager,
 
57
    InconsistencyError,
 
58
    METADATA_VERSION,
 
59
    TrashFileShelf,
 
60
    TrashTritcaskShelf,
 
61
    TRASH_ROW_TYPE,
 
62
)
 
63
from ubuntuone.syncdaemon import filesystem_manager, config, logger
 
64
from ubuntuone.syncdaemon.file_shelf import FileShelf
 
65
from ubuntuone.syncdaemon.tritcask import Tritcask
 
66
from ubuntuone.syncdaemon.event_queue import EventQueue
 
67
from ubuntuone.syncdaemon.interfaces import IMarker
 
68
from ubuntuone.syncdaemon.marker import MDMarker
 
69
from ubuntuone.syncdaemon.volume_manager import Share, allow_writes
 
70
 
 
71
BROKEN_PICKLE = '\axb80\x02}q\x01(U\x01aU\x04testq\x02U\x01bU\x06brokenq\x03u.'
 
72
 
 
73
 
 
74
@defer.inlineCallbacks
 
75
def _create_share(share_id, share_name, fsm, shares_dir,
 
76
                  access_level='Modify'):
 
77
    """Create a share."""
 
78
    assert isinstance(share_name, unicode)
 
79
    share_path = os.path.join(shares_dir, share_name.encode('utf8'))
 
80
    make_dir(share_path, recursive=True)
 
81
    share = Share(path=share_path, volume_id=share_id,
 
82
                  access_level=access_level)
 
83
    yield fsm.vm.add_share(share)
 
84
    defer.returnValue(share)
 
85
 
 
86
 
 
87
class FSMTestCase(BaseTwistedTestCase):
 
88
    """Base test case for FSM."""
 
89
 
 
90
    @defer.inlineCallbacks
 
91
    def setUp(self):
 
92
        """Setup the test."""
 
93
        yield super(FSMTestCase, self).setUp()
 
94
        self.shares_dir = self.mktemp('shares')
 
95
        self.root_dir = self.mktemp('root')
 
96
        self.fsmdir = self.mktemp("fsmdir")
 
97
        self.partials_dir = self.mktemp("partials")
 
98
        self.tritcask_path = self.mktemp("tritcask")
 
99
 
 
100
        self.db = Tritcask(self.tritcask_path)
 
101
        self.addCleanup(self.db.shutdown)
 
102
        self.fsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
103
                                     FakeVolumeManager(self.root_dir), self.db)
 
104
        self.eq = EventQueue(self.fsm)
 
105
        self.addCleanup(self.eq.shutdown)
 
106
        self.fsm.register_eq(self.eq)
 
107
        self.share = yield self.create_share('share', u'share_name')
 
108
        self.share_path = self.share.path
 
109
        config.get_user_config().set_use_trash(True)
 
110
 
 
111
        # add a in-memory logger handler
 
112
        self.handler = MementoHandler()
 
113
        self.handler.setLevel(0)
 
114
        logger.root_logger.addHandler(self.handler)
 
115
        self.addCleanup(logger.root_logger.removeHandler, self.handler)
 
116
 
 
117
    @defer.inlineCallbacks
 
118
    def create_share(self, share_id, share_name, fsm=None, shares_dir=None,
 
119
                     access_level='Modify'):
 
120
        """Create a share."""
 
121
        assert isinstance(share_name, unicode)
 
122
        if fsm is None:
 
123
            fsm = self.fsm
 
124
        if shares_dir is None:
 
125
            shares_dir = self.shares_dir
 
126
        share = yield _create_share(share_id, share_name, fsm, shares_dir,
 
127
                                    access_level)
 
128
        defer.returnValue(share)
 
129
 
 
130
    def create_node(self, name, is_dir=False, share=None):
 
131
        """Create a node."""
 
132
        if share is None:
 
133
            share = self.share
 
134
        path = os.path.join(share.path, name)
 
135
        mdid = self.fsm.create(path, share.volume_id, is_dir=is_dir)
 
136
        self.fsm.set_node_id(path, "uuid1")
 
137
        mdobj = self.fsm.get_by_mdid(mdid)
 
138
        return mdobj
 
139
 
 
140
 
 
141
class StartupTests(BaseTwistedTestCase):
 
142
    """Test the basic startup behaviour."""
 
143
 
 
144
    def test_basic_startup(self):
 
145
        """Test the init interface."""
 
146
        # only one arg
 
147
        self.assertRaises(TypeError, FileSystemManager)
 
148
        self.assertRaises(TypeError, FileSystemManager, 1, 2)
 
149
 
 
150
        # that creates the dir
 
151
        fsmdir = self.mktemp("a_fsmdir")
 
152
        partials_dir = self.mktemp("a_partials_dir")
 
153
        db = Tritcask(fsmdir)
 
154
        self.addCleanup(db.shutdown)
 
155
        FileSystemManager(fsmdir, partials_dir,
 
156
                          FakeVolumeManager(fsmdir), db)
 
157
        self.assertTrue(path_exists(fsmdir))
 
158
 
 
159
    @defer.inlineCallbacks
 
160
    def test_complex_startup(self):
 
161
        """Test startup after having data."""
 
162
        # pylint: disable-msg=W0212
 
163
        # open an empty one
 
164
        fsmdir = self.mktemp("fsmdir")
 
165
        partials_dir = self.mktemp("a_partials_dir")
 
166
 
 
167
        db = Tritcask(fsmdir)
 
168
        self.addCleanup(db.shutdown)
 
169
        fsm = FileSystemManager(fsmdir, partials_dir,
 
170
                                FakeVolumeManager(fsmdir), db)
 
171
        share = yield _create_share('share', u'share_name',
 
172
                                    fsm=fsm, shares_dir=fsmdir)
 
173
        self.assertEqual(fsm._idx_path, {})
 
174
        self.assertEqual(fsm._idx_node_id, {})
 
175
 
 
176
        # write some data, one with node_id
 
177
        path1 = os.path.join(share.path, 'path1')
 
178
        fsm.create(path1, "share")
 
179
        created_mdid1 = fsm._idx_path[path1]
 
180
        self.assertEqual(fsm._idx_path, {path1:created_mdid1})
 
181
        fsm.set_node_id(path1, "uuid1")
 
182
        self.assertEqual(fsm._idx_node_id, {("share","uuid1"):created_mdid1})
 
183
 
 
184
        # ...and one without
 
185
        path2 = os.path.join(share.path, 'path2')
 
186
        fsm.create(path2, "share")
 
187
        created_mdid2 = fsm._idx_path[path2]
 
188
        self.assertEqual(fsm._idx_path,
 
189
                         {path1:created_mdid1, path2:created_mdid2})
 
190
 
 
191
        # open a second one to see if everything is ok
 
192
        fsm = FileSystemManager(fsmdir, partials_dir,
 
193
                                fsm.vm, db)
 
194
        self.assertEqual(fsm._idx_path,
 
195
                         {path1:created_mdid1, path2:created_mdid2})
 
196
        self.assertEqual(fsm._idx_node_id, {("share","uuid1"):created_mdid1})
 
197
        self.assertTrue(fsm.get_by_mdid(created_mdid1))
 
198
        self.assertTrue(fsm.get_by_mdid(created_mdid2))
 
199
 
 
200
 
 
201
class CreationTests(FSMTestCase):
 
202
    """Test the creation behaviour."""
 
203
 
 
204
    def test_simple(self):
 
205
        """Test simple creation."""
 
206
        # create, but not twice
 
207
        path = os.path.join(self.share.path, 'path')
 
208
        self.fsm.create(path, "share")
 
209
        self.assertRaises(ValueError, self.fsm.create, path, "share")
 
210
        self.assertRaises(ValueError, self.fsm.create, path, "other")
 
211
        mdobj = self.fsm.get_by_path(path)
 
212
        self.assertEqual(mdobj.path, "path")
 
213
        self.assertEqual(mdobj.share_id, "share")
 
214
        self.assertEqual(mdobj.generation, None)
 
215
        self.assertEqual(mdobj.crc32, None)
 
216
        self.assertEqual(mdobj.size, None)
 
217
        when = mdobj.info.created
 
218
        now = time.time()
 
219
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
220
 
 
221
        # set uuid using valid path, but not twice
 
222
        self.fsm.set_node_id(path, "uuid")
 
223
        self.assertRaises(ValueError, self.fsm.set_node_id, path, "whatever")
 
224
        mdobj = self.fsm.get_by_path(path)
 
225
        when = mdobj.info.node_id_assigned
 
226
        now = time.time()
 
227
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
228
 
 
229
    def test_with_node_id(self):
 
230
        """Test creation with node_id"""
 
231
        # create, but not twice
 
232
        path = os.path.join(self.share.path, 'path')
 
233
        self.fsm.create(path, "share", node_id='a_node_id')
 
234
        self.assertRaises(ValueError, self.fsm.create, path, "share")
 
235
        self.assertRaises(ValueError, self.fsm.create, path, "other")
 
236
        mdobj = self.fsm.get_by_path(path)
 
237
        self.assertEqual(mdobj.path, "path")
 
238
        self.assertEqual(mdobj.share_id, "share")
 
239
        self.assertEqual(mdobj.node_id, "a_node_id")
 
240
        self.assertEqual(mdobj.generation, None)
 
241
        self.assertEqual(mdobj.crc32, None)
 
242
        self.assertEqual(mdobj.size, None)
 
243
        when = mdobj.info.created
 
244
        now = time.time()
 
245
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
246
 
 
247
        # set uuid using valid path, but not twice
 
248
        self.assertRaises(ValueError, self.fsm.set_node_id, path, "whatever")
 
249
        mdobj = self.fsm.get_by_path(path)
 
250
        when = mdobj.info.node_id_assigned
 
251
        now = time.time()
 
252
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
253
 
 
254
    def test_invalid_args(self):
 
255
        """Test using invalid args in set_node_id."""
 
256
        path = os.path.join(self.share.path, 'path')
 
257
        self.fsm.create(path, "share")
 
258
 
 
259
        # set uuid using an invalid path
 
260
        self.assertRaises(KeyError, self.fsm.set_node_id, "no-path", "whatevr")
 
261
 
 
262
        # set uuid using an invalid node_id
 
263
        self.assertRaises(ValueError, self.fsm.set_node_id, path, None)
 
264
 
 
265
    def test_twice_sameid_ok(self):
 
266
        """Test that uuid can be set twice, if the uuid is same."""
 
267
        # using the first FSM
 
268
        path = os.path.join(self.share.path, 'path')
 
269
        self.fsm.create(path, "share")
 
270
        self.fsm.set_node_id(path, "uuid")
 
271
        self.fsm.set_node_id(path, "uuid")
 
272
 
 
273
        # opening another FSM
 
274
        fsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
275
                                self.fsm.vm, self.db)
 
276
        fsm.set_node_id(path, "uuid")
 
277
 
 
278
    def test_twice_different_bad(self):
 
279
        """Test that assignments must be done once, even in different FSMs."""
 
280
        # using the first FSM
 
281
        path = os.path.join(self.share.path, 'path')
 
282
        self.fsm.create(path, "share")
 
283
        self.fsm.set_node_id(path, "uuid")
 
284
        self.assertRaises(ValueError, self.fsm.set_node_id, path, "other_uuid")
 
285
 
 
286
        # opening another FSM
 
287
        fsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
288
                                self.fsm.vm, self.db)
 
289
        self.assertRaises(ValueError, fsm.create, path, "share")
 
290
        self.assertRaises(ValueError, fsm.set_node_id, path, "other_uuid")
 
291
 
 
292
    def test_fresh_metadata(self):
 
293
        """Initing with nothing in the metadata, it should leave it right."""
 
294
        with open_file(os.path.join(self.fsmdir, "metadata_version")) as f:
 
295
            md_version = f.read()
 
296
        self.assertEqual(md_version, METADATA_VERSION)
 
297
 
 
298
    @skip_if_win32_and_uses_metadata_older_than_5
 
299
    @defer.inlineCallbacks
 
300
    def test_old_metadata_None(self):
 
301
        """Test old metadata situation, in None."""
 
302
        # create some stuff
 
303
        path = os.path.join(self.share.path, 'path')
 
304
        open_file(path, "w").close()
 
305
        mdid = self.fsm.create(path, "share")
 
306
        self.fsm.set_node_id(path, "uuid")
 
307
        # create a path with the old layout
 
308
        other_share = yield self.create_share('share1', u'share1_name')
 
309
        share_mdid = self.fsm.create(other_share.path, "share1")
 
310
        self.fsm.set_node_id(other_share.path, "uuid1")
 
311
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
312
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
313
                            'Shared With Me')
 
314
        old_path = os.path.join(old_shares_path, 'share1_name')
 
315
        make_link(self.shares_dir, old_shares_path)
 
316
 
 
317
        # put the old path in the mdobj
 
318
        share_md = self.fsm.fs[share_mdid]
 
319
        share_md['path'] = old_path
 
320
        self.fsm.fs[share_mdid] = share_md
 
321
 
 
322
        # break the node on purpose
 
323
        real_mdobj = self.fsm.fs[mdid]
 
324
        del real_mdobj["stat"]
 
325
        del real_mdobj["generation"]
 
326
        real_mdobj["path"] = unicode(real_mdobj["path"])
 
327
        real_mdobj["local_hash"] = None
 
328
        real_mdobj["server_hash"] = None
 
329
        self.fsm.fs[mdid] = real_mdobj
 
330
 
 
331
        # delete the version that should have left the previous fsm
 
332
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
333
        remove_file(version_file)
 
334
 
 
335
        # create a old-style fs with the data:
 
336
        old_fs = FileShelf(self.fsm.old_fs._path)
 
337
        for k, v in self.fsm.fs.iteritems():
 
338
            old_fs[k] = v
 
339
 
 
340
        # start up again, and check
 
341
        db = Tritcask(self.tritcask_path+'.new')
 
342
        self.addCleanup(db.shutdown)
 
343
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
344
                                   self.fsm.vm, db)
 
345
        md_version = open_file(version_file).read()
 
346
        self.assertEqual(md_version, METADATA_VERSION)
 
347
        newmdobj = newfsm.get_by_path(path)
 
348
        self.assertEqual(newmdobj.mdid, mdid)
 
349
        self.assertEqual(newmdobj.stat, stat_path(path))
 
350
        self.assertEqual(newmdobj.generation, None)
 
351
        self.assertEqual(newmdobj.local_hash, "")
 
352
        self.assertEqual(newmdobj.server_hash, "")
 
353
        self.assertTrue(isinstance(newmdobj.path, str))
 
354
        self.assertTrue(other_share.path in newfsm._idx_path)
 
355
        self.assertFalse(old_path in self.fsm._idx_path)
 
356
        self.assertFalse(old_path in newfsm._idx_path)
 
357
 
 
358
    @skip_if_win32_and_uses_metadata_older_than_5
 
359
    @defer.inlineCallbacks
 
360
    def test_old_metadata_1(self):
 
361
        """Test old metadata situation, in v1."""
 
362
        # create some stuff
 
363
        path1 = os.path.join(self.share.path, 'path1')
 
364
        path2 = os.path.join(self.share.path, 'path2')
 
365
        mdid1 = self.fsm.create(path1, "share")
 
366
        self.fsm.set_node_id(path1, "uuid1")
 
367
        mdid2 = self.fsm.create(path2, "share")
 
368
        self.fsm.set_node_id(path2, "uuid2")
 
369
 
 
370
        # create a path with the old layout
 
371
        other_share = yield self.create_share('share1', u'share1_name')
 
372
        share_mdid = self.fsm.create(other_share.path, "share1")
 
373
        self.fsm.set_node_id(other_share.path, "uuid1")
 
374
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
375
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
376
                            'Shared With Me')
 
377
        old_path = os.path.join(old_shares_path, 'share1_name')
 
378
        make_link(self.shares_dir, old_shares_path)
 
379
 
 
380
        # put the old path in the mdobj
 
381
        share_md = self.fsm.fs[share_mdid]
 
382
        share_md['path'] = old_path
 
383
        self.fsm.fs[share_mdid] = share_md
 
384
 
 
385
        # break the node on purpose, with unicode valid and not
 
386
        real_mdobj = self.fsm.fs[mdid1]
 
387
        real_mdobj["path"] = unicode(real_mdobj["path"])
 
388
        real_mdobj["local_hash"] = None
 
389
        real_mdobj["server_hash"] = None
 
390
        del real_mdobj["generation"]
 
391
        self.fsm.fs[mdid1] = real_mdobj
 
392
        real_mdobj = self.fsm.fs[mdid2]
 
393
        real_mdobj["path"] = "asdas\x00\xff\xffasd"
 
394
        self.fsm.fs[mdid2] = real_mdobj
 
395
 
 
396
        # put the old version in file
 
397
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
398
        with open_file(version_file, "w") as fh:
 
399
            fh.write("1")
 
400
 
 
401
        # create a old-style fs with the data:
 
402
        old_fs = FileShelf(self.fsm.old_fs._path)
 
403
        for k, v in self.fsm.fs.iteritems():
 
404
            old_fs[k] = v
 
405
 
 
406
        # start up again, and check
 
407
        db = Tritcask(self.tritcask_path+'.new')
 
408
        self.addCleanup(db.shutdown)
 
409
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
410
                                   self.fsm.vm, db)
 
411
        md_version = open_file(version_file).read()
 
412
        self.assertEqual(md_version, METADATA_VERSION)
 
413
        newmdobj = newfsm.get_by_path(path1)
 
414
        self.assertEqual(newmdobj.mdid, mdid1)
 
415
        self.assertEqual(newmdobj.local_hash, "")
 
416
        self.assertEqual(newmdobj.server_hash, "")
 
417
        self.assertEqual(newmdobj.generation, None)
 
418
        self.assertTrue(isinstance(newmdobj.path, str))
 
419
        # pylint: disable-msg=W0212
 
420
        self.assertEqual(2, len(newfsm._idx_node_id))
 
421
        self.assertTrue(other_share.path in newfsm._idx_path)
 
422
        self.assertFalse(old_path in newfsm._idx_path)
 
423
 
 
424
    @skip_if_win32_and_uses_metadata_older_than_5
 
425
    @defer.inlineCallbacks
 
426
    def test_old_metadata_2(self):
 
427
        """Test old metadata situation, in v2."""
 
428
        # create some stuff
 
429
        path = os.path.join(self.share.path, 'path')
 
430
        mdid = self.fsm.create(path, "share")
 
431
        self.fsm.set_node_id(path, "uuid")
 
432
        # create a path with the old layout
 
433
        other_share = yield self.create_share('share1', u'share1_name')
 
434
        share_mdid = self.fsm.create(other_share.path, "share1")
 
435
        self.fsm.set_node_id(other_share.path, "uuid1")
 
436
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
437
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
438
                            'Shared With Me')
 
439
        old_path = os.path.join(old_shares_path, 'share1_name')
 
440
        make_link(self.shares_dir, old_shares_path)
 
441
 
 
442
        # put the old path in the mdobj
 
443
        share_md = self.fsm.fs[share_mdid]
 
444
        share_md['path'] = old_path
 
445
        self.fsm.fs[share_mdid] = share_md
 
446
 
 
447
        # break the node on purpose, with hashes in None
 
448
        real_mdobj = self.fsm.fs[mdid]
 
449
        real_mdobj["local_hash"] = None
 
450
        real_mdobj["server_hash"] = None
 
451
        del real_mdobj["generation"]
 
452
        self.fsm.fs[mdid] = real_mdobj
 
453
 
 
454
        # put the old version in file
 
455
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
456
        with open_file(version_file, "w") as fh:
 
457
            fh.write("2")
 
458
 
 
459
        # create a old-style fs with the data:
 
460
        old_fs = FileShelf(self.fsm.old_fs._path)
 
461
        for k, v in self.fsm.fs.iteritems():
 
462
            old_fs[k] = v
 
463
 
 
464
        # start up again, and check
 
465
        db = Tritcask(self.tritcask_path+'.new')
 
466
        self.addCleanup(db.shutdown)
 
467
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
468
                                   self.fsm.vm, db)
 
469
        md_version = open_file(version_file).read()
 
470
        self.assertEqual(md_version, METADATA_VERSION)
 
471
        newmdobj = newfsm.get_by_path(path)
 
472
        self.assertEqual(newmdobj.mdid, mdid)
 
473
        self.assertEqual(newmdobj.local_hash, "")
 
474
        self.assertEqual(newmdobj.server_hash, "")
 
475
        self.assertEqual(newmdobj.generation, None)
 
476
        self.assertTrue(isinstance(newmdobj.path, str))
 
477
        # pylint: disable-msg=W0212
 
478
        self.assertEqual(2, len(newfsm._idx_node_id))
 
479
        self.assertTrue(other_share.path in newfsm._idx_path)
 
480
        self.assertFalse(old_path in newfsm._idx_path)
 
481
 
 
482
    @skip_if_win32_and_uses_metadata_older_than_5
 
483
    @defer.inlineCallbacks
 
484
    def test_old_metadata_3(self):
 
485
        """Test old metadata situation, in v3."""
 
486
        # create a path with the old layout and metadata
 
487
        # the root
 
488
        root_mdid = self.fsm.create(self.root_dir, "")
 
489
        self.fsm.set_node_id(self.root_dir, "uuid")
 
490
        # a share
 
491
        other_share = yield self.create_share('share1', u'share1_name')
 
492
        share_mdid = self.fsm.create(other_share.path, "share1")
 
493
        self.fsm.set_node_id(other_share.path, "uuid1")
 
494
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
495
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
496
                            'Shared With Me')
 
497
        old_path = os.path.join(old_shares_path, 'share1_name')
 
498
        make_link(self.shares_dir, old_shares_path)
 
499
        old_root_path = os.path.join(os.path.dirname(self.root_dir),
 
500
                                     'Ubuntu One', 'My Files')
 
501
 
 
502
        # simulate old data in the mdobjs
 
503
        share_md = self.fsm.fs[share_mdid]
 
504
        share_md['path'] = old_path
 
505
        self.fsm.fs[share_mdid] = share_md
 
506
        root_md = self.fsm.fs[root_mdid]
 
507
        root_md['path'] = old_root_path
 
508
        del root_md["generation"]
 
509
        self.fsm.fs[root_mdid] = root_md
 
510
 
 
511
        # put the old version in file
 
512
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
513
        with open_file(version_file, "w") as fh:
 
514
            fh.write("3")
 
515
 
 
516
        # create a old-style fs with the data:
 
517
        old_fs = FileShelf(self.fsm.old_fs._path)
 
518
        for k, v in self.fsm.fs.iteritems():
 
519
            old_fs[k] = v
 
520
 
 
521
        # start up again, and check
 
522
        db = Tritcask(self.tritcask_path+'.new')
 
523
        self.addCleanup(db.shutdown)
 
524
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
525
                                   self.fsm.vm, db)
 
526
        md_version = open_file(version_file).read()
 
527
        self.assertEqual(md_version, METADATA_VERSION)
 
528
        newmdobj = newfsm.get_by_path(other_share.path)
 
529
        self.assertEquals(newmdobj.mdid, share_md['mdid'])
 
530
        self.assertNotEquals(newmdobj.path, share_md['path'])
 
531
 
 
532
        root_dir = os.path.dirname(old_root_path)
 
533
        rootmdobj = newfsm.get_by_path(root_dir)
 
534
        self.assertEqual(rootmdobj.mdid, root_md['mdid'])
 
535
        self.assertEqual(rootmdobj.path, root_dir)
 
536
        self.assertEqual(rootmdobj.generation, None)
 
537
        # pylint: disable-msg=W0212
 
538
        self.assertEqual(2, len(newfsm._idx_node_id))
 
539
        self.assertTrue(other_share.path in newfsm._idx_path)
 
540
        self.assertFalse(old_path in newfsm._idx_path)
 
541
        self.assertTrue(root_dir in newfsm._idx_path)
 
542
        self.assertFalse(old_root_path in newfsm._idx_path)
 
543
 
 
544
    def test_old_metadata_4(self):
 
545
        """Test old metadata situation, in v4."""
 
546
        # create some stuff
 
547
        path = os.path.join(self.share.path, 'path')
 
548
        mdid = self.fsm.create(path, "share")
 
549
        self.fsm.set_node_id(path, "uuid")
 
550
 
 
551
        path_1 = os.path.join(self.share.path, 'path_1')
 
552
        mdid_1 = self.fsm.create(path_1, "share")
 
553
        self.fsm.set_node_id(path_1, "uuid_1")
 
554
 
 
555
        # break the node on purpose, without generation
 
556
        real_mdobj = self.fsm.fs[mdid]
 
557
        del real_mdobj["generation"]
 
558
        self.fsm.fs[mdid] = real_mdobj
 
559
        real_mdobj = self.fsm.fs[mdid_1]
 
560
        del real_mdobj["generation"]
 
561
        self.fsm.fs[mdid_1] = real_mdobj
 
562
 
 
563
        # add a node to the trash
 
564
        self.fsm.delete_to_trash(mdid_1, "parent")
 
565
        # and to the move limbo
 
566
        self.fsm.add_to_move_limbo("share", "uuid_1", "old_parent",
 
567
                                   "new_parent", "new_name", "pfrom", "pto")
 
568
 
 
569
        # put the old version in file
 
570
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
571
        with open_file(version_file, "w") as fh:
 
572
            fh.write("4")
 
573
 
 
574
        # create a old-style fs with the data:
 
575
        old_fs = FileShelf(self.fsm.old_fs._path)
 
576
        for k, v in self.fsm.fs.iteritems():
 
577
            old_fs[k] = v
 
578
        # create a old-style trash
 
579
        old_trash = TrashFileShelf(self.fsm._trash_dir)
 
580
        for k, v in self.fsm.trash.iteritems():
 
581
            old_trash[k] = v
 
582
        # create a old-style move_limbo
 
583
        old_mvlimbo = TrashFileShelf(self.fsm._movelimbo_dir)
 
584
        for k, v in self.fsm.move_limbo.iteritems():
 
585
            old_mvlimbo[k] = v
 
586
 
 
587
        # start up again, and check
 
588
        db = Tritcask(self.tritcask_path+'.new')
 
589
        self.addCleanup(db.shutdown)
 
590
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
591
                                   self.fsm.vm, db)
 
592
        md_version = open_file(version_file).read()
 
593
        self.assertEqual(md_version, METADATA_VERSION)
 
594
        newmdobj = newfsm.get_by_path(path)
 
595
        self.assertEqual(newmdobj.mdid, mdid)
 
596
        self.assertEqual(newmdobj.generation, None)
 
597
        # check that the trash is the same:
 
598
        self.assertEqual(self.fsm.trash,
 
599
                         {("share", "uuid_1"):
 
600
                                (mdid_1, "parent", path_1, False)})
 
601
        self.assertEqual(list(self.fsm.get_iter_trash()),
 
602
                         [("share", "uuid_1", "parent", path_1, False)])
 
603
        # check the move limbo
 
604
        expected = [(("share", "uuid_1"),
 
605
                    ("old_parent", "new_parent", "new_name", "pfrom", "pto"))]
 
606
        self.assertEquals(expected, self.fsm.move_limbo.items())
 
607
        r = [("share", "uuid_1", "old_parent", "new_parent",
 
608
              "new_name", "pfrom", "pto")]
 
609
        self.assertEqual(list(self.fsm.get_iter_move_limbo()), r)
 
610
 
 
611
    def test_old_metadata_5(self):
 
612
        """Test old metadata situation, in v5."""
 
613
        # create some stuff
 
614
        path = os.path.join(self.share.path, 'path')
 
615
        mdid = self.fsm.create(path, "share")
 
616
        self.fsm.set_node_id(path, "uuid")
 
617
 
 
618
        path_1 = os.path.join(self.share.path, 'path_1')
 
619
        mdid_1 = self.fsm.create(path_1, "share")
 
620
        self.fsm.set_node_id(path_1, "uuid_1")
 
621
 
 
622
        # add a node to the trash
 
623
        self.fsm.delete_to_trash(mdid_1, "parent")
 
624
        # and to the move limbo
 
625
        self.fsm.add_to_move_limbo("share", "uuid_1", "old_parent",
 
626
                                   "new_parent", "new_name", "pfrom", "pto")
 
627
 
 
628
        # put the old version in file
 
629
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
630
        with open_file(version_file, "w") as fh:
 
631
            fh.write("4")
 
632
 
 
633
        # create a old-style fs with the data:
 
634
        old_fs = FileShelf(self.fsm.old_fs._path)
 
635
        for k, v in self.fsm.fs.iteritems():
 
636
            old_fs[k] = v
 
637
        # create a old-style trash
 
638
        old_trash = TrashFileShelf(self.fsm._trash_dir)
 
639
        for k, v in self.fsm.trash.iteritems():
 
640
            old_trash[k] = v
 
641
        # create a old-style move_limbo
 
642
        old_mvlimbo = TrashFileShelf(self.fsm._movelimbo_dir)
 
643
        for k, v in self.fsm.move_limbo.iteritems():
 
644
            old_mvlimbo[k] = v
 
645
 
 
646
        # start up again, and check
 
647
        db = Tritcask(self.tritcask_path+'.new')
 
648
        self.addCleanup(db.shutdown)
 
649
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
650
                                   self.fsm.vm, db)
 
651
        md_version = open_file(version_file).read()
 
652
        self.assertEqual(md_version, METADATA_VERSION)
 
653
        newmdobj = newfsm.get_by_path(path)
 
654
        self.assertEqual(newmdobj.share_id, 'share')
 
655
        self.assertEqual(newmdobj.mdid, mdid)
 
656
        self.assertEqual(newmdobj.generation, None)
 
657
        # check that the trash is the same:
 
658
        self.assertEqual(self.fsm.trash,
 
659
                         {("share", "uuid_1"):
 
660
                                (mdid_1, "parent", path_1, False)})
 
661
        self.assertEqual(list(self.fsm.get_iter_trash()),
 
662
                         [("share", "uuid_1", "parent", path_1, False)])
 
663
        # check the move limbo
 
664
        expected = [(("share", "uuid_1"),
 
665
                    ("old_parent", "new_parent", "new_name", "pfrom", "pto"))]
 
666
        self.assertEquals(expected, self.fsm.move_limbo.items())
 
667
        r = [("share", "uuid_1", "old_parent", "new_parent",
 
668
              "new_name", "pfrom", "pto")]
 
669
        self.assertEqual(list(self.fsm.get_iter_move_limbo()), r)
 
670
 
 
671
    @skip_if_win32_and_uses_metadata_older_than_5
 
672
    def test_old_metadata_None_broken_pickle_without_backup(self):
 
673
        """Test old metadata situation, in None with broken metadata values."""
 
674
        # create some stuff
 
675
        path = os.path.join(self.share.path, 'path')
 
676
        path1 = os.path.join(self.share.path, 'path1')
 
677
        path2 = os.path.join(self.share.path, 'path2')
 
678
        for p in [path, path1, path2]:
 
679
            open_file(p, "w").close()
 
680
        mdid = self.fsm.create(path, "share", node_id='uuid')
 
681
        mdid1 = self.fsm.create(path1, "share", node_id='uuid1')
 
682
        mdid2 = self.fsm.create(path2, "share", node_id='uuid2')
 
683
 
 
684
        # create a old-style fs with the data:
 
685
        old_fs = FileShelf(self.fsm.old_fs._path)
 
686
        for k, v in self.fsm.fs.iteritems():
 
687
            old_fs[k] = v
 
688
 
 
689
        # break the node on purpose
 
690
        with open_file(old_fs.key_file(mdid1), 'w') as f:
 
691
            f.write(BROKEN_PICKLE)
 
692
            os.fsync(f.fileno())
 
693
 
 
694
        #break the node by creating a 0 byte pickle
 
695
        with open_file(old_fs.key_file(mdid2), 'w') as f:
 
696
            os.fsync(f.fileno())
 
697
 
 
698
        # delete the version that should have left the previous fsm
 
699
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
700
        remove_file(version_file)
 
701
 
 
702
        # start up again, and check
 
703
        db = Tritcask(self.tritcask_path+'.new')
 
704
        self.addCleanup(db.shutdown)
 
705
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
706
                                   self.fsm.vm, db)
 
707
        md_version = open_file(version_file).read()
 
708
        self.assertEqual(md_version, METADATA_VERSION)
 
709
        self.assertTrue(newfsm.get_by_mdid(mdid) is not None)
 
710
        self.assertRaises(KeyError, newfsm.get_by_mdid, mdid1)
 
711
        self.assertRaises(KeyError, newfsm.get_by_mdid, mdid2)
 
712
 
 
713
    @skip_if_win32_and_uses_metadata_older_than_5
 
714
    def test_old_metadata_1_broken_pickle_without_backup(self):
 
715
        """Test old metadata situation, in v1 with broken metadata values."""
 
716
        # create some stuff
 
717
        path = os.path.join(self.share.path, 'path')
 
718
        path1 = os.path.join(self.share.path, 'path1')
 
719
        path2 = os.path.join(self.share.path, 'path2')
 
720
        mdid = self.fsm.create(path, "share", node_id='uuid')
 
721
        mdid1 = self.fsm.create(path1, "share", node_id='uuid1')
 
722
        mdid2 = self.fsm.create(path2, "share", node_id='uuid2')
 
723
 
 
724
        # break the node on purpose
 
725
        real_mdobj = self.fsm.fs[mdid]
 
726
        real_mdobj["path"] = unicode(real_mdobj["path"])
 
727
        real_mdobj["local_hash"] = None
 
728
        real_mdobj["server_hash"] = None
 
729
        self.fsm.fs[mdid] = real_mdobj
 
730
 
 
731
        # create a old-style fs with the data:
 
732
        old_fs = FileShelf(self.fsm.old_fs._path)
 
733
        for k, v in self.fsm.fs.iteritems():
 
734
            old_fs[k] = v
 
735
 
 
736
        # break the second node on purpose but with an invalid pickle
 
737
        with open_file(old_fs.key_file(mdid1), 'w') as f:
 
738
            f.write(BROKEN_PICKLE)
 
739
            os.fsync(f.fileno())
 
740
        #break the third node by creating a 0 byte pickle
 
741
        with open_file(old_fs.key_file(mdid2), 'w') as f:
 
742
            os.fsync(f.fileno())
 
743
 
 
744
        # put the version file in 1
 
745
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
746
        with open_file(version_file, "w") as fh:
 
747
            fh.write("1")
 
748
 
 
749
        # start up again, and check
 
750
        db = Tritcask(self.tritcask_path+'.new')
 
751
        self.addCleanup(db.shutdown)
 
752
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
753
                                   self.fsm.vm, db)
 
754
        md_version = open_file(version_file).read()
 
755
        self.assertEqual(md_version, METADATA_VERSION)
 
756
        newmdobj = newfsm.get_by_path(path)
 
757
        self.assertEqual(newmdobj.mdid, mdid)
 
758
        self.assertEqual(newmdobj.local_hash, "")
 
759
        self.assertEqual(newmdobj.server_hash, "")
 
760
        self.assertTrue(isinstance(newmdobj.path, str))
 
761
        self.assertRaises(KeyError, newfsm.get_by_mdid, mdid1)
 
762
        self.assertRaises(KeyError, newfsm.get_by_mdid, mdid2)
 
763
        # pylint: disable-msg=W0212
 
764
        self.assertEqual(1, len(newfsm._idx_node_id))
 
765
 
 
766
    @skip_if_win32_and_uses_metadata_older_than_5
 
767
    def test_old_metadata_2_broken_pickle_without_backup(self):
 
768
        """Test old metadata situation, in v2 with broken metadata values."""
 
769
        # create some stuff
 
770
        path = os.path.join(self.share.path, 'path')
 
771
        path1 = os.path.join(self.share.path, 'path1')
 
772
        path2 = os.path.join(self.share.path, 'path2')
 
773
        for p in [path, path1, path2]:
 
774
            open_file(p, "w").close()
 
775
        mdid = self.fsm.create(path, "share", node_id='uuid')
 
776
        mdid1 = self.fsm.create(path1, "share", node_id='uuid1')
 
777
        mdid2 = self.fsm.create(path2, "share", node_id='uuid2')
 
778
 
 
779
        # break the node on purpose, with hashes in None
 
780
        real_mdobj = self.fsm.fs[mdid]
 
781
        real_mdobj["local_hash"] = None
 
782
        real_mdobj["server_hash"] = None
 
783
        self.fsm.fs[mdid] = real_mdobj
 
784
 
 
785
        # create a old-style fs with the data:
 
786
        old_fs = FileShelf(self.fsm.old_fs._path)
 
787
        for k, v in self.fsm.fs.iteritems():
 
788
            old_fs[k] = v
 
789
 
 
790
        # put the version file in 1
 
791
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
792
        with open_file(version_file, "w") as fh:
 
793
            fh.write("2")
 
794
        # break the second node on purpose but with an invalid pickle
 
795
        with open_file(old_fs.key_file(mdid1), 'w') as f:
 
796
            f.write(BROKEN_PICKLE)
 
797
            os.fsync(f.fileno())
 
798
        #break the third node by creating a 0 byte pickle
 
799
        with open_file(old_fs.key_file(mdid2), 'w') as f:
 
800
            os.fsync(f.fileno())
 
801
 
 
802
        # start up again, and check
 
803
        db = Tritcask(self.tritcask_path+'.new')
 
804
        self.addCleanup(db.shutdown)
 
805
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
806
                                   self.fsm.vm, db)
 
807
        md_version = open_file(version_file).read()
 
808
        self.assertEqual(md_version, METADATA_VERSION)
 
809
        newmdobj = newfsm.get_by_path(path)
 
810
        self.assertEqual(newmdobj.mdid, mdid)
 
811
        self.assertEqual(newmdobj.local_hash, "")
 
812
        self.assertEqual(newmdobj.server_hash, "")
 
813
        self.assertTrue(isinstance(newmdobj.path, str))
 
814
        self.assertRaises(KeyError, newfsm.get_by_mdid, mdid1)
 
815
        self.assertRaises(KeyError, newfsm.get_by_mdid, mdid2)
 
816
        # pylint: disable-msg=W0212
 
817
        self.assertEqual(1, len(newfsm._idx_node_id))
 
818
 
 
819
    @skip_if_win32_and_uses_metadata_older_than_5
 
820
    def test_old_metadata_None_broken_pickle_with_backup(self):
 
821
        """Test old metadata situation, in None with broken metadata values."""
 
822
        # create some stuff
 
823
        path = os.path.join(self.share.path, 'path')
 
824
        path1 = os.path.join(self.share.path, 'path1')
 
825
        path2 = os.path.join(self.share.path, 'path2')
 
826
        for p in [path, path1, path2]:
 
827
            open_file(p, "w").close()
 
828
        mdid = self.fsm.create(path, "share")
 
829
        self.fsm.set_node_id(path, "uuid")
 
830
        mdid1 = self.fsm.create(path1, "share")
 
831
        self.fsm.set_node_id(path1, "uuid1")
 
832
        mdid2 = self.fsm.create(path2, "share")
 
833
        self.fsm.set_node_id(path2, "uuid2")
 
834
 
 
835
        # create a old-style fs with the data:
 
836
        old_fs = FileShelf(self.fsm.old_fs._path)
 
837
        for k, v in self.fsm.fs.iteritems():
 
838
            old_fs[k] = v
 
839
        # fake version 2 with a backup
 
840
        mdobj = old_fs[mdid1]
 
841
        mdobj['node_id'] = None
 
842
        old_fs[mdid1] = mdobj
 
843
        old_fs[mdid1] = mdobj
 
844
        mdobj = old_fs[mdid2]
 
845
        mdobj['node_id'] = None
 
846
        old_fs[mdid2] = mdobj
 
847
        old_fs[mdid2] = mdobj
 
848
        # break the node on purpose
 
849
        with open_file(old_fs.key_file(mdid1), 'w') as f:
 
850
            f.write(BROKEN_PICKLE)
 
851
            os.fsync(f.fileno())
 
852
 
 
853
        #break the node by creating a 0 byte pickle
 
854
        with open_file(old_fs.key_file(mdid2), 'w') as f:
 
855
            os.fsync(f.fileno())
 
856
 
 
857
        # delete the version that should have left the previous fsm
 
858
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
859
        remove_file(version_file)
 
860
 
 
861
        # start up again, and check
 
862
        db = Tritcask(self.tritcask_path+'.new')
 
863
        self.addCleanup(db.shutdown)
 
864
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
865
                                   self.fsm.vm, db)
 
866
        md_version = open_file(version_file).read()
 
867
        self.assertEqual(md_version, METADATA_VERSION)
 
868
        self.assertTrue(newfsm.get_by_mdid(mdid) is not None)
 
869
        # pylint: disable-msg=W0212
 
870
        self.assertEqual(1, len(newfsm._idx_node_id))
 
871
        self.assertEqual(3, len(newfsm._idx_path))
 
872
        # check that the broken mdid's load the old metadata
 
873
        self.assertEquals(None, newfsm.get_by_mdid(mdid1).node_id)
 
874
        self.assertEquals(None, newfsm.get_by_mdid(mdid2).node_id)
 
875
 
 
876
    @skip_if_win32_and_uses_metadata_older_than_5
 
877
    def test_old_metadata_1_broken_pickle_with_backup(self):
 
878
        """Test old metadata situation, in v1 with broken metadata values."""
 
879
        # create some stuff
 
880
        path = os.path.join(self.share.path, 'path')
 
881
        path1 = os.path.join(self.share.path, 'path1')
 
882
        path2 = os.path.join(self.share.path, 'path2')
 
883
        mdid = self.fsm.create(path, "share")
 
884
        self.fsm.set_node_id(path, "uuid")
 
885
        mdid1 = self.fsm.create(path1, "share")
 
886
        self.fsm.set_node_id(path1, "uuid1")
 
887
        mdid2 = self.fsm.create(path2, "share")
 
888
        self.fsm.set_node_id(path2, "uuid2")
 
889
 
 
890
        # break the node on purpose
 
891
        real_mdobj = self.fsm.fs[mdid]
 
892
        real_mdobj["path"] = unicode(real_mdobj["path"])
 
893
        real_mdobj["local_hash"] = None
 
894
        real_mdobj["server_hash"] = None
 
895
        self.fsm.fs[mdid] = real_mdobj
 
896
 
 
897
        # create a old-style fs with the data:
 
898
        old_fs = FileShelf(self.fsm.old_fs._path)
 
899
        for k, v in self.fsm.fs.iteritems():
 
900
            old_fs[k] = v
 
901
        # fake version 2 with a backup
 
902
        mdobj = old_fs[mdid1]
 
903
        mdobj['node_id'] = None
 
904
        old_fs[mdid1] = mdobj
 
905
        old_fs[mdid1] = mdobj
 
906
        mdobj = old_fs[mdid2]
 
907
        mdobj['node_id'] = None
 
908
        old_fs[mdid2] = mdobj
 
909
        old_fs[mdid2] = mdobj
 
910
        # break the second node on purpose but with an invalid pickle
 
911
        with open_file(old_fs.key_file(mdid1), 'w') as f:
 
912
            f.write(BROKEN_PICKLE)
 
913
            os.fsync(f.fileno())
 
914
        #break the third node by creating a 0 byte pickle
 
915
        with open_file(old_fs.key_file(mdid2), 'w') as f:
 
916
            os.fsync(f.fileno())
 
917
 
 
918
        # put the version file in 1
 
919
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
920
        with open_file(version_file, "w") as fh:
 
921
            fh.write("1")
 
922
 
 
923
        # start up again, and check
 
924
        db = Tritcask(self.tritcask_path+'.new')
 
925
        self.addCleanup(db.shutdown)
 
926
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
927
                                   self.fsm.vm, db)
 
928
        md_version = open_file(version_file).read()
 
929
        self.assertEqual(md_version, METADATA_VERSION)
 
930
        newmdobj = newfsm.get_by_path(path)
 
931
        self.assertEqual(newmdobj.mdid, mdid)
 
932
        self.assertEqual(newmdobj.local_hash, "")
 
933
        self.assertEqual(newmdobj.server_hash, "")
 
934
        self.assertTrue(isinstance(newmdobj.path, str))
 
935
        # pylint: disable-msg=W0212
 
936
        self.assertEqual(1, len(newfsm._idx_node_id))
 
937
        self.assertEqual(3, len(newfsm._idx_path))
 
938
        # check that the broken mdid's load the old metadata
 
939
        self.assertEquals(None, newfsm.get_by_mdid(mdid1).node_id)
 
940
        self.assertEquals(None, newfsm.get_by_mdid(mdid2).node_id)
 
941
 
 
942
    @skip_if_win32_and_uses_metadata_older_than_5
 
943
    def test_old_metadata_2_broken_pickle_with_backup(self):
 
944
        """Test old metadata situation, in v2 with broken metadata values."""
 
945
        # create some stuff
 
946
        path = os.path.join(self.share.path, 'path')
 
947
        path1 = os.path.join(self.share.path, 'path1')
 
948
        path2 = os.path.join(self.share.path, 'path2')
 
949
        for p in [path, path1, path2]:
 
950
            open_file(p, "w").close()
 
951
        mdid = self.fsm.create(path, "share")
 
952
        self.fsm.set_node_id(path, "uuid")
 
953
        mdid1 = self.fsm.create(path1, "share")
 
954
        self.fsm.set_node_id(path1, "uuid1")
 
955
        mdid2 = self.fsm.create(path2, "share")
 
956
        self.fsm.set_node_id(path2, "uuid2")
 
957
 
 
958
        # break the node on purpose, with hashes in None
 
959
        real_mdobj = self.fsm.fs[mdid]
 
960
        real_mdobj["local_hash"] = None
 
961
        real_mdobj["server_hash"] = None
 
962
        self.fsm.fs[mdid] = real_mdobj
 
963
 
 
964
        # put the version file in 1
 
965
        version_file = os.path.join(self.fsmdir, "metadata_version")
 
966
        with open_file(version_file, "w") as fh:
 
967
            fh.write("2")
 
968
 
 
969
        # create a old-style fs with the data
 
970
        old_fs = FileShelf(self.fsm.old_fs._path)
 
971
        for k, v in self.fsm.fs.iteritems():
 
972
            old_fs[k] = v
 
973
 
 
974
        # fake version 2 with a backup
 
975
        mdobj = old_fs[mdid1]
 
976
        mdobj['node_id'] = None
 
977
        old_fs[mdid1] = mdobj
 
978
        old_fs[mdid1] = mdobj
 
979
        mdobj = old_fs[mdid2]
 
980
        mdobj['node_id'] = None
 
981
        old_fs[mdid2] = mdobj
 
982
        old_fs[mdid2] = mdobj
 
983
 
 
984
        # break the second node on purpose but with an invalid pickle
 
985
        with open_file(old_fs.key_file(mdid1), 'w') as f:
 
986
            f.write(BROKEN_PICKLE)
 
987
            os.fsync(f.fileno())
 
988
        #break the third node by creating a 0 byte pickle
 
989
        with open_file(old_fs.key_file(mdid2), 'w') as f:
 
990
            os.fsync(f.fileno())
 
991
 
 
992
        # start up again, and check
 
993
        db = Tritcask(self.tritcask_path+'.new')
 
994
        self.addCleanup(db.shutdown)
 
995
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
996
                                   self.fsm.vm, db)
 
997
        md_version = open_file(version_file).read()
 
998
        self.assertEqual(md_version, METADATA_VERSION)
 
999
        newmdobj = newfsm.get_by_path(path)
 
1000
        self.assertEqual(newmdobj.mdid, mdid)
 
1001
        self.assertEqual(newmdobj.local_hash, "")
 
1002
        self.assertEqual(newmdobj.server_hash, "")
 
1003
        self.assertTrue(isinstance(newmdobj.path, str))
 
1004
        # pylint: disable-msg=W0212
 
1005
        self.assertEqual(1, len(newfsm._idx_node_id))
 
1006
        self.assertEqual(3, len(newfsm._idx_path))
 
1007
        # check that the broken mdid's load the old metadata
 
1008
        self.assertEquals(None, newfsm.get_by_mdid(mdid1).node_id)
 
1009
        self.assertEquals(None, newfsm.get_by_mdid(mdid2).node_id)
 
1010
 
 
1011
    def test_current_metadata_phantom_node_older(self):
 
1012
        """Test current metadata with a phantom node."""
 
1013
        # create some stuff
 
1014
        path = os.path.join(self.share.path, 'path')
 
1015
        mdid = self.fsm.create(path, "share")
 
1016
        self.fsm.set_node_id(path, "uuid")
 
1017
        # keep the mdobj around
 
1018
        mdobj = self.fsm.fs[mdid]
 
1019
        # delete it from metadata and add it again to fake a phantom node.
 
1020
        self.fsm.delete_metadata(path)
 
1021
        self.fsm.fs[mdid] = mdobj
 
1022
 
 
1023
        # create a new node with the same path
 
1024
        mdid_1 = self.fsm.create(path, "share")
 
1025
        self.fsm.set_node_id(path, "uuid_1")
 
1026
 
 
1027
        # shutdown old self.db so files are closed
 
1028
        self.db.shutdown()
 
1029
 
 
1030
        # start up again, and check
 
1031
        self.db = Tritcask(self.tritcask_path)
 
1032
        self.addCleanup(self.db.shutdown)
 
1033
 
 
1034
        # patch this tritcask instance to return the keys ordered by tstamp
 
1035
        # (reversed), in order to make this test deterministic.
 
1036
        def rsorted_keys():
 
1037
            """Custom keys function to sort in reverse order by tstamp."""
 
1038
            return [k for k, _ in sorted(self.db._keydir.items(),
 
1039
                            key=lambda v: v[1].tstamp, reverse=True)]
 
1040
        self.patch(self.db, 'keys', rsorted_keys)
 
1041
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
1042
                                   self.fsm.vm, self.db)
 
1043
        newmdobj = newfsm.get_by_path(path)
 
1044
        # check that the mdobj is the new one.
 
1045
        self.assertEqual(newmdobj.mdid, mdid_1)
 
1046
        self.assertNotEqual(newmdobj.mdid, mdid)
 
1047
        self.assertEqual(1, len(self.db.keys()))
 
1048
        self.handler.check_warning("Path already in the index: %s" % (path))
 
1049
        self.handler.check_debug("Replacing and deleting node %s witth newer "
 
1050
                                 "node: %s" % (mdid, mdid_1))
 
1051
 
 
1052
    def test_current_metadata_phantom_node_newer(self):
 
1053
        """Test current metadata with a phantom node."""
 
1054
        # create some stuff
 
1055
        path = os.path.join(self.share.path, 'path')
 
1056
        mdid = self.fsm.create(path, "share")
 
1057
        self.fsm.set_node_id(path, "uuid")
 
1058
        # keep the mdobj around
 
1059
        mdobj = self.fsm.fs[mdid]
 
1060
        # delete it from metadata and add it again to fake a phantom node.
 
1061
        self.fsm.delete_metadata(path)
 
1062
        self.fsm.fs[mdid] = mdobj
 
1063
        # verterok: hack to ensure time moves forward
 
1064
        time.sleep(.1)
 
1065
 
 
1066
        # create a new node with the same path
 
1067
        mdid_1 = self.fsm.create(path, "share")
 
1068
        self.fsm.set_node_id(path, "uuid_1")
 
1069
 
 
1070
        # shutdown old self.db so files are closed
 
1071
        self.db.shutdown()
 
1072
 
 
1073
        # start up again, and check
 
1074
        self.db = Tritcask(self.tritcask_path)
 
1075
        self.addCleanup(self.db.shutdown)
 
1076
 
 
1077
        # patch this tritcask instance to return the keys ordered by tstamp
 
1078
        # (reversed), in order to make this test deterministic.
 
1079
        def sorted_keys():
 
1080
            """Custom keys function to sort ordered by tstamp."""
 
1081
            return [k for k, _ in sorted(self.db._keydir.items(),
 
1082
                            key=lambda v: v[1].tstamp, reverse=False)]
 
1083
        self.patch(self.db, 'keys', sorted_keys)
 
1084
        newfsm = FileSystemManager(self.fsmdir, self.partials_dir,
 
1085
                                   self.fsm.vm, self.db)
 
1086
        newmdobj = newfsm.get_by_path(path)
 
1087
 
 
1088
        # check that the mdobj is the new one.
 
1089
        self.assertEqual(newmdobj.mdid, mdid_1)
 
1090
        self.assertNotEqual(newmdobj.mdid, mdid)
 
1091
        self.assertEqual(1, len(self.db.keys()))
 
1092
        self.handler.check_warning("Path already in the index: %s" % (path))
 
1093
        self.handler.check_debug("The node: %s is newer than: %s, "
 
1094
                                 "leaving it alone and deleting the old one.",
 
1095
                                 mdid, mdid_1)
 
1096
 
 
1097
 
 
1098
class GetSetTests(FSMTestCase):
 
1099
    """Test the get/set interface."""
 
1100
 
 
1101
    def test_bad_data(self):
 
1102
        """No such info is allowed as path."""
 
1103
        self.assertRaises(ValueError, self.fsm.create, "", "share")
 
1104
        self.assertRaises(ValueError, self.fsm.create, " ", "share")
 
1105
 
 
1106
    def test_basic(self):
 
1107
        """Test basic retrieval."""
 
1108
        # pylint: disable-msg=W0212
 
1109
        # write some data
 
1110
        path1 = os.path.join(self.share_path, "path1")
 
1111
        newmdid = self.fsm.create(path1, "share")
 
1112
        mdid1 = self.fsm._idx_path[path1]
 
1113
        self.fsm.set_node_id(path1, "uuid1")
 
1114
        self.assertEqual(newmdid, mdid1)
 
1115
 
 
1116
        mdobj = self.fsm.get_by_mdid(mdid1)
 
1117
        self.assertEqual(mdobj.node_id, "uuid1")
 
1118
        self.assertEqual(mdobj.path, "path1")
 
1119
        self.assertEqual(mdobj.share_id, "share")
 
1120
        self.assertEqual(mdobj.local_hash, "")
 
1121
        self.assertEqual(mdobj.server_hash, "")
 
1122
        self.assertEqual(mdobj.info.is_partial, False)
 
1123
        self.assertEqual(mdobj.is_dir, False)
 
1124
        self.assertEqual(mdobj.mdid, mdid1)
 
1125
        self.assertEqual(self.fsm.get_by_node_id("share", "uuid1"), mdobj)
 
1126
        self.assertEqual(self.fsm.get_by_path(path1), mdobj)
 
1127
 
 
1128
        # write more data
 
1129
        path2 = os.path.join(self.share_path, "path2")
 
1130
        newmdid = self.fsm.create(path2, "share", is_dir=True)
 
1131
        mdid2 = self.fsm._idx_path[path2]
 
1132
        self.fsm.set_node_id(path2, "uuid2")
 
1133
        self.assertEqual(newmdid, mdid2)
 
1134
 
 
1135
        # check that is not mixed
 
1136
        mdobj = self.fsm.get_by_mdid(mdid1)
 
1137
        self.assertEqual(mdobj.node_id, "uuid1")
 
1138
        self.assertEqual(mdobj.path, "path1")
 
1139
        self.assertEqual(mdobj.share_id, "share")
 
1140
        self.assertEqual(mdobj.mdid, mdid1)
 
1141
        self.assertEqual(mdobj.is_dir, False)
 
1142
        self.assertEqual(self.fsm.get_by_node_id("share", "uuid1"), mdobj)
 
1143
        self.assertEqual(self.fsm.get_by_path(path1), mdobj)
 
1144
        mdobj = self.fsm.get_by_mdid(mdid2)
 
1145
        self.assertEqual(mdobj.node_id, "uuid2")
 
1146
        self.assertEqual(mdobj.path, "path2")
 
1147
        self.assertEqual(mdobj.share_id, "share")
 
1148
        self.assertEqual(mdobj.mdid, mdid2)
 
1149
        self.assertEqual(mdobj.is_dir, True)
 
1150
        self.assertEqual(self.fsm.get_by_node_id("share", "uuid2"), mdobj)
 
1151
        self.assertEqual(self.fsm.get_by_path(path2), mdobj)
 
1152
 
 
1153
    def test_iteration(self):
 
1154
        """Test basic retrieval."""
 
1155
        # create a few objects
 
1156
        mdids = []
 
1157
        path_names = "path1 path2 path3".split()
 
1158
        paths = []
 
1159
        for path in path_names:
 
1160
            path = os.path.join(self.share.path, path)
 
1161
            paths.append(path)
 
1162
            mdid = self.fsm.create(path, "share")
 
1163
            mdids.append(mdid)
 
1164
        mdids.sort()
 
1165
 
 
1166
        # get them
 
1167
        retrieved_mdids = []
 
1168
        retrieved_paths = []
 
1169
        for mdobj in self.fsm.get_mdobjs_by_share_id("share"):
 
1170
            retrieved_mdids.append(mdobj.mdid)
 
1171
            retrieved_paths.append(mdobj.path)
 
1172
        retrieved_mdids.sort()
 
1173
        retrieved_paths.sort()
 
1174
 
 
1175
        # check them
 
1176
        self.assertEqual(mdids, retrieved_mdids)
 
1177
        self.assertEqual(path_names, retrieved_paths)
 
1178
 
 
1179
    def test_getacopy(self):
 
1180
        """Test that we receive only a copy."""
 
1181
        # write some data
 
1182
        path = os.path.join(self.share_path, "path")
 
1183
        mdid = self.fsm.create(path, "share")
 
1184
        self.fsm.set_node_id(path, "uuid")
 
1185
        self.fsm.set_by_mdid(mdid, newarg="foo")
 
1186
 
 
1187
        # test getting a copy with mdid
 
1188
        d = self.fsm.get_by_mdid(mdid)
 
1189
        # XXX http://bugs.launchpad.net/bugs/400331
 
1190
        d.newarg = "bar"
 
1191
        d = self.fsm.get_by_mdid(mdid)
 
1192
        self.assertEqual(d.newarg, "foo")
 
1193
 
 
1194
        # test getting a copy with uuid
 
1195
        d = self.fsm.get_by_node_id("share", "uuid")
 
1196
        d.newarg = "bar"
 
1197
        d = self.fsm.get_by_node_id("share", "uuid")
 
1198
        self.assertEqual(d.newarg, "foo")
 
1199
 
 
1200
        # test getting a copy with path
 
1201
        d = self.fsm.get_by_path(path)
 
1202
        d.newarg = "bar"
 
1203
        d = self.fsm.get_by_path(path)
 
1204
        self.assertEqual(d.newarg, "foo")
 
1205
 
 
1206
    def test_get_raises(self):
 
1207
        """Test that we get an exception if the object is not there."""
 
1208
        # write some data
 
1209
        path = os.path.join(self.share_path, "path")
 
1210
        mdid = self.fsm.create(path, "share")
 
1211
        self.fsm.set_node_id(path, "uuid")
 
1212
 
 
1213
        # with mdid, ok and bad
 
1214
        self.fsm.get_by_mdid(mdid)
 
1215
        self.assertRaises(KeyError, self.fsm.get_by_mdid, "no-such-key")
 
1216
 
 
1217
        # with uuid, ok and bad
 
1218
        self.fsm.get_by_node_id("share", "uuid")
 
1219
        self.assertRaises(KeyError, self.fsm.get_by_node_id,
 
1220
                          "share", "no-such-key")
 
1221
        self.assertRaises(ValueError, self.fsm.get_by_node_id,
 
1222
                          "share", None)
 
1223
 
 
1224
        # with path, ok and bad
 
1225
        self.fsm.get_by_path(path)
 
1226
        self.assertRaises(KeyError, self.fsm.get_by_path, "no-such-key")
 
1227
 
 
1228
    def test_setters_simple(self):
 
1229
        """Test that setters work."""
 
1230
        # create some data
 
1231
        path = os.path.join(self.share.path, 'path')
 
1232
        mdid = self.fsm.create(path, "share")
 
1233
        self.fsm.set_node_id(path, "uuid")
 
1234
 
 
1235
        # test using mdid
 
1236
        self.fsm.set_by_mdid(mdid, foo="foo1")
 
1237
        self.fsm.set_by_mdid(mdid, bar="bar1", baz="baz1")
 
1238
        mdobj = self.fsm.get_by_mdid(mdid)
 
1239
        self.assertEqual(mdobj.foo, "foo1")
 
1240
        self.assertEqual(mdobj.bar, "bar1")
 
1241
        self.assertEqual(mdobj.baz, "baz1")
 
1242
 
 
1243
        # test using uuid
 
1244
        self.assertRaises(ValueError, self.fsm.set_by_node_id, None, "sh", j=3)
 
1245
        self.fsm.set_by_node_id("uuid", "share", foo="foo2")
 
1246
        self.fsm.set_by_node_id("uuid", "share", bar="bar2", baz="baz2")
 
1247
        mdobj = self.fsm.get_by_node_id("share", "uuid")
 
1248
        self.assertEqual(mdobj.foo, "foo2")
 
1249
        self.assertEqual(mdobj.bar, "bar2")
 
1250
        self.assertEqual(mdobj.baz, "baz2")
 
1251
 
 
1252
        # test using path
 
1253
        self.fsm.set_by_path(path, foo="foo3")
 
1254
        self.fsm.set_by_path(path, bar="bar3", baz="baz3")
 
1255
        mdobj = self.fsm.get_by_path(path)
 
1256
        self.assertEqual(mdobj.foo, "foo3")
 
1257
        self.assertEqual(mdobj.bar, "bar3")
 
1258
        self.assertEqual(mdobj.baz, "baz3")
 
1259
 
 
1260
    def test_setters_mixed(self):
 
1261
        """Test the setters using different combinations."""
 
1262
        # create some data
 
1263
        path = os.path.join(self.share.path, 'path')
 
1264
        mdid = self.fsm.create(path, "share")
 
1265
        self.fsm.set_node_id(path, "uuid")
 
1266
 
 
1267
        # set with mdid, get with uuid and path
 
1268
        self.fsm.set_by_mdid(mdid, foo="foo1")
 
1269
        self.assertEqual(self.fsm.get_by_node_id("share", "uuid").foo, "foo1")
 
1270
        self.assertEqual(self.fsm.get_by_path(path).foo, "foo1")
 
1271
 
 
1272
        # set with uuid, get with mdid and path
 
1273
        self.fsm.set_by_node_id("uuid", "share", foo="foo2")
 
1274
        self.assertEqual(self.fsm.get_by_mdid(mdid).foo, "foo2")
 
1275
        self.assertEqual(self.fsm.get_by_path(path).foo, "foo2")
 
1276
 
 
1277
        # set with path, get with uuid and mdid
 
1278
        self.fsm.set_by_path(path, foo="foo3")
 
1279
        self.assertEqual(self.fsm.get_by_node_id("share", "uuid").foo, "foo3")
 
1280
        self.assertEqual(self.fsm.get_by_mdid(mdid).foo, "foo3")
 
1281
 
 
1282
    def test_setters_raises(self):
 
1283
        """Test that setters raise ok."""
 
1284
        # create some data
 
1285
        path = os.path.join(self.share.path, 'path')
 
1286
        self.fsm.create(path, "share")
 
1287
        self.fsm.set_node_id(path, "uuid")
 
1288
 
 
1289
        # test with bad values
 
1290
        self.assertRaises(KeyError, self.fsm.get_by_node_id, "share", "bad")
 
1291
        self.assertRaises(KeyError, self.fsm.get_by_mdid, "bad-value")
 
1292
        self.assertRaises(KeyError, self.fsm.get_by_path, "bad-value")
 
1293
 
 
1294
    def test_setting_forbidden_values(self):
 
1295
        """Test trying to set forbidden values."""
 
1296
        # create some data
 
1297
        path = os.path.join(self.share.path, 'path')
 
1298
        mdid = self.fsm.create(path, "share")
 
1299
        self.fsm.set_node_id(path, "uuid")
 
1300
 
 
1301
        # test with forbidden uuid
 
1302
        self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, node_id="-")
 
1303
        self.assertRaises(ValueError, self.fsm.set_by_path, path, node_id="")
 
1304
 
 
1305
        # test with forbidden path
 
1306
        self.assertRaises(ValueError, self.fsm.set_by_node_id, "uuid", "share",
 
1307
                          path="-")
 
1308
        self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, path="-")
 
1309
 
 
1310
        # test with forbidden info
 
1311
        self.assertRaises(ValueError, self.fsm.set_by_node_id, "uuid", "share",
 
1312
                          info="-")
 
1313
        self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, info="-")
 
1314
        self.assertRaises(ValueError, self.fsm.set_by_path, path, info="-")
 
1315
 
 
1316
        # test with forbidden share
 
1317
        self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, share_id="-")
 
1318
        self.assertRaises(ValueError, self.fsm.set_by_path, path, share_id="-")
 
1319
 
 
1320
        # test with forbidden mdid
 
1321
        self.assertRaises(ValueError, self.fsm.set_by_node_id, "uuid", "share",
 
1322
                          mdid="-")
 
1323
        self.assertRaises(ValueError, self.fsm.set_by_path, path, mdid="-")
 
1324
 
 
1325
        # test with forbidden is_dir
 
1326
        self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, is_dir="-")
 
1327
        self.assertRaises(ValueError, self.fsm.set_by_path, path, is_dir="-")
 
1328
        self.assertRaises(ValueError,
 
1329
                          self.fsm.set_by_node_id, "uuid", "share", is_dir="-")
 
1330
 
 
1331
    def test_setting_forbidden_mixed(self):
 
1332
        """Test that when trying with forbidden, nothing happens at all."""
 
1333
        # create some data
 
1334
        path = os.path.join(self.share.path, 'path')
 
1335
        mdid = self.fsm.create(path, "share")
 
1336
        self.fsm.set_node_id(path, "uuid")
 
1337
 
 
1338
        # test with mixed stuff
 
1339
        self.assertRaises(TypeError, self.fsm.set_by_node_id, info="n", foo="")
 
1340
        self.assertRaises(TypeError, self.fsm.set_by_mdid, path="nop", bar="?")
 
1341
        self.assertRaises(TypeError, self.fsm.set_by_path, node_id="n", baz="")
 
1342
 
 
1343
        # see that it still is unchanged
 
1344
        mdobj = self.fsm.get_by_mdid(mdid)
 
1345
        self.assertEqual(mdobj.path, "path")
 
1346
        self.assertEqual(mdobj.node_id, "uuid")
 
1347
 
 
1348
    def test_get_root(self):
 
1349
        """ Test that the root of a share is stored properly. """
 
1350
        # write some data
 
1351
        self.fsm.create(self.share_path, "share")
 
1352
        self.fsm.set_node_id(self.share_path, "uuid")
 
1353
 
 
1354
        # with path, ok and bad
 
1355
        self.fsm.get_by_path(self.share_path)
 
1356
        self.assertRaises(KeyError, self.fsm.get_by_path, "no-such-key")
 
1357
 
 
1358
        # check the path stored in the mdobj
 
1359
        mdobj = self.fsm.get_by_node_id("share", "uuid")
 
1360
        self.assertEquals(self.share_path, mdobj.path)
 
1361
 
 
1362
    @defer.inlineCallbacks
 
1363
    def test_get_all_by_share(self):
 
1364
        """ Test that it returns all the mdids in a share. """
 
1365
        # create the shares
 
1366
        share1 = yield self.create_share('share_id1', u'share_name1',
 
1367
                                         access_level='View')
 
1368
        share2 = yield self.create_share('share_id2', u'share_name2',
 
1369
                                         access_level='View')
 
1370
        self.fsm.create(share1.path, "share_id1", is_dir=True)
 
1371
        self.fsm.set_node_id(share1.path, "uuid1")
 
1372
        self.fsm.create(share2.path, "share_id2", is_dir=True)
 
1373
        self.fsm.set_node_id(share2.path, "uuid2")
 
1374
 
 
1375
        # create some nodes in share 1
 
1376
        path3 = os.path.join(share1.path, "a")
 
1377
        mdid3 = self.fsm.create(path3, "share_id1", is_dir=True)
 
1378
        self.fsm.set_node_id(path3, "uuid3")
 
1379
        path4 = os.path.join(share1.path, "a", "b")
 
1380
        mdid4 = self.fsm.create(path4, "share_id1")
 
1381
        self.fsm.set_node_id(path4, "uuid4")
 
1382
        path5 = os.path.join(share1.path, "c")
 
1383
        mdid5 = self.fsm.create(path5, "share_id1")
 
1384
        self.fsm.set_node_id(path5, "uuid5")
 
1385
 
 
1386
        # create some nodes in share 2
 
1387
        path6 = os.path.join(share2.path, "a")
 
1388
        mdid6 = self.fsm.create(path6, "share_id2", is_dir=True)
 
1389
        self.fsm.set_node_id(path6, "uuid6")
 
1390
        path7 = os.path.join(share2.path, "c")
 
1391
        mdid7 = self.fsm.create(path7, "share_id2")
 
1392
        self.fsm.set_node_id(path7, "uuid7")
 
1393
 
 
1394
        # tricky: node without node_id yet
 
1395
        path8 = os.path.join(share2.path, "d")
 
1396
        mdid8 = self.fsm.create(path8, "share_id2")
 
1397
 
 
1398
        # get them
 
1399
        all_data = set()
 
1400
        for mdobj in self.fsm.get_mdobjs_by_share_id("share_id1"):
 
1401
            all_data.add(mdobj.mdid)
 
1402
        self.assertTrue(mdid3 in all_data)
 
1403
        self.assertTrue(mdid4 in all_data)
 
1404
        self.assertTrue(mdid5 in all_data)
 
1405
        self.assertTrue(mdid6 not in all_data)
 
1406
        self.assertTrue(mdid7 not in all_data)
 
1407
        self.assertTrue(mdid8 not in all_data)
 
1408
 
 
1409
        all_data = set()
 
1410
        for mdobj in self.fsm.get_mdobjs_by_share_id("share_id2"):
 
1411
            all_data.add(mdobj.mdid)
 
1412
        self.assertTrue(mdid3 not in all_data)
 
1413
        self.assertTrue(mdid4 not in all_data)
 
1414
        self.assertTrue(mdid5 not in all_data)
 
1415
        self.assertTrue(mdid6 in all_data)
 
1416
        self.assertTrue(mdid7 in all_data)
 
1417
        self.assertTrue(mdid8 in all_data)
 
1418
 
 
1419
        all_data = set()
 
1420
        patha = os.path.join(share1.path, 'a')
 
1421
        for mdobj in self.fsm.get_mdobjs_by_share_id("share_id1", patha):
 
1422
            all_data.add(mdobj.mdid)
 
1423
        self.assertTrue(mdid3 in all_data)
 
1424
        self.assertTrue(mdid4 in all_data)
 
1425
        self.assertTrue(mdid5 not in all_data)
 
1426
        self.assertTrue(mdid6 not in all_data)
 
1427
        self.assertTrue(mdid7 not in all_data)
 
1428
        self.assertTrue(mdid8 not in all_data)
 
1429
 
 
1430
    @defer.inlineCallbacks
 
1431
    def test_get_all_by_share_mixed(self):
 
1432
        """Test that it returns all the mdids in a share with mixed nodes."""
 
1433
        # create the shares
 
1434
        share = yield self.create_share('share_id', u'sharetest',
 
1435
                                        access_level='View')
 
1436
        self.fsm.create(share.path, "share_id", is_dir=True)
 
1437
        self.fsm.set_node_id(share.path, "uuid")
 
1438
 
 
1439
        # create one real node...
 
1440
        path1 = os.path.join(share.path, "a")
 
1441
        mdid1 = self.fsm.create(path1, "share_id", is_dir=True)
 
1442
        self.fsm.set_node_id(path1, "uuid1")
 
1443
 
 
1444
        # ...and two without node_id's
 
1445
        path2 = os.path.join(share.path, "b")
 
1446
        mdid2 = self.fsm.create(path2, "share_id")
 
1447
        path3 = os.path.join(share.path, "c")
 
1448
        mdid3 = self.fsm.create(path3, "share_id")
 
1449
 
 
1450
        # get them
 
1451
        all_data = set()
 
1452
        for mdobj in self.fsm.get_mdobjs_by_share_id("share_id"):
 
1453
            all_data.add(mdobj.mdid)
 
1454
        self.assertTrue(mdid1 in all_data)
 
1455
        self.assertTrue(mdid2 in all_data)
 
1456
        self.assertTrue(mdid3 in all_data)
 
1457
 
 
1458
    def test_internal_set_node_id(self):
 
1459
        """Test _set_node_id"""
 
1460
        path = os.path.join(self.share.path, 'path')
 
1461
        mdid = self.fsm.create(path, "share")
 
1462
        mdobj = self.fsm.fs[mdid]
 
1463
        # yes, it's a unit test, I access protected members.
 
1464
        # pylint: disable-msg=W0212
 
1465
        self.fsm._set_node_id(mdobj, "uuid", path)
 
1466
 
 
1467
        self.assertEquals('uuid', mdobj['node_id'])
 
1468
        self.fsm.set_node_id(path, "uuid")
 
1469
        new_mdobj = self.fsm.get_by_node_id('share', 'uuid')
 
1470
        for k, v in mdobj.items():
 
1471
            if k == 'info':
 
1472
                for k1, v1 in v.items():
 
1473
                    self.assertEquals(int(v1),
 
1474
                                      int(getattr(new_mdobj.info, k1)))
 
1475
            else:
 
1476
                self.assertEquals(v, getattr(new_mdobj, k))
 
1477
 
 
1478
        # test using bad uuid
 
1479
        mdobj = self.fsm.fs[mdid]
 
1480
        self.assertEquals('uuid', mdobj['node_id'])
 
1481
        self.assertRaises(ValueError,
 
1482
                          self.fsm._set_node_id, mdobj, 'bad-uuid', path)
 
1483
 
 
1484
 
 
1485
class GetMDObjectsInDirTests(FSMTestCase):
 
1486
    """Test the get_mdobjs_in_dir method."""
 
1487
 
 
1488
    def create_some_contents(self, share):
 
1489
        a = 'a'
 
1490
        ab = os.path.join(a, 'b')
 
1491
        ab1 = os.path.join(a, 'b1')
 
1492
        ab2 = os.path.join(a, 'b2')
 
1493
        ac = os.path.join(a, 'c')
 
1494
        acd = os.path.join(ac, 'd')
 
1495
 
 
1496
        dirs = [a, ab, ab1, ab2, ac, acd]
 
1497
        for d in dirs:
 
1498
            self.create_node(d, is_dir=True, share=share)
 
1499
 
 
1500
        x = os.path.join(a, 'x.txt')
 
1501
        y = os.path.join(ab, 'y.txt')
 
1502
        z = os.path.join(ac, 'z.txt')
 
1503
 
 
1504
        files = [x, y, z]
 
1505
        for f in files:
 
1506
            self.create_node(f, is_dir=False, share=share)
 
1507
 
 
1508
        self.contents[share] = sorted(dirs + files)
 
1509
 
 
1510
    @defer.inlineCallbacks
 
1511
    def setUp(self):
 
1512
        """Init."""
 
1513
        yield super(GetMDObjectsInDirTests, self).setUp()
 
1514
        self.contents = {}
 
1515
        self.create_some_contents(self.share)
 
1516
 
 
1517
    def test_basic(self):
 
1518
        """Test basic retrieval."""
 
1519
        expected = ['a']
 
1520
        actual = sorted([d.path for d in
 
1521
                         self.fsm.get_mdobjs_in_dir(self.share.path)])
 
1522
        self.assertEqual(expected, actual)
 
1523
 
 
1524
    def test_no_tree(self):
 
1525
        """Test just receiving the dir and not the tree."""
 
1526
        expected = [os.path.join('a', 'b'),
 
1527
                    os.path.join('a', 'b1'),
 
1528
                    os.path.join( 'a', 'b2'),
 
1529
                    os.path.join( 'a', 'c'),
 
1530
                    os.path.join( 'a', 'x.txt')]
 
1531
        actual = sorted([d.path for d in self.fsm.get_mdobjs_in_dir(
 
1532
                                    os.path.join(self.share.path, 'a'))])
 
1533
        self.assertEqual(expected, actual)
 
1534
 
 
1535
    def test_similar_paths(self):
 
1536
        """Test having similar paths (a/b, a/b1, a/b2)."""
 
1537
        expected = [os.path.join('a', 'b', 'y.txt')]
 
1538
        actual = sorted([d.path for d in self.fsm.get_mdobjs_in_dir(
 
1539
                                    os.path.join(self.share.path, 'a', 'b'))])
 
1540
        self.assertEqual(expected, actual)
 
1541
 
 
1542
    @defer.inlineCallbacks
 
1543
    def test_with_two_shares(self):
 
1544
        """Test having 2 shares."""
 
1545
        second_share = yield self.create_share('second_share', u'the_second')
 
1546
        self.create_some_contents(second_share)
 
1547
 
 
1548
        expected = ['a']
 
1549
        actual = sorted([d.path for d in
 
1550
                         self.fsm.get_mdobjs_in_dir(second_share.path)])
 
1551
        self.assertEqual(expected, actual)
 
1552
 
 
1553
    @defer.inlineCallbacks
 
1554
    def test_both_shares(self):
 
1555
        """Test having 2 shares and asking for mdobjs in shares_dir."""
 
1556
        second_share = yield self.create_share('second_share', u'the_second')
 
1557
        self.create_some_contents(second_share)
 
1558
 
 
1559
        expected = []
 
1560
        actual = sorted([d.path for d in
 
1561
                         self.fsm.get_mdobjs_in_dir(self.shares_dir)])
 
1562
        self.assertEqual(expected, actual)
 
1563
 
 
1564
 
 
1565
class StatTests(FSMTestCase):
 
1566
    """Test all the behaviour regarding the stats."""
 
1567
 
 
1568
    def test_create_nofile(self):
 
1569
        """Test creation when there's no file."""
 
1570
        mdobj = self.create_node("foo")
 
1571
        self.assertEqual(mdobj.stat, None)
 
1572
 
 
1573
    def test_create_file(self):
 
1574
        """Test creation when there's a file."""
 
1575
        # file
 
1576
        path = os.path.join(self.share.path, "thisfile")
 
1577
        open_file(path, "w").close()
 
1578
        mdobj = self.create_node("thisfile")
 
1579
        self.assertEqual(mdobj.stat, stat_path(path))
 
1580
 
 
1581
        # dir
 
1582
        path = os.path.join(self.share.path, "thisdir")
 
1583
        make_dir(path)
 
1584
        mdobj = self.create_node("thisdir")
 
1585
        self.assertEqual(mdobj.stat, stat_path(path))
 
1586
 
 
1587
    def test_commit_partial(self):
 
1588
        """Test that it's updated in the commit."""
 
1589
        path = os.path.join(self.share.path, "thisfile")
 
1590
        open_file(path, "w").close()
 
1591
        mdobj = self.create_node("thisfile")
 
1592
        mdid = mdobj.mdid
 
1593
        oldstat = stat_path(path)
 
1594
        self.assertEqual(mdobj.stat, oldstat)
 
1595
 
 
1596
        # create a partial
 
1597
        self.fsm.create_partial(mdobj.node_id, mdobj.share_id)
 
1598
        fh = self.fsm.get_partial_for_writing(mdobj.node_id, mdobj.share_id)
 
1599
        fh.write("foobar")
 
1600
        fh.close()
 
1601
        mdobj = self.fsm.get_by_mdid(mdid)
 
1602
        self.assertEqual(mdobj.stat, oldstat)
 
1603
 
 
1604
        # commit the partial
 
1605
        self.fsm.commit_partial(mdobj.node_id, mdobj.share_id, "localhash")
 
1606
        mdobj = self.fsm.get_by_mdid(mdid)
 
1607
        self.assertEqual(mdobj.stat, stat_path(path))
 
1608
 
 
1609
    def test_move(self):
 
1610
        """Test that move refreshes stat."""
 
1611
        path1 = os.path.join(self.share.path, "thisfile1")
 
1612
        path2 = os.path.join(self.share.path, "thisfile2")
 
1613
        open_file(path1, "w").close()
 
1614
        mdobj = self.create_node(path1)
 
1615
        self.assertEqual(mdobj.stat, stat_path(path1))
 
1616
 
 
1617
        # move
 
1618
        self.fsm.move_file("share", path1, path2)
 
1619
 
 
1620
        # check
 
1621
        mdobj = self.fsm.get_by_path(path2)
 
1622
        self.assertEqual(mdobj.stat, stat_path(path2))
 
1623
 
 
1624
    def test_move_overwriting(self):
 
1625
        """Test that move refreshes stat when overwrites other file."""
 
1626
        self.fsm.create(self.share_path, self.share.id, is_dir=True)
 
1627
        path1 = os.path.join(self.share.path, "thisfile1")
 
1628
        path2 = os.path.join(self.share.path, "thisfile2")
 
1629
        open_file(path1, "w").close()
 
1630
        open_file(path2, "w").close()
 
1631
        mdobj1 = self.create_node(path1)
 
1632
        mdobj2 = self.create_node(path2)
 
1633
        self.assertEqual(mdobj1.stat, stat_path(path1))
 
1634
        self.assertEqual(mdobj2.stat, stat_path(path2))
 
1635
 
 
1636
        # move
 
1637
        self.fsm.move_file("share", path1, path2)
 
1638
 
 
1639
        # check
 
1640
        self.assertRaises(KeyError, self.fsm.get_by_path, path1)
 
1641
        mdobj2 = self.fsm.get_by_path(path2)
 
1642
        self.assertEqual(mdobj2.stat, stat_path(path2))
 
1643
 
 
1644
    def test_set_stat_by_mdid(self):
 
1645
        """Test that update_stat works."""
 
1646
        path = os.path.join(self.share.path, "thisfile")
 
1647
        open_file(path, "w").close()
 
1648
        mdobj = self.create_node("thisfile")
 
1649
        mdid = mdobj.mdid
 
1650
        oldstat = stat_path(path)
 
1651
        self.assertEqual(mdobj.stat, oldstat)
 
1652
 
 
1653
        # touch the file, it's not automagically updated
 
1654
        open_file(path, "w").close()
 
1655
        mdobj = self.fsm.get_by_mdid(mdid)
 
1656
        self.assertEqual(mdobj.stat, oldstat)
 
1657
 
 
1658
        # it's updated when asked, even if it's an old stat
 
1659
        self.fsm.set_by_mdid(mdid, stat=oldstat)
 
1660
        mdobj = self.fsm.get_by_mdid(mdid)
 
1661
        self.assertEqual(mdobj.stat, oldstat)
 
1662
 
 
1663
 
 
1664
class PartialTests(FSMTestCase):
 
1665
    """Test all the .partial nitty gritty."""
 
1666
 
 
1667
    def test_create_file(self):
 
1668
        """Test create .partial for a file."""
 
1669
        testfile = os.path.join(self.share_path, "path")
 
1670
        mdid = self.fsm.create(testfile, "share")
 
1671
        partial_path = os.path.join(self.fsm.partials_dir,
 
1672
                            mdid + ".u1partial." + os.path.basename(testfile))
 
1673
        self.fsm.set_node_id(testfile, "uuid")
 
1674
 
 
1675
        # create partial ok
 
1676
        self.fsm.create_partial("uuid", "share")
 
1677
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1678
        self.assertTrue(path_exists(partial_path))
 
1679
        mdobj = self.fsm.get_by_mdid(mdid)
 
1680
        when = mdobj.info.last_partial_created
 
1681
        now = time.time()
 
1682
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
1683
 
 
1684
        # invalid uuid
 
1685
        self.assertRaises(KeyError, self.fsm.create_partial, "foo", "share")
 
1686
        self.assertRaises(ValueError, self.fsm.create_partial, None, "share")
 
1687
 
 
1688
        # already has a partial!
 
1689
        self.assertRaises(ValueError, self.fsm.create_partial, "uuid", "share")
 
1690
 
 
1691
    def test_commit_file(self):
 
1692
        """Test commit the .partial for a file, after a successful download."""
 
1693
        testfile = os.path.join(self.share_path, "path")
 
1694
        mdid = self.fsm.create(testfile, "share")
 
1695
        partial_path = os.path.join(self.fsm.partials_dir,
 
1696
                            mdid + ".u1partial." + os.path.basename(testfile))
 
1697
        self.fsm.set_node_id(testfile, "uuid")
 
1698
 
 
1699
        # create partial
 
1700
        self.fsm.create_partial("uuid", "share")
 
1701
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1702
        with open_file(partial_path, "w") as fh:
 
1703
            fh.write("test info!")
 
1704
 
 
1705
        # commit partial, and check that the file is moved, and metadata is ok
 
1706
        self.fsm.commit_partial("uuid", "share", local_hash=9876)
 
1707
        self.assertFalse(path_exists(partial_path))
 
1708
        with open_file(testfile) as fh:
 
1709
            in_file = fh.read()
 
1710
        self.assertEqual(in_file, "test info!")
 
1711
        mdobj = self.fsm.get_by_mdid(mdid)
 
1712
        self.assertFalse(mdobj.info.is_partial)
 
1713
        self.assertEqual(mdobj.local_hash, 9876)
 
1714
        when = mdobj.info.last_downloaded
 
1715
        now = time.time()
 
1716
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
1717
 
 
1718
        # invalid uuid
 
1719
        self.assertRaises(KeyError, self.fsm.commit_partial,
 
1720
                          "foo", "share", 123)
 
1721
        self.assertRaises(ValueError, self.fsm.commit_partial,
 
1722
                          None, "share", 123)
 
1723
        # it has no partial!
 
1724
        self.assertRaises(ValueError, self.fsm.commit_partial,
 
1725
                          "uuid", "share", 1)
 
1726
 
 
1727
    def test_remove_file(self):
 
1728
        """Test removing the .partial for a file, because a bad download."""
 
1729
        testfile = os.path.join(self.share_path, "path")
 
1730
        mdid = self.fsm.create(testfile, "share")
 
1731
        partial_path = os.path.join(self.fsm.partials_dir,
 
1732
                            mdid + ".u1partial." + os.path.basename(testfile))
 
1733
        self.fsm.set_node_id(testfile, "uuid")
 
1734
 
 
1735
        # create partial
 
1736
        self.fsm.create_partial("uuid", "share")
 
1737
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1738
        with open_file(partial_path, "w") as fh:
 
1739
            fh.write("test info!")
 
1740
        with open_file(testfile, "w") as fh:
 
1741
            fh.write("previous stuff!")
 
1742
 
 
1743
        # remove partial, and check that the file is gone, and metadata is ok
 
1744
        self.fsm.remove_partial("uuid", "share")
 
1745
        self.assertFalse(path_exists(partial_path))
 
1746
        with open_file(testfile) as fh:
 
1747
            in_file = fh.read()
 
1748
        self.assertEqual(in_file, "previous stuff!")
 
1749
        mdobj = self.fsm.get_by_mdid(mdid)
 
1750
        self.assertFalse(mdobj.info.is_partial)
 
1751
        when = mdobj.info.last_partial_removed
 
1752
        now = time.time()
 
1753
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
1754
 
 
1755
        # invalid uuid
 
1756
        self.assertRaises(KeyError, self.fsm.remove_partial, "foo", "share")
 
1757
        self.assertRaises(ValueError, self.fsm.remove_partial, None, "share")
 
1758
 
 
1759
        # it has no partial!
 
1760
        self.fsm.remove_partial("uuid", "share")
 
1761
 
 
1762
    def test_create_dir_previous(self):
 
1763
        """Test create .partial for a dir when the dir existed."""
 
1764
        testdir = os.path.join(self.share_path, "path")
 
1765
        mdid = self.fsm.create(testdir, "share", is_dir=True)
 
1766
        self.fsm.set_node_id(testdir, "uuid")
 
1767
        make_dir(testdir)
 
1768
 
 
1769
        # create partial ok
 
1770
        self.fsm.create_partial("uuid", "share")
 
1771
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1772
        partial_path = os.path.join(self.fsm.partials_dir,
 
1773
                            mdid + ".u1partial." + os.path.basename(testdir))
 
1774
        self.assertTrue(path_exists(partial_path))
 
1775
        mdobj = self.fsm.get_by_mdid(mdid)
 
1776
        when = mdobj.info.last_partial_created
 
1777
        now = time.time()
 
1778
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
1779
 
 
1780
        # invalid uuid
 
1781
        self.assertRaises(KeyError, self.fsm.create_partial, "foo", "share")
 
1782
 
 
1783
        # already has a partial!
 
1784
        self.assertRaises(ValueError, self.fsm.create_partial, "uuid", "share")
 
1785
 
 
1786
    def test_create_dir_notprevious(self):
 
1787
        """Test create .partial for a dir when the dir didn't exist."""
 
1788
        testdir = os.path.join(self.share_path, "path")
 
1789
        mdid = self.fsm.create(testdir, "share", is_dir=True)
 
1790
        self.fsm.set_node_id(testdir, "uuid")
 
1791
 
 
1792
        # create partial ok
 
1793
        self.fsm.create_partial("uuid", "share")
 
1794
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1795
        self.assertTrue(path_exists(testdir))
 
1796
        partial_path = os.path.join(self.fsm.partials_dir,
 
1797
                            mdid + ".u1partial." + os.path.basename(testdir))
 
1798
        self.assertTrue(path_exists(partial_path))
 
1799
        mdobj = self.fsm.get_by_mdid(mdid)
 
1800
        when = mdobj.info.last_partial_created
 
1801
        now = time.time()
 
1802
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
1803
 
 
1804
        # invalid uuid
 
1805
        self.assertRaises(KeyError, self.fsm.create_partial, "foo", "share")
 
1806
 
 
1807
        # already has a partial!
 
1808
        self.assertRaises(ValueError, self.fsm.create_partial, "uuid", "share")
 
1809
 
 
1810
    def test_commit_dir(self):
 
1811
        """Test commit the .partial for a dir, after a successful download."""
 
1812
        testdir = os.path.join(self.share_path, "path")
 
1813
        mdid = self.fsm.create(testdir, "share", is_dir=True)
 
1814
        self.fsm.set_node_id(testdir, "uuid")
 
1815
 
 
1816
        # create partial
 
1817
        self.fsm.create_partial("uuid", "share")
 
1818
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1819
 
 
1820
        # commit is forbidden for directories
 
1821
        self.assertRaises(ValueError, self.fsm.commit_partial, "uuid", "share",
 
1822
                          local_hash=9876)
 
1823
 
 
1824
    def test_remove_dir(self):
 
1825
        """Test removing the .partial for a dir, because a bad download."""
 
1826
        testdir = os.path.join(self.share_path, "path")
 
1827
        mdid = self.fsm.create(testdir, "share", is_dir=True)
 
1828
        self.fsm.set_node_id(testdir, "uuid")
 
1829
 
 
1830
        # create partial
 
1831
        self.fsm.create_partial("uuid", "share")
 
1832
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1833
 
 
1834
        # remove partial, and check that the file is gone, and metadata is ok
 
1835
        self.fsm.remove_partial("uuid", "share")
 
1836
        partial_path = os.path.join(self.fsm.partials_dir,
 
1837
                            mdid + ".u1partial." + os.path.basename(testdir))
 
1838
        self.assertFalse(path_exists(partial_path))
 
1839
        mdobj = self.fsm.get_by_mdid(mdid)
 
1840
        self.assertFalse(mdobj.info.is_partial)
 
1841
        when = mdobj.info.last_partial_removed
 
1842
        now = time.time()
 
1843
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
1844
 
 
1845
        # invalid uuid
 
1846
        self.assertRaises(KeyError, self.fsm.remove_partial, "foo", "share")
 
1847
 
 
1848
        # it has no partial!
 
1849
        self.fsm.remove_partial("uuid", "share")
 
1850
 
 
1851
    @defer.inlineCallbacks
 
1852
    def test_ro_share(self):
 
1853
        """Test creating a partial of a RO share.
 
1854
 
 
1855
        It should leave the partials dir permissions intact.
 
1856
        """
 
1857
        share = yield self.create_share('ro_share', u'ro_share_name',
 
1858
                                        access_level='View')
 
1859
        testdir = os.path.join(share.path, "path")
 
1860
        mdid = self.fsm.create(testdir, share.volume_id, is_dir=False)
 
1861
        self.fsm.set_node_id(testdir, "uuid")
 
1862
        # create partial
 
1863
        self.fsm.create_partial("uuid", share.volume_id)
 
1864
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1865
        # commit the partial
 
1866
        self.fsm.commit_partial('uuid', share.volume_id, '')
 
1867
        # create a new partial, this time for a rw share
 
1868
        testdir = os.path.join(self.share.path, "path")
 
1869
        mdid = self.fsm.create(testdir, "share", is_dir=True)
 
1870
        self.fsm.set_node_id(testdir, "uuid1")
 
1871
        self.fsm.create_partial("uuid1", "share")
 
1872
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1873
 
 
1874
    def test_disk_problem(self):
 
1875
        """On any disk problem, the node should be left in partial state."""
 
1876
        testfile = os.path.join(self.share_path, "path")
 
1877
        mdid = self.fsm.create(testfile, "share")
 
1878
        self.fsm.set_node_id(testfile, "uuid")
 
1879
 
 
1880
        # ugly problem not handled
 
1881
        remove_dir(self.partials_dir)
 
1882
        try:
 
1883
            self.fsm.create_partial("uuid", "share")
 
1884
        except IOError, e:
 
1885
            if e.errno == errno.ENOENT:
 
1886
                # expected
 
1887
                pass
 
1888
        else:
 
1889
            raise
 
1890
 
 
1891
        # the node should still be in partial internally
 
1892
        # for LR to handle it ok
 
1893
        mdobj = self.fsm.get_by_mdid(mdid)
 
1894
        self.assertTrue(mdobj.info.is_partial)
 
1895
 
 
1896
    def test_filename_too_long(self):
 
1897
        """Handle the filename being too long."""
 
1898
        # find a almost too long file
 
1899
        repeat = 300
 
1900
        while True:
 
1901
            testfile = os.path.join(self.share_path, "x"*repeat)
 
1902
            try:
 
1903
                fh = open_file(testfile, 'w')
 
1904
            except IOError, e:
 
1905
                # linux will give you "too long", windows will say "invalid"
 
1906
                if e.errno in (errno.ENAMETOOLONG, errno.EINVAL):
 
1907
                    repeat -= 10
 
1908
            else:
 
1909
                fh.close()
 
1910
                remove_file(testfile)
 
1911
                break
 
1912
        mdid = self.fsm.create(testfile, "share")
 
1913
        self.fsm.set_node_id(testfile, "uuid")
 
1914
 
 
1915
        # create partial ok, even knowing that "full partial path" won't fit
 
1916
        self.fsm.create_partial("uuid", "share")
 
1917
        self.assertTrue(self.fsm.get_by_mdid(mdid).info.is_partial)
 
1918
 
 
1919
        # check the path
 
1920
        partial_path = self.fsm._get_partial_path(self.fsm.fs[mdid])
 
1921
        self.assertTrue(path_exists(partial_path))
 
1922
 
 
1923
    def test_get_partial_path_notrim(self):
 
1924
        """Create the partial path."""
 
1925
        testfile = os.path.join(self.share_path, "foo")
 
1926
        mdid = self.fsm.create(testfile, "share")
 
1927
        partial_path = self.fsm._get_partial_path(self.fsm.fs[mdid])
 
1928
        partial_name = os.path.basename(partial_path)
 
1929
        self.assertEqual(partial_name, mdid + ".u1partial.foo")
 
1930
 
 
1931
    def test_get_partial_path_trim1(self):
 
1932
        """Create the partial path trimmed some chars."""
 
1933
        longname = "longnamethatistoolong"
 
1934
        testfile = os.path.join(self.share_path, longname)
 
1935
        mdid = self.fsm.create(testfile, "share")
 
1936
        partial_path = self.fsm._get_partial_path(self.fsm.fs[mdid], trim=1)
 
1937
        partial_name = os.path.basename(partial_path)
 
1938
        self.assertEqual(partial_name, mdid + ".u1partial." + longname[:-10])
 
1939
 
 
1940
    def test_get_partial_path_trim2(self):
 
1941
        """Create the partial path trimmed more chars."""
 
1942
        longname = "longnamethatistoolong"
 
1943
        testfile = os.path.join(self.share_path, longname)
 
1944
        mdid = self.fsm.create(testfile, "share")
 
1945
        partial_path = self.fsm._get_partial_path(self.fsm.fs[mdid], trim=2)
 
1946
        partial_name = os.path.basename(partial_path)
 
1947
        self.assertEqual(partial_name, mdid + ".u1partial." + longname[:-20])
 
1948
 
 
1949
    def test_get_partial_path_dontcache_when_notrim(self):
 
1950
        """Normal behaviour, no partial path cached."""
 
1951
        testfile = os.path.join(self.share_path, "longnamethatistoolong")
 
1952
        mdid = self.fsm.create(testfile, "share")
 
1953
        self.fsm._get_partial_path(self.fsm.fs[mdid])
 
1954
 
 
1955
        # check
 
1956
        mdobj = self.fsm.get_by_mdid(mdid)
 
1957
        self.assertFalse(hasattr(mdobj.info, 'partial_path'))
 
1958
 
 
1959
    def test_get_partial_path_caches_when_trim(self):
 
1960
        """If trimming is necessary, it will cache the name."""
 
1961
        testfile = os.path.join(self.share_path, "longnamethatistoolong")
 
1962
        mdid = self.fsm.create(testfile, "share")
 
1963
        mdobj = self.fsm.fs[mdid]
 
1964
        partial_path = self.fsm._get_partial_path(mdobj, trim=1)
 
1965
 
 
1966
        # check
 
1967
        self.assertEqual(mdobj['info']['partial_path'], partial_path)
 
1968
 
 
1969
    def test_get_partial_path_cached_normal(self):
 
1970
        """Return the cached partial path if there."""
 
1971
        testfile = os.path.join(self.share_path, "foo")
 
1972
        mdid = self.fsm.create(testfile, "share")
 
1973
 
 
1974
        # fake the cache
 
1975
        mdobj = self.fsm.fs[mdid]
 
1976
        mdobj['info']['partial_path'] = "bar"
 
1977
 
 
1978
        # check
 
1979
        partial_path = self.fsm._get_partial_path(mdobj)
 
1980
        partial_name = os.path.basename(partial_path)
 
1981
        self.assertEqual(partial_name, "bar")
 
1982
 
 
1983
    def test_get_partial_path_cached_trimming(self):
 
1984
        """Do not return the cached partial path if there when trimming."""
 
1985
        testfile = os.path.join(self.share_path, "foobarlongone")
 
1986
        mdid = self.fsm.create(testfile, "share")
 
1987
 
 
1988
        # fake the cache
 
1989
        mdobj = self.fsm.fs[mdid]
 
1990
        mdobj['info']['partial_path'] = "bar"
 
1991
 
 
1992
        # check
 
1993
        partial_path = self.fsm._get_partial_path(mdobj, trim=1)
 
1994
        partial_name = os.path.basename(partial_path)
 
1995
        self.assertEqual(partial_name, mdid + ".u1partial.foo")
 
1996
 
 
1997
 
 
1998
class FileHandlingTests(FSMTestCase):
 
1999
    """Test the file handling services."""
 
2000
 
 
2001
    def assert_no_metadata(self, mdid, path, share_id, node_id):
 
2002
        """The node has no metadata registered."""
 
2003
        self.assertRaises(KeyError, self.fsm.get_by_mdid, mdid)
 
2004
        self.assertRaises(KeyError, self.fsm.get_by_path, path)
 
2005
        self.assertRaises(KeyError, self.fsm.get_by_node_id, share_id, node_id)
 
2006
 
 
2007
    def test_move_to_conflict(self):
 
2008
        """Test that the conflict stuff works."""
 
2009
        testfile = os.path.join(self.share_path, "path")
 
2010
        mdid = self.fsm.create(testfile, "share")
 
2011
        self.fsm.set_node_id(testfile, "uuid")
 
2012
        with open_file(testfile, "w") as fh:
 
2013
            fh.write("test!")
 
2014
 
 
2015
        # move first time
 
2016
        self.fsm.move_to_conflict(mdid)
 
2017
        self.assertFalse(path_exists(testfile))
 
2018
        with open_file(testfile + self.fsm.CONFLICT_SUFFIX) as fh:
 
2019
            in_file = fh.read()
 
2020
        self.assertEqual(in_file, "test!")
 
2021
        mdobj = self.fsm.get_by_mdid(mdid)
 
2022
        when = mdobj.info.last_conflicted
 
2023
        now = time.time()
 
2024
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
2025
 
 
2026
        # move second time, start the .N serie
 
2027
        with open_file(testfile, "w") as fh:
 
2028
            fh.write("test 1!")
 
2029
        self.fsm.move_to_conflict(mdid)
 
2030
        self.assertFalse(path_exists(testfile))
 
2031
        with open_file(testfile + ".u1conflict.1") as fh:
 
2032
            in_file = fh.read()
 
2033
        self.assertEqual(in_file, "test 1!")
 
2034
 
 
2035
        # create a few more, test a higher one
 
2036
        open_file(testfile + ".u1conflict.2", "w").close()
 
2037
        open_file(testfile + ".u1conflict.3", "w").close()
 
2038
        open_file(testfile + ".u1conflict.4", "w").close()
 
2039
        open_file(testfile + ".u1conflict.5", "w").close()
 
2040
        with open_file(testfile, "w") as fh:
 
2041
            fh.write("test 6!")
 
2042
        self.fsm.move_to_conflict(mdid)
 
2043
        self.assertFalse(path_exists(testfile))
 
2044
        with open_file(testfile + ".u1conflict.6") as fh:
 
2045
            in_file = fh.read()
 
2046
        self.assertEqual(in_file, "test 6!")
 
2047
 
 
2048
        # invalid uuid
 
2049
        self.assertRaises(KeyError, self.fsm.move_to_conflict, "no-such-mdid")
 
2050
 
 
2051
    def test_conflict_file_pushes_event(self):
 
2052
        """A conflict with a file pushes FSM_FILE_CONFLICT."""
 
2053
        listener = Listener()
 
2054
        self.eq.subscribe(listener)
 
2055
 
 
2056
        testfile = os.path.join(self.share_path, "path")
 
2057
        mdid = self.fsm.create(testfile, "share")
 
2058
        self.fsm.set_node_id(testfile, "uuid")
 
2059
        with open_file(testfile, "w") as fh:
 
2060
            fh.write("test!")
 
2061
 
 
2062
        self.fsm.move_to_conflict(mdid)
 
2063
 
 
2064
        new_name = testfile + self.fsm.CONFLICT_SUFFIX
 
2065
        kwargs = dict(old_name=testfile, new_name=new_name)
 
2066
 
 
2067
        self.assertTrue(("FSM_FILE_CONFLICT", kwargs) in listener.events)
 
2068
 
 
2069
    def test_conflict_dir_pushes_event(self):
 
2070
        """A conflict with a dir pushes FSM_DIR_CONFLICT."""
 
2071
        listener = Listener()
 
2072
        self.eq.subscribe(listener)
 
2073
 
 
2074
        testdir = os.path.join(self.share_path, "path")
 
2075
        mdid = self.fsm.create(testdir, "share", is_dir=True)
 
2076
        self.fsm.set_node_id(testdir, "uuid")
 
2077
        make_dir(testdir)
 
2078
 
 
2079
        self.fsm.move_to_conflict(mdid)
 
2080
 
 
2081
        new_name = testdir + self.fsm.CONFLICT_SUFFIX
 
2082
        kwargs = dict(old_name=testdir, new_name=new_name)
 
2083
 
 
2084
        self.assertTrue(("FSM_DIR_CONFLICT", kwargs) in listener.events)
 
2085
 
 
2086
    def test_upload_finished(self):
 
2087
        """Test upload finished."""
 
2088
        path = os.path.join(self.share_path, "path")
 
2089
        mdid = self.fsm.create(path, "share")
 
2090
        self.fsm.set_node_id(path, "uuid")
 
2091
 
 
2092
        # finish the upload!
 
2093
        self.fsm.upload_finished(mdid, server_hash=1234567890)
 
2094
        mdobj = self.fsm.get_by_mdid(mdid)
 
2095
        self.assertEqual(mdobj.server_hash, 1234567890)
 
2096
        when = mdobj.info.last_uploaded
 
2097
        now = time.time()
 
2098
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
2099
 
 
2100
        # invalid mdid
 
2101
        self.assertRaises(KeyError, self.fsm.upload_finished,
 
2102
                          "no-such-mdid", 123)
 
2103
 
 
2104
        # bad arguments
 
2105
        self.assertRaises(TypeError, self.fsm.upload_finished, mdid)
 
2106
 
 
2107
    def test_move_file_withfile(self):
 
2108
        """Test that a file is moved from one point to the other."""
 
2109
        testfile = os.path.join(self.share_path, "path")
 
2110
        mdid = self.fsm.create(testfile, "share")
 
2111
        self.fsm.set_node_id(testfile, "uuid")
 
2112
        with open_file(testfile, "w") as fh:
 
2113
            fh.write("test!")
 
2114
 
 
2115
        # move the file
 
2116
        to_path = os.path.join(self.share_path, "path2")
 
2117
        self.fsm.move_file("share", testfile, to_path)
 
2118
        self.assertFalse(path_exists(testfile))
 
2119
        with open_file(to_path) as fh:
 
2120
            in_file = fh.read()
 
2121
        self.assertEqual(in_file, "test!")
 
2122
        mdobj = self.fsm.get_by_mdid(mdid)
 
2123
        self.assertEqual(mdobj.info.last_moved_from, testfile)
 
2124
        when = mdobj.info.last_moved_time
 
2125
        now = time.time()
 
2126
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
2127
 
 
2128
        # move again, to a directory
 
2129
        from_path = to_path
 
2130
        make_dir(os.path.join(self.share_path, "testdir"))
 
2131
        to_path = os.path.join(self.share_path, "testdir", "path3")
 
2132
        self.fsm.move_file("share", from_path, to_path)
 
2133
        self.assertFalse(path_exists(from_path))
 
2134
        with open_file(to_path) as fh:
 
2135
            in_file = fh.read()
 
2136
        self.assertEqual(in_file, "test!")
 
2137
 
 
2138
        # invalid path
 
2139
        self.assertRaises(KeyError, self.fsm.move_file,
 
2140
                          "share", "no-path", "dest")
 
2141
 
 
2142
        # other share
 
2143
        self.assertRaises(KeyError, self.fsm.move_file,
 
2144
                                            "othershare", testfile, to_path)
 
2145
 
 
2146
        # invalid args
 
2147
        self.assertRaises(TypeError, self.fsm.move_file, "one-path")
 
2148
 
 
2149
    def test_move_file_overwrite(self):
 
2150
        """Test that a file is moved over other one."""
 
2151
        self.fsm.create(self.share_path, self.share.id,
 
2152
                        self.share.node_id, is_dir=True)
 
2153
        testfile1 = os.path.join(self.share_path, "path1")
 
2154
        mdid1 = self.fsm.create(testfile1, "share")
 
2155
        self.fsm.set_node_id(testfile1, "uuid1")
 
2156
        with open_file(testfile1, "w") as fh:
 
2157
            fh.write("test 1")
 
2158
 
 
2159
        testfile2 = os.path.join(self.share_path, "path2")
 
2160
        mdid2 = self.fsm.create(testfile2, "share")
 
2161
        self.fsm.set_node_id(testfile2, "uuid2")
 
2162
        with open_file(testfile2, "w") as fh:
 
2163
            fh.write("test 2")
 
2164
 
 
2165
        # move the file
 
2166
        self.fsm.move_file("share", testfile1, testfile2)
 
2167
        self.assertFalse(path_exists(testfile1))
 
2168
        with open_file(testfile2) as fh:
 
2169
            in_file = fh.read()
 
2170
        self.assertEqual(in_file, "test 1")
 
2171
        mdobj = self.fsm.get_by_mdid(mdid1)
 
2172
        self.assertEqual(mdobj.path, "path2")
 
2173
        mdobj = self.fsm.get_by_path(testfile2)
 
2174
        self.assertEqual(mdobj.mdid, mdid1)
 
2175
 
 
2176
        # check that the info for the overwritten one is gone to trash
 
2177
        self.assert_no_metadata(mdid2, testfile1, "share", "uuid2")
 
2178
        self.assertEqual(self.fsm.trash[(self.share.id, "uuid2")],
 
2179
                         (mdid2, self.share.node_id, testfile2, False))
 
2180
 
 
2181
    def test_move_file_withdir(self):
 
2182
        """Test that a dir is moved from one point to the other."""
 
2183
        from_path = os.path.join(self.share_path, "path")
 
2184
        mdid = self.fsm.create(from_path, "share", is_dir=True)
 
2185
        self.fsm.set_node_id(from_path, "uuid")
 
2186
 
 
2187
        # move the file
 
2188
        make_dir(from_path)
 
2189
        to_path = os.path.join(self.share_path, "path2")
 
2190
        self.fsm.move_file("share", from_path, to_path)
 
2191
        self.assertFalse(path_exists(from_path))
 
2192
        self.assertTrue(path_exists(to_path))
 
2193
        mdobj = self.fsm.get_by_mdid(mdid)
 
2194
        self.assertEqual(mdobj.info.last_moved_from, from_path)
 
2195
        when = mdobj.info.last_moved_time
 
2196
        now = time.time()
 
2197
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
2198
 
 
2199
        # move again, to a directory
 
2200
        from_path = to_path
 
2201
        make_dir(os.path.join(self.share_path, "testdir"))
 
2202
        to_path = os.path.join(self.share_path, "testdir", "path3")
 
2203
        self.fsm.move_file("share", from_path, to_path)
 
2204
        self.assertFalse(path_exists(from_path))
 
2205
        self.assertTrue(path_exists(to_path))
 
2206
        mdobj = self.fsm.get_by_mdid(mdid)
 
2207
        self.assertEqual(mdobj.info.last_moved_from, from_path)
 
2208
        when = mdobj.info.last_moved_time
 
2209
        now = time.time()
 
2210
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
2211
 
 
2212
    def test_move_file_withfulldir(self):
 
2213
        """Test that a dir is moved from even having a file inside."""
 
2214
        # the containing dir
 
2215
        from_path = os.path.join(self.share_path, "path")
 
2216
        mdid = self.fsm.create(from_path, "share", is_dir=True)
 
2217
        self.fsm.set_node_id(from_path, "uuid")
 
2218
        make_dir(from_path)
 
2219
 
 
2220
        # the file outside, with a similar name just to confuse
 
2221
        otherfile = os.path.join(self.share_path, "pa")
 
2222
        self.fsm.create(otherfile, "share", is_dir=False)
 
2223
        self.fsm.set_node_id(otherfile, "otheruuid")
 
2224
        open_file(otherfile, "w").close()
 
2225
 
 
2226
        # the file inside
 
2227
        filepath = os.path.join(from_path, "file.txt")
 
2228
        fileid = self.fsm.create(filepath, "share", is_dir=False)
 
2229
        self.fsm.set_node_id(filepath, "fileuuid")
 
2230
        open_file(filepath, "w").close()
 
2231
 
 
2232
        # move the dir
 
2233
        to_path = os.path.join(self.share_path, "path2")
 
2234
        self.fsm.move_file("share", from_path, to_path)
 
2235
        self.assertFalse(path_exists(from_path))
 
2236
        self.assertTrue(path_exists(to_path))
 
2237
        mdobj = self.fsm.get_by_mdid(mdid)
 
2238
        self.assertEqual(mdobj.info.last_moved_from, from_path)
 
2239
        when = mdobj.info.last_moved_time
 
2240
        now = time.time()
 
2241
        self.assertTrue(now-3 <= when <= now) # 3 seconds test range
 
2242
 
 
2243
        # check that file inside is ok
 
2244
        newfilepath = os.path.join(to_path, "file.txt")
 
2245
        self.assertFalse(path_exists(filepath))
 
2246
        self.assertTrue(path_exists(newfilepath))
 
2247
        mdobj = self.fsm.get_by_path(newfilepath)
 
2248
        self.assertEqual(mdobj.mdid, fileid)
 
2249
        self.assertEqual(mdobj.path, os.path.join('path2', 'file.txt'))
 
2250
 
 
2251
        # check the outer file
 
2252
        self.assertTrue(path_exists(otherfile))
 
2253
        mdobj = self.fsm.get_by_path(otherfile)
 
2254
        self.assertEqual(mdobj.path, "pa")
 
2255
 
 
2256
    def _delete_file(self):
 
2257
        """Helper to test that a file is deleted."""
 
2258
        testfile = os.path.join(self.share_path, "path")
 
2259
        open_file(testfile, "w").close()
 
2260
        mdid = self.fsm.create(testfile, "share")
 
2261
        self.fsm.set_node_id(testfile, "uuid")
 
2262
 
 
2263
        # delete the file
 
2264
        self.fsm.delete_file(testfile)
 
2265
        self.assertFalse(path_exists(testfile))
 
2266
        self.assert_no_metadata(mdid, testfile, "share", "uuid")
 
2267
 
 
2268
    def test_delete_file_directly(self):
 
2269
        """Really delete the file."""
 
2270
        config.get_user_config().set_use_trash(False)
 
2271
 
 
2272
        # check it was sent to trash, not just deleted
 
2273
        called = []
 
2274
        orig_call = filesystem_manager.remove_file
 
2275
        self.patch(filesystem_manager, 'remove_file',
 
2276
                   lambda path: called.append(True) or orig_call(path))
 
2277
 
 
2278
        self._delete_file()
 
2279
        self.assertTrue(called)
 
2280
 
 
2281
    def test_delete_file_trash(self):
 
2282
        """Move the file to trash."""
 
2283
        config.get_user_config().set_use_trash(True)
 
2284
 
 
2285
        # check it was sent to trash, not just deleted
 
2286
        called = []
 
2287
        orig_call = filesystem_manager.move_to_trash
 
2288
        self.patch(filesystem_manager, 'move_to_trash',
 
2289
                   lambda path: called.append(True) or orig_call(path))
 
2290
 
 
2291
        self._delete_file()
 
2292
        self.assertTrue(called)
 
2293
 
 
2294
    def _delete_dir(self):
 
2295
        """Helper to test that an empty dir is deleted."""
 
2296
        testdir = os.path.join(self.share.path, "path")
 
2297
        make_dir(testdir)
 
2298
        mdid = self.fsm.create(testdir, "share", is_dir=True)
 
2299
        self.fsm.set_node_id(testdir, "uuid")
 
2300
 
 
2301
        # try to delete the dir, but has files on it
 
2302
        open_file(os.path.join(testdir, "foo"), "w").close()
 
2303
        self.assertEqual(self.fsm.get_by_mdid(mdid).path, "path")
 
2304
        self.assertEqual(self.fsm.get_by_path(testdir).path, "path")
 
2305
        self.assertEqual(self.fsm.get_by_node_id("share", "uuid").path, "path")
 
2306
        remove_file(os.path.join(testdir, "foo"))
 
2307
 
 
2308
        # really delete the dir
 
2309
        self.fsm.delete_file(testdir)
 
2310
 
 
2311
        self.assertFalse(path_exists(testdir))
 
2312
        self.assert_no_metadata(mdid, testdir, "share", "uuid")
 
2313
 
 
2314
    def test_delete_dir_directly(self):
 
2315
        """Really delete the dir."""
 
2316
        config.get_user_config().set_use_trash(False)
 
2317
 
 
2318
        # check it was sent to trash, not just deleted
 
2319
        called = []
 
2320
        orig_call = filesystem_manager.remove_dir
 
2321
        self.patch(filesystem_manager, 'remove_dir',
 
2322
                   lambda path: called.append(True) or orig_call(path))
 
2323
 
 
2324
        self._delete_dir()
 
2325
        self.assertTrue(called)
 
2326
 
 
2327
    def test_delete_dir_trash(self):
 
2328
        """Move the dir to trash."""
 
2329
        config.get_user_config().set_use_trash(True)
 
2330
 
 
2331
        # check it was sent to trash, not just deleted
 
2332
        called = []
 
2333
        orig_call = filesystem_manager.move_to_trash
 
2334
        self.patch(filesystem_manager, 'move_to_trash',
 
2335
                   lambda path: called.append(True) or orig_call(path))
 
2336
 
 
2337
        self._delete_dir()
 
2338
        self.assertTrue(called)
 
2339
 
 
2340
    def _delete_dir_when_non_empty_and_no_modifications(self):
 
2341
        """Helper to test that a dir is deleted, non empty but ok to clean."""
 
2342
        local_dir = os.path.join(self.root_dir, "foo")
 
2343
        make_dir(local_dir)
 
2344
        mdid = self.fsm.create(local_dir, "", is_dir=True)
 
2345
        self.fsm.set_node_id(local_dir, "uuid")
 
2346
 
 
2347
        local_file = os.path.join(local_dir, "bar.txt")
 
2348
        open_file(local_file, 'w').close() # touch bar.txt so it exists
 
2349
        mdid_file = self.fsm.create(local_file, "")
 
2350
        self.fsm.set_node_id(local_file, "uuid_file")
 
2351
 
 
2352
        assert len(listdir(local_dir)) > 0 # local_dir is not empty
 
2353
        assert not self.fsm.local_changed(path=local_dir)
 
2354
 
 
2355
        self.fsm.delete_file(local_dir)
 
2356
 
 
2357
        self.assertFalse(path_exists(local_file))
 
2358
        self.assert_no_metadata(mdid_file, local_file, "", "uuid_file")
 
2359
 
 
2360
        self.assertFalse(path_exists(local_dir))
 
2361
        self.assert_no_metadata(mdid, local_dir, "", "uuid")
 
2362
 
 
2363
    def test_delete_nonempty_cleanable_dir_directly(self):
 
2364
        """Really delete the non empty but cleanable dir."""
 
2365
        config.get_user_config().set_use_trash(False)
 
2366
 
 
2367
        # check it was sent to trash, not just deleted
 
2368
        called = []
 
2369
        orig_call = filesystem_manager.remove_tree
 
2370
        self.patch(filesystem_manager, 'remove_tree',
 
2371
                   lambda path: called.append(True) or orig_call(path))
 
2372
 
 
2373
        self._delete_dir_when_non_empty_and_no_modifications()
 
2374
        self.assertTrue(called)
 
2375
 
 
2376
    def test_delete_nonempty_cleanable_dir_trash(self):
 
2377
        """Move the non empty but cleanable dir to trash."""
 
2378
        config.get_user_config().set_use_trash(True)
 
2379
 
 
2380
        # check it was sent to trash, not just deleted
 
2381
        called = []
 
2382
        orig_call = filesystem_manager.move_to_trash
 
2383
        self.patch(filesystem_manager, 'move_to_trash',
 
2384
                   lambda path: called.append(True) or orig_call(path))
 
2385
 
 
2386
        self._delete_dir_when_non_empty_and_no_modifications()
 
2387
        self.assertTrue(called)
 
2388
 
 
2389
    def test_delete_dir_when_non_empty_and_modifications_prior_delete(self):
 
2390
        """Test that a dir is deleted, when is not empty and modified."""
 
2391
        local_dir = os.path.join(self.root_dir, "foo")
 
2392
        make_dir(local_dir)
 
2393
        self.fsm.create(local_dir, "", is_dir=True)
 
2394
        self.fsm.set_node_id(local_dir, "uuid")
 
2395
 
 
2396
        local_file = os.path.join(local_dir, "bar.txt")
 
2397
        open_file(local_file, 'w').close() # touch bar.txt so it exists
 
2398
        mdid_file = self.fsm.create(local_file, "")
 
2399
        self.fsm.set_node_id(local_file, "uuid_file")
 
2400
        self.fsm.set_by_mdid(mdid_file, local_hash=98765)
 
2401
 
 
2402
        assert len(listdir(local_dir)) > 0 # local_dir is not empty
 
2403
        assert self.fsm.changed(path=local_file) == self.fsm.CHANGED_LOCAL
 
2404
        self.assertRaises(DirectoryNotRemovable,
 
2405
                          self.fsm.delete_file, local_dir)
 
2406
 
 
2407
    def test_delete_dir_when_non_empty_and_prior_conflict_on_file(self):
 
2408
        """Test that a dir is not deleted, when there is a conflicted file."""
 
2409
        # local directory
 
2410
        local_dir = os.path.join(self.root_dir, "foo")
 
2411
        make_dir(local_dir)
 
2412
        self.fsm.create(local_dir, "", is_dir=True)
 
2413
        self.fsm.set_node_id(local_dir, "uuid")
 
2414
 
 
2415
        local_file = os.path.join(local_dir,
 
2416
                                  "bar.txt" + self.fsm.CONFLICT_SUFFIX)
 
2417
        open_file(local_file, 'w').close() # touch bar.txt.u1conflict
 
2418
 
 
2419
        assert not local_file in self.fsm._idx_path
 
2420
        self.assertRaises(DirectoryNotRemovable,
 
2421
                          self.fsm.delete_file, local_dir)
 
2422
 
 
2423
        infos = [record.message for record in self.handler.records
 
2424
                 if record.levelname == 'INFO']
 
2425
        self.assertTrue(len(infos) == 1)
 
2426
        self.assertTrue(repr(local_file) in infos[0])
 
2427
 
 
2428
    def test_delete_dir_when_non_empty_and_prior_conflict_on_subdir(self):
 
2429
        """Test that a dir is not deleted, when there is a conflicted dir."""
 
2430
        # local directory
 
2431
        local_dir = os.path.join(self.root_dir, "foo")
 
2432
        make_dir(local_dir)
 
2433
        self.fsm.create(local_dir, "", is_dir=True)
 
2434
        self.fsm.set_node_id(local_dir, "uuid")
 
2435
 
 
2436
        local_subdir = os.path.join(local_dir,
 
2437
                                    "subdir_bar" + self.fsm.CONFLICT_SUFFIX)
 
2438
        make_dir(local_subdir)
 
2439
 
 
2440
        assert not local_subdir in self.fsm._idx_path
 
2441
        self.assertRaises(DirectoryNotRemovable,
 
2442
                          self.fsm.delete_file, local_dir)
 
2443
 
 
2444
        infos = [record.message for record in self.handler.records
 
2445
                 if record.levelname == 'INFO']
 
2446
        self.assertTrue(len(infos) == 1)
 
2447
        self.assertTrue(repr(local_subdir) in infos[0])
 
2448
 
 
2449
    def test_no_warning_on_log_file_when_recursive_delete(self):
 
2450
        """Test that sucessfully deleted dir does not log OSError."""
 
2451
        local_dir = os.path.join(self.root_dir, "foo")
 
2452
        make_dir(local_dir)
 
2453
        self.fsm.create(local_dir, "", is_dir=True)
 
2454
        self.fsm.set_node_id(local_dir, "uuid")
 
2455
 
 
2456
        local_file = os.path.join(local_dir, "bar.txt")
 
2457
        open_file(local_file, 'w').close() # touch bar.txt so it exists
 
2458
        self.fsm.create(local_file, "")
 
2459
        self.fsm.set_node_id(local_file, "uuid_file")
 
2460
 
 
2461
        previous = self.handler.records
 
2462
        self.fsm.delete_file(local_dir)
 
2463
 
 
2464
        # no logs were added
 
2465
        self.assertEquals(previous, self.handler.records)
 
2466
 
 
2467
    def test_warning_on_log_file_when_failing_delete(self):
 
2468
        """Test that sucessfully deleted dir does not log OSError."""
 
2469
 
 
2470
        local_dir = os.path.join(self.root_dir, "foo")
 
2471
        self.fsm.create(local_dir, "", is_dir=True)
 
2472
        self.fsm.set_node_id(local_dir, "uuid")
 
2473
 
 
2474
        # local_dir does not exist on the file system
 
2475
        self.fsm.delete_file(local_dir)
 
2476
 
 
2477
        warnings = [record.message for record in self.handler.records
 
2478
                    if record.levelname == 'WARNING']
 
2479
        self.assertTrue(len(warnings) == 1)
 
2480
        # On linux, we get a [Errno 2], but in windows, [Error 3]
 
2481
        self.assertTrue('OSError' in warnings[0])
 
2482
        self.assertTrue(repr(local_dir) in warnings[0])
 
2483
 
 
2484
    def test_move_dir_to_conflict(self):
 
2485
        """Test that the conflict to a dir removes children metadata."""
 
2486
        tdir = os.path.join(self.share_path, "adir")
 
2487
        mdid1 = self.fsm.create(tdir, "share", is_dir=True)
 
2488
        self.fsm.set_node_id(tdir, "uuid1")
 
2489
        make_dir(tdir)
 
2490
 
 
2491
        testfile = os.path.join(tdir, "path")
 
2492
        mdid2 = self.fsm.create(testfile, "share")
 
2493
        self.fsm.set_node_id(testfile, "uuid2")
 
2494
        with open_file(testfile, "w") as fh:
 
2495
            fh.write("test!")
 
2496
 
 
2497
        # move the dir to conflict, the file is still there, but with no MD
 
2498
        self.fsm.move_to_conflict(mdid1)
 
2499
        self.assertFalse(path_exists(tdir))
 
2500
        self.assertTrue(path_exists(tdir + self.fsm.CONFLICT_SUFFIX))
 
2501
        testfile = os.path.join(self.share_path,
 
2502
                                tdir + self.fsm.CONFLICT_SUFFIX, "path")
 
2503
        self.assertTrue(path_exists(testfile))
 
2504
        self.assertTrue(self.fsm.get_by_mdid(mdid1))
 
2505
        self.assert_no_metadata(mdid2, testfile, "share", "uuid2")
 
2506
 
 
2507
    def test_move_dir_to_conflict_similar_path(self):
 
2508
        """Test that the conflict to a dir removes children metadata."""
 
2509
        tdir1 = os.path.join(self.share_path, "adirectory")
 
2510
        mdid1 = self.fsm.create(tdir1, "share", is_dir=True)
 
2511
        self.fsm.set_node_id(tdir1, "uuid1")
 
2512
        make_dir(tdir1)
 
2513
 
 
2514
        tdir2 = os.path.join(self.share_path, "adir")
 
2515
        mdid2 = self.fsm.create(tdir2, "share", is_dir=True)
 
2516
        self.fsm.set_node_id(tdir2, "uuid2")
 
2517
        make_dir(tdir2)
 
2518
 
 
2519
        testfile = os.path.join(tdir2, "path")
 
2520
        mdid3 = self.fsm.create(testfile, "share")
 
2521
        self.fsm.set_node_id(testfile, "uuid3")
 
2522
        with open_file(testfile, "w") as fh:
 
2523
            fh.write("test!")
 
2524
 
 
2525
        # move the dir2 to conflict, see dir2 and file inside it went ok
 
2526
        self.fsm.move_to_conflict(mdid2)
 
2527
        self.assertFalse(path_exists(tdir2))
 
2528
        self.assertTrue(path_exists(tdir2 + self.fsm.CONFLICT_SUFFIX))
 
2529
        testfile = os.path.join(self.share_path,
 
2530
                                tdir2 + self.fsm.CONFLICT_SUFFIX, "path")
 
2531
        self.assertTrue(path_exists(testfile))
 
2532
        self.assertTrue(self.fsm.get_by_mdid(mdid2))
 
2533
        self.assertRaises(KeyError, self.fsm.get_by_mdid, mdid3)
 
2534
 
 
2535
        # and check that the one with similar path is untouched
 
2536
        self.assertTrue(path_exists(tdir1))
 
2537
        self.assertTrue(self.fsm.get_by_mdid(mdid1))
 
2538
 
 
2539
 
 
2540
class LimboTests(FSMTestCase):
 
2541
    """Test related to trash and move limbo."""
 
2542
 
 
2543
    def test_movelimbo_normal(self):
 
2544
        """Test that a node is sent to and removed from the move limbo."""
 
2545
        # move to limbo
 
2546
        self.fsm.add_to_move_limbo("share", "uuid", "old_parent",
 
2547
                                   "new_parent", "new_name", "pfrom", "pto")
 
2548
        d = {("share", "uuid"):
 
2549
             ("old_parent", "new_parent", "new_name", "pfrom", "pto")}
 
2550
        self.assertEqual(self.fsm.move_limbo, d)
 
2551
        r = [("share", "uuid", "old_parent", "new_parent",
 
2552
              "new_name", "pfrom", "pto")]
 
2553
        self.assertEqual(list(self.fsm.get_iter_move_limbo()), r)
 
2554
 
 
2555
        # remove from limbo
 
2556
        self.fsm.remove_from_move_limbo("share", "uuid")
 
2557
        self.assertEqual(self.fsm.move_limbo, {})
 
2558
        self.assertEqual(list(self.fsm.get_iter_move_limbo()), [])
 
2559
 
 
2560
    def test_movelimbo_no_paths(self):
 
2561
        """For old limbos, faked paths appear."""
 
2562
        # fake old limbo info (note: no paths!)
 
2563
        self.fsm.move_limbo = {
 
2564
            ("share", "uuid"): ("old_parent", "new_parent", "new_name"),
 
2565
        }
 
2566
        r = [("share", "uuid", "old_parent", "new_parent",
 
2567
              "new_name", "fake_path_from", "fake_path_to")]
 
2568
        self.assertEqual(list(self.fsm.get_iter_move_limbo()), r)
 
2569
 
 
2570
    def test_trash_normal(self):
 
2571
        """Test that a node is sent to and removed from trash."""
 
2572
        testfile = os.path.join(self.share_path, "path")
 
2573
        open_file(testfile, "w").close()
 
2574
        mdid = self.fsm.create(testfile, "share")
 
2575
        self.fsm.set_node_id(testfile, "uuid")
 
2576
        self.assertTrue(self.fsm.has_metadata(mdid=mdid))
 
2577
        self.assertEqual(self.fsm.trash, {})
 
2578
        self.assertEqual(list(self.fsm.get_iter_trash()), [])
 
2579
        self.assertFalse(self.fsm.node_in_trash("share", "uuid"))
 
2580
 
 
2581
        # delete to trash
 
2582
        self.fsm.delete_to_trash(mdid, "parent")
 
2583
        self.assertFalse(self.fsm.has_metadata(mdid=mdid))
 
2584
        self.assertEqual(self.fsm.trash,
 
2585
                         {("share", "uuid"):
 
2586
                                (mdid, "parent", testfile, False)})
 
2587
        self.assertEqual(list(self.fsm.get_iter_trash()),
 
2588
                         [("share", "uuid", "parent", testfile, False)])
 
2589
        self.assertTrue(self.fsm.node_in_trash("share", "uuid"))
 
2590
 
 
2591
        # remove from trash
 
2592
        self.fsm.remove_from_trash("share", "uuid")
 
2593
        self.assertFalse(self.fsm.has_metadata(mdid=mdid))
 
2594
        self.assertEqual(self.fsm.trash, {})
 
2595
        self.assertEqual(list(self.fsm.get_iter_trash()), [])
 
2596
        self.assertFalse(self.fsm.node_in_trash("share", "uuid"))
 
2597
 
 
2598
    def test_trash_older(self):
 
2599
        """get_iter_trash supports older trash (no is_dir)."""
 
2600
        self.fsm.trash  = {("share", "uuid"): ("mdid", "parent", "path1")}
 
2601
        self.assertEqual(list(self.fsm.get_iter_trash()),
 
2602
                         [("share", "uuid", "parent", "path1", False)])
 
2603
 
 
2604
    def test_trash_oldest(self):
 
2605
        """get_iter_trash supports oldest trash (no is_dir nor path)."""
 
2606
        self.fsm.trash  = {("share", "uuid"): ("mdid", "parent")}
 
2607
        self.assertEqual(list(self.fsm.get_iter_trash()),
 
2608
                         [("share", "uuid", "parent", "fake_unblocking_path",
 
2609
                           False)])
 
2610
 
 
2611
    def test_trash_with_node_in_none(self):
 
2612
        """Test that in trash is saved the marker if node_id is None."""
 
2613
        testfile = os.path.join(self.share_path, "path")
 
2614
        open_file(testfile, "w").close()
 
2615
        mdid = self.fsm.create(testfile, "share")
 
2616
 
 
2617
        # delete to trash and check the marker
 
2618
        self.fsm.delete_to_trash(mdid, "parent")
 
2619
        marker = MDMarker(mdid)
 
2620
        self.assertEqual(self.fsm.trash,
 
2621
                         {("share", marker):
 
2622
                                (mdid, "parent", testfile, False)})
 
2623
 
 
2624
    def test_dereference_ok_limbos_none(self):
 
2625
        """Limbos' markers ok dereferencing is fine if no marker at all."""
 
2626
        self.fsm.dereference_ok_limbos('nothing', "foo")
 
2627
 
 
2628
    def test_dereference_err_limbos_none(self):
 
2629
        """Limbos' markers err dereferencing is fine if no marker at all."""
 
2630
        self.fsm.dereference_err_limbos('nothing')
 
2631
 
 
2632
    def test_dereference_ok_trash_node(self):
 
2633
        """Dereference possible marker in trash, node."""
 
2634
        # set up
 
2635
        testfile = os.path.join(self.share_path, "path")
 
2636
        mdid = self.fsm.create(testfile, "share")
 
2637
        self.fsm.set_node_id(testfile, "mrkr")
 
2638
        self.fsm.delete_to_trash(mdid, "parent")
 
2639
 
 
2640
        # dereference and test
 
2641
        self.fsm.dereference_ok_limbos("mrkr", "final")
 
2642
        self.assertFalse(self.fsm.node_in_trash("share", "mrkr"))
 
2643
        self.assertTrue(self.fsm.node_in_trash("share", "final"))
 
2644
        self.assertTrue(self.handler.check_debug("dereference ok trash",
 
2645
                                                 "marker", "node"))
 
2646
 
 
2647
    def test_dereference_ok_trash_parent(self):
 
2648
        """Dereference possible marker in trash, parent."""
 
2649
        # set up
 
2650
        testfile = os.path.join(self.share_path, "path")
 
2651
        mdid = self.fsm.create(testfile, "share")
 
2652
        self.fsm.set_node_id(testfile, "node")
 
2653
        self.fsm.delete_to_trash(mdid, "mrkr")
 
2654
 
 
2655
        # dereference and test
 
2656
        self.fsm.dereference_ok_limbos("mrkr", "final")
 
2657
        self.assertEqual(self.fsm.trash[("share", "node")][1], "final")
 
2658
        self.assertTrue(self.handler.check_debug("dereference ok trash",
 
2659
                                                 "marker", "parent"))
 
2660
 
 
2661
    def test_dereference_ok_trash_parent_node(self):
 
2662
        """An unlinked node can be a parent of other."""
 
2663
        # set up one node
 
2664
        testfile = os.path.join(self.share_path, "path1")
 
2665
        mdid = self.fsm.create(testfile, "share")
 
2666
        self.fsm.set_node_id(testfile, "mrkr")
 
2667
        self.fsm.delete_to_trash(mdid, "parent")
 
2668
 
 
2669
        # set up child node
 
2670
        testfile = os.path.join(self.share_path, "path2")
 
2671
        mdid = self.fsm.create(testfile, "share")
 
2672
        self.fsm.set_node_id(testfile, "node")
 
2673
        self.fsm.delete_to_trash(mdid, "mrkr")
 
2674
 
 
2675
        # dereference and test
 
2676
        self.fsm.dereference_ok_limbos("mrkr", "final")
 
2677
        self.assertFalse(self.fsm.node_in_trash("share", "mrkr"))
 
2678
        self.assertTrue(self.fsm.node_in_trash("share", "final"))
 
2679
        self.assertEqual(self.fsm.trash[("share", "node")][1], "final")
 
2680
 
 
2681
    def test_dereference_err_trash_node(self):
 
2682
        """Dereference with error possible marker in trash, node."""
 
2683
        # set up
 
2684
        testfile = os.path.join(self.share_path, "path")
 
2685
        mdid = self.fsm.create(testfile, "share")
 
2686
        self.fsm.set_node_id(testfile, "mrkr")
 
2687
        self.fsm.delete_to_trash(mdid, "parent")
 
2688
 
 
2689
        # dereference and test
 
2690
        self.fsm.dereference_err_limbos("mrkr")
 
2691
        self.assertFalse(self.fsm.node_in_trash("share", "mrkr"))
 
2692
        self.assertTrue(self.handler.check_debug("dereference err trash", "marker"))
 
2693
 
 
2694
    def test_dereference_err_trash_parent(self):
 
2695
        """Dereference with error possible marker in trash, parent."""
 
2696
        # set up
 
2697
        testfile = os.path.join(self.share_path, "path")
 
2698
        mdid = self.fsm.create(testfile, "share")
 
2699
        self.fsm.set_node_id(testfile, "node")
 
2700
        self.fsm.delete_to_trash(mdid, "mrkr")
 
2701
 
 
2702
        # dereference and test
 
2703
        self.fsm.dereference_err_limbos("mrkr")
 
2704
        self.assertFalse(self.fsm.node_in_trash("share", "node"))
 
2705
        self.assertTrue(self.handler.check_debug(
 
2706
            "dereference err trash", "marker"))
 
2707
 
 
2708
    def test_dereference_err_trash_parent_node(self):
 
2709
        """An unlinked node can be a parent of other, both with failure."""
 
2710
        # set up one node
 
2711
        testfile = os.path.join(self.share_path, "path1")
 
2712
        mdid = self.fsm.create(testfile, "share")
 
2713
        self.fsm.set_node_id(testfile, "mrkr")
 
2714
        self.fsm.delete_to_trash(mdid, "parent")
 
2715
 
 
2716
        # set up child node
 
2717
        testfile = os.path.join(self.share_path, "path2")
 
2718
        mdid = self.fsm.create(testfile, "share")
 
2719
        self.fsm.set_node_id(testfile, "node")
 
2720
        self.fsm.delete_to_trash(mdid, "mrkr")
 
2721
 
 
2722
        # dereference and test
 
2723
        self.fsm.dereference_err_limbos("mrkr")
 
2724
        self.assertFalse(self.fsm.node_in_trash("share", "mrkr"))
 
2725
        self.assertFalse(self.fsm.node_in_trash("share", "node"))
 
2726
 
 
2727
    def test_dereference_ok_movelimbo_node(self):
 
2728
        """Dereference possible marker in move limbo, node."""
 
2729
        # set up
 
2730
        self.fsm.add_to_move_limbo("sh", "mrkr", "oldparent", "newparent", "x",
 
2731
                                   "path_from", "path_to")
 
2732
 
 
2733
        # dereference and test
 
2734
        self.fsm.dereference_ok_limbos("mrkr", "final")
 
2735
        self.assertFalse(("sh", "mrkr") in self.fsm.move_limbo)
 
2736
        self.assertTrue(("sh", "final") in self.fsm.move_limbo)
 
2737
        self.assertTrue(self.handler.check_debug("dereference ok move limbo",
 
2738
                                                 "marker", "node"))
 
2739
 
 
2740
    def test_dereference_ok_movelimbo_oldparent(self):
 
2741
        """Dereference possible marker in move limbo, oldparent."""
 
2742
        # set up
 
2743
        self.fsm.add_to_move_limbo("sh", "node", "mrkr", "newparent", "x",
 
2744
                                   "path_from", "path_to")
 
2745
 
 
2746
        # dereference and test
 
2747
        self.fsm.dereference_ok_limbos("mrkr", "final")
 
2748
        self.assertEqual(self.fsm.move_limbo[("sh", "node")],
 
2749
                         ("final", "newparent", "x", "path_from", "path_to"))
 
2750
        self.assertTrue(self.handler.check_debug("dereference ok move limbo",
 
2751
                                                 "marker", "old_parent"))
 
2752
 
 
2753
    def test_dereference_ok_movelimbo_newparent(self):
 
2754
        """Dereference possible marker in move limbo, newparent."""
 
2755
        # set up
 
2756
        self.fsm.add_to_move_limbo("sh", "node", "oldparent", "mrkr", "x",
 
2757
                                   "path_from", "path_to")
 
2758
 
 
2759
        # dereference and test
 
2760
        self.fsm.dereference_ok_limbos("mrkr", "final")
 
2761
        self.assertEqual(self.fsm.move_limbo[("sh", "node")],
 
2762
                         ("oldparent", "final", "x", "path_from", "path_to"))
 
2763
        self.assertTrue(self.handler.check_debug("dereference ok move limbo",
 
2764
                                                 "marker", "new_parent"))
 
2765
 
 
2766
    def test_dereference_ok_movelimbo_bothparents(self):
 
2767
        """Dereference possible marker in move limbo, both parents."""
 
2768
        # set up
 
2769
        self.fsm.add_to_move_limbo("sh", "node", "mrkr", "mrkr", "x",
 
2770
                                   "path_from", "path_to")
 
2771
 
 
2772
        # dereference and test
 
2773
        self.fsm.dereference_ok_limbos("mrkr", "final")
 
2774
        self.assertEqual(self.fsm.move_limbo[("sh", "node")],
 
2775
                         ("final", "final", "x", "path_from", "path_to"))
 
2776
 
 
2777
    def test_dereference_err_movelimbo_node(self):
 
2778
        """Dereference with error possible marker in move limbo, node."""
 
2779
        # set up
 
2780
        self.fsm.add_to_move_limbo("sh", "mrkr", "oldparent", "newparent", "x",
 
2781
                                   "path_from", "path_to")
 
2782
 
 
2783
        # dereference and test
 
2784
        self.fsm.dereference_err_limbos("mrkr")
 
2785
        self.assertFalse(("sh", "mrkr") in self.fsm.move_limbo)
 
2786
        self.assertTrue(self.handler.check_debug("dereference err move limbo",
 
2787
                                                 "marker"))
 
2788
 
 
2789
    def test_dereference_err_movelimbo_oldparent(self):
 
2790
        """Dereference with error possible marker in move limbo, oldparent."""
 
2791
        # set up
 
2792
        self.fsm.add_to_move_limbo("sh", "node", "mrkr", "newparent", "x",
 
2793
                                   "path_from", "path_to")
 
2794
 
 
2795
        # dereference and test
 
2796
        self.fsm.dereference_err_limbos("mrkr")
 
2797
        self.assertFalse(("sh", "node") in self.fsm.move_limbo)
 
2798
        self.assertTrue(self.handler.check_debug("dereference err move limbo",
 
2799
                                                 "marker"))
 
2800
 
 
2801
    def test_dereference_err_movelimbo_newparent(self):
 
2802
        """Dereference with error possible marker in move limbo, newparent."""
 
2803
        # set up
 
2804
        self.fsm.add_to_move_limbo("sh", "node", "oldparent", "mrkr", "x",
 
2805
                                   "path_from", "path_to")
 
2806
 
 
2807
        # dereference and test
 
2808
        self.fsm.dereference_err_limbos("mrkr")
 
2809
        self.assertFalse(("sh", "node") in self.fsm.move_limbo)
 
2810
        self.assertTrue(self.handler.check_debug("dereference err move limbo",
 
2811
                                                 "marker"))
 
2812
 
 
2813
    def test_dereference_err_movelimbo_bothparents(self):
 
2814
        """Dereference with error possible marker in move limbo, both."""
 
2815
        # set up
 
2816
        self.fsm.add_to_move_limbo("sh", "node", "mrkr", "mrkr", "x",
 
2817
                                   "path_from", "path_to")
 
2818
 
 
2819
        # dereference and test
 
2820
        self.fsm.dereference_err_limbos("mrkr")
 
2821
        self.assertFalse(("sh", "node") in self.fsm.move_limbo)
 
2822
 
 
2823
    def test_make_dir_all_ok(self):
 
2824
        """Create the dir, add a watch, mute the event."""
 
2825
        called = []
 
2826
 
 
2827
        def add_watch(path):
 
2828
            """Fake it."""
 
2829
            called.append(path)
 
2830
            return defer.succeed(True)
 
2831
 
 
2832
        self.eq.add_watch = add_watch
 
2833
        self.eq.add_to_mute_filter = lambda e, **a: called.append((e, a))
 
2834
        local_dir = os.path.join(self.root_dir, "foo")
 
2835
        mdid = self.fsm.create(local_dir, "", is_dir=True)
 
2836
 
 
2837
        # create the dir and check
 
2838
        self.fsm.make_dir(mdid)
 
2839
        self.assertTrue(os.path.isdir(local_dir))
 
2840
        self.assertEqual(called,
 
2841
                         [('FS_DIR_CREATE', dict(path=local_dir)), local_dir])
 
2842
 
 
2843
    def test_make_dir_not_a_file(self):
 
2844
        """Create the dir, add a watch."""
 
2845
        local_dir = os.path.join(self.root_dir, "foo")
 
2846
        mdid = self.fsm.create(local_dir, "", is_dir=False)
 
2847
        self.assertRaises(ValueError, self.fsm.make_dir, mdid)
 
2848
 
 
2849
    def test_make_dir_already_there(self):
 
2850
        """If the dir already exist, don't raise an error."""
 
2851
        local_dir = os.path.join(self.root_dir, "foo")
 
2852
        mdid = self.fsm.create(local_dir, "", is_dir=True)
 
2853
        make_dir(local_dir)
 
2854
        self.fsm.make_dir(mdid)
 
2855
        self.assertTrue(path_exists(local_dir))
 
2856
 
 
2857
    @defer.inlineCallbacks
 
2858
    def test_make_dir_in_ro_share(self):
 
2859
        """Also works in a read only share."""
 
2860
        share = yield self.create_share('ro_share_id', u'ro',
 
2861
                                        access_level='View')
 
2862
        testdir = os.path.join(share.path, "foo")
 
2863
        mdid = self.fsm.create(testdir, 'ro_share_id', is_dir=True)
 
2864
        self.fsm.make_dir(mdid)
 
2865
        self.assertTrue(path_exists(testdir))
 
2866
 
 
2867
    @defer.inlineCallbacks
 
2868
    def test_make_dir_ro_watch(self):
 
2869
        """Don't add the watch nor the mute on a RO share."""
 
2870
        called = []
 
2871
 
 
2872
        def add_watch(path):
 
2873
            """Fake it."""
 
2874
            called.append(path)
 
2875
            return defer.succeed(True)
 
2876
 
 
2877
        self.eq.add_watch = add_watch
 
2878
        self.eq.add_to_mute_filter = lambda *a: called.append(a)
 
2879
        share = yield self.create_share('ro_share_id', u'ro',
 
2880
                                        access_level='View')
 
2881
        testdir = os.path.join(share.path, "foo")
 
2882
        mdid = self.fsm.create(testdir, 'ro_share_id', is_dir=True)
 
2883
 
 
2884
        # create the dir and check
 
2885
        self.fsm.make_dir(mdid)
 
2886
        self.assertFalse(called)
 
2887
 
 
2888
 
 
2889
class SyntheticInfoTests(FSMTestCase):
 
2890
    """Test the methods that generates attributes."""
 
2891
 
 
2892
    def test_has_metadata(self):
 
2893
        """Test the has_metadata option."""
 
2894
        # not yet
 
2895
        self.assertFalse(self.fsm.has_metadata(path="path"))
 
2896
        self.assertFalse(self.fsm.has_metadata(node_id="uuid",
 
2897
                                                            share_id="share"))
 
2898
        self.assertFalse(self.fsm.has_metadata(mdid="garbage"))
 
2899
 
 
2900
        # path created
 
2901
        path = os.path.join(self.share.path, 'path')
 
2902
        mdid = self.fsm.create(path, "share")
 
2903
        self.assertTrue(self.fsm.has_metadata(path=path))
 
2904
        self.assertFalse(self.fsm.has_metadata(node_id="uuid",
 
2905
                                               share_id="share"))
 
2906
        self.assertTrue(self.fsm.has_metadata(mdid=mdid))
 
2907
 
 
2908
        # uuid set
 
2909
        self.fsm.set_node_id(path, "uuid")
 
2910
        self.assertTrue(self.fsm.has_metadata(path=path))
 
2911
        self.assertTrue(self.fsm.has_metadata(node_id="uuid",
 
2912
                                              share_id="share"))
 
2913
        self.assertTrue(self.fsm.has_metadata(mdid=mdid))
 
2914
        self.assertRaises(ValueError, self.fsm.has_metadata, node_id=None,
 
2915
                                                             share_id="share")
 
2916
 
 
2917
    def test_is_dir(self):
 
2918
        """Test the is_directory option."""
 
2919
        # standard file
 
2920
        testfiledir = os.path.join(self.share_path, "path1")
 
2921
        mdid = self.fsm.create(testfiledir, "share", is_dir=False)
 
2922
        self.fsm.set_node_id(testfiledir, "uuid1")
 
2923
        self.assertFalse(self.fsm.is_dir(path=testfiledir))
 
2924
        self.assertFalse(self.fsm.is_dir(node_id="uuid1", share_id="share"))
 
2925
        self.assertFalse(self.fsm.is_dir(mdid=mdid))
 
2926
 
 
2927
        # directory
 
2928
        testfiledir = os.path.join(self.share_path, "path2")
 
2929
        mdid = self.fsm.create(testfiledir, "share", is_dir=True)
 
2930
        self.fsm.set_node_id(testfiledir, "uuid2")
 
2931
        self.assertTrue(self.fsm.is_dir(path=testfiledir))
 
2932
        self.assertTrue(self.fsm.is_dir(node_id="uuid2", share_id="share"))
 
2933
        self.assertTrue(self.fsm.is_dir(mdid=mdid))
 
2934
 
 
2935
    def test_changed_server(self):
 
2936
        """Test the changed option when in SERVER state."""
 
2937
        # SERVER means: local_hash != server_hash and is_partial
 
2938
        testfile = os.path.join(self.share_path, "path")
 
2939
        mdid = self.fsm.create(testfile, "share")
 
2940
        partial_path = os.path.join(self.fsm.partials_dir,
 
2941
                            mdid + ".u1partial." + os.path.basename(testfile))
 
2942
        self.fsm.set_node_id(testfile, "uuid")
 
2943
 
 
2944
        # set conditions and test
 
2945
        self.fsm.set_by_mdid(mdid, server_hash=98765)
 
2946
        self.fsm.create_partial("uuid", "share")
 
2947
        # local_hash is None so far
 
2948
        self.assertEqual(self.fsm.changed(mdid=mdid), self.fsm.CHANGED_SERVER)
 
2949
        self.assertEqual(self.fsm.changed(node_id="uuid", share_id="share"),
 
2950
                        self.fsm.CHANGED_SERVER)
 
2951
        self.assertEqual(self.fsm.changed(path=testfile),
 
2952
                         self.fsm.CHANGED_SERVER)
 
2953
 
 
2954
        # remove the .partial by hand, to see it crash
 
2955
        remove_file(partial_path)
 
2956
        # pylint: disable-msg=W0212
 
2957
        self.assertRaises(InconsistencyError,
 
2958
                          self.fsm._check_partial, mdid=mdid)
 
2959
 
 
2960
    def test_changed_none(self):
 
2961
        """Test the changed option when in NONE state."""
 
2962
        # NONE means: local_hash == server_hash and is_partial == False
 
2963
        testfile = os.path.join(self.share_path, "path")
 
2964
        mdid = self.fsm.create(testfile, "share")
 
2965
        partial_path = os.path.join(self.fsm.partials_dir,
 
2966
                            mdid + ".u1partial." + os.path.basename(testfile))
 
2967
        self.fsm.set_node_id(testfile, "uuid")
 
2968
 
 
2969
        # all conditions are set: by default, local_hash and server_hash
 
2970
        # are both None
 
2971
        self.assertEqual(self.fsm.changed(mdid=mdid), self.fsm.CHANGED_NONE)
 
2972
        self.assertEqual(self.fsm.changed(node_id="uuid", share_id="share"),
 
2973
                        self.fsm.CHANGED_NONE)
 
2974
        self.assertEqual(self.fsm.changed(path=testfile), self.fsm.CHANGED_NONE)
 
2975
 
 
2976
        # put a .partial by hand, to see it crash
 
2977
        open_file(partial_path, "w").close()
 
2978
        # pylint: disable-msg=W0212
 
2979
        self.assertRaises(InconsistencyError,
 
2980
                          self.fsm._check_partial, mdid=mdid)
 
2981
 
 
2982
    def test_changed_local(self):
 
2983
        """Test the changed option when in LOCAL state."""
 
2984
        # LOCAL means: local_hash != server_hash and is not partial
 
2985
        testfile = os.path.join(self.share_path, "path")
 
2986
        mdid = self.fsm.create(testfile, "share")
 
2987
        partial_path = os.path.join(self.fsm.partials_dir,
 
2988
                            mdid + ".u1partial." + os.path.basename(testfile))
 
2989
        self.fsm.set_node_id(testfile, "uuid")
 
2990
 
 
2991
        # set conditions and test
 
2992
        self.fsm.set_by_mdid(mdid, server_hash=98765)
 
2993
        # local_hash is None so far
 
2994
        self.assertEqual(self.fsm.changed(mdid=mdid), self.fsm.CHANGED_LOCAL)
 
2995
        self.assertEqual(self.fsm.changed(node_id="uuid", share_id="share"),
 
2996
                        self.fsm.CHANGED_LOCAL)
 
2997
        self.assertEqual(self.fsm.changed(path=testfile),
 
2998
                         self.fsm.CHANGED_LOCAL)
 
2999
 
 
3000
        # put a .partial by hand, to see it crash
 
3001
        open_file(partial_path, "w").close()
 
3002
        # pylint: disable-msg=W0212
 
3003
        self.assertRaises(InconsistencyError,
 
3004
                          self.fsm._check_partial, mdid=mdid)
 
3005
 
 
3006
    def test_dir_content(self):
 
3007
        """Test the dir_content method."""
 
3008
        # create a structure in md
 
3009
        to_create = []
 
3010
        dir1 = os.path.join(self.share_path, "foo")
 
3011
        to_create.append((dir1, True))
 
3012
        to_create.append((os.path.join(dir1, "file2"), False))
 
3013
        to_create.append((os.path.join(dir1, "file1"), False))
 
3014
 
 
3015
        dir2 = os.path.join(dir1, "bar")
 
3016
        to_create.append((dir2, True))
 
3017
        to_create.append((os.path.join(dir2, "file3"), False))
 
3018
        to_create.append((os.path.join(dir2, "file5"), False))
 
3019
        to_create.append((os.path.join(dir2, "file4"), False))
 
3020
        to_create.append((os.path.join(dir2, "file6"), False))
 
3021
 
 
3022
        dir3 = os.path.join(dir2, "baz")
 
3023
        to_create.append((dir3, True))
 
3024
        to_create.append((os.path.join(dir3, "file7"), False))
 
3025
        to_create.append((os.path.join(dir3, "file9"), False))
 
3026
        to_create.append((os.path.join(dir3, "file8"), False))
 
3027
 
 
3028
        dir4 = os.path.join(dir2, "other")
 
3029
        to_create.append((dir4, True))
 
3030
 
 
3031
        for i, (path, is_dir) in enumerate(to_create):
 
3032
            self.fsm.create(path, "share", is_dir=is_dir)
 
3033
            self.fsm.set_node_id(path, "uuid" + str(i))
 
3034
 
 
3035
        # ask for the info for dir1
 
3036
        should_be = [
 
3037
            ("bar", True, "uuid3"),
 
3038
            ("file1", False, "uuid2"),
 
3039
            ("file2", False, "uuid1"),
 
3040
        ]
 
3041
        content = self.fsm.dir_content(dir1)
 
3042
        self.assertEqual(should_be, content)
 
3043
 
 
3044
        # ask for the info for dir2
 
3045
        should_be = [
 
3046
            ("baz", True, "uuid8"),
 
3047
            ("file3", False, "uuid4"),
 
3048
            ("file4", False, "uuid6"),
 
3049
            ("file5", False, "uuid5"),
 
3050
            ("file6", False, "uuid7"),
 
3051
            ("other", True, "uuid12"),
 
3052
        ]
 
3053
        content = self.fsm.dir_content(dir2)
 
3054
        self.assertEqual(should_be, content)
 
3055
 
 
3056
        # ask for the info for dir3
 
3057
        should_be = [
 
3058
            ("file7", False, "uuid9"),
 
3059
            ("file8", False, "uuid11"),
 
3060
            ("file9", False, "uuid10"),
 
3061
        ]
 
3062
        content = self.fsm.dir_content(dir3)
 
3063
        self.assertEqual(should_be, content)
 
3064
 
 
3065
        # ask for the info for an empty dir
 
3066
        content = self.fsm.dir_content(dir4)
 
3067
        self.assertEqual([], content)
 
3068
 
 
3069
        # ask for the info for an inexistant dir
 
3070
        self.assertRaises(KeyError, self.fsm.dir_content, "no-such-dir")
 
3071
 
 
3072
        # ask for the info for file
 
3073
        just_a_file = os.path.join(dir3, "file9")
 
3074
        self.assertRaises(ValueError, self.fsm.dir_content, just_a_file)
 
3075
 
 
3076
 
 
3077
class SharesTests(FSMTestCase):
 
3078
    """Test fsm with ro and rw shares."""
 
3079
 
 
3080
    @skip_if_win32_and_uses_readonly
 
3081
    @defer.inlineCallbacks
 
3082
    def test_file_ro_share_fail(self):
 
3083
        """ Test that manual creation of a file, fails on a ro-share. """
 
3084
        share = yield self.create_share('ro_share', u'ro_share_name',
 
3085
                                        access_level='View')
 
3086
        testfile = os.path.join(share.path, "a_file")
 
3087
        self.assertRaises(IOError, open_file, testfile, 'w')
 
3088
 
 
3089
    @defer.inlineCallbacks
 
3090
    def test_dir_ro_share(self):
 
3091
        """ Test that the creation of a file using fsm, works on a ro-share."""
 
3092
        share = yield self.create_share('ro_share', u'ro_share_name',
 
3093
                                        access_level='View')
 
3094
        testdir = os.path.join(share.path, "path2")
 
3095
        self.fsm.create(testdir, share.volume_id, is_dir=True)
 
3096
        self.fsm.set_node_id(testdir, "uuid2")
 
3097
        self.fsm.create_partial('uuid2', share.volume_id)
 
3098
        fd = self.fsm.get_partial_for_writing('uuid2', share.volume_id)
 
3099
        fd.flush()
 
3100
        fd.close()
 
3101
        self.assertTrue(path_exists(testdir))
 
3102
 
 
3103
    @defer.inlineCallbacks
 
3104
    def test_file_ro_share(self):
 
3105
        """ Test that the creation of a file using fsm, works on a ro-share."""
 
3106
        self.share = yield self.create_share('ro_share', u'ro_share_name',
 
3107
                                             access_level='View')
 
3108
        testfile = os.path.join(self.share.path, "a_file")
 
3109
        self.fsm.create(testfile, self.share.volume_id, is_dir=False)
 
3110
        self.fsm.set_node_id(testfile, "uuid3")
 
3111
        self.fsm.create_partial('uuid3', self.share.volume_id)
 
3112
        fd = self.fsm.get_partial_for_writing('uuid3', self.share.volume_id)
 
3113
        fd.flush()
 
3114
        fd.close()
 
3115
        self.fsm.commit_partial('uuid3', self.share.volume_id, None)
 
3116
        self.assertTrue(path_exists(testfile))
 
3117
 
 
3118
    @defer.inlineCallbacks
 
3119
    def test_delete_dir_ro_share(self):
 
3120
        """ Test that fsm is able to delete a dir in a ro.share. """
 
3121
        share = yield self.create_share('ro_share', u'ro_share_name',
 
3122
                                        access_level='View')
 
3123
        testdir = os.path.join(share.path, "path2")
 
3124
        self.fsm.create(testdir, share.volume_id, is_dir=True)
 
3125
        self.fsm.set_node_id(testdir, "uuid2")
 
3126
        self.fsm.create_partial('uuid2', share.volume_id)
 
3127
        fd = self.fsm.get_partial_for_writing('uuid2', share.volume_id)
 
3128
        fd.flush()
 
3129
        fd.close()
 
3130
        self.fsm.remove_partial('uuid2', share.volume_id)
 
3131
        self.assertTrue(path_exists(testdir))
 
3132
        self.fsm.delete_file(testdir)
 
3133
        self.assertFalse(path_exists(testdir))
 
3134
 
 
3135
    @defer.inlineCallbacks
 
3136
    def test_delete_non_empty_dir_ro_share(self):
 
3137
        """Test that fsm is able to delete a non-empty dir in a ro.share."""
 
3138
        share = yield self.create_share('ro_share', u'ro_share_name',
 
3139
                                        access_level='View')
 
3140
        testdir = os.path.join(share.path, "path2")
 
3141
        mdid = self.fsm.create(testdir, share.volume_id, is_dir=True)
 
3142
        self.fsm.set_node_id(testdir, "uuid2")
 
3143
        self.fsm.create_partial('uuid2', share.volume_id)
 
3144
        fd = self.fsm.get_partial_for_writing('uuid2', share.volume_id)
 
3145
        fd.flush()
 
3146
        fd.close()
 
3147
        self.fsm.remove_partial('uuid2', share.volume_id)
 
3148
        self.fsm.upload_finished(mdid, self.fsm.get_by_mdid(mdid).local_hash)
 
3149
        # crete a file inside the testdir
 
3150
        testfile = os.path.join(testdir, "a_file")
 
3151
        mdid = self.fsm.create(testfile, share.volume_id, is_dir=False)
 
3152
        self.fsm.set_node_id(testfile, "uuid3")
 
3153
        self.fsm.create_partial('uuid3', share.volume_id)
 
3154
        fd = self.fsm.get_partial_for_writing('uuid3', share.volume_id)
 
3155
        fd.flush()
 
3156
        fd.close()
 
3157
        self.fsm.commit_partial('uuid3', share.volume_id, None)
 
3158
        self.fsm.upload_finished(mdid, self.fsm.get_by_mdid(mdid).local_hash)
 
3159
        self.assertTrue(path_exists(testdir))
 
3160
        self.assertTrue(path_exists(testfile))
 
3161
        self.fsm.delete_file(testdir)
 
3162
        self.assertFalse(path_exists(testdir))
 
3163
        self.assertFalse(path_exists(testfile))
 
3164
 
 
3165
    @defer.inlineCallbacks
 
3166
    def test_delete_non_empty_dir_rw_share(self):
 
3167
        """Test that fsm is able to delete a non-empty dir in a rw.share."""
 
3168
        share = yield self.create_share('rw_share', u'rw_share_name',
 
3169
                                        access_level='Modify')
 
3170
        testdir = os.path.join(share.path, "path2")
 
3171
        mdid = self.fsm.create(testdir, share.volume_id, is_dir=True)
 
3172
        self.fsm.set_node_id(testdir, "uuid2")
 
3173
        self.fsm.create_partial('uuid2', share.volume_id)
 
3174
        fd = self.fsm.get_partial_for_writing('uuid2', share.volume_id)
 
3175
        fd.flush()
 
3176
        fd.close()
 
3177
        self.fsm.remove_partial('uuid2', share.volume_id)
 
3178
        self.fsm.upload_finished(mdid, self.fsm.get_by_mdid(mdid).local_hash)
 
3179
        # crete a file inside the testdir
 
3180
        testfile = os.path.join(testdir, "a_file")
 
3181
        mdid = self.fsm.create(testfile, share.volume_id, is_dir=False)
 
3182
        self.fsm.set_node_id(testfile, "uuid3")
 
3183
        self.fsm.create_partial('uuid3', share.volume_id)
 
3184
        fd = self.fsm.get_partial_for_writing('uuid3', share.volume_id)
 
3185
        fd.flush()
 
3186
        fd.close()
 
3187
        self.fsm.commit_partial('uuid3', share.volume_id, None)
 
3188
        self.fsm.upload_finished(mdid, self.fsm.get_by_mdid(mdid).local_hash)
 
3189
        self.assertTrue(path_exists(testdir))
 
3190
        self.assertTrue(path_exists(testfile))
 
3191
        self.fsm.delete_file(testdir)
 
3192
        self.assertFalse(path_exists(testdir))
 
3193
        self.assertFalse(path_exists(testfile))
 
3194
 
 
3195
    @skip_if_win32_and_uses_readonly
 
3196
    @defer.inlineCallbacks
 
3197
    def test_delete_non_empty_dir_bad_perms_rw_share(self):
 
3198
        """Test that fsm is able to delete a non-empty dir in a rw.share."""
 
3199
        share = yield self.create_share('rw_share', u'rw_share_name',
 
3200
                                        access_level='Modify')
 
3201
        testdir = os.path.join(share.path, "path2")
 
3202
        mdid = self.fsm.create(testdir, share.volume_id, is_dir=True)
 
3203
        self.fsm.set_node_id(testdir, "uuid2")
 
3204
        self.fsm.create_partial('uuid2', share.volume_id)
 
3205
        fd = self.fsm.get_partial_for_writing('uuid2', share.volume_id)
 
3206
        fd.flush()
 
3207
        fd.close()
 
3208
        self.fsm.remove_partial('uuid2', share.volume_id)
 
3209
        self.fsm.upload_finished(mdid, self.fsm.get_by_mdid(mdid).local_hash)
 
3210
        # crete a file inside the testdir
 
3211
        testfile = os.path.join(testdir, "a_file")
 
3212
        mdid = self.fsm.create(testfile, share.volume_id, is_dir=False)
 
3213
        self.fsm.set_node_id(testfile, "uuid3")
 
3214
        self.fsm.create_partial('uuid3', share.volume_id)
 
3215
        fd = self.fsm.get_partial_for_writing('uuid3', share.volume_id)
 
3216
        fd.flush()
 
3217
        fd.close()
 
3218
        self.fsm.commit_partial('uuid3', share.volume_id, None)
 
3219
        self.fsm.upload_finished(mdid, self.fsm.get_by_mdid(mdid).local_hash)
 
3220
        self.assertTrue(path_exists(testdir))
 
3221
        self.assertTrue(path_exists(testfile))
 
3222
 
 
3223
        # make the dir read-only, the error should be logged
 
3224
        set_dir_readonly(testdir)
 
3225
        self.addCleanup(set_dir_readwrite, testdir)
 
3226
 
 
3227
        self.fsm.delete_file(testdir)
 
3228
        self.assertTrue(self.handler.check_warning("OSError", testdir,
 
3229
                                                   "when trying to remove"))
 
3230
        self.assertTrue(path_exists(testdir))
 
3231
        self.assertTrue(path_exists(testfile))
 
3232
 
 
3233
    @defer.inlineCallbacks
 
3234
    def test_delete_file_ro_share(self):
 
3235
        """ Test that fsm is able to delete a file in a ro-share. """
 
3236
        self.share = yield self.create_share('ro_share', u'ro_share_name',
 
3237
                                        access_level='View')
 
3238
        testfile = os.path.join(self.share.path, "a_file")
 
3239
        self.fsm.create(testfile, self.share.volume_id, is_dir=False)
 
3240
        self.fsm.set_node_id(testfile, "uuid3")
 
3241
        self.fsm.create_partial('uuid3', self.share.volume_id)
 
3242
        fd = self.fsm.get_partial_for_writing('uuid3', self.share.volume_id)
 
3243
        fd.flush()
 
3244
        fd.close()
 
3245
        self.fsm.commit_partial('uuid3', self.share.volume_id, None)
 
3246
        self.assertTrue(path_exists(testfile))
 
3247
        self.fsm.delete_file(testfile)
 
3248
        self.assertFalse(path_exists(testfile))
 
3249
 
 
3250
    @defer.inlineCallbacks
 
3251
    def test_move_to_conflict_ro_share(self):
 
3252
        """ Test that fsm is able to handle move_to_conflict in a ro-share. """
 
3253
        self.share = yield self.create_share('ro_share', u'ro_share_name',
 
3254
                                             access_level='View')
 
3255
        testfile = os.path.join(self.share.path, "a_file")
 
3256
        file_mdid = self.fsm.create(testfile, self.share.volume_id,
 
3257
                                    is_dir=False)
 
3258
        self.fsm.set_node_id(testfile, "uuid3")
 
3259
        self.fsm.create_partial('uuid3', self.share.volume_id)
 
3260
        fd = self.fsm.get_partial_for_writing('uuid3', self.share.volume_id)
 
3261
        fd.flush()
 
3262
        fd.close()
 
3263
        self.fsm.commit_partial('uuid3', self.share.volume_id, None)
 
3264
        self.assertTrue(path_exists(testfile))
 
3265
        self.fsm.move_to_conflict(file_mdid)
 
3266
        self.assertTrue(path_exists(testfile + self.fsm.CONFLICT_SUFFIX))
 
3267
 
 
3268
    @defer.inlineCallbacks
 
3269
    def test_file_rw_share_no_fail(self):
 
3270
        """ Test that manual creation of a file, ona  rw-share. """
 
3271
        share = yield self.create_share('ro_share', u'ro_share_name')
 
3272
        testfile = os.path.join(share.path, "a_file")
 
3273
        open_file(testfile, 'w').close()
 
3274
        self.assertTrue(path_exists(testfile))
 
3275
 
 
3276
    @defer.inlineCallbacks
 
3277
    def test_dir_rw_share(self):
 
3278
        """ Test that the creation of a file using fsm, works on a rw-share."""
 
3279
        share = yield self.create_share('ro_share', u'ro_share_name')
 
3280
        testdir = os.path.join(share.path, "path2")
 
3281
        self.fsm.create(testdir, share.volume_id, is_dir=True)
 
3282
        self.fsm.set_node_id(testdir, "uuid2")
 
3283
        self.fsm.create_partial('uuid2', share.volume_id)
 
3284
        fd = self.fsm.get_partial_for_writing('uuid2', share.volume_id)
 
3285
        fd.flush()
 
3286
        fd.close()
 
3287
        self.assertTrue(path_exists(testdir))
 
3288
 
 
3289
    @defer.inlineCallbacks
 
3290
    def test_file_rw_share(self):
 
3291
        """Test that the creation of a file using fsm, works on a rw-share."""
 
3292
        self.share = yield self.create_share('ro_share', u'ro_share_name')
 
3293
        testfile = os.path.join(self.share.path, "a_file")
 
3294
        self.fsm.create(testfile, self.share.volume_id, is_dir=False)
 
3295
        self.fsm.set_node_id(testfile, "uuid3")
 
3296
        self.fsm.create_partial('uuid3', self.share.volume_id)
 
3297
        fd = self.fsm.get_partial_for_writing('uuid3', self.share.volume_id)
 
3298
        fd.flush()
 
3299
        fd.close()
 
3300
        self.fsm.commit_partial('uuid3', self.share.volume_id, None)
 
3301
        self.assertTrue(path_exists(testfile))
 
3302
 
 
3303
    def test_share_and_root(self):
 
3304
        """ Test the creation of a file with the same relative path in a share
 
3305
        and in the root.
 
3306
        """
 
3307
        a_dir_root = os.path.join(self.root_dir, "a_dir")
 
3308
        a_dir_share = os.path.join(self.share.path, "a_dir")
 
3309
        self.fsm.create(a_dir_root, "", is_dir=True)
 
3310
        self.fsm.set_node_id(a_dir_root, "uuid1")
 
3311
        self.fsm.create(a_dir_share, self.share.volume_id, is_dir=True)
 
3312
        self.fsm.set_node_id(a_dir_share, "uuid2")
 
3313
        self.fsm.create_partial('uuid1', "")
 
3314
        fd = self.fsm.get_partial_for_writing('uuid1', "")
 
3315
        fd.flush()
 
3316
        fd.close()
 
3317
        self.fsm.create_partial('uuid2', self.share.volume_id)
 
3318
        fd = self.fsm.get_partial_for_writing('uuid2', self.share.volume_id)
 
3319
        fd.flush()
 
3320
        fd.close()
 
3321
        self.assertTrue(path_exists(self.fsm.get_abspath("", a_dir_root)))
 
3322
        self.assertTrue(path_exists(a_dir_share))
 
3323
 
 
3324
 
 
3325
class TestEnableShareWrite(FSMTestCase):
 
3326
    """Tests for EnableShareWrite context manager"""
 
3327
 
 
3328
    @defer.inlineCallbacks
 
3329
    def setUp(self):
 
3330
        """Test setup"""
 
3331
        yield super(TestEnableShareWrite, self).setUp()
 
3332
        # create a ro share
 
3333
        self.share_ro = yield self.create_share('share_ro', u'share_ro_name',
 
3334
                                                access_level='View')
 
3335
        self.share_ro_path = self.share_ro.path
 
3336
 
 
3337
    @skip_if_win32_and_uses_readonly
 
3338
    def test_write_in_ro_share(self):
 
3339
        """Test the EnableShareWrite context manager in a ro share."""
 
3340
        path = os.path.join(self.share_ro_path, 'foo', 'a_file_in_a_ro_share')
 
3341
        data = 'yes I can write!'
 
3342
        can_write_parent = os.access(os.path.dirname(self.share_ro_path),
 
3343
                                     os.W_OK)
 
3344
        with EnableShareWrite(self.share_ro, path) as enabled:
 
3345
            self.assertTrue(enabled.ro)
 
3346
            with open_file(path, 'w') as f:
 
3347
                f.write(data)
 
3348
        self.assertEqual(data, open_file(path, 'r').read())
 
3349
        self.assertFalse(os.access(self.share_ro_path, os.W_OK))
 
3350
        # check that the parent permissions are ok
 
3351
        self.assertEqual(can_write_parent,
 
3352
                         os.access(os.path.dirname(self.share_ro_path),
 
3353
                                   os.W_OK))
 
3354
        # fail to write directly in the share
 
3355
        self.assertRaises(IOError, open, path, 'w')
 
3356
 
 
3357
    def test_write_in_rw_share(self):
 
3358
        """test the EnableShareWrite context manager in a rw share"""
 
3359
        path = os.path.join(self.share_path, 'a_file_in_a_rw_share')
 
3360
        data = 'yes I can write!'
 
3361
        can_write_parent = os.access(os.path.dirname(self.share_path), os.W_OK)
 
3362
        with EnableShareWrite(self.share, path) as enabled:
 
3363
            self.assertFalse(enabled.ro)
 
3364
            with open_file(path, 'w') as f:
 
3365
                f.write(data)
 
3366
        self.assertEquals(data, open_file(path, 'r').read())
 
3367
        self.assertTrue(os.access(self.share_path, os.W_OK))
 
3368
        # check that the parent permissions are ok
 
3369
        self.assertEquals(can_write_parent, os.access(self.share_path, os.W_OK))
 
3370
 
 
3371
 
 
3372
class RealVMTestCase(FSMTestCase):
 
3373
 
 
3374
    @defer.inlineCallbacks
 
3375
    def setUp(self):
 
3376
        """Setup the test."""
 
3377
        yield super(RealVMTestCase, self).setUp()
 
3378
        self.shares_dir = self.mktemp('shares')
 
3379
        self.root_dir = self.mktemp('root')
 
3380
        self.data_dir = self.mktemp("data")
 
3381
        self.partials_dir = self.mktemp("partials")
 
3382
        self.main = FakeMain(self.root_dir, self.shares_dir,
 
3383
                             self.data_dir, self.partials_dir)
 
3384
        self.addCleanup(self.main.shutdown)
 
3385
        self.fsm = self.main.fs
 
3386
        self.share = yield self.create_share('share', u'share_name')
 
3387
        self.share_path = self.share.path
 
3388
 
 
3389
    @defer.inlineCallbacks
 
3390
    def create_share(self, share_id, share_name,
 
3391
                     access_level='Modify'):
 
3392
        with allow_writes(self.shares_dir):
 
3393
            share = yield _create_share(share_id, share_name, self.fsm,
 
3394
                                        self.shares_dir, access_level)
 
3395
 
 
3396
        defer.returnValue(share)
 
3397
 
 
3398
    @skip_if_win32_and_uses_metadata_older_than_5
 
3399
    @defer.inlineCallbacks
 
3400
    def test_old_metadata_None_missing_share(self):
 
3401
        """test loading metadata v0. that points to a share that
 
3402
        we don't have
 
3403
        """
 
3404
        # create some stuff
 
3405
        path = os.path.join(self.share.path, 'path')
 
3406
        open_file(path, "w").close()
 
3407
        mdid = self.fsm.create(path, "share")
 
3408
        self.fsm.set_node_id(path, "uuid")
 
3409
        # create a path with the old layout
 
3410
        other_share = yield self.create_share('share1', u'share1_name')
 
3411
        share_mdid = self.fsm.create(other_share.path, "share1")
 
3412
        self.fsm.set_node_id(other_share.path, "uuid1")
 
3413
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
3414
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
3415
                            'Shared With Me')
 
3416
        old_path = os.path.join(old_shares_path, u'share1_name')
 
3417
        make_link(self.shares_dir, old_shares_path)
 
3418
 
 
3419
        # put the old path in the mdobj
 
3420
        share_md = self.fsm.fs[share_mdid]
 
3421
        share_md['path'] = old_path
 
3422
        self.fsm.fs[share_mdid] = share_md
 
3423
 
 
3424
        # break the node on purpose
 
3425
        real_mdobj = self.fsm.fs[mdid]
 
3426
        del real_mdobj["stat"]
 
3427
        real_mdobj["path"] = unicode(real_mdobj["path"])
 
3428
        real_mdobj["local_hash"] = None
 
3429
        real_mdobj["server_hash"] = None
 
3430
        self.fsm.fs[mdid] = real_mdobj
 
3431
 
 
3432
        # delete the version that should have left the previous fsm
 
3433
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3434
        remove_file(version_file)
 
3435
 
 
3436
        # create a old-style fs with the data:
 
3437
        old_fs = FileShelf(self.fsm.old_fs._path)
 
3438
        for k, v in self.fsm.fs.iteritems():
 
3439
            old_fs[k] = v
 
3440
 
 
3441
        # remove the share!
 
3442
        del self.fsm.vm.shares[other_share.volume_id]
 
3443
 
 
3444
        # start up again, and check
 
3445
        db = Tritcask(os.path.join(self.main.data_dir, 'tritcask.new'))
 
3446
        self.addCleanup(db.shutdown)
 
3447
        newfsm = FileSystemManager(self.data_dir, self.partials_dir,
 
3448
                                   self.fsm.vm, db)
 
3449
        md_version = open_file(version_file).read()
 
3450
        self.assertEqual(md_version, METADATA_VERSION)
 
3451
        newmdobj = newfsm.get_by_path(path)
 
3452
        self.assertEqual(newmdobj.mdid, mdid)
 
3453
        self.assertEqual(newmdobj.stat, stat_path(path))
 
3454
        self.assertEqual(newmdobj.local_hash, "")
 
3455
        self.assertEqual(newmdobj.server_hash, "")
 
3456
        self.assertTrue(isinstance(newmdobj.path, str))
 
3457
        self.assertTrue(other_share.path not in newfsm._idx_path)
 
3458
        self.assertFalse(old_path in self.fsm._idx_path)
 
3459
        self.assertFalse(old_path in newfsm._idx_path)
 
3460
        self.assertRaises(KeyError, newfsm.get_by_mdid, share_mdid)
 
3461
 
 
3462
    @skip_if_win32_and_uses_metadata_older_than_5
 
3463
    @defer.inlineCallbacks
 
3464
    def test_old_metadata_1_missing_share(self):
 
3465
        """test loading metadata v1. that points to a share that
 
3466
        we don't have
 
3467
        """
 
3468
        # create some stuff
 
3469
        path1 = os.path.join(self.share.path, 'path1')
 
3470
        path2 = os.path.join(self.share.path, 'path2')
 
3471
        mdid1 = self.fsm.create(path1, "share")
 
3472
        self.fsm.set_node_id(path1, "uuid1")
 
3473
        mdid2 = self.fsm.create(path2, "share")
 
3474
        self.fsm.set_node_id(path2, "uuid2")
 
3475
 
 
3476
        # create a path with the old layout
 
3477
        other_share = yield self.create_share('share1', u'share1_name')
 
3478
        share_mdid = self.fsm.create(other_share.path, "share1")
 
3479
        self.fsm.set_node_id(other_share.path, "uuid3")
 
3480
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
3481
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
3482
                            'Shared With Me')
 
3483
        old_path = os.path.join(old_shares_path, u'share1_name')
 
3484
        make_link(self.shares_dir, old_shares_path)
 
3485
 
 
3486
        # put the old path in the mdobj
 
3487
        share_md = self.fsm.fs[share_mdid]
 
3488
        share_md['path'] = old_path
 
3489
        self.fsm.fs[share_mdid] = share_md
 
3490
 
 
3491
        # break the node on purpose, with unicode valid and not
 
3492
        real_mdobj = self.fsm.fs[mdid1]
 
3493
        real_mdobj["path"] = unicode(real_mdobj["path"])
 
3494
        real_mdobj["local_hash"] = None
 
3495
        real_mdobj["server_hash"] = None
 
3496
        self.fsm.fs[mdid1] = real_mdobj
 
3497
        real_mdobj = self.fsm.fs[mdid2]
 
3498
        real_mdobj["path"] = "asdas\x00\xff\xffasd"
 
3499
        self.fsm.fs[mdid2] = real_mdobj
 
3500
 
 
3501
        # put the version file in 1
 
3502
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3503
        with open_file(version_file, "w") as fh:
 
3504
            fh.write("1")
 
3505
 
 
3506
        # create a old-style fs with the data:
 
3507
        old_fs = FileShelf(self.fsm.old_fs._path)
 
3508
        for k, v in self.fsm.fs.iteritems():
 
3509
            old_fs[k] = v
 
3510
 
 
3511
        # remove the share!
 
3512
        del self.fsm.vm.shares[other_share.volume_id]
 
3513
 
 
3514
        # start up again, and check
 
3515
        db = Tritcask(os.path.join(self.main.data_dir, 'tritcask.new'))
 
3516
        self.addCleanup(db.shutdown)
 
3517
        newfsm = FileSystemManager(self.data_dir, self.partials_dir,
 
3518
                                   self.fsm.vm, db)
 
3519
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3520
        md_version = open_file(version_file).read()
 
3521
        self.assertEqual(md_version, METADATA_VERSION)
 
3522
        # pylint: disable-msg=W0212
 
3523
        self.assertEqual(1, len(newfsm._idx_node_id))
 
3524
        self.assertEqual(2, len(newfsm._idx_path))
 
3525
        self.assertEquals('uuid1', newfsm.get_by_mdid(mdid1).node_id)
 
3526
        self.assertRaises(KeyError, newfsm.get_by_mdid, share_mdid)
 
3527
 
 
3528
    @skip_if_win32_and_uses_metadata_older_than_5
 
3529
    @defer.inlineCallbacks
 
3530
    def test_old_metadata_2_missing_share(self):
 
3531
        """test loading metadata v2. that points to a share that
 
3532
        we don't have
 
3533
        """
 
3534
        # create some stuff
 
3535
        path = os.path.join(self.share.path, 'path')
 
3536
        mdid = self.fsm.create(path, "share")
 
3537
        self.fsm.set_node_id(path, "uuid")
 
3538
        # create a path with the old layout
 
3539
        other_share = yield self.create_share('share1', u'share1_name')
 
3540
        share_mdid = self.fsm.create(other_share.path, "share1")
 
3541
        self.fsm.set_node_id(other_share.path, "uuid3")
 
3542
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
3543
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
3544
                            'Shared With Me')
 
3545
        old_path = os.path.join(old_shares_path, u'share1_name')
 
3546
        make_link(self.shares_dir, old_shares_path)
 
3547
 
 
3548
        # put the old path in the mdobj
 
3549
        share_md = self.fsm.fs[share_mdid]
 
3550
        share_md['path'] = old_path
 
3551
        self.fsm.fs[share_mdid] = share_md
 
3552
 
 
3553
        # break the node on purpose, with hashes in None
 
3554
        real_mdobj = self.fsm.fs[mdid]
 
3555
        real_mdobj["local_hash"] = None
 
3556
        real_mdobj["server_hash"] = None
 
3557
        self.fsm.fs[mdid] = real_mdobj
 
3558
 
 
3559
        # put the version file in 1
 
3560
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3561
        with open_file(version_file, "w") as fh:
 
3562
            fh.write("2")
 
3563
 
 
3564
        # create a old-style fs with the data:
 
3565
        old_fs = FileShelf(self.fsm.old_fs._path)
 
3566
        for k, v in self.fsm.fs.iteritems():
 
3567
            old_fs[k] = v
 
3568
 
 
3569
        # remove the share!
 
3570
        del self.fsm.vm.shares[other_share.volume_id]
 
3571
 
 
3572
        # start up again, and check
 
3573
        db = Tritcask(os.path.join(self.main.data_dir, 'tritcask.new'))
 
3574
        self.addCleanup(db.shutdown)
 
3575
        newfsm = FileSystemManager(self.data_dir, self.partials_dir,
 
3576
                                   self.fsm.vm, db)
 
3577
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3578
        md_version = open_file(version_file).read()
 
3579
        self.assertEqual(md_version, METADATA_VERSION)
 
3580
        self.assertTrue(newfsm.get_by_mdid(mdid) is not None)
 
3581
        # pylint: disable-msg=W0212
 
3582
        self.assertEqual(1, len(newfsm._idx_node_id))
 
3583
        self.assertEqual(2, len(newfsm._idx_path))
 
3584
        self.assertRaises(KeyError, newfsm.get_by_mdid, share_mdid)
 
3585
 
 
3586
    @skip_if_win32_and_uses_metadata_older_than_5
 
3587
    @defer.inlineCallbacks
 
3588
    def test_old_metadata_3_missing_share(self):
 
3589
        """test loading metadata v3. that points to a share that
 
3590
        we don't have
 
3591
        """
 
3592
        # create a path with the old layout and metadata
 
3593
        # the root
 
3594
        root_mdid = self.fsm.get_by_path(self.root_dir).mdid
 
3595
        self.fsm.set_node_id(self.root_dir, "uuid")
 
3596
        # a share
 
3597
        other_share = yield self.create_share('share1', u'share1_name')
 
3598
        share_mdid = self.fsm.create(other_share.path, "share1")
 
3599
        self.fsm.set_node_id(other_share.path, "uuid1")
 
3600
        make_dir(os.path.join(self.root_dir, 'Ubuntu One'), recursive=True)
 
3601
        old_shares_path = os.path.join(self.root_dir, 'Ubuntu One',
 
3602
                            'Shared With Me')
 
3603
        old_path = os.path.join(old_shares_path, u'share1_name')
 
3604
        make_link(self.shares_dir, old_shares_path)
 
3605
        old_root_path = os.path.join(os.path.dirname(self.root_dir),
 
3606
                                     'Ubuntu One', 'My Files')
 
3607
 
 
3608
        # put the old path in the mdobjs
 
3609
        share_md = self.fsm.fs[share_mdid]
 
3610
        share_md['path'] = old_path
 
3611
        self.fsm.fs[share_mdid] = share_md
 
3612
        root_md = self.fsm.fs[root_mdid]
 
3613
        root_md['path'] = old_root_path
 
3614
        self.fsm.fs[root_mdid] = root_md
 
3615
 
 
3616
        # put the version file in 1
 
3617
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3618
        with open_file(version_file, "w") as fh:
 
3619
            fh.write("3")
 
3620
 
 
3621
        # create a old-style fs with the data:
 
3622
        old_fs = FileShelf(self.fsm.old_fs._path)
 
3623
        for k, v in self.fsm.fs.iteritems():
 
3624
            old_fs[k] = v
 
3625
 
 
3626
        # remove the share!
 
3627
        del self.fsm.vm.shares[other_share.volume_id]
 
3628
 
 
3629
        # start up again, and check
 
3630
        db = Tritcask(os.path.join(self.main.data_dir, 'tritcask.new'))
 
3631
        self.addCleanup(db.shutdown)
 
3632
        newfsm = FileSystemManager(self.data_dir, self.partials_dir,
 
3633
                                   self.fsm.vm, db)
 
3634
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3635
        md_version = open_file(version_file).read()
 
3636
        self.assertEqual(md_version, METADATA_VERSION)
 
3637
        self.assertTrue(newfsm.get_by_mdid(root_mdid) is not None)
 
3638
        # pylint: disable-msg=W0212
 
3639
        self.assertEqual(1, len(newfsm._idx_node_id))
 
3640
        self.assertEqual(1, len(newfsm._idx_path))
 
3641
        self.assertRaises(KeyError, newfsm.get_by_mdid, share_mdid)
 
3642
 
 
3643
    @defer.inlineCallbacks
 
3644
    def test_metadata_missing_share(self):
 
3645
        """test loading current metadata that points to a share
 
3646
        that we don't have
 
3647
        """
 
3648
        md_version = open_file(os.path.join(self.data_dir,
 
3649
                                       "metadata_version")).read()
 
3650
        self.assertEqual(md_version, METADATA_VERSION)
 
3651
        path = os.path.join(self.share.path, 'path')
 
3652
        path1 = os.path.join(self.share.path, 'path1')
 
3653
        other_share = yield self.create_share('share1', u'share1_name')
 
3654
 
 
3655
        path2 = os.path.join(other_share.path, 'broken_path2')
 
3656
        for p in [path, path1, path2]:
 
3657
            open_file(p, "w").close()
 
3658
        mdid = self.fsm.create(path, "share")
 
3659
        self.fsm.set_node_id(path, "uuid")
 
3660
        mdid1 = self.fsm.create(path1, "share")
 
3661
        self.fsm.set_node_id(path1, "uuid1")
 
3662
        mdid2 = self.fsm.create(path2, "share1")
 
3663
        self.fsm.set_node_id(path2, "uuid2")
 
3664
 
 
3665
        # remove the share!
 
3666
        del self.fsm.vm.shares[other_share.volume_id]
 
3667
 
 
3668
        # start up again, and check
 
3669
        newfsm = FileSystemManager(self.data_dir, self.partials_dir,
 
3670
                                   self.fsm.vm, self.main.db)
 
3671
        version_file = os.path.join(self.data_dir, "metadata_version")
 
3672
        md_version = open_file(version_file).read()
 
3673
        self.assertEqual(md_version, METADATA_VERSION)
 
3674
        self.assertTrue(newfsm.get_by_mdid(mdid) is not None)
 
3675
        # pylint: disable-msg=W0212
 
3676
        self.assertEqual(2, len(newfsm._idx_node_id))
 
3677
        self.assertEqual(3, len(newfsm._idx_path))
 
3678
        # check that the broken mdid's load the old metadata
 
3679
        self.assertEquals('uuid', newfsm.get_by_mdid(mdid).node_id)
 
3680
        self.assertEquals('uuid1', newfsm.get_by_mdid(mdid1).node_id)
 
3681
        self.assertRaises(KeyError, newfsm.get_by_mdid, mdid2)
 
3682
 
 
3683
 
 
3684
class PathsStartingWithTestCase(FSMTestCase):
 
3685
    """Test FSM.get_paths_starting_with utility."""
 
3686
 
 
3687
    @defer.inlineCallbacks
 
3688
    def setUp(self):
 
3689
        """Basic setup."""
 
3690
        yield super(PathsStartingWithTestCase, self).setUp()
 
3691
 
 
3692
        self.some_dir = os.path.join(self.root_dir, 'foo')
 
3693
        self.sub_dir = os.path.join(self.some_dir, 'baz')
 
3694
        self.some_file = os.path.join(self.sub_dir, 'bar.txt')
 
3695
 
 
3696
        for d in (self.some_dir, self.sub_dir):
 
3697
            make_dir(d)
 
3698
            self.addCleanup(self.rmtree, d)
 
3699
            self.fsm.create(d, '', is_dir=True)
 
3700
            self.fsm.set_node_id(d, 'uuid')
 
3701
 
 
3702
        open_file(self.some_file, 'w').close()
 
3703
        self.fsm.create(self.some_file, "")
 
3704
        self.fsm.set_node_id(self.some_file, "uuid_file")
 
3705
 
 
3706
    def test_with_self(self):
 
3707
        """Check paths starting with including some_dir."""
 
3708
        expected = sorted([(self.some_dir, True), (self.sub_dir, True),
 
3709
                           (self.some_file, False)])
 
3710
        actual = self.fsm.get_paths_starting_with(self.some_dir)
 
3711
        self.assertEqual(expected, sorted(actual))
 
3712
 
 
3713
    def test_dir_names_only(self):
 
3714
        """Check paths starting with excluding directories with same prefix."""
 
3715
        similar_dir = os.path.join(self.root_dir, 'fooo')
 
3716
        make_dir(similar_dir)
 
3717
        self.fsm.create(similar_dir, '', is_dir=True)
 
3718
        self.fsm.set_node_id(similar_dir, 'uuid')
 
3719
 
 
3720
        expected = sorted([(self.some_dir, True), (self.sub_dir, True),
 
3721
                           (self.some_file, False)])
 
3722
        actual = self.fsm.get_paths_starting_with(self.some_dir)
 
3723
 
 
3724
        self.assertEqual(expected, sorted(actual))
 
3725
 
 
3726
    def test_without_self(self):
 
3727
        """Check paths starting with excluding some_dir."""
 
3728
        expected = sorted([(self.sub_dir, True), (self.some_file, False)])
 
3729
        actual = self.fsm.get_paths_starting_with(self.some_dir,
 
3730
                                                  include_base=False)
 
3731
        self.assertEqual(expected, sorted(actual))
 
3732
 
 
3733
 
 
3734
class ServerRescanDataTestCase(FSMTestCase):
 
3735
    """Test FSM services to get server rescan data."""
 
3736
 
 
3737
    def test_get_for_server_rescan_by_path(self):
 
3738
        """Test FSM.get_for_server_rescan_by_path method"""
 
3739
        # create the share fsm object
 
3740
        self.fsm.create(self.share_path, self.share.volume_id)
 
3741
        self.fsm.set_node_id(self.share_path, "share_uuid")
 
3742
        # create a few nodes
 
3743
        path1 = os.path.join(self.share_path, "path1")
 
3744
        path2 = os.path.join(self.share_path, "path1", "path2")
 
3745
        path_out = os.path.join(self.root_dir, "path1")
 
3746
        self.fsm.create(path1, "share", is_dir=True)
 
3747
        self.fsm.create(path2, "share")
 
3748
        self.fsm.create(path_out, "")
 
3749
        self.fsm.set_node_id(path1, "uuid1")
 
3750
        self.fsm.set_node_id(path2, "uuid2")
 
3751
        self.fsm.set_node_id(path_out, "uuid3")
 
3752
        data = list(self.fsm.get_for_server_rescan_by_path(self.share.path))
 
3753
        self.assertEquals(len(data), 3)
 
3754
        self.assertTrue(("share", "uuid1", "") in data)
 
3755
        self.assertTrue(("share", "uuid2", "") in data)
 
3756
        self.assertTrue(("share", "share_uuid", "") in data)
 
3757
        self.assertFalse(("", "uuid3", "") in data)
 
3758
 
 
3759
 
 
3760
class MutingTestCase(FSMTestCase):
 
3761
    """Test FSM interaction with mutes."""
 
3762
 
 
3763
    @defer.inlineCallbacks
 
3764
    def setUp(self):
 
3765
        """Set up the test infrastructure."""
 
3766
        yield super(MutingTestCase, self).setUp()
 
3767
        self.muted = []
 
3768
 
 
3769
        # in-the-middle add
 
3770
        self._orig_add_mute = self.eq.add_to_mute_filter
 
3771
        def _fake_add(event, **data):
 
3772
            """Store what is added."""
 
3773
            self.muted.append((event, data))
 
3774
            return self._orig_add_mute(event, **data)
 
3775
        self.eq.add_to_mute_filter = _fake_add
 
3776
 
 
3777
        # in-the-middle remove
 
3778
        self._orig_rm_mute = self.eq.rm_from_mute_filter
 
3779
        def _fake_rm(event, **data):
 
3780
            """Store what is deleted."""
 
3781
            self.muted.remove((event, data))
 
3782
            return self._orig_rm_mute(event, **data)
 
3783
        self.eq.rm_from_mute_filter = _fake_rm
 
3784
 
 
3785
    @defer.inlineCallbacks
 
3786
    def tearDown(self):
 
3787
        """Put original stuff back in."""
 
3788
        self.eq.add_to_mute_filter = self._orig_add_mute
 
3789
        self.eq.rm_from_mute_filter = self._orig_rm_mute
 
3790
        yield super(MutingTestCase, self).tearDown()
 
3791
 
 
3792
    def test_movefile_ok(self):
 
3793
        """Move file adds a mute filter."""
 
3794
        path1 = os.path.join(self.share.path, "thisfile1")
 
3795
        path2 = os.path.join(self.share.path, "thisfile2")
 
3796
        open_file(path1, "w").close()
 
3797
        self.create_node(path1)
 
3798
 
 
3799
        # move and check
 
3800
        self.fsm.move_file("share", path1, path2)
 
3801
        self.assertEqual(self.muted, [('FS_FILE_MOVE',
 
3802
                                      dict(path_from=path1, path_to=path2))])
 
3803
 
 
3804
    def test_movefile_error(self):
 
3805
        """Move file adds and removes a mute filter."""
 
3806
        path1 = os.path.join(self.share.path, "thisfile1")
 
3807
        path2 = os.path.join(self.share.path, "thisfile2")
 
3808
        self.create_node(path1)
 
3809
 
 
3810
        # move and check
 
3811
        self.fsm.move_file("share", path1, path2)
 
3812
        self.assertEqual(self.muted, [])
 
3813
 
 
3814
    def test_movedir_ok(self):
 
3815
        """Move dir adds a mute filter."""
 
3816
        path1 = os.path.join(self.share.path, "thisfile1")
 
3817
        path2 = os.path.join(self.share.path, "thisfile2")
 
3818
        make_dir(path1)
 
3819
        self.create_node(path1, is_dir=True)
 
3820
 
 
3821
        # move and check
 
3822
        self.fsm.move_file("share", path1, path2)
 
3823
        self.assertEqual(self.muted, [('FS_DIR_MOVE',
 
3824
                                      dict(path_from=path1, path_to=path2))])
 
3825
 
 
3826
    def test_movedir_error(self):
 
3827
        """Move dir adds and removes a mute filter."""
 
3828
        path1 = os.path.join(self.share.path, "thisfile1")
 
3829
        path2 = os.path.join(self.share.path, "thisfile2")
 
3830
        self.create_node(path1, is_dir=True)
 
3831
 
 
3832
        # move and check
 
3833
        self.fsm.move_file("share", path1, path2)
 
3834
        self.assertEqual(self.muted, [])
 
3835
 
 
3836
    def test_deletefile_ok(self):
 
3837
        """Delete file adds a mute filter."""
 
3838
        testfile = os.path.join(self.share_path, "path")
 
3839
        open_file(testfile, "w").close()
 
3840
        self.fsm.create(testfile, "share")
 
3841
        self.fsm.set_node_id(testfile, "uuid")
 
3842
 
 
3843
        # delete and check
 
3844
        self.fsm.delete_file(testfile)
 
3845
        self.assertEqual(self.muted, [('FS_FILE_DELETE', dict(path=testfile))])
 
3846
 
 
3847
    def test_deletefile_error(self):
 
3848
        """Delete file adds and removes a mute filter."""
 
3849
        testfile = os.path.join(self.share_path, "path")
 
3850
        self.fsm.create(testfile, "share")
 
3851
        self.fsm.set_node_id(testfile, "uuid")
 
3852
 
 
3853
        # delete and check
 
3854
        self.fsm.delete_file(testfile)
 
3855
        self.assertEqual(self.muted, [])
 
3856
 
 
3857
    def test_deletedir_ok(self):
 
3858
        """Delete dir adds a mute filter."""
 
3859
        testfile = os.path.join(self.share_path, "path")
 
3860
        make_dir(testfile)
 
3861
        self.fsm.create(testfile, "share", is_dir=True)
 
3862
        self.fsm.set_node_id(testfile, "uuid")
 
3863
 
 
3864
        # delete and check
 
3865
        self.fsm.delete_file(testfile)
 
3866
        self.assertEqual(self.muted, [('FS_DIR_DELETE', dict(path=testfile))])
 
3867
 
 
3868
    def test_deletedir_error(self):
 
3869
        """Delete dir adds and removes a mute filter."""
 
3870
        testfile = os.path.join(self.share_path, "path")
 
3871
        self.fsm.create(testfile, "share", is_dir=True)
 
3872
        self.fsm.set_node_id(testfile, "uuid")
 
3873
 
 
3874
        # delete and check
 
3875
        self.fsm.delete_file(testfile)
 
3876
        self.assertEqual(self.muted, [])
 
3877
 
 
3878
    def test_conflict_movefile_ok(self):
 
3879
        """Move file when conflict adds a mute filter.
 
3880
 
 
3881
        The muted event is a DELETE, because the .u1conflict is
 
3882
        ignored (and that transforms the MOVE into DELETE).
 
3883
        """
 
3884
        path1 = os.path.join(self.share.path, "thisfile1")
 
3885
        open_file(path1, "w").close()
 
3886
        mdobj = self.create_node(path1)
 
3887
 
 
3888
        # move and check
 
3889
        self.fsm.move_to_conflict(mdobj.mdid)
 
3890
        self.assertEqual(self.muted, [('FS_FILE_DELETE', dict(path=path1))])
 
3891
 
 
3892
    def test_conflict_movefile_error(self):
 
3893
        """Move file when conflict adds and removes a mute filter."""
 
3894
        path1 = os.path.join(self.share.path, "thisfile1")
 
3895
        mdobj = self.create_node(path1)
 
3896
 
 
3897
        # move and check
 
3898
        self.fsm.move_to_conflict(mdobj.mdid)
 
3899
        self.assertEqual(self.muted, [])
 
3900
 
 
3901
    def test_conflict_movedir_ok(self):
 
3902
        """Move dir when conflict adds a mute filter.
 
3903
 
 
3904
        The muted event is a DELETE, because the .u1conflict is
 
3905
        ignored (and that transforms the MOVE into DELETE).
 
3906
        """
 
3907
        path1 = os.path.join(self.share.path, "thisfile1")
 
3908
        make_dir(path1)
 
3909
        mdobj = self.create_node(path1, is_dir=True)
 
3910
 
 
3911
        # move and check
 
3912
        self.fsm.move_to_conflict(mdobj.mdid)
 
3913
        self.assertEqual(self.muted, [('FS_DIR_DELETE', dict(path=path1))])
 
3914
 
 
3915
    def test_conflict_movedir_error(self):
 
3916
        """Move dir when conflict adds and removes a mute filter."""
 
3917
        path1 = os.path.join(self.share.path, "thisfile1")
 
3918
        mdobj = self.create_node(path1, is_dir=True)
 
3919
 
 
3920
        # move and check
 
3921
        self.fsm.move_to_conflict(mdobj.mdid)
 
3922
        self.assertEqual(self.muted, [])
 
3923
 
 
3924
 
 
3925
class DirtyNodesTests(FSMTestCase):
 
3926
    """Test all related to dirty nodes."""
 
3927
 
 
3928
    def test_get_set_dirty(self):
 
3929
        """Dirty flag is allowed to be set."""
 
3930
        path = os.path.join(self.root_dir, "path")
 
3931
        mdid = self.fsm.create(path, "")
 
3932
 
 
3933
        self.fsm.set_by_mdid(mdid, dirty=True)
 
3934
        self.assertTrue(self.fsm.get_by_mdid(mdid).dirty)
 
3935
 
 
3936
    def test_get_dirty_nodes_allempty(self):
 
3937
        """No dirty nodes when no nodes at all."""
 
3938
        self.assertEqual(list(self.fsm.get_dirty_nodes()), [])
 
3939
 
 
3940
    def test_get_dirty_nodes_all_ok(self):
 
3941
        """No dirty nodes when other nodes are ok."""
 
3942
        # create a node but keep it clean
 
3943
        path = os.path.join(self.root_dir, "path")
 
3944
        self.fsm.create(path, "")
 
3945
 
 
3946
        self.assertEqual(list(self.fsm.get_dirty_nodes()), [])
 
3947
 
 
3948
    def test_get_dirty_nodes_mixed(self):
 
3949
        """Some dirty nodes betweeen the others."""
 
3950
        # create a node but keep it clean
 
3951
        path1 = os.path.join(self.root_dir, "path1")
 
3952
        self.fsm.create(path1, "")
 
3953
        path2 = os.path.join(self.root_dir, "path2")
 
3954
        mdid2 = self.fsm.create(path2, "")
 
3955
        path3 = os.path.join(self.root_dir, "path3")
 
3956
        self.fsm.create(path3, "")
 
3957
        path4 = os.path.join(self.root_dir, "path4")
 
3958
        mdid4 = self.fsm.create(path4, "")
 
3959
 
 
3960
        # dirty some
 
3961
        self.fsm.set_by_mdid(mdid2, dirty=True)
 
3962
        self.fsm.set_by_mdid(mdid4, dirty=True)
 
3963
 
 
3964
        # get and compare
 
3965
        all_dirty = list(self.fsm.get_dirty_nodes())
 
3966
        dirty_mdids = [n.mdid for n in all_dirty]
 
3967
        self.assertEqual(len(all_dirty), 2)
 
3968
        self.assertTrue(mdid2 in dirty_mdids)
 
3969
        self.assertTrue(mdid4 in dirty_mdids)
 
3970
 
 
3971
 
 
3972
class TrashFileShelfTests(BaseTwistedTestCase):
 
3973
    """Test the customized file shelf."""
 
3974
 
 
3975
    @defer.inlineCallbacks
 
3976
    def setUp(self):
 
3977
        """Set up."""
 
3978
        yield super(TrashFileShelfTests, self).setUp()
 
3979
        self.trash_dir = self.mktemp('trash')
 
3980
        self.tfs = TrashFileShelf(self.trash_dir)
 
3981
 
 
3982
    def test_one_value(self):
 
3983
        """Test the file shelf with one value."""
 
3984
        self.tfs[("foo", "bar")] = 'value'
 
3985
 
 
3986
        self.assertEqual(self.tfs[("foo", "bar")], 'value')
 
3987
        self.assertEqual(list(self.tfs.keys()), [("foo", "bar")])
 
3988
 
 
3989
    def test_two_values(self):
 
3990
        """Test the file shelf with two values."""
 
3991
        self.tfs[("foo", "bar")] = 'value1'
 
3992
        self.tfs[("xyz", "hfb")] = 'value2'
 
3993
 
 
3994
        self.assertEqual(self.tfs[("foo", "bar")], 'value1')
 
3995
        self.assertEqual(self.tfs[("xyz", "hfb")], 'value2')
 
3996
        self.assertEqual(sorted(self.tfs.keys()),
 
3997
                         [("foo", "bar"), ("xyz", "hfb")])
 
3998
 
 
3999
    def test_node_id_None(self):
 
4000
        """node_id can be None."""
 
4001
        self.tfs[("foo", None)] = 'value'
 
4002
        self.assertEqual(self.tfs[("foo", None)], 'value')
 
4003
        self.assertEqual(list(self.tfs.keys()), [("foo", None)])
 
4004
 
 
4005
    def test_node_id_marker(self):
 
4006
        """node_id can be a marker."""
 
4007
        marker = MDMarker("bar")
 
4008
        self.tfs[("foo", marker)] = 'value'
 
4009
        self.assertEqual(self.tfs[("foo", marker)], 'value')
 
4010
        self.assertEqual(list(self.tfs.keys()), [("foo", marker)])
 
4011
        node_id = list(self.tfs.keys())[0][1]
 
4012
        self.assertTrue(IMarker.providedBy(node_id))
 
4013
 
 
4014
    def test_share_id_marker(self):
 
4015
        """share_id can be a marker."""
 
4016
        marker = MDMarker("bar")
 
4017
        self.tfs[(marker, "foo")] = 'value'
 
4018
        self.assertEqual(self.tfs[(marker, "foo")], 'value')
 
4019
        self.assertEqual(list(self.tfs.keys()), [(marker, "foo")])
 
4020
        share_id = list(self.tfs.keys())[0][0]
 
4021
        self.assertTrue(IMarker.providedBy(share_id))
 
4022
 
 
4023
 
 
4024
class TrashTritcaskShelfTests(TrashFileShelfTests):
 
4025
 
 
4026
    @defer.inlineCallbacks
 
4027
    def setUp(self):
 
4028
        """Set up."""
 
4029
        yield super(TrashTritcaskShelfTests, self).setUp()
 
4030
        self.trash_dir = self.mktemp('trash')
 
4031
        self.db = Tritcask(self.trash_dir)
 
4032
        self.addCleanup(self.db.shutdown)
 
4033
        self.tfs = TrashTritcaskShelf(TRASH_ROW_TYPE, self.db)
 
4034
 
 
4035
 
 
4036
class OsIntegrationTests(FSMTestCase, MockerTestCase):
 
4037
    """Ensure that the correct os_helper methods are used."""
 
4038
 
 
4039
    @defer.inlineCallbacks
 
4040
    def setUp(self):
 
4041
        """Set up."""
 
4042
        yield super(OsIntegrationTests, self).setUp()
 
4043
        self.open_file = self.mocker.replace('ubuntuone.platform.open_file')
 
4044
        self.normpath = self.mocker.replace('ubuntuone.platform.normpath')
 
4045
        self.listdir = self.mocker.replace('ubuntuone.platform.listdir')
 
4046
 
 
4047
    def test_get_partial_for_writing(self):
 
4048
        """Test that the get partial does use the correct os_helper method."""
 
4049
        fd = 'file_descriptor'
 
4050
        path = 'path'
 
4051
        mdid = 'id'
 
4052
        node_id = 'node_id'
 
4053
        share_id = 'share_id'
 
4054
        self.fsm._idx_node_id = {(share_id, node_id): mdid}
 
4055
        self.fsm.fs = {'id': path}
 
4056
        self.fsm._get_partial_path = self.mocker.mock()
 
4057
        self.fsm._get_partial_path(path)
 
4058
        self.mocker.result(path)
 
4059
        self.open_file(path, 'wb')
 
4060
        self.mocker.result(fd)
 
4061
        self.mocker.replay()
 
4062
        self.assertEqual(fd, self.fsm.get_partial_for_writing(node_id, share_id))
 
4063
 
 
4064
    def test_get_partial(self):
 
4065
        """Test that the get partial does use the correct os_helper method."""
 
4066
        fd = 'file_descriptor'
 
4067
        path = 'path'
 
4068
        mdid = 'id'
 
4069
        node_id = 'node_id'
 
4070
        share_id = 'share_id'
 
4071
        self.fsm._idx_node_id = {(share_id, node_id): mdid}
 
4072
        self.fsm.fs = {'id': path}
 
4073
        self.fsm._get_partial_path = self.mocker.mock()
 
4074
        self.fsm._check_partial = self.mocker.mock()
 
4075
        # set the expectations
 
4076
        self.fsm._check_partial(mdid)
 
4077
        self.mocker.result(True)
 
4078
        self.fsm._get_partial_path(path)
 
4079
        self.mocker.result(path)
 
4080
        self.open_file(path, 'rb')
 
4081
        self.mocker.result(fd)
 
4082
        self.mocker.replay()
 
4083
        self.assertEqual(fd, self.fsm.get_partial(node_id, share_id))
 
4084
 
 
4085
    def test_open_file(self):
 
4086
        """Test that the open file uses the correct os_helper method."""
 
4087
        fd = 'file_descriptor'
 
4088
        mdid = 'id'
 
4089
        mdobj = dict(is_dir=False, share_id='share_id', path='path')
 
4090
        self.fsm.get_abspath = self.mocker.mock()
 
4091
        self.fsm.fs = dict(id=mdobj)
 
4092
        self.fsm.get_abspath(mdobj['share_id'], mdobj['path'])
 
4093
        self.mocker.result(mdobj['path'])
 
4094
        self.open_file(mdobj['path'], 'rb')
 
4095
        self.mocker.result(fd)
 
4096
        self.mocker.replay()
 
4097
        self.assertEqual(fd, self.fsm.open_file(mdid))
 
4098
 
 
4099
    def test_create(self):
 
4100
        """Test that create uses the correct os_helper functions."""
 
4101
        # we do not care about the entire method, lets force and error and
 
4102
        # test that the methods are called
 
4103
        path = 'path'
 
4104
        share_id = 'share_id'
 
4105
        mdid = 'id'
 
4106
        self.fsm._idx_path = {path: mdid}
 
4107
        # expectations
 
4108
        self.normpath(path)
 
4109
        self.mocker.result(path)
 
4110
        self.mocker.replay()
 
4111
        self.assertRaises(ValueError, self.fsm.create, path, share_id)
 
4112
 
 
4113
    def test_set_node_id(self):
 
4114
        """Test that set_node_id uses the correct os_helper function."""
 
4115
        # we do not care about the entire method, lets force and error and
 
4116
        # test that the methods are called
 
4117
        path = 'path'
 
4118
        node_id = 'id'
 
4119
        self.fsm._idx_path = {}
 
4120
        # expectations
 
4121
        self.normpath(path)
 
4122
        self.mocker.result(path)
 
4123
        self.mocker.replay()
 
4124
        self.assertRaises(KeyError, self.fsm.set_node_id, path, node_id)
 
4125
 
 
4126
    def test_get_by_path(self):
 
4127
        """Test that the get_by_path uses the correct os_helper function."""
 
4128
        # we do not care about the entire method, lets force and error and
 
4129
        # test that the methods are called
 
4130
        path = 'path'
 
4131
        self.fsm._idx_path = {}
 
4132
        # expectations
 
4133
        self.normpath(path)
 
4134
        self.mocker.result(path)
 
4135
        self.mocker.replay()
 
4136
        self.assertRaises(KeyError, self.fsm.get_by_path, path)
 
4137
 
 
4138
    def test_set_by_path(self):
 
4139
        """Test that set_by_path uses the correct os_helper function."""
 
4140
        # we do not care about the entire method, lets force and error and
 
4141
        # test that the methods are called
 
4142
        path = 'path'
 
4143
        self.fsm._idx_path = {}
 
4144
        # expectations
 
4145
        self.normpath(path)
 
4146
        self.mocker.result(path)
 
4147
        self.mocker.replay()
 
4148
        self.assertRaises(KeyError, self.fsm.set_by_path, path)
 
4149
 
 
4150
    def test_move_file(self):
 
4151
        """Test that move_file uses the correct os_helper function."""
 
4152
        # we do not care about the entire method, lets force and error and
 
4153
        # test that the methods are called
 
4154
        new_share_id = 'id'
 
4155
        moved_from = 'path_moved_from'
 
4156
        moved_to = 'path_moved_to'
 
4157
        self.fsm._idx_path = {}
 
4158
        # set the expectations
 
4159
        self.normpath(moved_from)
 
4160
        self.mocker.result(moved_from)
 
4161
        self.normpath(moved_to)
 
4162
        self.mocker.result(moved_to)
 
4163
        self.mocker.replay()
 
4164
        self.assertRaises(KeyError, self.fsm.move_file, new_share_id,
 
4165
                          moved_from, moved_to)
 
4166
 
 
4167
    def test_moved(self):
 
4168
        """Test that moved uses the correct os_helper function."""
 
4169
        # we do not care about the entire method, lets force and error and
 
4170
        # test that the methods are called
 
4171
        new_share_id = 'id'
 
4172
        moved_from = 'path_moved_from'
 
4173
        moved_to = 'path_moved_to'
 
4174
        self.fsm._idx_path = {}
 
4175
        # set the expectations
 
4176
        self.normpath(moved_from)
 
4177
        self.mocker.result(moved_from)
 
4178
        self.normpath(moved_to)
 
4179
        self.mocker.result(moved_to)
 
4180
        self.mocker.replay()
 
4181
        self.assertRaises(KeyError, self.fsm.moved, new_share_id, moved_from,
 
4182
                          moved_to)
 
4183
 
 
4184
    def test_delete_metadata(self):
 
4185
        """Test that delete_metadata uses the correct os_helper function."""
 
4186
        mdid = 'id'
 
4187
        path = 'path'
 
4188
        mdobj = {'node_id': None}
 
4189
        self.fsm._idx_path = {path: mdid}
 
4190
        self.fsm.fs = {mdid: mdobj}
 
4191
        self.normpath(path)
 
4192
        self.mocker.result(path)
 
4193
        self.mocker.replay()
 
4194
        self.fsm.delete_metadata(path)
 
4195
 
 
4196
    def test_delete_file(self):
 
4197
        """Test that delete_files uses the correct os_helper function."""
 
4198
        # we do not care about the entire method, lets force and error and
 
4199
        # test that the methods are called
 
4200
        mdid = 'id'
 
4201
        mdobj = {}
 
4202
        path = 'path'
 
4203
        self.fsm._idx_path = {path: mdid}
 
4204
        self.fsm.fs = {mdid: mdobj}
 
4205
        self.fsm.is_dir = self.mocker.mock()
 
4206
        self.fsm.eq = self.mocker.mock()
 
4207
        # expectations
 
4208
        self.normpath(path)
 
4209
        self.mocker.result(path)
 
4210
        self.fsm.is_dir(path=path)
 
4211
        self.mocker.result(True)
 
4212
        self.fsm.eq.add_to_mute_filter(ANY, path=path)
 
4213
        self.listdir(path)
 
4214
        self.mocker.throw(ValueError)
 
4215
        self.mocker.replay()
 
4216
        self.assertRaises(ValueError, self.fsm.delete_file, path)
 
4217
 
 
4218
    def test_get_mdid_from_args(self):
 
4219
        """Test that get_mdid_from_args uses the correct os_helper function."""
 
4220
        mdid = 'id'
 
4221
        path = 'path'
 
4222
        parent = None
 
4223
        self.fsm._idx_path = {path: mdid}
 
4224
        args = {'path': path}
 
4225
        # expectations
 
4226
        self.normpath(path)
 
4227
        self.mocker.result(path)
 
4228
        self.mocker.replay()
 
4229
        self.assertEqual(mdid, self.fsm._get_mdid_from_args(args, parent))
 
4230
 
 
4231
    def test_has_metadata(self):
 
4232
        """Test that has_metadata uses the correct os_helper function."""
 
4233
        path = 'path'
 
4234
        self.fsm._idx_path = {}
 
4235
        # expectations
 
4236
        self.normpath(path)
 
4237
        self.mocker.result(path)
 
4238
        self.mocker.replay()
 
4239
        self.assertFalse(self.fsm.has_metadata(path=path))
 
4240
 
 
4241
    def test_dir_content(self):
 
4242
        """Test that dir_content uses the correct os_helper function."""
 
4243
        path = 'path'
 
4244
        mdid = 'id'
 
4245
        # we are not testing the entire method, just that we cal the correct
 
4246
        # os_helper, lets pass the wrong value and get an exception
 
4247
        mdobj = {'is_dir': False}
 
4248
        self.fsm._idx_path = {mdid: path}
 
4249
        self.fsm.fs = {mdid: mdobj}
 
4250
        # set the expectations
 
4251
        self.normpath(path)
 
4252
        self.mocker.result(path)
 
4253
        self.mocker.replay()
 
4254
        self.assertRaises(KeyError, self.fsm.dir_content, path)