~didrocks/ubuntuone-client/dont-suffer-zg-crash

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/local_rescan.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-06-30 12:00:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090630120000-by806ovmw3193qe8
Tags: upstream-0.90.3
ImportĀ upstreamĀ versionĀ 0.90.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# ubuntuone.syncdaemon.local_rescan - local rescanning
 
2
#
 
3
# Author: Facundo Batista <facundo@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
'''Module that implements the Local Rescan.'''
 
19
 
 
20
import os
 
21
import functools
 
22
import logging
 
23
import errno
 
24
 
 
25
from twisted.internet import defer, reactor
 
26
 
 
27
class ScanTransactionDirty(Exception):
 
28
    '''The transaction was dirty.'''
 
29
 
 
30
class ScanNoDirectory(Exception):
 
31
    '''The whole directory went away.'''
 
32
 
 
33
# local rescan logger
 
34
lr_logger = logging.getLogger('ubuntuone.SyncDaemon.local_rescan')
 
35
log_info = functools.partial(lr_logger.log, logging.INFO)
 
36
log_debug = functools.partial(lr_logger.log, logging.DEBUG)
 
37
log_error = functools.partial(lr_logger.log, logging.ERROR)
 
38
log_warning = functools.partial(lr_logger.log, logging.WARNING)
 
39
 
 
40
class LocalRescan(object):
 
41
    '''Local re-scanner.
 
42
 
 
43
    Compares the real disc with FSM's metadata, and pushes the changes to EQ.
 
44
    '''
 
45
    def __init__(self, vm, fsm, eq):
 
46
        self.vm = vm
 
47
        self.fsm = fsm
 
48
        self.eq = eq
 
49
        self._queue = []
 
50
        self._previous_deferred = None
 
51
 
 
52
    def start(self):
 
53
        '''Start the comparison.'''
 
54
        log_info("start scan all shares")
 
55
        to_scan = self._get_shares()
 
56
        for share in to_scan:
 
57
            all_share_dirs = self._get_share_dirs(share.id)
 
58
            self._queue.insert(0, (all_share_dirs, share.path, share.path))
 
59
        d = self._queue_scan()
 
60
        return d
 
61
 
 
62
    def _get_shares(self, access_level="Modify"):
 
63
        '''Get all the shares to compare.'''
 
64
        for sid in self.vm.shares:
 
65
            share = self.vm.shares[sid]
 
66
            if share.access_level == access_level:
 
67
                yield share
 
68
 
 
69
    def scan_dir(self, direct):
 
70
        '''Compares one directory between metadata and disk.'''
 
71
        log_info("scan dir: %r", direct)
 
72
 
 
73
        # get the share to get only a subset of mdids
 
74
        for share in self._get_shares():
 
75
            if direct.startswith(share.path):
 
76
                break
 
77
        else:
 
78
            # not in RW shares; let's check RO shares, otherwise it's an error
 
79
            for share in self._get_shares("View"):
 
80
                if direct.startswith(share.path):
 
81
                    return
 
82
            log_error("The received path is not in any share!")
 
83
            raise ValueError("The received path is not in any share!")
 
84
 
 
85
        if not os.path.exists(direct):
 
86
            m = "The path is not in disk: %r" % direct
 
87
            log_warning(m)
 
88
            return
 
89
 
 
90
        if not os.path.isdir(direct):
 
91
            m = "The path is in disk but it's not a dir: %r" % direct
 
92
            log_error(m)
 
93
            raise ValueError("m")
 
94
 
 
95
        # No, 'share' is surely defined; pylint: disable-msg=W0631
 
96
        all_share_dirs = self._get_share_dirs(share.id)
 
97
        self._queue.insert(0, (all_share_dirs, share.path, direct))
 
98
        return self._queue_scan()
 
99
 
 
100
    def _queue_scan(self):
 
101
        '''If there's a scan in progress, queue the new one for later.'''
 
102
        if self._previous_deferred is None:
 
103
            self._previous_deferred = defer.Deferred()
 
