~ubuntu-branches/ubuntu/precise/ubuntuone-client/precise-security

« back to all changes in this revision

Viewing changes to .pc/01_getnodebyid_fix.patch/ubuntuone/syncdaemon/filesystem_manager.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-10-11 14:59:28 UTC
  • Revision ID: james.westby@ubuntu.com-20111011145928-wii0ehii12wmzvv5
Tags: 2.0.0-0ubuntu2
* debian/patches/01_getnodebyid_fix.patch:
  - Fix filter by share and path (LP: #807737)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# ubuntuone.syncdaemon.filesystem_manager - FSM
 
2
#
 
3
# Author: Facundo Batista <facundo@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
"""Module that implements the File System Manager."""
 
19
from __future__ import with_statement
 
20
 
 
21
import os
 
22
import time
 
23
import functools
 
24
import itertools
 
25
import logging
 
26
import contextlib
 
27
import errno
 
28
import stat
 
29
import uuid
 
30
 
 
31
from ubuntuone.syncdaemon import file_shelf, config
 
32
from ubuntuone.syncdaemon.volume_manager import VolumeDoesNotExist
 
33
from ubuntuone.syncdaemon.interfaces import IMarker
 
34
from ubuntuone.syncdaemon.marker import MDMarker
 
35
from ubuntuone.syncdaemon.tritcask import TritcaskShelf
 
36
from ubuntuone.platform import (
 
37
    listdir,
 
38
    make_dir,
 
39
    normpath,
 
40
    move_to_trash,
 
41
    path_exists,
 
42
    remove_dir,
 
43
    remove_file,
 
44
    remove_tree,
 
45
    rename,
 
46
    recursive_move,
 
47
    set_dir_readonly,
 
48
    set_dir_readwrite,
 
49
    set_file_readonly,
 
50
    set_file_readwrite,
 
51
    stat_path,
 
52
    walk,
 
53
)
 
54
from ubuntuone.platform import open_file as os_open
 
55
 
 
56
METADATA_VERSION = "6"
 
57
 
 
58
# tritcask row types
 
59
FSM_ROW_TYPE = 0
 
60
TRASH_ROW_TYPE = 1
 
61
MOVE_LIMBO_ROW_TYPE = 2
 
62
 
 
63
#
 
64
# File System Manager  (FSM)
 
65
# --------------------------
 
66
#
 
67
# The FileSystemManager is the one that interacts with the filesystem, and
 
68
# keeps a storage with metadata.  This storage is verterok's FileShelf.
 
69
#
 
70
# The metadata, in disk, is a dictionary, where the keys are 'mdid's (metadata
 
71
# ids), and the values are different parameters, some of them can be modified
 
72
# from outside, and some can not.
 
73
#
 
74
# There're two very important values: path and node_id. The path is the
 
75
# pointer to where the file or directory is located in the filesystem, and
 
76
# the node_id is the unique identifier from the server. When a new file is
 
77
# created (with the .create() method), a mdid is assigned to the path and the
 
78
# share, but no node_id yet. When the server assigns the node_id, it needs to
 
79
# be set here with the .set_node_id() method.
 
80
#
 
81
# All the data can be retrieved generally using this three values (mdid, path,
 
82
# and node_id/share) using specific get_by_*() methods. For this to be fast,
 
83
# two indexes are created at init time, two dictionaries that hold the
 
84
# relationships path->mdid, and (share,node_id)->mdid.  In any case, KeyError
 
85
# is raised if an incorrect value is passed to the getters. Note that a mdid
 
86
# identifies uniquely the MD Object, like also the path; but for the node_id it
 
87
# also needs to have the share, as the same "server file" can live in different
 
88
# directories in the "client disk".
 
89
#
 
90
# Once assigned, the path, share and node_id values can not be changed. For any
 
91
# other value (except another special one, 'info', see below), three methods
 
92
# are provided to set them: set_by_*() (symmetric to the getters). These
 
93
# methods receive a first argument to indicate what is modified, and then
 
94
# several keyword arguments with all the values to be set.
 
95
#
 
96
# The 'info' is a special value set by the FileSystemManager itself, that
 
97
# records operations and changes made to each node, and as I said before,
 
98
# it can only be accesses from outside, not modified.
 
99
#
 
100
# Another method is provided to retrieve the created objects:
 
101
# get_mdobjs_by_share_id, that returns all the objects in that share and it
 
102
# path starts with the base_path argument.
 
103
#
 
104
# When asked for data, the FSM returns an object that is a thin wrapper to the
 
105
# info, only to be easily accessible, like using "mdobj.path", or
 
106
# "mdobj.info.is_partial". This object is not alive: it does not get updated
 
107
# if something changes in the metadata, and any change in the object is not
 
108
# written back to the metadata (this is by design, because of how the
 
109
# information flows in the system).
 
110
#
 
111
# As I said before, the FileSystemManager not only keeps the metadata, but also
 
112
# interacts with the filesystem itself. As such, it provides several operations
 
113
# on files and directories.
 
114
#
 
115
# In the process of downloading a file from the server, FSM handles the
 
116
# .partial files. With the .create_partial() method the system creates this
 
117
# special file where the new content will be downloaded. When it finishes ok,
 
118
# the .commit_partial() is called, and that content is moved into the old file.
 
119
# If the download fails for any reason, .remove_partial() is called and all is
 
120
# back to clean.
 
121
#
 
122
# Other services are provided:
 
123
#
 
124
#    .move_to_conflict(): moves a file or dir in problem to the same name but
 
125
# adding a .conflict to the name (if .conflict already exists, it will try with
 
126
# .conflict.1, .conflict.2, and so on).
 
127
#
 
128
#    .upload_finished(): sets a new hash in the metadata, marking that the
 
129
# new content was uploaded to the server.
 
130
#
 
131
#    .move_file(): moves a file or directory from one pathname to other.
 
132
#
 
133
#    .delete_file(): removes a file or directory from disk.
 
134
#
 
135
# Finally, the FSM has three methods that provides high level information,
 
136
# in some cases synthesising their values using some internal values:
 
137
#
 
138
#    .has_metadata(): returns True if the system has metadata for that path,
 
139
# node_id or mdid (note that we may don't have metadata even to an old mdid,
 
140
# because it was deleted in the middle)
 
141
#
 
142
#    .changed(): returns 'local', 'server', or 'none', depending of what
 
143
# changed for that node
 
144
#
 
145
#    .is_dir: returns if the node is a directory.
 
146
#
 
147
#
 
148
 
 
149
 
 
150
# fsm logger
 
151
fsm_logger = logging.getLogger('ubuntuone.SyncDaemon.fsm')
 
152
logger = functools.partial(fsm_logger.log, logging.INFO)
 
153
log_warning = functools.partial(fsm_logger.log, logging.WARNING)
 
154
log_debug = functools.partial(fsm_logger.log, logging.DEBUG)
 
155
 
 
156
is_forbidden = set("info path node_id share_id is_dir".split()
 
157
                  ).intersection
 
158
 
 
159
 
 
160
class InconsistencyError(Exception):
 
161
    """Inconsistency between internal records and filesystem itself."""
 
162
 
 
163
 
 
164
class Despair(Exception):
 
165
    """This should never happen, we're in an impossible condition!"""
 
166
 
 
167
 
 
168
class DirectoryNotRemovable(Exception):
 
169
    """The directory can not be emptied to delete."""
 
170
 
 
171
 
 
172
class _MDObject(object):
 
173
    """Wrapper around MD dict."""
 
174
    def __init__(self, **mdobj):
 
175
        self.__dict__.update(mdobj)
 
176
 
 
177
        # info is a special one
 
178
        if "info" in mdobj:
 
179
            self.info = _MDObject(**mdobj["info"])
 
180
 
 
181
    def __eq__(self, other):
 
182
        return self.__dict__ == other.__dict__
 
183
 
 
184
 
 
185
class ShareNodeDict(dict):
 
186
    """Cache for node_id and share."""
 
187
    # pylint: disable-msg=W0612
 
188
    def __getitem__(self, key):
 
189
        share_id, node_id = key
 
190
        if node_id is None:
 
191
            raise ValueError("The node_id can not be None")
 
192
        return dict.__getitem__(self, key)
 
193
 
 
194
    def __setitem__(self, key, value):
 
195
        share_id, node_id = key
 
196
        if node_id is None:
 
197
            raise ValueError("The node_id can not be None")
 
198
        return dict.__setitem__(self, key, value)
 
199
 
 
200
    def __contains__(self, key):
 
201
        share_id, node_id = key
 
202
        if node_id is None:
 
203
            raise ValueError("The node_id can not be None")
 
204
        return dict.__contains__(self, key)
 
205
 
 
206
 
 
207
class TrashFileShelf(file_shelf.CachedFileShelf):
 
208
    """Custom file shelf that supports share and node as keys."""
 
209
 
 
210
    _marker_flag = 'marker'
 
211
    _marker_len = len(_marker_flag)
 
212
 
 
213
    def key_file(self, key):
 
214
        """Support share and node as keys."""
 
215
        share_id, node_id = key
 
216
 
 
217
        # convert the markers to a string that flags them
 
218
        if IMarker.providedBy(share_id):
 
219
            share_id = str(share_id) + self._marker_flag
 
220
        if IMarker.providedBy(node_id):
 
221
            node_id = str(node_id) + self._marker_flag
 
222
 
 
223
        # build a string with the node_id first to have a more sparse
 
224
        # layout in disk
 
225
        key = "%s|%s" % (node_id, share_id)
 
226
        return super(TrashFileShelf, self).key_file(key)
 
227
 
 
228
    def keys(self):
 
229
        """Restore the share/node pair"""
 
230
        for key in super(TrashFileShelf, self).keys():
 
231
            node_id, share_id = key.split("|")
 
232
            if node_id == 'None':
 
233
                node_id = None
 
234
            elif node_id.endswith(self._marker_flag):
 
235
                node_id = MDMarker(node_id[:-self._marker_len])
 
236
            if share_id.endswith(self._marker_flag):
 
237
                share_id = MDMarker(share_id[:-self._marker_len])
 
238
            yield (share_id, node_id)
 
239
 
 
240
 
 
241
class TrashTritcaskShelf(TritcaskShelf):
 
242
    """Custom TritcaskShelf that supports share and node as keys."""
 
243
 
 
244
    _marker_flag = 'marker'
 
245
    _marker_len = len(_marker_flag)
 
246
 
 
247
    def _get_key(self, key):
 
248
        """Support share and node as keys."""
 
249
        share_id, node_id = key
 
250
 
 
251
        # convert the markers to a string that flags them
 
252
        if IMarker.providedBy(share_id):
 
253
            share_id = str(share_id) + self._marker_flag
 
254
        if IMarker.providedBy(node_id):
 
255
            node_id = str(node_id) + self._marker_flag
 
256
 
 
257
        # build a string from the (share_id, node_id)
 
258
        return "%s|%s" % (share_id, node_id)
 
259
 
 
260
    def __setitem__(self, key, value):
 
261
        """dict protocol."""
 
262
        raw_key = self._get_key(key)
 
263
        super(TrashTritcaskShelf, self).__setitem__(raw_key, value)
 
264
 
 
265
    def __getitem__(self, key):
 
266
        """dict protocol."""
 
267
        raw_key = self._get_key(key)
 
268
        return super(TrashTritcaskShelf, self).__getitem__(raw_key)
 
269
 
 
270
    def __delitem__(self, key):
 
271
        """dict protocol."""
 
272
        raw_key = self._get_key(key)
 
273
        return super(TrashTritcaskShelf, self).__delitem__(raw_key)
 
274
 
 
275
    def __contains__(self, key):
 
276
        """dict protocol."""
 
277
        raw_key = self._get_key(key)
 
278
        return super(TrashTritcaskShelf, self).__contains__(raw_key)
 
279
 
 
280
    def keys(self):
 
281
        """Restore the share/node pair"""
 
282
        for key in super(TrashTritcaskShelf, self).keys():
 
283
            share_id, node_id = key.split("|")
 
284
            if node_id == 'None':
 
285
                node_id = None
 
286
            elif node_id.endswith(self._marker_flag):
 
287
                node_id = MDMarker(node_id[:-self._marker_len])
 
288
            if share_id.endswith(self._marker_flag):
 
289
                share_id = MDMarker(share_id[:-self._marker_len])
 
290
            yield (share_id, node_id)
 
291
 
 
292
 
 
293
class FileSystemManager(object):
 
294
    """Keeps the files/dirs metadata and interacts with the filesystem.
 
295
 
 
296
    It has a FileShelf where all the metadata is stored, using 'mdid's as
 
297
    keys.  'mdid' is 'metadata id'... it's actually an uuid, but we call it
 
298
    mdid to don't get confused about names, as we also have the node_id is
 
299
    the one assigned by the server.
 
300
 
 
301
    At init time two indexes are built in memory:
 
302
 
 
303
      - idx_path: relationship path -> mdid
 
304
      - idx_node_id: relationship (share_id, node_id) -> mdid
 
305
    """
 
306
 
 
307
    CONFLICT_SUFFIX = '.u1conflict'
 
308
    CHANGED_LOCAL = 'LOCAL'
 
309
    CHANGED_SERVER = 'SERVER'
 
310
    CHANGED_NONE = 'NONE'
 
311
 
 
312
    def __init__(self, data_dir, partials_dir, vm, db):
 
313
        if not isinstance(data_dir, basestring):
 
314
            raise TypeError("data_dir should be a string instead of %s" % \
 
315
                            type(data_dir))
 
316
        fsmdir = os.path.join(data_dir, 'fsm')
 
317
        self._trash_dir = os.path.join(data_dir, 'trash')
 
318
        self._movelimbo_dir = os.path.join(data_dir, 'move_limbo')
 
319
        self.partials_dir = partials_dir
 
320
        if not path_exists(self.partials_dir):
 
321
            make_dir(self.partials_dir, recursive=True)
 
322
        else:
 
323
            # ensure that we can write in the partials_dir
 
324
            set_dir_readwrite(self.partials_dir)
 
325
        self.fs = TritcaskShelf(FSM_ROW_TYPE, db)
 
326
        self.old_fs = file_shelf.CachedFileShelf(fsmdir, cache_size=1500,
 
327
                                             cache_compact_threshold=4)
 
328
        self.trash = TrashTritcaskShelf(TRASH_ROW_TYPE, db)
 
329
        self.move_limbo = TrashTritcaskShelf(MOVE_LIMBO_ROW_TYPE, db)
 
330
        self.shares = {}
 
331
        self.vm = vm
 
332
        self.eq = None  # this will be registered later
 
333
 
 
334
        # create the indexes
 
335
        self._idx_path = {}
 
336
        self._idx_node_id = ShareNodeDict()
 
337
 
 
338
        # get the metadata version
 
339
        self._version_file = os.path.join(data_dir, "metadata_version")
 
340
        if path_exists(self._version_file):
 
341
            with os_open(self._version_file) as fh:
 
342
                md_version = fh.read().strip()
 
343
        else:
 
344
            md_version = None
 
345
 
 
346
        # load the info from the metadata
 
347
        if md_version == METADATA_VERSION:
 
348
            self._load_metadata_updated()
 
349
        else:
 
350
            load_method = getattr(self, "_load_metadata_%s" % md_version)
 
351
            load_method(md_version)
 
352
 
 
353
        # load some config
 
354
        self.user_config = config.get_user_config()
 
355
 
 
356
        logger("initialized: idx_path: %s, idx_node_id: %s, shares: %s",
 
357
               len(self._idx_path), len(self._idx_node_id), len(self.shares))
 
358
 
 
359
    def register_eq(self, eq):
 
360
        """Registers an EventQueue here."""
 
361
        self.eq = eq
 
362
 
 
363
    def _safe_old_fs_iteritems(self):
 
364
        """Returns a 'safe' iterator over the items of the FileShelf.
 
365
 
 
366
        It's 'safe' because broken metadata objects are deleted and not
 
367
        returned.
 
368
        """
 
369
        def safeget_mdobj(mdid):
 
370
            """check if the mdobj is valid and return mdid, mdobj.
 
371
            If a KeyError is raised, returns False.
 
372
            """
 
373
            try:
 
374
                mdobj = self.old_fs[mdid]
 
375
            except KeyError:
 
376
                # oops, we have a key but don't have the value, possibly broken
 
377
                # metadata, remove it and keep going
 
378
                del self.old_fs[mdid]
 
379
                # return False, in order to be filtered later
 
380
                return False
 
381
            else:
 
382
                # check if the share exists
 
383
                try:
 
384
                    self._get_share(mdobj["share_id"])
 
385
                except VolumeDoesNotExist:
 
386
                    # oops, the share is gone!, invalidate this mdid
 
387
                    log_warning('Share %r disappeared! deleting mdid: %s',
 
388
                                mdobj['share_id'], mdid)
 
389
                    del self.old_fs[mdid]
 
390
                    return False
 
391
                else:
 
392
                    return mdid, mdobj
 
393
        safe_iteritems = itertools.imap(safeget_mdobj, self.old_fs.keys())
 
394
        # filter all False values
 
395
        return itertools.ifilter(None, safe_iteritems)
 
396
 
 
397
    def _fix_path_for_new_layout(self, mdobj):
 
398
        """fix the mdobj path for the new layout, only for shares root"""
 
399
        base_path, name = os.path.split(mdobj['path'])
 
400
        if base_path.startswith('/') and \
 
401
           base_path.endswith('Ubuntu One/Shared With Me'):
 
402
            realpath = os.path.realpath(mdobj['path'])
 
403
            mdobj['path'] = realpath
 
404
        if base_path.startswith('/') and \
 
405
            base_path.endswith('Ubuntu One') and name == 'My Files':
 
406
            mdobj['path'] = base_path
 
407
 
 
408
    def _migrate_trash_to_tritcask(self):
 
409
        """Migrate trash from FileShelf to Tritcask."""
 
410
        old_trash = TrashFileShelf(self._trash_dir, cache_size=100,
 
411
                                   cache_compact_threshold=4)
 
412
        for key, value in old_trash.iteritems():
 
413
            self.trash[key] = value
 
414
        # delete the old trash
 
415
        remove_tree(self._trash_dir)
 
416
 
 
417
    def _migrate_movelimbo_to_tritcask(self):
 
418
        """Migrate move limbo from FileShelf to Tritcask."""
 
419
        old_move_limbo = TrashFileShelf(self._movelimbo_dir, cache_size=100,
 
420
                                        cache_compact_threshold=4)
 
421
        for key, value in old_move_limbo.iteritems():
 
422
            self.move_limbo[key] = value
 
423
        # delete the old move limbo
 
424
        remove_tree(self._movelimbo_dir)
 
425
 
 
426
    def _load_metadata_None(self, old_version):
 
427
        """Loads metadata from when it wasn't even versioned."""
 
428
        logger("loading metadata from old version %r", old_version)
 
429
 
 
430
        for mdid, mdobj in self._safe_old_fs_iteritems():
 
431
            # assure path are bytes (new to version 2)
 
432
            try:
 
433
                mdobj["path"] = mdobj["path"].encode("utf8")
 
434
            except UnicodeDecodeError:
 
435
                # this is an invalid path, we shouldn't have it
 
436
                del self.fs[mdid]
 
437
                continue
 
438
 
 
439
            # fix the path
 
440
            self._fix_path_for_new_layout(mdobj)
 
441
            abspath = self.get_abspath(mdobj["share_id"], mdobj["path"])
 
442
            # of course, load the metadata
 
443
            self._idx_path[abspath] = mdid
 
444
            if mdobj["node_id"] is not None:
 
445
                self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
 
446
 
 
447
            # assure we have stat info (new to version 1)
 
448
            mdobj["stat"] = get_stat(abspath)
 
449
 
 
450
            # convert the "yet without content" hashes to "" (new to v3)
 
451
            if mdobj["local_hash"] is None:
 
452
                mdobj["local_hash"] = ""
 
453
            if mdobj["server_hash"] is None:
 
454
                mdobj["server_hash"] = ""
 
455
 
 
456
            # add the generation number (new to v5)
 
457
            mdobj["generation"] = None
 
458
 
 
459
            # write back the object
 
460
            self.fs[mdid] = mdobj
 
461
 
 
462
        self._migrate_trash_to_tritcask()
 
463
        self._migrate_movelimbo_to_tritcask()
 
464
        # set new version
 
465
        with os_open(self._version_file, "w") as fh:
 
466
            fh.write(METADATA_VERSION)
 
467
            os.fsync(fh.fileno())
 
468
        # remove the old metadata
 
469
        remove_tree(self.old_fs._path)
 
470
 
 
471
    def _load_metadata_1(self, old_version):
 
472
        """Loads metadata from version 1."""
 
473
        logger("loading metadata from old version %r", old_version)
 
474
 
 
475
        for mdid, mdobj in self._safe_old_fs_iteritems():
 
476
            # assure path are bytes (new to version 2)
 
477
            try:
 
478
                mdobj["path"] = mdobj["path"].encode("utf8")
 
479
            except UnicodeDecodeError:
 
480
                # this is an invalid path, we shouldn't have it
 
481
                del self.old_fs[mdid]
 
482
                continue
 
483
 
 
484
            # convert the "yet without content" hashes to "" (new to v3)
 
485
            if mdobj["local_hash"] is None:
 
486
                mdobj["local_hash"] = ""
 
487
            if mdobj["server_hash"] is None:
 
488
                mdobj["server_hash"] = ""
 
489
 
 
490
            # fix the path
 
491
            self._fix_path_for_new_layout(mdobj)
 
492
 
 
493
            # add the generation number (new to v5)
 
494
            mdobj["generation"] = None
 
495
 
 
496
            # write back the object
 
497
            self.fs[mdid] = mdobj
 
498
 
 
499
            # and of course, load the metadata
 
500
            abspath = self.get_abspath(mdobj["share_id"], mdobj["path"])
 
501
            self._idx_path[abspath] = mdid
 
502
            if mdobj["node_id"] is not None:
 
503
                self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
 
504
 
 
505
        self._migrate_trash_to_tritcask()
 
506
        self._migrate_movelimbo_to_tritcask()
 
507
        # set new version
 
508
        with os_open(self._version_file, "w") as fh:
 
509
            fh.write(METADATA_VERSION)
 
510
            os.fsync(fh.fileno())
 
511
        # remove the old metadata
 
512
        remove_tree(self.old_fs._path)
 
513
 
 
514
    def _load_metadata_2(self, old_version):
 
515
        """Loads metadata from version 2."""
 
516
        logger("loading metadata from old version %r", old_version)
 
517
 
 
518
        for mdid, mdobj in self._safe_old_fs_iteritems():
 
519
            # convert the "yet without content" hashes to "" (new to v3)
 
520
            if mdobj["local_hash"] is None:
 
521
                mdobj["local_hash"] = ""
 
522
            if mdobj["server_hash"] is None:
 
523
                mdobj["server_hash"] = ""
 
524
 
 
525
            # fix the path
 
526
            self._fix_path_for_new_layout(mdobj)
 
527
 
 
528
            # add the generation number (new to v5)
 
529
            mdobj["generation"] = None
 
530
 
 
531
            # write back the object
 
532
            self.fs[mdid] = mdobj
 
533
 
 
534
            # and of course, load the metadata
 
535
            abspath = self.get_abspath(mdobj["share_id"], mdobj["path"])
 
536
            self._idx_path[abspath] = mdid
 
537
            if mdobj["node_id"] is not None:
 
538
                self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
 
539
 
 
540
        self._migrate_trash_to_tritcask()
 
541
        self._migrate_movelimbo_to_tritcask()
 
542
        # set new version
 
543
        with os_open(self._version_file, "w") as fh:
 
544
            fh.write(METADATA_VERSION)
 
545
            os.fsync(fh.fileno())
 
546
        # remove the old metadata
 
547
        remove_tree(self.old_fs._path)
 
548
 
 
549
    def _load_metadata_3(self, old_version):
 
550
        """Loads metadata from version 3."""
 
551
        logger("loading metadata from old version %r", old_version)
 
552
 
 
553
        for mdid, mdobj in self._safe_old_fs_iteritems():
 
554
            # fix the path
 
555
            self._fix_path_for_new_layout(mdobj)
 
556
 
 
557
            # add the generation number (new to v5)
 
558
            mdobj["generation"] = None
 
559
 
 
560
            # write back the object
 
561
            self.fs[mdid] = mdobj
 
562
 
 
563
            # and of course, load the metadata
 
564
            abspath = self.get_abspath(mdobj["share_id"], mdobj["path"])
 
565
            self._idx_path[abspath] = mdid
 
566
            if mdobj["node_id"] is not None:
 
567
                self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
 
568
 
 
569
        self._migrate_trash_to_tritcask()
 
570
        self._migrate_movelimbo_to_tritcask()
 
571
        # set new version
 
572
        with os_open(self._version_file, "w") as fh:
 
573
            fh.write(METADATA_VERSION)
 
574
            os.fsync(fh.fileno())
 
575
        # remove the old metadata
 
576
        remove_tree(self.old_fs._path)
 
577
 
 
578
    def _load_metadata_4(self, old_version):
 
579
        """Loads metadata from version 4."""
 
580
        logger("loading metadata from old version %r", old_version)
 
581
 
 
582
        for mdid, mdobj in self._safe_old_fs_iteritems():
 
583
            # add the generation number (new to v5)
 
584
            mdobj["generation"] = None
 
585
 
 
586
            # write back the object
 
587
            self.fs[mdid] = mdobj
 
588
 
 
589
            # and of course, load the metadata
 
590
            abspath = self.get_abspath(mdobj["share_id"], mdobj["path"])
 
591
            self._idx_path[abspath] = mdid
 
592
            if mdobj["node_id"] is not None:
 
593
                self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
 
594
 
 
595
        self._migrate_trash_to_tritcask()
 
596
        self._migrate_movelimbo_to_tritcask()
 
597
        # set new version
 
598
        with os_open(self._version_file, "w") as fh:
 
599
            fh.write(METADATA_VERSION)
 
600
            os.fsync(fh.fileno())
 
601
        # remove the old metadata
 
602
        remove_tree(self.old_fs._path)
 
603
 
 
604
    def _load_metadata_5(self, old_version):
 
605
        """Loads metadata of last version."""
 
606
        logger("loading metadata from old version %r", old_version)
 
607
 
 
608
        for mdid, mdobj in self._safe_old_fs_iteritems():
 
609
            abspath = self.get_abspath(mdobj["share_id"], mdobj["path"])
 
610
            # write back the object
 
611
            self.fs[mdid] = mdobj
 
612
            self._idx_path[abspath] = mdid
 
613
            if mdobj["node_id"] is not None:
 
614
                self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
 
615
 
 
616
        self._migrate_trash_to_tritcask()
 
617
        self._migrate_movelimbo_to_tritcask()
 
618
        # set new version
 
619
        with os_open(self._version_file, "w") as fh:
 
620
            fh.write(METADATA_VERSION)
 
621
            os.fsync(fh.fileno())
 
622
        # remove the old metadata
 
623
        remove_tree(self.old_fs._path)
 
624
 
 
625
    def _load_metadata_updated(self):
 
626
        """Loads metadata of last version."""
 
627
        logger("loading updated metadata")
 
628
        for mdid, mdobj in self.fs.items():
 
629
            try:
 
630
                abspath = self.get_abspath(mdobj["share_id"], mdobj["path"])
 
631
            except VolumeDoesNotExist:
 
632
                # the share is gone!
 
633
                del self.fs[mdid]
 
634
                continue
 
635
            if abspath in self._idx_path:
 
636
                # oh, we already have this path in the idx.
 
637
                log_warning("Path already in the index: %s", abspath)
 
638
                current_mdobj = self.fs[self._idx_path[abspath]]
 
639
                if current_mdobj['info']['created'] < mdobj['info']['created']:
 
640
                    log_debug("Replacing and deleting node: %s with newer "
 
641
                              "node: %s", current_mdobj['mdid'], mdid)
 
642
                    self._idx_path[abspath] = mdid
 
643
                    # and delete the old node
 
644
                    del self.fs[current_mdobj['mdid']]
 
645
                else:
 
646
                    # do nothing if the current mdobj is newer
 
647
                    log_debug("The node: %s is newer than: %s, "
 
648
                              "leaving it alone and deleting the old one.",
 
649
                              current_mdobj['mdid'], mdid)
 
650
                    # but delete the old node
 
651
                    del self.fs[mdid]
 
652
            else:
 
653
                self._idx_path[abspath] = mdid
 
654
            if mdobj["node_id"] is not None:
 
655
                self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
 
656
 
 
657
    def create(self, path, share_id, node_id=None, is_dir=False):
 
658
        """Creates a new md object."""
 
659
        if not path.strip():
 
660
            raise ValueError("Empty paths are not allowed (got %r)" % path)
 
661
 
 
662
        path = normpath(path)
 
663
        if path in self._idx_path:
 
664
            raise ValueError("The path %r is already created!" % path)
 
665
 
 
666
        # create it
 
667
        mdid = str(uuid.uuid4())
 
668
        # make path relative to the share_id
 
669
        relpath = self._share_relative_path(share_id, path)
 
670
        newobj = dict(path=relpath, node_id=None, share_id=share_id,
 
671
                      is_dir=is_dir, local_hash="", server_hash="",
 
672
                      mdid=mdid, generation=None, crc32=None, size=None)
 
673
        newobj["info"] = dict(created=time.time(), is_partial=False)
 
674
        # only one stat, (instead of path_exists & os.stat)
 
675
        newobj["stat"] = get_stat(path)
 
676
        if node_id is not None:
 
677
            self._set_node_id(newobj, node_id, path)
 
678
 
 
679
        log_debug("create: path=%r mdid=%r share_id=%r node_id=%r is_dir=%r",
 
680
                  path, mdid, share_id, None, is_dir)
 
681
        self.fs[mdid] = newobj
 
682
 
 
683
        # adjust the index
 
684
        self._idx_path[path] = mdid
 
685
 
 
686
        return mdid
 
687
 
 
688
    def set_node_id(self, path, node_id):
 
689
        """Sets the node_id to a md object."""
 
690
        path = normpath(path)
 
691
        mdid = self._idx_path[path]
 
692
        mdobj = self.fs[mdid]
 
693
        self._set_node_id(mdobj, node_id, path)
 
694
        self.fs[mdid] = mdobj
 
695
 
 
696
    def _set_node_id(self, mdobj, node_id, path):
 
697
        """Set the node_id to the mdobj, but don't 'save' the mdobj"""
 
698
        if mdobj["node_id"] is not None:
 
699
            # the object is already there! it's ok if it has the same id
 
700
            if mdobj["node_id"] == node_id:
 
701
                log_warning("set_node_id (repeated!): path=%r mdid=%r "
 
702
                            "node_id=%r", path, mdobj['mdid'], node_id)
 
703
                return
 
704
            msg = "The path %r already has node_id (%r)" % (path, node_id)
 
705
            raise ValueError(msg)
 
706
        # adjust the index
 
707
        share_id = mdobj["share_id"]
 
708
        self._idx_node_id[(share_id, node_id)] = mdobj['mdid']
 
709
 
 
710
        # set the node_id
 
711
        mdobj["node_id"] = node_id
 
712
        mdobj["info"]["node_id_assigned"] = time.time()
 
713
 
 
714
        log_debug("set_node_id: path=%r mdid=%r share_id=%r node_id=%r",
 
715
                  path, mdobj['mdid'], share_id, node_id)
 
716
 
 
717
    def get_mdobjs_by_share_id(self, share_id, base_path=None):
 
718
        """Returns all the mdobj that belongs to a share and it path
 
719
        startswith base_path.
 
720
        """
 
721
        if base_path is None:
 
722
            base_path = self._get_share(share_id).path
 
723
        all_mdobjs = []
 
724
        # filter by path first, so we don't touch disk
 
725
        for path in self._idx_path:
 
726
            if path.startswith(base_path):
 
727
                mdid = self._idx_path[path]
 
728
                mdobj = self.fs[mdid]
 
729
                if mdobj["share_id"] == share_id:
 
730
                    all_mdobjs.append(_MDObject(**mdobj))
 
731
        return all_mdobjs
 
732
 
 
733
    def get_mdobjs_in_dir(self, base_path):
 
734
        """Return all the mdobjs which dir is base_path."""
 
735
        all_mdobjs = []
 
736
        sep = os.path.sep
 
737
        base_path += sep
 
738
        len_base = len(base_path)
 
739
        for path, mdid in self._idx_path.iteritems():
 
740
            if path[:len_base] == base_path and sep not in path[len_base:]:
 
741
                mdobj = self.fs[mdid]
 
742
                all_mdobjs.append(_MDObject(**mdobj))
 
743
        return all_mdobjs
 
744
 
 
745
    def get_data_for_server_rescan(self):
 
746
        """Generates all the (share, node, hash) tuples needed for rescan"""
 
747
        all_data = []
 
748
        for _, v in self.fs.items():
 
749
            if v['node_id']:
 
750
                all_data.append(
 
751
                        (v['share_id'], v['node_id'], v['server_hash']))
 
752
        return all_data
 
753
 
 
754
    def get_for_server_rescan_by_path(self, base_path):
 
755
        """
 
756
        Generates all the (share, node, hash) tuples, for the nodes
 
757
        starting with 'path', needed for rescan.
 
758
 
 
759
        """
 
760
        all_data = []
 
761
        for path, _ in self.get_paths_starting_with(base_path):
 
762
            mdid = self._idx_path[path]
 
763
            mdobj = self.fs[mdid]
 
764
            if mdobj['node_id']:
 
765
                all_data.append((mdobj['share_id'],
 
766
                                 mdobj['node_id'],
 
767
                                 mdobj['server_hash']))
 
768
        return all_data
 
769
 
 
770
    def get_by_mdid(self, mdid):
 
771
        """Returns the md object according to the mdid."""
 
772
        mdobj = self.fs[mdid]
 
773
        return _MDObject(**mdobj)
 
774
 
 
775
    def get_by_path(self, path):
 
776
        """Returns the md object according to the path."""
 
777
        path = normpath(path)
 
778
        mdid = self._idx_path[path]
 
779
        mdobj = self.fs[mdid]
 
780
        return _MDObject(**mdobj)
 
781
 
 
782
    def get_by_node_id(self, share_id, node_id):
 
783
        """Returns the md object according to the node_id and share_id."""
 
784
        mdid = self._idx_node_id[(share_id, node_id)]
 
785
        mdobj = self.fs[mdid]
 
786
        return _MDObject(**mdobj)
 
787
 
 
788
    def set_by_mdid(self, mdid, **kwargs):
 
789
        """Set some values to the md object with that mdid."""
 
790
        forbidden = is_forbidden(set(kwargs))
 
791
        if forbidden:
 
792
            raise ValueError("The following attributes can not be set "
 
793
                             "externally: %s" % forbidden)
 
794
 
 
795
        log_debug("set mdid=%r: %s", mdid, kwargs)
 
796
        mdobj = self.fs[mdid]
 
797
        for k, v in kwargs.items():
 
798
            mdobj[k] = v
 
799
        self.fs[mdid] = mdobj
 
800
 
 
801
    def set_by_path(self, path, **kwargs):
 
802
        """Set some values to the md object with that path."""
 
803
        if "mdid" in kwargs:
 
804
            raise ValueError("The mdid is forbidden to set externally")
 
805
        path = normpath(path)
 
806
        mdid = self._idx_path[path]
 
807
        self.set_by_mdid(mdid, **kwargs)
 
808
 
 
809
    def set_by_node_id(self, node_id, share_id, **kwargs):
 
810
        """Set some values to the md object with that node_id/share_id."""
 
811
        if "mdid" in kwargs:
 
812
            raise ValueError("The mdid is forbidden to set externally")
 
813
        mdid = self._idx_node_id[(share_id, node_id)]
 
814
        self.set_by_mdid(mdid, **kwargs)
 
815
 
 
816
    def move_file(self, new_share_id, path_from, path_to):
 
817
        """Move a file/dir from one point to the other."""
 
818
        path_from = normpath(path_from)
 
819
        path_to = normpath(path_to)
 
820
        mdid = self._idx_path[path_from]
 
821
        mdobj = self.fs[mdid]
 
822
 
 
823
        # move the file in the fs
 
824
        from_context = self._enable_share_write(mdobj['share_id'], path_from)
 
825
        to_context = self._enable_share_write(new_share_id, path_to)
 
826
 
 
827
        # pylint: disable-msg=W0704
 
828
        if mdobj["is_dir"]:
 
829
            expected_event = "FS_DIR_MOVE"
 
830
        else:
 
831
            expected_event = "FS_FILE_MOVE"
 
832
        try:
 
833
            with contextlib.nested(from_context, to_context):
 
834
                self.eq.add_to_mute_filter(expected_event, path_from=path_from,
 
835
                                           path_to=path_to)
 
836
                recursive_move(path_from, path_to)
 
837
        except IOError, e:
 
838
            # file was not yet created
 
839
            self.eq.rm_from_mute_filter(expected_event,
 
840
                                        path_from=path_from, path_to=path_to)
 
841
            m = "IOError %s when trying to move file/dir %r"
 
842
            log_warning(m, e, path_from)
 
843
        self.moved(new_share_id, path_from, path_to)
 
844
 
 
845
    def moved(self, new_share_id, path_from, path_to):
 
846
        """Change the metadata of a moved file."""
 
847
        path_from = normpath(path_from)
 
848
        path_to = normpath(path_to)
 
849
        mdid = self._idx_path.pop(path_from)
 
850
        log_debug("move_file: mdid=%r path_from=%r path_to=%r",
 
851
                                                    mdid, path_from, path_to)
 
852
 
 
853
        # if the move overwrites other file, send it to trash
 
854
        if path_to in self._idx_path:
 
855
            to_mdid = self._idx_path[path_to]
 
856
            parent_path = os.path.dirname(path_to)
 
857
            parent_mdid = self._idx_path[parent_path]
 
858
            parent_mdobj = self.fs[parent_mdid]
 
859
            self.delete_to_trash(to_mdid, parent_mdobj['node_id'])
 
860
 
 
861
        # adjust the metadata of "from" file
 
862
        mdobj = self.fs[mdid]
 
863
        self._idx_path[path_to] = mdid
 
864
 
 
865
        # change the path, make it relative to the share_id
 
866
        relpath = self._share_relative_path(new_share_id, path_to)
 
867
        mdobj["path"] = relpath
 
868
        mdobj['share_id'] = new_share_id
 
869
        mdobj["info"]["last_moved_from"] = path_from
 
870
        mdobj["info"]["last_moved_time"] = time.time()
 
871
        # we try to stat, if we fail, so what?
 
872
        #pylint: disable-msg=W0704
 
873
        try:
 
874
            mdobj["stat"] = stat_path(path_to)  # needed if not the same FS
 
875
        except OSError:
 
876
            log_warning("Got an OSError while getting the stat of %r", path_to)
 
877
        self.fs[mdid] = mdobj
 
878
 
 
879
        if mdobj["is_dir"]:
 
880
            # change the path for all the children of that node
 
881
            path_from = path_from + os.path.sep
 
882
            len_from = len(path_from)
 
883
            pathstofix = [x for x in self._idx_path if x.startswith(path_from)]
 
884
            for path in pathstofix:
 
885
                newpath = os.path.join(path_to, path[len_from:])
 
886
 
 
887
                # change in the index
 
888
                mdid = self._idx_path.pop(path)
 
889
                self._idx_path[newpath] = mdid
 
890
 
 
891
                # and in the object itself
 
892
                mdobj = self.fs[mdid]
 
893
                relpath = self._share_relative_path(new_share_id, newpath)
 
894
                mdobj["path"] = relpath
 
895
                self.fs[mdid] = mdobj
 
896
 
 
897
    def delete_metadata(self, path):
 
898
        """Delete the metadata."""
 
899
        path = normpath(path)
 
900
        mdid = self._idx_path[path]
 
901
        mdobj = self.fs[mdid]
 
902
        log_debug("delete metadata: path=%r mdid=%r", path, mdid)
 
903
 
 
904
        # adjust all
 
905
        del self._idx_path[path]
 
906
        if mdobj["node_id"] is not None:
 
907
            del self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])]
 
