1
# ubuntuone.syncdaemon.local_rescan - local rescanning
3
# Author: Facundo Batista <facundo@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
'''Module that implements the Local Rescan.'''
25
from twisted.internet import defer, reactor
27
class ScanTransactionDirty(Exception):
28
'''The transaction was dirty.'''
30
class ScanNoDirectory(Exception):
31
'''The whole directory went away.'''
34
lr_logger = logging.getLogger('ubuntuone.SyncDaemon.local_rescan')
35
log_info = functools.partial(lr_logger.log, logging.INFO)
36
log_debug = functools.partial(lr_logger.log, logging.DEBUG)
37
log_error = functools.partial(lr_logger.log, logging.ERROR)
38
log_warning = functools.partial(lr_logger.log, logging.WARNING)
40
class LocalRescan(object):
43
Compares the real disc with FSM's metadata, and pushes the changes to EQ.
45
def __init__(self, vm, fsm, eq):
50
self._previous_deferred = None
53
'''Start the comparison.'''
54
log_info("start scan all shares")
55
to_scan = self._get_shares()
57
all_share_dirs = self._get_share_dirs(share.id)
58
self._queue.insert(0, (all_share_dirs, share.path, share.path))
59
d = self._queue_scan()
62
def _get_shares(self, access_level="Modify"):
63
'''Get all the shares to compare.'''
64
for sid in self.vm.shares:
65
share = self.vm.shares[sid]
66
if share.access_level == access_level:
69
def scan_dir(self, direct):
70
'''Compares one directory between metadata and disk.'''
71
log_info("scan dir: %r", direct)
73
# get the share to get only a subset of mdids
74
for share in self._get_shares():
75
if direct.startswith(share.path):
78
# not in RW shares; let's check RO shares, otherwise it's an error
79
for share in self._get_shares("View"):
80
if direct.startswith(share.path):
82
log_error("The received path is not in any share!")
83
raise ValueError("The received path is not in any share!")
85
if not os.path.exists(direct):
86
m = "The path is not in disk: %r" % direct
90
if not os.path.isdir(direct):
91
m = "The path is in disk but it's not a dir: %r" % direct
95
# No, 'share' is surely defined; pylint: disable-msg=W0631
96
all_share_dirs = self._get_share_dirs(share.id)
97
self._queue.insert(0, (all_share_dirs, share.path, direct))
98
return self._queue_scan()
100
def _queue_scan(self):
101
'''If there's a scan in progress, queue the new one for later.'''
102
if self._previous_deferred is None:
103
self._previous_deferred = defer.Deferred()
104
self._process_next_queue(None)
105
return self._previous_deferred
107
def _process_next_queue(self, _):
108
'''Process the next item in the queue, if any.'''
109
log_debug("process next in queue (len %d)", len(self._queue))
111
self._previous_deferred.callback(None)
112
self._previous_deferred = None
116
scan_info = self._queue.pop()
120
self._scan_tree(*scan_info)
122
self._previous_deferred.errback(e)
124
reactor.callLater(0, safe_scan)
126
def _get_share_dirs(self, share_id):
127
'''Get all the directories in a share.'''
129
for obj in self.fsm.get_mdobjs_by_share_id(share_id):
130
changd = self.fsm.changed(mdid=obj.mdid)
131
all_share_dirs.append(
132
(obj.path, obj.is_dir, obj.stat, changd, obj.node_id))
133
return all_share_dirs
135
def _scan_tree(self, all_share_dirs, share_path, path):
136
'''Scans a whole tree, using the received path as root.'''
137
log_debug("_scan_tree: share_path: %r path: %r", share_path, path)
139
def get_start_info():
140
'''Gathers the info to start.'''
142
return (all_share_dirs, share_path)
144
def go_deeper(newdirs):
145
'''Explore into the subdirs.'''
146
for direct in newdirs:
147
log_debug("explore subdir: %r", direct)
148
self._queue.insert(0, (all_share_dirs, share_path, direct))
150
def re_launch(failure):
151
'''Explore that directory again.'''
152
if failure.check(ScanTransactionDirty):
153
reason = failure.getErrorMessage()
154
log_debug("re queue, transaction dirty for %r, reason: %s",
156
self._queue.insert(0, (all_share_dirs, share_path, path))
157
elif failure.check(ScanNoDirectory):
158
log_error("the directory dissappeared: %s (%s)", failure.type,
161
log_error("in the scan: %s (%s)\n%s",
162
failure.type, failure.value, failure.getTraceback())
165
d = defer.succeed((all_share_dirs, share_path, path))
166
d.addCallbacks(self._scan_one_dir)
167
d.addCallbacks(go_deeper, re_launch)
168
d.addCallback(self._process_next_queue)
171
def _compare(self, dirpath, dirnames, filenames, all_share_dirs, shr_path):
172
'''Compare the directories with the info that should be there.'''
173
log_debug("comparing directory %r", dirpath)
176
shouldbe = self._paths_filter(all_share_dirs, dirpath, len(shr_path))
178
def despair(message, fullname, also_children=False, also_remove=None):
179
'''Something went very bad with this node, converge!'''
180
log_debug(message, fullname)
182
os.rename(fullname, fullname + ".conflict")
184
m = "OSError %s when trying to move to conflict file/dir %r"
185
log_warning(m, e, fullname)
186
self.fsm.delete_metadata(fullname)
188
# if asked, remove metadata por children
190
log_debug("Removing also metadata from %r children", fullname)
191
for path, is_dir in self.fsm.get_paths_starting_with(fullname):
192
self.fsm.delete_metadata(path)
194
# if asked, remove also that file
195
if also_remove is not None:
197
os.remove(also_remove)
199
m = "OSError %s when trying to remove file %r"
200
log_warning(m, e, fullname)
202
def check_stat(fullname, statinfo):
203
'''Generate event if stats differ.'''
204
log_debug("comp yield STAT prv: %s", statinfo)
205
newstat = os.stat(fullname)
206
log_debug("comp yield STAT new: %s", newstat)
207
if statinfo != newstat:
208
events.append(('FS_FILE_CLOSE_WRITE', fullname))
210
# check all directories
213
for dname in dirnames:
214
fullname = os.path.join(dirpath, dname)
215
if dname in shouldbe:
216
is_dir, statinfo, changed = shouldbe.pop(dname)
218
# it's there, but it's a file!
219
log_debug("comp yield: file %r became a dir!", fullname)
220
events.append(('FS_FILE_DELETE', fullname))
221
events.append(('FS_DIR_CREATE', fullname))
223
if changed == "SERVER":
224
# download interrupted
225
log_debug("comp yield: dir %r in SERVER", fullname)
226
mdobj = self.fsm.get_by_path(fullname)
227
self.fsm.set_by_mdid(mdobj.mdid,
228
server_hash=mdobj.local_hash)
229
self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
230
to_scan_later.append(fullname)
231
elif changed == "NONE":
232
# it's old, we should scan it later
233
log_debug("comp yield: dir %r will be scaned later!",
235
to_scan_later.append(fullname)
237
m = "Wrong 'changed' value for %r: " + changed
238
despair(m, fullname, also_children=True)
242
log_debug("comp yield: directory %r is new!", fullname)
243
events.append(('FS_DIR_CREATE', fullname))
246
for fname in filenames:
247
fullname = os.path.join(dirpath, fname)
248
if fname in shouldbe:
249
is_dir, statinfo, changed = shouldbe.pop(fname)
251
log_debug("comp yield: dir %r became a file!", fullname)
252
# it's there, but it's a directory!
253
events.append(('FS_DIR_DELETE', fullname))
254
events.append(('FS_FILE_CREATE', fullname))
256
if changed == "LOCAL":
258
log_debug("comp yield: file %r in LOCAL state!",
260
events.append(('FS_FILE_CLOSE_WRITE', fullname))
261
elif changed == "NONE":
262
# what about stat info?
263
log_debug("comp yield: file %r was here.. stat?",
265
check_stat(fullname, statinfo)
266
elif changed == "SERVER":
267
log_debug("comp yield: file %r in SERVER", fullname)
268
mdobj = self.fsm.get_by_path(fullname)
269
self.fsm.set_by_mdid(mdobj.mdid,
270
server_hash=mdobj.local_hash)
271
self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
272
check_stat(fullname, statinfo)
274
m = "Wrong 'changed' value for %r: " + changed
278
if fname.endswith(".partial"):
279
# a partial file! it can be a standard file, or the one
280
# inside a dir (which will be deleted in that case)
281
realname = fname[:-8]
282
realfullname = fullname[:-8]
283
if realname not in shouldbe:
284
# this is the case of a .partial with no md at all!
285
m = "Found a .partial (%r) with no metadata, removing!"
286
log_debug(m, fullname)
290
is_dir, statinfo, changed = shouldbe.pop(realname)
292
m = ".partial of a file that MD says it's a dir: %r"
293
despair(m, realfullname, also_remove=fullname)
294
elif changed != "SERVER":
295
m = ".partial of a file that 'changed' != SERVER: %r"
296
despair(m, realfullname, also_remove=fullname)
298
# download interrupted
299
m = "comp yield: file %r in SERVER state!"
300
log_debug(m, fullname)
301
mdobj = self.fsm.get_by_path(realfullname)
302
self.fsm.set_by_mdid(mdobj.mdid,
303
server_hash=mdobj.local_hash)
304
self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
305
check_stat(fullname, statinfo)
309
log_debug("comp yield: file %r is new!", fullname)
310
events.append(('FS_FILE_CREATE', fullname))
312
# if it's not empty, tell to hash it and uplod
313
if os.path.getsize(fullname):
314
events.append(('FS_FILE_CLOSE_WRITE', fullname))
317
# all these don't exist anymore
318
for name, (is_dir, statinfo, changed) in shouldbe.iteritems():
319
fullname = os.path.join(dirpath, name)
321
if changed not in ("SERVER", "NONE"):
323
m = "Bad 'changed': removing MD from dir %r and children"
324
log_debug(m, fullname)
325
children = self.fsm.get_paths_starting_with(fullname)
326
for path, is_dir in children:
327
self.fsm.delete_metadata(path)
330
log_debug("comp yield: directory %r is gone!", fullname)
331
# it's a directory, didn't have any info inside?
332
base_path = fullname[len(shr_path)+1:]
335
# get all the info inside that dir
336
for shrpath, is_dir, statinfo, _, _ in all_share_dirs:
337
if shrpath.startswith(base_path):
338
qparts = len(shrpath.split(os.path.sep))
339
to_inform.append((qparts, shrpath, is_dir))
341
# order everything from more path components to less (this
342
# will assure correct upgoing walk in the tree)
343
to_inform.sort(reverse=True)
346
for (_, name, is_dir) in to_inform:
347
fullname = os.path.join(shr_path, name)
349
events.append(('FS_DIR_DELETE', fullname))
351
events.append(('FS_FILE_DELETE', fullname))
353
if changed not in ("SERVER", "NONE", "LOCAL"):
355
m = "Bad 'changed': removing MD from file %r"
356
log_debug(m, fullname)
357
self.fsm.delete_metadata(fullname)
360
log_debug("comp yield: file %r is gone!", fullname)
361
events.append(('FS_FILE_DELETE', fullname))
362
return events, to_scan_later
364
def _paths_filter(self, all_share_dirs, dirpath, len_shr_path):
365
'''Returns the paths that belong to this dir.'''
366
# paths in shares are relative, remove the first slash
367
direct = dirpath[len_shr_path + 1:]
368
basedir = dirpath[:len_shr_path]
372
for shrpath, is_dir, statinfo, changed, node_id in all_share_dirs:
373
base, fname = os.path.split(shrpath)
375
# if without node_id, remove the metadata, and take it as new
377
fullname = os.path.join(basedir, shrpath)
378
m = "Deleting metadata, because of node_id=None, of %r"
379
log_debug(m, fullname)
380
self.fsm.delete_metadata(fullname)
383
filesdirs[fname] = is_dir, statinfo, changed
386
def _scan_one_dir(self, scan_info):
387
'''Gets one dir and compares with fsm.'''
388
all_share_dirs, shr_path, dirpath = scan_info
390
log_debug("Adding watch to %r", dirpath)
391
self.eq.inotify_add_watch(dirpath)
394
self.eq.freeze_begin(dirpath)
397
'''the scan, really'''
398
log_debug("scanning the dir %r", dirpath)
400
listdir = os.listdir(dirpath)
402
if e.errno == errno.ENOENT:
403
self.eq.freeze_rollback()
404
raise ScanNoDirectory("Directory %r disappeared since last"
408
# don't support symlinks yet
409
no_link = lambda p: not os.path.islink(os.path.join(dirpath, p))
410
listdir = filter(no_link, listdir)
412
# get the info from disk
415
for something in listdir:
416
fullname = os.path.join(dirpath, something)
417
if os.path.isdir(fullname):
418
dnames.append(something)
420
fnames.append(something)
423
events, to_scan_later = self._compare(dirpath, dnames, fnames,
424
all_share_dirs, shr_path)
425
to_later.extend(to_scan_later)
427
if e.errno == errno.ENOENT:
428
# something dissapeared from disk, start all over again
429
self.eq.freeze_rollback()
430
raise ScanTransactionDirty("The file/dir %r dissapeared" %
437
'''controls that everything was ok'''
439
self.eq.freeze_rollback()
440
raise ScanTransactionDirty("dirty!")
444
d = defer.execute(scan)
445
d.addCallback(self.eq.freeze_commit)
446
d.addCallback(control)