104
            self._process_next_queue(None)
 
105
        return self._previous_deferred
 
106
 
 
107
    def _process_next_queue(self, _):
 
108
        '''Process the next item in the queue, if any.'''
 
109
        log_debug("process next in queue (len %d)", len(self._queue))
 
110
        if not self._queue:
 
111
            self._previous_deferred.callback(None)
 
112
            self._previous_deferred = None
 
113
            return
 
114
 
 
115
        # more to scan
 
116
        scan_info = self._queue.pop()
 
117
 
 
118
        def safe_scan():
 
119
            try:
 
120
                self._scan_tree(*scan_info)
 
121
            except Exception, e:
 
122
                self._previous_deferred.errback(e)
 
123
 
 
124
        reactor.callLater(0, safe_scan)
 
125
 
 
126
    def _get_share_dirs(self, share_id):
 
127
        '''Get all the directories in a share.'''
 
128
        all_share_dirs = []
 
129
        for obj in self.fsm.get_mdobjs_by_share_id(share_id):
 
130
            changd = self.fsm.changed(mdid=obj.mdid)
 
131
            all_share_dirs.append(
 
132
                    (obj.path, obj.is_dir, obj.stat, changd, obj.node_id))
 
133
        return all_share_dirs
 
134
 
 
135
    def _scan_tree(self, all_share_dirs, share_path, path):
 
136
        '''Scans a whole tree, using the received path as root.'''
 
137
        log_debug("_scan_tree:  share_path: %r  path: %r", share_path, path)
 
138
 
 
139
        def get_start_info():
 
140
            '''Gathers the info to start.'''
 
141
            # start the process
 
142
            return (all_share_dirs, share_path)
 
143
 
 
144
        def go_deeper(newdirs):
 
145
            '''Explore into the subdirs.'''
 
146
            for direct in newdirs:
 
147
                log_debug("explore subdir: %r", direct)
 
148
                self._queue.insert(0, (all_share_dirs, share_path, direct))
 
149
 
 
150
        def re_launch(failure):
 
151
            '''Explore that directory again.'''
 
152
            if failure.check(ScanTransactionDirty):
 
153
                reason = failure.getErrorMessage()
 
154
                log_debug("re queue, transaction dirty for %r, reason: %s",
 
155
                                                                  path, reason)
 
156
                self._queue.insert(0, (all_share_dirs, share_path, path))
 
157
            elif failure.check(ScanNoDirectory):
 
158
                log_error("the directory dissappeared: %s (%s)", failure.type,
 
159
                                                                failure.value)
 
160
            else:
 
161
                log_error("in the scan: %s (%s)\n%s",
 
162
                          failure.type, failure.value, failure.getTraceback())
 
163
                return failure
 
164
 
 
165
        d = defer.succeed((all_share_dirs, share_path, path))
 
166
        d.addCallbacks(self._scan_one_dir)
 
167
        d.addCallbacks(go_deeper, re_launch)
 
168
        d.addCallback(self._process_next_queue)
 
169
        return d
 
170
 
 
171
    def _compare(self, dirpath, dirnames, filenames, all_share_dirs, shr_path):
 
172
        '''Compare the directories with the info that should be there.'''
 
173
        log_debug("comparing directory %r", dirpath)
 
174
 
 
175
        # get the share info
 
176
        shouldbe = self._paths_filter(all_share_dirs, dirpath, len(shr_path))
 
177
 
 
178
        def despair(message, fullname, also_children=False, also_remove=None):
 
179
            '''Something went very bad with this node, converge!'''
 
180
            log_debug(message, fullname)
 
181
            try:
 
182
                os.rename(fullname, fullname + ".conflict")
 
183
            except OSError, e:
 
184
                m = "OSError %s when trying to move to conflict file/dir %r"
 
185
                log_warning(m, e, fullname)
 
186
            self.fsm.delete_metadata(fullname)
 
187
 
 
188
            # if asked, remove metadata por children
 
189
            if also_children:
 