908
        del self.fs[mdid]
 
909
 
 
910
    def _delete_dir_tree(self, path):
 
911
        """Tell if it's ok to delete a dir tree.
 
912
 
 
913
        Raise an exception if the directory can not be removed.
 
914
        """
 
915
        # check metadata to see if any node in LOCAL
 
916
        subtree = self.get_paths_starting_with(path, include_base=False)
 
917
        for p, is_dir in subtree:
 
918
            if self.changed(path=p) == self.CHANGED_LOCAL:
 
919
                logger("Conflicting dir on remove because %r is local", p)
 
920
                raise DirectoryNotRemovable()
 
921
 
 
922
        # check disk searching for previous conflicts
 
923
        for (dirpath, dirnames, filenames) in walk(path):
 
924
            for fname in filenames + dirnames:
 
925
                if fname.endswith(self.CONFLICT_SUFFIX):
 
926
                    logger("Conflicting dir on remove because of previous "
 
927
                           "conflict on: %r", os.path.join(dirpath, fname))
 
928
                    raise DirectoryNotRemovable()
 
929
 
 
930
        return subtree
 
931
 
 
932
    def delete_file(self, path):
 
933
        """Delete a file/dir and the metadata."""
 
934
        # adjust the metadata
 
935
        path = normpath(path)
 