190
                log_debug("Removing also metadata from %r children", fullname)
 
191
                for path, is_dir in self.fsm.get_paths_starting_with(fullname):
 
192
                    self.fsm.delete_metadata(path)
 
193
 
 
194
            # if asked, remove also that file
 
195
            if also_remove is not None:
 
196
                try:
 
197
                    os.remove(also_remove)
 
198
                except OSError, e:
 
199
                    m = "OSError %s when trying to remove file %r"
 
200
                    log_warning(m, e, fullname)
 
201
 
 
202
        def check_stat(fullname, statinfo):
 
203
            '''Generate event if stats differ.'''
 
204
            log_debug("comp yield STAT prv: %s", statinfo)
 
205
            newstat = os.stat(fullname)
 
206
            log_debug("comp yield STAT new: %s", newstat)
 
207
            if statinfo != newstat:
 
208
                events.append(('FS_FILE_CLOSE_WRITE', fullname))
 
209
 
 
210
        # check all directories
 
211
        to_scan_later = []
 
212
        events = []
 
213
        for dname in dirnames:
 
214
            fullname = os.path.join(dirpath, dname)
 
215
            if dname in shouldbe:
 
216
                is_dir, statinfo, changed = shouldbe.pop(dname)
 
217
                if not is_dir:
 
218
                    # it's there, but it's a file!
 
219
                    log_debug("comp yield: file %r became a dir!", fullname)
 
220
                    events.append(('FS_FILE_DELETE', fullname))
 
221
                    events.append(('FS_DIR_CREATE', fullname))
 
222
                else:
 
223
                    if changed == "SERVER":
 
224
                        # download interrupted
 
225
                        log_debug("comp yield: dir %r in SERVER", fullname)
 
226
                        mdobj = self.fsm.get_by_path(fullname)
 
227
                        self.fsm.set_by_mdid(mdobj.mdid,
 
228
                                             server_hash=mdobj.local_hash)
 
229
                        self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
 
230
                        to_scan_later.append(fullname)
 
231
                    elif changed == "NONE":
 
232
                        # it's old, we should scan it later
 
233
                        log_debug("comp yield: dir %r will be scaned later!",
 
234
                                                                      fullname)
 
235
                        to_scan_later.append(fullname)
 
236
                    else:
 
237
                        m = "Wrong 'changed' value for %r: " + changed
 
238
                        despair(m, fullname, also_children=True)
 
239
 
 
240
            else:
 
241
                # hey, it's new!
 
242
                log_debug("comp yield: directory %r is new!", fullname)
 
243
                events.append(('FS_DIR_CREATE', fullname))
 
244
 
 
245
        # check all files
 
246
        for fname in filenames:
 
247
            fullname = os.path.join(dirpath, fname)
 
248
            if fname in shouldbe:
 
249
                is_dir, statinfo, changed = shouldbe.pop(fname)
 
250
                if is_dir:
 
251
                    log_debug("comp yield: dir %r became a file!", fullname)
 
252
                    # it's there, but it's a directory!
 
253
                    events.append(('FS_DIR_DELETE', fullname))
 
254
                    events.append(('FS_FILE_CREATE', fullname))
 
255
                else:
 
256
                    if changed == "LOCAL":
 
257
                        # upload interrupted
 
258
                        log_debug("comp yield: file %r in LOCAL state!",
 
259
                                                                    fullname)
 
260
                        events.append(('FS_FILE_CLOSE_WRITE', fullname))
 
261
                    elif changed == "NONE":
 
262
                        # what about stat info?
 
263
                        log_debug("comp yield: file %r was here.. stat?",
 
264
                                                                    fullname)
 
265
                        check_stat(fullname, statinfo)
 
266
                    elif changed == "SERVER":
 
267
                        log_debug("comp yield: file %r in SERVER", fullname)
 
268
                        mdobj = self.fsm.get_by_path(fullname)
 
269
                        self.fsm.set_by_mdid(mdobj.mdid,
 
270
                                             server_hash=mdobj.local_hash)
 
271
                        self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
 
272
                        check_stat(fullname, statinfo)
 
273
                    else:
 
274
                        m = "Wrong 'changed' value for %r: " + changed
 
275
                        despair(m, fullname)
 
276
 
 
277
            else:
 
278
                if fname.endswith(".partial"):
 
279
                    # a partial file! it can be a standard file, or the one
 
280
                    # inside a dir (which will be deleted in that case)
 
281
                    realname = fname[:-8]
 
282
                    realfullname = fullname[:-8]
 
283
                    if realname not in shouldbe:
 
284
                        # this is the case of a .partial with no md at all!
 
285
                        m = "Found a .partial (%r) with no metadata, removing!"
 
286
                        log_debug(m, fullname)
 
287
                        os.remove(fullname)
 
288
                        continue
 
289
 
 
290
                    is_dir, statinfo, changed = shouldbe.pop(realname)
 
291
                    if is_dir:
 
292
                        m = ".partial of a file that MD says it's a dir: %r"
 
293
                        despair(m, realfullname, also_remove=fullname)
 
294
                    elif changed != "SERVER":
 
295
                        m = ".partial of a file that 'changed' != SERVER: %r"
 
296
                        despair(m, realfullname, also_remove=fullname)
 
297
                    else:
 
298
                        # download interrupted
 
299
                        m = "comp yield: file %r in SERVER state!"
 
300
                        log_debug(m, fullname)
 
301
                        mdobj = self.fsm.get_by_path(realfullname)
 
302
                        self.fsm.set_by_mdid(mdobj.mdid,
 
303
                                             server_hash=mdobj.local_hash)
 
304
                        self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
 
305
                        check_stat(fullname, statinfo)
 
306
 
 
307
                else:
 
308
                    # hey, it's new!
 
309
                    log_debug("comp yield: file %r is new!", fullname)
 
310
                    events.append(('FS_FILE_CREATE', fullname))
 
311
 
 
312
                    # if it's not empty, tell to hash it and uplod
 
313
                    if os.path.getsize(fullname):
 
314
                        events.append(('FS_FILE_CLOSE_WRITE', fullname))
 
315
 
 
316
 
 
317
        # all these don't exist anymore
 
318
        for name, (is_dir, statinfo, changed) in shouldbe.iteritems():
 
319
            fullname = os.path.join(dirpath, name)
 
320
            if is_dir:
 
321
                if changed not in ("SERVER", "NONE"):
 
322
                    # bad metadata
 
323
                    m = "Bad 'changed': removing MD from dir %r and children"
 
324
                    log_debug(m, fullname)
 
325
                    children = self.fsm.get_paths_starting_with(fullname)
 
326
                    for path, is_dir in children:
 
327
                        self.fsm.delete_metadata(path)
 
328
                    continue
 
329
 
 
330
                log_debug("comp yield: directory %r is gone!", fullname)
 
331
                # it's a directory, didn't have any info inside?
 
332
                base_path = fullname[len(shr_path)+1:]
 
333
                to_inform = []
 
334
 
 
335
                # get all the info inside that dir
 
336
                for shrpath, is_dir, statinfo, _, _ in all_share_dirs:
 
337
                    if shrpath.startswith(base_path):
 
338
                        qparts = len(shrpath.split(os.path.sep))
 
339
                        to_inform.append((qparts, shrpath, is_dir))
 
340
 
 
341
                # order everything from more path components to less (this
 
342
                # will assure correct upgoing walk in the tree)
 
343
                to_inform.sort(reverse=True)
 
344
 
 
345
                # inform deletion!
 
346
                for (_, name, is_dir) in to_inform:
 
347
                    fullname = os.path.join(shr_path, name)
 
348
                    if is_dir:
 
349
                        events.append(('FS_DIR_DELETE', fullname))
 
350
                    else:
 
351
                        events.append(('FS_FILE_DELETE', fullname))
 
352
            else:
 
353
                if changed not in ("SERVER", "NONE", "LOCAL"):
 
354
                    # bad metadata
 