936
        mdid = self._idx_path[path]
 
937
        mdobj = self.fs[mdid]
 
938
        log_debug("delete: path=%r mdid=%r", path, mdid)
 
939
 
 
940
        is_dir = self.is_dir(path=path)
 
941
        if is_dir:
 
942
            filter_event = "FS_DIR_DELETE"
 
943
        else:
 
944
            filter_event = "FS_FILE_DELETE"
 
945
        self.eq.add_to_mute_filter(filter_event, path=path)
 
946
 
 
947
        try:
 
948
            if is_dir:
 
949
                if listdir(path):
 
950
                    # not empty, need to check if we can delete it
 
951
                    subtree = self._delete_dir_tree(path=path)
 
952
                    for p, is_dir in subtree:
 
953
                        filter_name = "FS_DIR_DELETE" if is_dir \
 
954
                                                      else "FS_FILE_DELETE"
 
955
                        self.eq.add_to_mute_filter(filter_name, path=p)
 
956
                        self.delete_metadata(p)
 
957
 
 
958
                    with self._enable_share_write(mdobj['share_id'], path,
 
959
                                                  recursive=True):
 
960
                        if self.user_config.get_use_trash():
 
961
                            move_to_trash(path)
 
962
                        else:
 
963
                            remove_tree(path)
 
964
                else:
 
965
                    # empty, just delete it
 
966
                    with self._enable_share_write(mdobj['share_id'], path):
 
967
                        if self.user_config.get_use_trash():
 
968
                            move_to_trash(path)
 
969
                        else:
 
970
                            remove_dir(path)
 
971
            else:
 
972
                # it's a file, just delete it
 
973
                with self._enable_share_write(mdobj['share_id'], path):
 
974
                    if self.user_config.get_use_trash():
 
975
                        move_to_trash(path)
 
976
                    else:
 
977
                        remove_file(path)
 
978
 
 
979
        except OSError, e:
 
980
            self.eq.rm_from_mute_filter(filter_event, path=path)
 
981
            log_warning("OSError %s when trying to remove file/dir %r",
 
982
                        e, path)
 
983
 
 
984
        self.delete_metadata(path)
 
985
 
 
986
    def move_to_conflict(self, mdid):
 
987
        """Move a file/dir to its .conflict."""
 
988
        mdobj = self.fs[mdid]
 
989
        path = self.get_abspath(mdobj['share_id'], mdobj['path'])
 
990
        log_debug("move_to_conflict: path=%r mdid=%r", path, mdid)
 
991
        base_to_path = to_path =  path + self.CONFLICT_SUFFIX
 
992
        ind = 0
 