355
                    m = "Bad 'changed': removing MD from file %r"
 
356
                    log_debug(m, fullname)
 
357
                    self.fsm.delete_metadata(fullname)
 
358
                    continue
 
359
 
 
360
                log_debug("comp yield: file %r is gone!", fullname)
 
361
                events.append(('FS_FILE_DELETE', fullname))
 
362
        return events, to_scan_later
 
363
 
 
364
    def _paths_filter(self, all_share_dirs, dirpath, len_shr_path):
 
365
        '''Returns the paths that belong to this dir.'''
 
366
        # paths in shares are relative, remove the first slash
 
367
        direct = dirpath[len_shr_path + 1:]
 
368
        basedir = dirpath[:len_shr_path]
 
369
 
 
370
        # build the dict
 
371
        filesdirs = {}
 
372
        for shrpath, is_dir, statinfo, changed, node_id in all_share_dirs:
 
373
            base, fname = os.path.split(shrpath)
 
374
            if base == direct:
 
375
                # if without node_id, remove the metadata, and take it as new
 
376
                if node_id is None:
 
377
                    fullname = os.path.join(basedir, shrpath)
 
378
                    m = "Deleting metadata, because of node_id=None, of %r"
 
379
                    log_debug(m, fullname)
 
380
                    self.fsm.delete_metadata(fullname)
 
381
                    continue
 
382
 
 
383
                filesdirs[fname] = is_dir, statinfo, changed
 
384
        return filesdirs
 
385
 
 
386
    def _scan_one_dir(self, scan_info):
 
387
        '''Gets one dir and compares with fsm.'''
 
388
        all_share_dirs, shr_path, dirpath = scan_info
 
389
 
 
390
        log_debug("Adding watch to %r", dirpath)
 
391
        self.eq.inotify_add_watch(dirpath)
 
392
 
 
393
        to_later = []
 
394
        self.eq.freeze_begin(dirpath)
 
395
 
 
396
        def scan():
 
397
            '''the scan, really'''
 
398
            log_debug("scanning the dir %r", dirpath)
 
399
            try:
 
400
                listdir = os.listdir(dirpath)
 
401
            except OSError, e:
 
402
                if e.errno == errno.ENOENT:
 
403
                    self.eq.freeze_rollback()
 
404
                    raise ScanNoDirectory("Directory %r disappeared since last"
 
405
                                          " time" % dirpath)
 
406
                raise
 
407
 
 
408
            # don't support symlinks yet
 
409
            no_link = lambda p: not os.path.islink(os.path.join(dirpath, p))
 
410
            listdir = filter(no_link, listdir)
 
411
 
 
412
            # get the info from disk
 
413
            dnames = []
 
414
            fnames = []
 
415
            for something in listdir:
 
416
                fullname = os.path.join(dirpath, something)
 
417
                if os.path.isdir(fullname):
 
418
                    dnames.append(something)
 
419
                else:
 
420
                    fnames.append(something)
 
421
 
 
422
            try:
 
423
                events, to_scan_later = self._compare(dirpath, dnames, fnames,
 
424
                                                      all_share_dirs, shr_path)
 
425
                to_later.extend(to_scan_later)
 
426
            except OSError, e:
 
427
                if e.errno == errno.ENOENT:
 
428
                    # something dissapeared from disk, start all over again
 
429
                    self.eq.freeze_rollback()
 
430
                    raise ScanTransactionDirty("The file/dir %r dissapeared" %
 
431
                                                                    e.filename)
 
432
                raise
 
433
 
 
434
            return events
 
435
 
 
436
        def control(dirty):
 
437
            '''controls that everything was ok'''
 
438
            if dirty:
 
439
                self.eq.freeze_rollback()
 
440
                raise ScanTransactionDirty("dirty!")
 
441
            else:
 
442
                return to_later
 
443
 
 
444
        d = defer.execute(scan)
 
445
        d.addCallback(self.eq.freeze_commit)
 
446
        d.addCallback(control)
 
447
        return d