993
        while path_exists(to_path):
 
994
            ind += 1
 
995
            to_path = base_to_path + "." + str(ind)
 
996
        is_dir = mdobj["is_dir"]
 
997
        if is_dir:
 
998
            expected_event = "FS_DIR_DELETE"
 
999
        else:
 
1000
            expected_event = "FS_FILE_DELETE"
 
1001
        with self._enable_share_write(mdobj['share_id'], path):
 
1002
            try:
 
1003
                self.eq.add_to_mute_filter(expected_event, path=path)
 
1004
                rename(path, to_path)
 
1005
                event = "FSM_DIR_CONFLICT" if is_dir else "FSM_FILE_CONFLICT"
 
1006
                self.eq.push(event, old_name=path, new_name=to_path)
 
1007
            except OSError, e:
 
1008
                self.eq.rm_from_mute_filter(expected_event, path=path)
 
1009
                if e.errno == errno.ENOENT:
 
1010
                    m = "Already removed when trying to move to conflict: %r"
 
1011
                    log_warning(m, path)
 
1012
                else:
 
1013
                    raise
 
1014
 
 
1015
        for p, is_dir in self.get_paths_starting_with(path, include_base=False):
 
1016
            if is_dir:
 
1017
                # remove inotify watch
 
1018
                try:
 
1019
                    self.vm.m.event_q.rm_watch(p)
 
1020
                except TypeError, e:
 
1021
                    # pyinotify has an ugly error management, if we can call
 
1022
                    # it that, :(. We handle this here because it's possible
 
1023
                    # and correct that the path is not there anymore
 
1024
                    m = "Error %s when trying to remove the watch on %r"
 
1025
                    log_warning(m, e, path)
 
1026
 
 
1027
            self.delete_metadata(p)
 
1028
        mdobj["info"]["last_conflicted"] = time.time()
 
1029
        self.fs[mdid] = mdobj
 
1030
 
 
1031
    def _check_partial(self, mdid):
 
1032
        """Check consistency between internal flag and FS regarding partial"""
 
1033
        # get the values
 
1034
        mdobj = self.fs[mdid]
 
1035
        partial_in_md = mdobj["info"]["is_partial"]
 
1036
        partial_in_disk = path_exists(self._get_partial_path(mdobj))
 
1037
 
 
1038
        # check and return
 
1039
        if partial_in_md != partial_in_disk:
 
1040
            msg = "'partial' inconsistency for object with mdid %r!  In disk:"\
 
1041
                  " %s, In MD: %s" % (mdid, partial_in_disk, partial_in_md)
 
1042
            raise InconsistencyError(msg)
 
1043
        return partial_in_md
 
1044
 
 
1045
    def _get_partial_path(self, mdobj, trim=None):
 
1046
        """Gets the path of the .partial file for a given mdobj"""
 
1047
        if trim is None and "partial_path" in mdobj["info"]:
 
1048
            return mdobj["info"]["partial_path"]
 
1049
 
 
1050
        path = self.get_abspath(mdobj['share_id'], mdobj['path'])
 
1051
        partial_path = os.path.join(self.partials_dir, mdobj['mdid'] + '.u1partial')
 
1052
        dirname, filename = os.path.split(path)
 
1053
 
 
1054
        if trim is not None:
 
1055
            filename = filename[:-10*trim]
 
1056
            mdobj["info"]["partial_path"] = partial_path + '.' + filename
 
1057
 
 
1058
        return partial_path + '.' + filename
 
1059
 
 
1060
    def create_partial(self, node_id, share_id):
 
1061
        """Create a .partial in disk and set the flag in metadata."""
 
1062
        mdid = self._idx_node_id[(share_id, node_id)]
 
1063
        log_debug("create_partial: mdid=%r share_id=%r node_id=%r",
 
1064
                  mdid, share_id, node_id)
 
1065
        if self._check_partial(mdid):
 
1066
            raise ValueError("The object with share_id %r and node_id %r is "
 
1067
                             "already partial!", share_id, node_id)
 
1068
 
 
1069
        # create an empty partial and set the flag
 
1070
        mdobj = self.fs[mdid]
 
1071
        is_dir = mdobj["is_dir"]
 
1072
        path = self.get_abspath(mdobj['share_id'], mdobj['path'])
 
1073
        with self._enable_share_write(share_id, os.path.dirname(path)):
 
1074
            # if we don't have the dir yet, create it
 
1075
            if is_dir and not path_exists(path):
 
1076
                self.eq.add_to_mute_filter("FS_DIR_CREATE", path=path)
 
1077
                make_dir(path)
 
1078
 
 
1079
        mdobj["info"]["last_partial_created"] = time.time()
 
1080
        mdobj["info"]["is_partial"] = True
 
1081
 
 
1082
        # create the partial path, trimming the name until fits
 
1083
        # in the filesystem
 
1084
        partial_path = self._get_partial_path(mdobj)
 
1085
        trim = 0
 
1086
        try:
 
1087
            while True:
 
1088
                try:
 
1089
                    # don't alert EQ, partials are in other directory, not watched
 
1090
                    os_open(partial_path, "w").close()
 
1091
                except IOError, e:
 
1092
                    # linux will give you too long, windows will say invalid
 
1093
                    if e.errno in (errno.ENAMETOOLONG, errno.EINVAL):
 
1094
                        trim += 1
 
1095
                        partial_path = self._get_partial_path(mdobj, trim=trim)
 
1096
                    else:
 
1097
                        raise
 
1098
                else:
 
1099
                    break
 
1100
        finally:
 
1101
            self.fs[mdid] = mdobj
 
1102
 
 
1103
    def get_partial_for_writing(self, node_id, share_id):
 
1104
        """Get a write-only fd to a partial file"""
 
1105
        mdid = self._idx_node_id[(share_id, node_id)]
 
1106
        log_debug("get_partial_for_writing: mdid=%r share_id=%r node_id=%r",
 
1107
                  mdid, share_id, node_id)
 
1108
 
 
1109
        mdobj = self.fs[mdid]
 
1110
        partial_path = self._get_partial_path(mdobj)
 
1111
        return os_open(partial_path, "wb")
 
1112
 
 
1113
    def get_partial(self, node_id, share_id):
 
1114
        """Get a read-only fd to a partial file."""
 
1115
        mdid = self._idx_node_id[(share_id, node_id)]
 
1116
        if not self._check_partial(mdid):
 
1117
            raise ValueError("The object with share_id %r and node_id %r is "
 
1118
                             "not partial!" % (share_id, node_id))
 
1119
 
 
1120
        partial_path = self._get_partial_path(self.fs[mdid])
 
1121
        fd = os_open(partial_path, "rb")
 
1122
        return fd
 
1123
 
 
1124
    def commit_partial(self, node_id, share_id, local_hash):
 
1125
        """Create a .partial in disk and set the flag in metadata."""
 
1126
        mdid = self._idx_node_id[(share_id, node_id)]
 
1127
        mdobj = self.fs[mdid]
 
1128
        if mdobj["is_dir"]:
 
1129
            raise ValueError("Directory partials can not be commited!")
 
1130
        if not self._check_partial(mdid):
 
1131
            raise ValueError("The object with share_id %r and node_id %r is "
 
1132
                             "not partial!" % (share_id, node_id))
 
1133
 
 
1134
        # move the .partial to the real path, and set the md info
 
1135
        path = self.get_abspath(mdobj['share_id'], mdobj['path'])
 
1136
        log_debug("commit_partial: path=%r mdid=%r share_id=%r node_id=%r",
 
1137
                  path, mdid, share_id, node_id)
 
1138
 
 
1139
        partial_path = self._get_partial_path(mdobj)
 
1140
        with self._enable_share_write(share_id, path):
 
1141
            self.eq.add_to_mute_filter("FS_FILE_CREATE", path=path)
 
1142
            self.eq.add_to_mute_filter("FS_FILE_CLOSE_WRITE", path=path)
 
1143
            recursive_move(partial_path, path)
 
1144
        mdobj["local_hash"] = local_hash
 
1145
        mdobj["info"]["last_downloaded"] = time.time()
 
1146
        mdobj["info"]["is_partial"] = False
 
1147
        mdobj["stat"] = get_stat(path)
 
1148
        self.fs[mdid] = mdobj
 
1149
 
 
1150
    def remove_partial(self, node_id, share_id):
 
1151
        """Remove a .partial in disk and set the flag in metadata."""
 
1152
        mdid = self._idx_node_id[(share_id, node_id)]
 
1153
 
 
1154
        # delete the .partial, and set the md info
 
1155
        mdobj = self.fs[mdid]
 
1156
        path = self.get_abspath(mdobj['share_id'], mdobj['path'])
 
1157
        log_debug("remove_partial: path=%r mdid=%r share_id=%r node_id=%r",
 
1158
                  path, mdid, share_id, node_id)
 
1159
        partial_path = self._get_partial_path(mdobj)
 
1160
        #pylint: disable-msg=W0704
 
1161
        try:
 
1162
            # don't alert EQ, partials are in other directory, not watched
 
1163
            remove_file(partial_path)
 
1164
        except OSError, e:
 
1165
            # we only remove it if its there.
 
1166
            m = "OSError %s when trying to remove partial_path %r"
 
1167
            log_warning(m, e, partial_path)
 
1168
        mdobj["info"]["last_partial_removed"] = time.time()
 
1169
        mdobj["info"]["is_partial"] = False
 
1170
        self.fs[mdid] = mdobj
 
1171
 
 
1172
    def upload_finished(self, mdid, server_hash):
 
1173
        """Set the metadata with timestamp and server hash."""
 
1174
        mdobj = self.fs[mdid]
 
1175
        mdobj["info"]["last_uploaded"] = time.time()
 
1176
        mdobj["server_hash"] = server_hash
 
1177
        self.fs[mdid] = mdobj
 
1178
 
 
1179
    def _get_mdid_from_args(self, kwargs, parent):
 
1180
        """Parse the kwargs and gets the mdid."""
 
1181
        if len(kwargs) == 1 and "path" in kwargs:
 
1182
            path = normpath(kwargs["path"])
 
1183
            return self._idx_path[path]
 
1184
        if len(kwargs) == 1 and "mdid" in kwargs:
 
1185
            return kwargs["mdid"]
 
1186
        if len(kwargs) == 2 and "node_id" in kwargs and "share_id" in kwargs:
 
1187
            return self._idx_node_id[(kwargs["share_id"], kwargs["node_id"])]
 
1188
        raise TypeError("Incorrect arguments for %r: %r" % (parent, kwargs))
 
1189
 
 
1190
    def is_dir(self, **kwargs):
 
1191
        """Return True if the path of a given object is a directory."""
 
1192
        mdid = self._get_mdid_from_args(kwargs, "is_dir")
 
1193
        mdobj = self.fs[mdid]
 
1194
        return mdobj["is_dir"]
 
1195
 
 
1196
    def has_metadata(self, **kwargs):
 
1197
        """Return True if there's metadata for a given object."""
 
1198
        if len(kwargs) == 1 and "path" in kwargs:
 
1199
            path = normpath(kwargs["path"])
 
1200
            return path in self._idx_path
 
1201
        if len(kwargs) == 1 and "mdid" in kwargs:
 
1202
            return kwargs["mdid"] in self.fs
 
1203
        if len(kwargs) == 2 and "node_id" in kwargs and "share_id" in kwargs:
 
1204
            return (kwargs["share_id"], kwargs["node_id"]) in self._idx_node_id
 
1205
        raise TypeError("Incorrect arguments for 'has_metadata': %r" % kwargs)
 
1206
 
 
1207
    def changed(self, **kwargs):
 
1208
        """Return whether a given node has changed or not.
 
1209
 
 
1210
        The node can be defined by any of the following:
 
1211
            - path
 
1212
            - metadata's id (mdid)
 
1213
            - node_id and share_id
 
1214
 
 
1215
        Return:
 
1216
            - LOCAL if the node has local modifications that the server is
 
1217
              not aware of.
 
1218
            - SERVER if the node is not fully downloaded.
 
1219
            - NONE the node has not changed.
 
1220
 
 
1221
        """
 
1222
        # get the values
 
1223
        mdid = self._get_mdid_from_args(kwargs, "changed")
 
1224
        mdobj = self.fs[mdid]
 
1225
        is_partial = mdobj["info"]["is_partial"]
 
1226
        local_hash = mdobj.get("local_hash", False)
 
1227
        server_hash = mdobj.get("server_hash", False)
 
1228
 
 
1229
        # return the status
 
1230
        if local_hash == server_hash:
 
1231
            if is_partial:
 
1232
                return "We broke the Universe! local_hash %r, server_hash %r,"\
 
1233
                       " is_partial %r" % (local_hash, server_hash, is_partial)
 
1234
            else:
 
1235
                return self.CHANGED_NONE
 
1236
        else:
 
1237
            if is_partial:
 
1238
                return self.CHANGED_SERVER
 
1239
            else:
 
1240
                return self.CHANGED_LOCAL
 
1241
 
 
1242
    def local_changed(self, path):
 
1243
        """Return whether a given node have locally changed or not.
 
1244
 
 
1245
        Return True if the node at `path' (or any of its children) has
 
1246
        been locally modified.
 
1247
 
 
1248
        """
 
1249
        has_changed = False
 
1250
        for p, is_dir in self.get_paths_starting_with(path):
 
1251
            if self.changed(path=p) == self.CHANGED_LOCAL:
 
1252
                has_changed = True
 
1253
                break
 
1254
        return has_changed
 
1255
 
 
1256
    def dir_content(self, path):
 
1257
        """Return the content of the directory in a server-comparable way."""
 
1258
        path = normpath(path)
 
1259
        mdid = self._idx_path[path]
 
1260
        mdobj = self.fs[mdid]
 
1261
        if not mdobj["is_dir"]:
 
1262
            raise ValueError("You can ask dir_content only on a directory.")
 
1263
 
 
1264
        def _get_all():
 
1265
            """find the mdids that match"""
 
1266
            for p, m in self._idx_path.iteritems():
 
1267
                if os.path.dirname(p) == path and p != path:
 
1268
                    mdobj = self.fs[m]
 
1269
                    yield (os.path.basename(p), mdobj["is_dir"],
 
1270
                                                            mdobj["node_id"])
 
1271
 
 
1272
        return sorted(_get_all())
 
1273
 
 
1274
    def open_file(self, mdid):
 
1275
        """Return a file like object for reading the contents of the file."""
 
1276
        mdobj = self.fs[mdid]
 
1277
        if mdobj["is_dir"]:
 
1278
            raise ValueError("You can only open files, not directories.")
 
1279
 
 
1280
        return os_open(self.get_abspath(mdobj['share_id'], mdobj['path']),
 
1281
                       'rb')
 
1282
 
 
1283
    def _share_relative_path(self, share_id, path):
 
1284
        """Return the relative path from the share_id."""
 
1285
        share = self._get_share(share_id)
 
1286
        if path == share.path:
 
1287
            # the relaitve path is the fullpath
 
1288
            return share.path
 
1289
        # pylint: disable-msg=W0612
 
1290
        head, sep, tail = path.rpartition(share.path)
 
1291
        if sep == '':
 
1292
            raise ValueError("'%s' isn't a child of '%s'" % (path, share.path))
 
1293
        relpath = tail.lstrip(os.path.sep)
 
1294
        # remove the initial os.path.sep
 
1295
        return relpath.lstrip(os.path.sep)
 
1296
 
 
1297
    def _get_share(self, id):
 
1298
        """Returns the share/udf with share or volume id: id."""
 
1299
        # TODO: refactor fsm to use volume instead of share
 
1300
        share = self.shares.get(id, None)
 
1301
        if share is None:
 
1302
            share = self.vm.get_volume(id)
 
1303
            self.shares[id] = share
 
1304
        return share
 
1305
 
 
1306
    def get_abspath(self, share_id, path):
 
1307
        """Return the absolute path: share.path + path."""
 
1308
        share_path = self._get_share(share_id).path
 
1309
        if share_path == path:
 
1310
            # the relaitve path is the fullpath
 
1311
            return share_path
 
1312
        else:
 
1313
            return os.path.abspath(os.path.join(share_path, path))
 
1314
 
 
1315
    def _enable_share_write(self, share_id, path, recursive=False):
 
1316
        """Helper to create a EnableShareWrite context manager."""
 
1317
        share = self._get_share(share_id)
 
1318
        return EnableShareWrite(share, path, recursive)
 
1319
 
 
1320
    def get_paths_starting_with(self, base_path, include_base=True):
 
1321
        """Return a list of paths that are starts with base_path.
 
1322
 
 
1323
        base_path should be a directory.
 
1324
        If include_base, base_path is added to the resulting list.
 
1325
 
 
1326
        """
 
1327
        all_paths = []
 
1328
 
 
1329
        base_mdid = self._idx_path.get(base_path)
 
1330
        if base_mdid is not None and include_base:
 
1331
            mdobj = self.fs[base_mdid]
 
1332
            all_paths.append((base_path, mdobj['is_dir']))
 
1333
 
 
1334
        # add sep, to always match children in the tree and not partial names
 
1335
        base_path += os.path.sep
 
1336
 
 
1337
        for path, mdid in self._idx_path.iteritems():
 
1338
            if path.startswith(base_path):
 
1339
                mdobj = self.fs[mdid]
 
1340
                all_paths.append((path, mdobj['is_dir']))
 
1341
 
 
1342
        return all_paths
 
1343
 
 
1344
    def delete_to_trash(self, mdid, parent_id):
 
1345
        """Move the node to the trash."""
 
1346
        mdobj = self.fs[mdid]
 
1347
        node_id = mdobj["node_id"]
 
1348
        if node_id is None:
 
1349
            node_id = MDMarker(mdid)
 
1350
        share_id = mdobj["share_id"]
 
1351
        path = self.get_abspath(mdobj['share_id'], mdobj['path'])
 
1352
        is_dir = mdobj["is_dir"]
 
1353
        log_debug("delete_to_trash: mdid=%r, parent=%r, share=%r, node=%r, "
 
1354
                  "path=%r is_dir=%r", mdid, parent_id, share_id, node_id,
 
1355
                  path, is_dir)
 
1356
        self.delete_metadata(path)
 
1357
        self.trash[(share_id, node_id)] = (mdid, parent_id, path, is_dir)
 
1358
 
 
1359
    def remove_from_trash(self, share_id, node_id):
 
1360
        """Delete the node from the trash."""
 
1361
        log_debug("remove_from_trash: share=%r, node=%r", share_id, node_id)
 
1362
        if (share_id, node_id) in self.trash:
 
1363
            del self.trash[(share_id, node_id)]
 
1364
 
 
1365
    def node_in_trash(self, share_id, node_id):
 
1366
        """Return if the node is in the trash."""
 
1367
        return (share_id, node_id) in self.trash
 
1368
 
 
1369
    def get_iter_trash(self):
 
1370
        """Return the trash element by element."""
 
1371
        for (share_id, node_id), node_info in self.trash.iteritems():
 
1372
            parent_id = node_info[1]
 
1373
            if len(node_info) <= 2:
 
1374
                # old trash, use a fake path to not block the unlink
 
1375
                # that LR generates
 
1376
                path = "fake_unblocking_path"
 
1377
            else:
 
1378
                path = node_info[2]
 
1379
            if len(node_info) <= 3:
 
1380
                is_dir = False
 
1381
            else:
 
1382
                is_dir = node_info[3]
 
1383
            yield share_id, node_id, parent_id, path, is_dir
 
1384
 
 
1385
    def get_dirty_nodes(self):
 
1386
        """Return the mdid of the dirty nodes, one by one."""
 
1387
        for _, v in self.fs.items():
 
1388
            if v.get('dirty'):
 
1389
                yield _MDObject(**v)
 
1390
 
 
1391
    def add_to_move_limbo(self, share_id, node_id, old_parent_id,
 
1392
                          new_parent_id, new_name, path_from, path_to):
 
1393
        """Add the operation info to the move limbo."""
 
1394
        log_debug("add to move limbo: share=%r, node=%r, old_parent=%r, "
 
1395
                  "new_parent=%r, new_name=%r", share_id, node_id,
 
1396
                  old_parent_id, new_parent_id, new_name)
 
1397
        self.move_limbo[(share_id, node_id)] = (old_parent_id, new_parent_id,
 
1398
                                                new_name, path_from, path_to)
 
1399
 
 
1400
    def remove_from_move_limbo(self, share_id, node_id):
 
1401
        """Remove the node from the move limbo."""
 
1402
        log_debug("remove from move limbo: share=%r, node=%r",
 
1403
                  share_id, node_id)
 
1404
        if (share_id, node_id) in self.move_limbo:
 
1405
            del self.move_limbo[(share_id, node_id)]
 
1406
 
 
1407
    def get_iter_move_limbo(self):
 
1408
        """Return the move limbo node by node."""
 
1409
        for k, v in self.move_limbo.iteritems():
 
1410
            share_id, node_id = k
 
1411
            if len(v) == 3:
 
1412
                # old move limbo, use fakes path to not block the move
 
1413
                # that LR generates
 
1414
                path_from = "fake_path_from"
 
1415
                path_to = "fake_path_to"
 
1416
                old_parent_id, new_parent_id, new_name = v
 
1417
            else:
 
1418
                old_parent_id, new_parent_id, new_name, path_from, path_to = v
 
1419
            yield (share_id, node_id, old_parent_id, new_parent_id,
 
1420
                   new_name, path_from, path_to)
 
1421
 
 
1422
    def make_dir(self, mdid):
 
1423
        """Create the dir in disk."""
 
1424
        mdobj = self.get_by_mdid(mdid)
 
1425
        if not mdobj.is_dir:
 
1426
            raise ValueError("make_dir must be on a file (mdid: %r)" % (mdid,))
 
1427
 
 
1428
        full_path = self.get_abspath(mdobj.share_id, mdobj.path)
 
1429
        with self._enable_share_write(mdobj.share_id, full_path) as enable:
 
1430
            if not enable.ro:
 
1431
                self.eq.add_to_mute_filter('FS_DIR_CREATE', path=full_path)
 
1432
 
 
1433
            try:
 
1434
                make_dir(full_path)
 
1435
            except OSError, e:
 
1436
                if not e.errno == 17: #already exists
 
1437
                    raise
 
1438
 
 
1439
            if not enable.ro:
 
1440
                # add the watch: we hope the user wont have time to add a file
 
1441
                # just after *we* created the directory; see bug #373940
 
1442
                self.eq.add_watch(full_path)
 
1443
 
 
1444
    def dereference_ok_limbos(self, marker, value):
 
1445
        """Dereference markers in the limbos with a value."""
 
1446
        for (share, node), (mdid, parent, path, is_dir) in \
 
1447
                self.trash.iteritems():
 
1448
            if node == marker:
 
1449
                del self.trash[(share, node)]
 
1450
                self.trash[(share, value)] = (mdid, parent, path, is_dir)
 
1451
                log_debug("dereference ok trash: share=%r  marker=%r  "
 
1452
                          "new node=%r", share, marker, value)
 
1453
            elif parent == marker:
 
1454
                self.trash[(share, node)] = (mdid, value, path, is_dir)
 
1455
                log_debug("dereference ok trash: share=%r  node=%r  marker=%r"
 
1456
                          "  new parent=%r", share, node, marker, value)
 
1457
 
 
1458
        for k, v in self.move_limbo.iteritems():
 
1459
            share, node = k
 
1460
            old_parent, new_parent, new_name, path_from, path_to = v
 
1461
 
 
1462
            if node == marker:
 
1463
                del self.move_limbo[(share, node)]
 
1464
                self.move_limbo[(share, value)] = v
 
1465
                log_debug("dereference ok move limbo: share=%r  marker=%r  "
 
1466
                          "new node=%r", share, marker, value)
 
1467
            else:
 
1468
                # both parents can be the same marker at the same time
 
1469
                if old_parent == marker or new_parent == marker:
 
1470
                    if old_parent == marker:
 
1471
                        old_parent = value
 
1472
                    if new_parent == marker:
 
1473
                        new_parent = value
 
1474
                log_debug("dereference ok move limbo: share=%r  node=%r  "
 
1475
                          "marker=%r  old_parent=%r  new_parent=%r",
 
1476
                          share, node, marker, old_parent, new_parent)
 
1477
                self.move_limbo[k] = (old_parent, new_parent, new_name,
 
1478
                                      path_from, path_to)
 
1479
 
 
1480
    def dereference_err_limbos(self, marker):
 
1481
        """Dereference markers in the limbos with an error.
 
1482
 
 
1483
        As the dependency is not valid, we just remove the item.
 
1484
        """
 
1485
        for (share, node), (_, parent, _, _) in self.trash.iteritems():
 
1486
            if node == marker or parent == marker:
 
1487
                log_debug("dereference err trash: share=%r  node=%r  "
 
1488
                          "marker=%r", share, node, marker)
 
1489
                del self.trash[(share, node)]
 
1490
 
 
1491
        move_items = self.move_limbo.iteritems()
 
1492
        for (share, node), (old_parent, new_parent, _, _, _) in move_items:
 
1493
            if node == marker or old_parent == marker or new_parent == marker:
 
1494
                log_debug("dereference err move limbo: share=%r  node=%r  "
 
1495
                          "marker=%r", share, node, marker)
 
1496
                del self.move_limbo[(share, node)]
 
1497
 
 
1498
 
 
1499
class EnableShareWrite(object):
 
1500
    """Context manager to allow write in ro-shares."""
 
1501
 
 
1502
    def __init__(self, share, path, recursive=False):
 
1503
        self.share = share
 
1504
        self.path = path
 
1505
        self.ro = not self.share.can_write()
 
1506
        self.recursive = recursive
 
1507
 
 
1508
        # list of (path, isdir) to restore permissions
 
1509
        self._changed_nodes = []
 
1510
 
 
1511
    def __enter__(self):
 
1512
        """Change the nodes to be writable."""
 
1513
        if not self.ro:
 
1514
            return self
 
1515
 
 
1516
        # the parent should be writable for us to change path
 
1517
        parent = os.path.dirname(self.path)
 
1518
        parent_stat = get_stat(parent)
 
1519
        if parent_stat is None:
 
1520
            # if we don't have the parent yet, create it
 
1521
            with EnableShareWrite(self.share, parent):
 
1522
                make_dir(parent)
 
1523
        set_dir_readwrite(parent)
 
1524
        self._changed_nodes.append((parent, True))
 
1525
 
 
1526
        # so, change path if exists
 
1527
        path_stat = get_stat(self.path)
 
1528
        if path_stat is not None:
 
1529
            if stat.S_ISDIR(path_stat.st_mode):
 
1530
                set_dir_readwrite(self.path)
 
1531
                self._changed_nodes.append((self.path, True))
 
1532
            else:
 
1533
                set_file_readwrite(self.path)
 
1534
                self._changed_nodes.append((self.path, False))
 
1535
 
 
1536
        # if needed, change the whole subtree
 
1537
        if self.recursive:
 
1538
            for dirpath, dirnames, filenames in walk(self.path, topdown=False):
 
1539
                for dname in dirnames:
 
1540
                    path = os.path.join(dirpath, dname)
 
1541
                    set_dir_readwrite(path)
 
1542
                    self._changed_nodes.append((path, True))
 
1543
                for fname in filenames:
 
1544
                    path = os.path.join(dirpath, fname)
 
1545
                    set_file_readwrite(path)
 
1546
                    self._changed_nodes.append((path, False))
 
1547
        return self
 
1548
 
 
1549
    def __exit__(self, *exc_info):
 
1550
        """Restore node permissions.
 
1551
 
 
1552
        Note that this is done backwards, from the leaf to the root.
 
1553
        """
 
1554
        if not self.ro:
 
1555
            return
 
1556
 
 
1557
        # restore self.path, that may not have existed at __enter__ time
 
1558
        path_stat = get_stat(self.path)
 
1559
        if path_stat is not None:
 
1560
            if stat.S_ISDIR(path_stat.st_mode):
 
1561
                set_dir_readonly(self.path)
 
1562
            else:
 
1563
                set_file_readonly(self.path)
 
1564
 
 
1565
        # restore all saved ones
 
1566
        exists = path_exists
 
1567
        for path, isdir in self._changed_nodes[::-1]:
 
1568
            if exists(path):
 
1569
                if isdir:
 
1570
                    set_dir_readonly(path)
 
1571
                else:
 
1572
                    set_file_readonly(path)
 
1573
 
 
1574
 
 
1575
def get_stat(path):
 
1576
    """Return os.lstat or None if errno == ENOENT.
 
1577
 
 
1578
    os.lstat is used as we don't support symlinks
 
1579
    """
 
1580
    try:
 
1581
        return stat_path(path)
 
1582
    except OSError, e:
 
1583
        if e.errno == errno.ENOENT:
 
1584
            return None
 
1585
        else:
 
1586
            raise