~didrocks/ubuntuone-client/dont-suffer-zg-crash

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/sync.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-06-30 12:00:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090630120000-by806ovmw3193qe8
Tags: upstream-0.90.3
ImportĀ upstreamĀ versionĀ 0.90.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# ubuntuone.syncdaemon.sync - sync module
 
2
#
 
3
# Author: Lucio Torre <lucio.torre@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
"""This is the magic"""
 
19
from __future__ import with_statement
 
20
 
 
21
import os
 
22
import logging
 
23
from cStringIO import StringIO
 
24
import sys
 
25
 
 
26
from ubuntuone.syncdaemon.marker import MDMarker
 
27
from ubuntuone.storageprotocol.dircontent_pb2 import DIRECTORY
 
28
from ubuntuone.storageprotocol import dircontent, hash
 
29
from ubuntuone.syncdaemon.fsm.fsm import \
 
30
    StateMachineRunner, StateMachine
 
31
from ubuntuone.syncdaemon.interfaces import IMarker
 
32
from ubuntuone.syncdaemon import u1fsfsm
 
33
from ubuntuone.syncdaemon.filesystem_manager import \
 
34
    InconsistencyError
 
35
empty_hash = hash.content_hash_factory().content_hash()
 
36
 
 
37
class FSKey(object):
 
38
    """This encapsulates the problem of getting the metadata with different
 
39
    keys."""
 
40
 
 
41
    def __init__(self, fs, **keys):
 
42
        """create"""
 
43
        self.fs = fs
 
44
        self.keys = keys
 
45
 
 
46
    def get_mdid(self):
 
47
        """Get the metadata id."""
 
48
        if len(self.keys) == 1 and "path" in self.keys:
 
49
            mdid = self.fs._idx_path[self.keys["path"]]
 
50
        elif len(self.keys) == 1 and "mdid" in self.keys:
 
51
            mdid = self.keys["mdid"]
 
52
        elif len(self.keys) == 2 and "node_id" in self.keys \
 
53
                    and "share_id" in self.keys:
 
54
            mdid = self.fs._idx_node_id[self.keys["share_id"],
 
55
                                                        self.keys["node_id"]]
 
56
        else:
 
57
            raise KeyError("Incorrect keys: %s" % self.keys)
 
58
        if mdid is None:
 
59
            raise KeyError("cant find mdid")
 
60
        return mdid
 
61
 
 
62
    def get(self, key):
 
63
        """Get the value for key."""
 
64
        mdid = self.get_mdid()
 
65
        if key == 'path':
 
66
            mdobj = self.fs.get_by_mdid(mdid)
 
67
            return self.fs.get_abspath(mdobj.share_id, mdobj.path)
 
68
        elif key == 'node_id':
 
69
            mdobj = self.fs.get_by_mdid(mdid)
 
70
            if mdobj.node_id is None:
 
71
                return MDMarker(mdid)
 
72
            else:
 
73
                return mdobj.node_id
 
74
        elif key == 'parent_id':
 
75
            mdobj = self.fs.get_by_mdid(mdid)
 
76
            path = self.fs.get_abspath(mdobj.share_id, mdobj.path)
 
77
            parent_path = os.path.dirname(path)
 
78
            parent = self.fs.get_by_path(parent_path)
 
79
            return parent.node_id or MDMarker(parent.mdid)
 
80
        else:
 
81
            return getattr(self.fs.get_by_mdid(mdid), key, None)
 
82
 
 
83
    def __getitem__(self, key):
 
84
        """Get the value for key."""
 
85
        return self.get(key)
 
86
 
 
87
    def set(self, **kwargs):
 
88
        """Set the values for kwargs."""
 
89
        mdid = self.get_mdid()
 
90
        self.fs.set_by_mdid(mdid, **kwargs)
 
91
 
 
92
    def has_metadata(self):
 
93
        """The State Machine value version of has_metadata."""
 
94
        try:
 
95
            return str(self.fs.has_metadata(**self.keys))[0]
 
96
        except KeyError, TypeError:
 
97
            return 'NA'
 
98
 
 
99
    def is_dir(self):
 
100
        """The State Machine value version of is_dir."""
 
101
        try:
 
102
            return str(self.fs.is_dir(**self.keys))[0]
 
103
        except KeyError:
 
104
            return 'NA'
 
105
 
 
106
    def changed(self):
 
107
        """The State Machine value version of changed."""
 
108
        try:
 
109
            return self.fs.changed(**self.keys)
 
110
        except KeyError:
 
111
            return 'NA'
 
112
 
 
113
    def open_file(self):
 
114
        """get the file object for reading"""
 
115
        mdid = self.get_mdid()
 
116
        try:
 
117
            fo = self.fs.open_file(mdid)
 
118
        except IOError:
 
119
            # this is a HUGE cheat
 
120
            # the state expectes to start a download
 
121
            # but the file is gone. so to keep the transitions correct
 
122
            # we return an empty file. we will later receive the FS_DELETE
 
123
            return StringIO()
 
124
        return fo
 
125
 
 
126
    def upload_finished(self, server_hash):
 
127
        """signal that we have uploaded the file"""
 
128
        mdid = self.get_mdid()
 
129
        self.fs.upload_finished(mdid, server_hash)
 
130
 
 
131
    def delete_file(self):
 
132
        """delete the file and metadata"""
 
133
        path = self["path"]
 
134
        self.fs.delete_file(path)
 
135
 
 
136
    def delete_metadata(self):
 
137
        """delete the metadata"""
 
138
        path = self["path"]
 
139
        self.fs.delete_metadata(path)
 
140
 
 
141
    def move_file(self, new_share_id, new_parent_id, new_name):
 
142
        """get the stuff we need to move the file."""
 
143
        source_path = self['path']
 
144
        parent_path = self.fs.get_by_node_id(new_share_id, new_parent_id).path
 
145
        dest_path = os.path.join(
 
146
            self.fs.get_abspath(new_share_id, parent_path),
 
147
            new_name)
 
148
        self.fs.move_file(new_share_id, source_path, dest_path)
 
149
 
 
150
    def moved(self, new_share_id, path_to):
 
151
        """change the metadata of a moved file."""
 
152
        self.fs.moved(new_share_id, self['path'], path_to)
 
153
        if "path" in self.keys:
 
154
            self.keys["path"] = path_to
 
155
 
 
156
    def remove_partial(self):
 
157
        """remove a partial file"""
 
158
        # pylint: disable-msg=W0704
 
159
        try:
 
160
            self.fs.remove_partial(self["node_id"], self["share_id"])
 
161
        except ValueError:
 
162
            # we had no partial, ignore
 
163
            pass
 
164
 
 
165
    def move_to_conflict(self):
 
166
        """Move file to conflict"""
 
167
        self.fs.move_to_conflict(self.get_mdid())
 
168
 
 
169
    def refresh_stat(self):
 
170
        """refresh the stat"""
 
171
        path = self["path"]
 
172
        # pylint: disable-msg=W0704
 
173
        try:
 
174
            self.fs.refresh_stat(path)
 
175
        except OSError:
 
176
            # no file to stat, nothing to do
 
177
            pass
 
178
 
 
179
    def safe_get(self, key, default='^_^'):
 
180
        """ safe version of self.get, to be used in the FileLogger. """
 
181
        # catch all errors as we are here to help logging
 
182
        # pylint: disable-msg=W0703
 
183
        try:
 
184
            return self.get(key)
 
185
        except Exception:
 
186
            return default
 
187
 
 
188
    def make_file(self):
 
189
        "create the local empty file"
 
190
        self.fs.create_file(self.get_mdid())
 
191
 
 
192
 
 
193
 
 
194
 
 
195
def loglevel(lvl):
 
196
    """Make a function that logs at lvl log level."""
 
197
    def level_log(self, message, *args, **kwargs):
 
198
        """inner."""
 
199
        self.log(lvl, message, *args, **kwargs)
 
200
    return level_log
 
201
 
 
202
 
 
203
class FileLogger(object):
 
204
    """A logger that knows about the file and its state."""
 
205
 
 
206
    def __init__(self, logger, key):
 
207
        """Create a logger for this guy"""
 
208
        self.logger = logger
 
209
        self.key = key
 
210
 
 
211
    def log(self, lvl, message, *args, **kwargs):
 
212
        """Log."""
 
213
 
 
214
        format = "%(hasmd)s:%(changed)s:%(isdir)s %(mdid)s "\
 
215
                 "[%(share_id)s:%(node_id)s] '%(path)r' | %(message)s"
 
216
        exc_info = sys.exc_info
 
217
        if self.key.has_metadata() == "T":
 
218
            # catch all errors as we are logging, pylint: disable-msg=W0703
 
219
            try:
 
220
                base = os.path.split(self.key.fs._get_share(
 
221
                    self.key['share_id']).path)[1]
 
222
                path = os.path.join(base, self.key.fs._share_relative_path(
 
223
                    self.key['share_id'], self.key['path']))
 
224
            except Exception:
 
225
                # error while getting the path
 
226
                self.logger.exception("Error in logger while building the "
 
227
                                      "relpath of: %r", self.key['path'])
 
228
                path = self.key.safe_get('path')
 
229
            extra = dict(message=message,
 
230
                         mdid=self.key.safe_get("mdid"),
 
231
                         path=path,
 
232
                         share_id=self.key.safe_get("share_id") or 'root',
 
233
                         node_id=self.key.safe_get("node_id"),
 
234
                         hasmd=self.key.has_metadata(),
 
235
                         isdir=self.key.is_dir(),
 
236
                         changed=self.key.changed())
 
237
        else:
 
238
            extra = dict(message=message, mdid="-",
 
239
                         path="-",
 
240
                         share_id="-",
 
241
                         node_id="-",
 
242
                         hasmd="-",
 
243
                         isdir="-",
 
244
                         changed="-")
 
245
            extra.update(self.key.keys)
 
246
        message = format % extra
 
247
        if lvl == -1:
 
248
            kwargs.update({'exc_info':exc_info})
 
249
            self.logger.error(message, *args, **kwargs)
 
250
        else:
 
251
            self.logger.log(lvl, message, *args, **kwargs)
 
252
 
 
253
    critical = loglevel(logging.CRITICAL)
 
254
    error = loglevel(logging.ERROR)
 
255
    warning = loglevel(logging.WARNING)
 
256
    info = loglevel(logging.INFO)
 
257
    debug = loglevel(logging.DEBUG)
 
258
    exception = loglevel(-1)
 
259
 
 
260
class SyncStateMachineRunner(StateMachineRunner):
 
261
    """This is where all the state machine methods are."""
 
262
 
 
263
    def __init__(self, fsm, main, key, logger=None):
 
264
        """Create the runner."""
 
265
        super(SyncStateMachineRunner, self).__init__(fsm, logger)
 
266
        self.m = main
 
267
        self.key = key
 
268
 
 
269
    def signal_event_with_hash(self, event, hash, *args):
 
270
        """An event that takes a hash ocurred, build the params and signal."""
 
271
        self.on_event(event, self.build_hash_eq(hash), hash, *args)
 
272
 
 
273
    def build_hash_eq(self, hash):
 
274
        """Build the event params."""
 
275
        try:
 
276
            sh = str(self.key["server_hash"] == hash)[0]
 
277
            lh = str(self.key["local_hash"] == hash)[0]
 
278
        except KeyError:
 
279
            sh = lh = "NA"
 
280
        return dict(hash_eq_server_hash=sh, hash_eq_local_hash=lh)
 
281
 
 
282
    def signal_event_with_error_and_hash(self, event, error, hash, *args):
 
283
        """An event that takes a hash ocurred, build the params and signal."""
 
284
        params = self.build_error_eq(error)
 
285
        params.update(self.build_hash_eq(hash))
 
286
        self.on_event(event, params, error, hash, *args)
 
287
 
 
288
    def signal_event_with_error(self, event, error, *args):
 
289
        """An event returned with error."""
 
290
        params = self.build_error_eq(error)
 
291
        self.on_event(event, params, error, *args)
 
292
 
 
293
    def build_error_eq(self, error):
 
294
        """Get the error state."""
 
295
        return dict(not_available="F", not_authorized="F")
 
296
 
 
297
    def get_state_values(self):
 
298
        """Get the values for the current state."""
 
299
        return dict(
 
300
            has_metadata=self.key.has_metadata(),
 
301
            changed=self.key.changed(),
 
302
            is_directory=self.key.is_dir(),
 
303
        )
 
304
 
 
305
    # EVENT HANDLERS
 
306
 
 
307
    def nothing(self, event, params, *args):
 
308
        """pass"""
 
309
        pass
 
310
 
 
311
    def new_dir(self, event, params, share_id, node_id, parent_id, name):
 
312
        """create a local file."""
 
313
        mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
 
314
        path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
 
315
        self.m.fs.create(path=path, share_id=share_id, is_dir=True)
 
316
        self.m.fs.set_node_id(path, node_id)
 
317
        self.m.action_q.query([(share_id, node_id, "")])
 
318
        # pylint: disable-msg=W0704
 
319
        # this should be provided by FSM, fix!!
 
320
        try:
 
321
            with self.m.fs._enable_share_write(share_id, self.key['path']):
 
322
                os.mkdir(self.key['path'])
 
323
        except OSError, e:
 
324
            if not e.errno == 17: #already exists
 
325
                raise
 
326
        else:
 
327
            try:
 
328
                # just add the watch
 
329
                # we hope the user wont have time to add a file just after
 
330
                # *we* created the directory
 
331
                # this is until we can solve and issue with LR and
 
332
                # new dirs and fast downloads
 
333
                # see bug #373940
 
334
                self.m.event_q.inotify_add_watch(path)
 
335
                #self.m.lr.scan_dir(path)
 
336
            except ValueError:
 
337
                pass #it was gone when lr got it
 
338
 
 
339
 
 
340
    def new_dir_on_server_with_local(self, event, params, share_id,
 
341
                                     node_id, parent_id, name):
 
342
        """new dir on server and we have a local file."""
 
343
        self.key.move_to_conflict()
 
344
        self.key.delete_metadata()
 
345
        self.new_dir(event, params, share_id, node_id, parent_id, name)
 
346
 
 
347
    def reget_dir(self, event, params, hash):
 
348
        """Reget the directory."""
 
349
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
350
                            node_id=self.key['node_id'])
 
351
        self.key.remove_partial()
 
352
        self.m.action_q.query([(self.key['share_id'],
 
353
                                self.key['node_id'],
 
354
                                self.key['local_hash'] or "")])
 
355
        self.key.set(server_hash=self.key['local_hash'])
 
356
 
 
357
    def get_dir(self, event, params, hash):
 
358
        """Get the directory."""
 
359
        self.key.set(server_hash=hash)
 
360
        self.m.fs.create_partial(node_id=self.key['node_id'],
 
361
                                 share_id=self.key['share_id'])
 
362
        self.m.action_q.listdir(
 
363
            self.key['share_id'], self.key['node_id'], hash,
 
364
            lambda : self.m.fs.get_partial_for_writing(
 
365
                node_id=self.key['node_id'],
 
366
                share_id=self.key['share_id'])
 
367
            )
 
368
 
 
369
    def file_conflict(self, event, params, hash, crc32, size, stat):
 
370
        """This file is in conflict."""
 
371
        self.key.move_to_conflict()
 
372
        self.key.make_file()
 
373
 
 
374
    def local_file_conflict(self, event, params, hash):
 
375
        """This file is in conflict."""
 
376
        self.key.move_to_conflict()
 
377
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
 
378
                            node_id=self.key['node_id'])
 
379
        self.key.make_file()
 
380
        self.get_file(event, params, hash)
 
381
 
 
382
    def merge_directory(self, event, params, hash):
 
383
        """Merge the server directory with the local one."""
 
384
        new_files = []
 
385
        new_dirs = []
 
386
        deleted_files = []
 
387
        deleted_dirs = []
 
388
        moved = set()
 
389
 
 
390
        try:
 
391
            fd = self.m.fs.get_partial(node_id=self.key['node_id'],
 
392
                            share_id=self.key['share_id'])
 
393
        except InconsistencyError:
 
394
            self.key.remove_partial()
 
395
            self.key.set(server_hash=self.key['local_hash'])
 
396
            self.m.action_q.query([
 
397
                (self.key["share_id"], self.key["node_id"], "")])
 
398
            # we dont perform the merge, we try to re get it
 
399
            return
 
400
 
 
401
 
 
402
        items = dircontent.parse_dir_content(fd)
 
403
        server_dir = [ (o.utf8_name, o.node_type == DIRECTORY, o.uuid)
 
404
                        for o in items ]
 
405
        client_dir = self.m.fs.dir_content(self.key['path'])
 
406
        # XXX: lucio.torre: with huge dirs, this could take a while
 
407
 
 
408
        share = self.key['share_id']
 
409
        for name, isdir, uuid in server_dir:
 
410
            # we took the name as bytes already encoded in utf8
 
411
            # directly from dircontent!
 
412
            try:
 
413
                md = self.m.fs.get_by_node_id(share, uuid)
 
414
            except KeyError:
 
415
                # not there, a new thing
 
416
                if isdir:
 
417
                    new_dirs.append((share, uuid, name))
 
418
                else:
 
419
                    new_files.append((share, uuid, name))
 
420
                continue
 
421
            mdpath = self.m.fs.get_abspath(md.share_id, md.path)
 
422
            if mdpath != os.path.join(self.key['path'], name):
 
423
                # this was moved
 
424
                # mark as moved
 
425
                moved.add(uuid)
 
426
                # signal moved
 
427
                self.m.event_q.push("SV_MOVED",
 
428
                    share_id=md.share_id, node_id=uuid,
 
429
                    new_share_id=share, new_parent_id=self.key['node_id'],
 
430
                    new_name=name)
 
431
 
 
432
 
 
433
        for name, isdir, uuid in client_dir:
 
434
            if uuid is None:
 
435
                continue
 
436
 
 
437
            if not (name, isdir, uuid) in server_dir:
 
438
                # not there, a its gone on the server
 
439
                if uuid in moved:
 
440
                    # this was a move, dont delete
 
441
                    continue
 
442
                if isdir:
 
443
                    deleted_dirs.append((share, uuid))
 
444
                else:
 
445
                    deleted_files.append((share, uuid))
 
446
 
 
447
 
 
448
        parent_uuid = self.key['node_id']
 
449
        for share, uuid, name in new_files:
 
450
            self.m.event_q.push("SV_FILE_NEW", parent_id=parent_uuid,
 
451
                                node_id=uuid, share_id=share, name=name)
 
452
        for share, uuid, name in new_dirs:
 
453
            self.m.event_q.push("SV_DIR_NEW", parent_id=parent_uuid,
 
454
                                node_id=uuid, share_id=share, name=name)
 
455
        for share, uuid in deleted_files:
 
456
            self.m.event_q.push("SV_FILE_DELETED",
 
457
                                node_id=uuid, share_id=share)
 
458
        for share, uuid in deleted_dirs:
 
459
            self.m.event_q.push("SV_FILE_DELETED",
 
460
                                node_id=uuid, share_id=share)
 
461
 
 
462
        self.key.remove_partial()
 
463
        self.key.set(local_hash=hash)
 
464
 
 
465
    def new_file(self, event, params, share_id, node_id, parent_id, name):
 
466
        """create a local file."""
 
467
        mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
 
468
        path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
 
469
        self.m.fs.create(path=path, share_id=share_id, is_dir=False)
 
470
        self.m.fs.set_node_id(path, node_id)
 
471
        self.key.set(server_hash="")
 
472
        self.key.set(local_hash="")
 
473
        self.key.make_file()
 
474
        self.m.action_q.query([(share_id, node_id, "")])
 
475
 
 
476
    def new_file_on_server_with_local(self, event, params, share_id,
 
477
                                      node_id, parent_id, name):
 
478
        """move local file to conflict and re create"""
 
479
        self.key.move_to_conflict()
 
480
        self.key.delete_metadata()
 
481
        self.new_file(event, params, share_id, node_id, parent_id, name)
 
482
 
 
483
    def get_file(self, event, params, hash):
 
484
        """Get the contents for the file."""
 
485
        self.key.set(server_hash=hash)
 
486
        self.m.fs.create_partial(node_id=self.key['node_id'],
 
487
                                 share_id=self.key['share_id'])
 
488
        self.m.action_q.download(
 
489
            share_id=self.key['share_id'], node_id=self.key['node_id'],
 
490
            server_hash=hash,
 
491
            fileobj_factory=lambda: self.m.fs.get_partial_for_writing(
 
492
                node_id=self.key['node_id'],
 
493
                share_id=self.key['share_id'])
 
494
            )
 
495
 
 
496
    def reget_file(self, event, params, hash):
 
497
        """cancel and reget this download."""
 
498
        self.key.set(server_hash=hash)
 
499
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
500
                            node_id=self.key['node_id'])
 
501
        self.key.remove_partial()
 
502
        self.get_file(event, params, hash)
 
503
 
 
504
    def client_moved(self, event, params, path_from, path_to):
 
505
        """the client moved a file"""
 
506
        parent_path = os.path.dirname(path_from)
 
507
        old_parent = FSKey(self.m.fs, path=parent_path)
 
508
        old_parent_id = old_parent['node_id']
 
509
        new_path = os.path.dirname(path_to)
 
510
        new_name = os.path.basename(path_to)
 
511
        new_parent = FSKey(self.m.fs, path=new_path)
 
512
        new_parent_id = new_parent['node_id']
 
513
 
 
514
        self.m.action_q.move(share_id=self.key['share_id'],
 
515
            node_id=self.key['node_id'], old_parent_id=old_parent_id,
 
516
            new_parent_id=new_parent_id, new_name=new_name)
 
517
        self.key.moved(self.key['share_id'], path_to)
 
518
 
 
519
        # this is cheating, we change the state of another node
 
520
        if not IMarker.providedBy(old_parent_id):
 
521
            share_id = self.key['share_id']
 
522
            self.m.action_q.cancel_download(share_id, old_parent_id)
 
523
            old_parent.remove_partial()
 
524
            self.m.fs.set_by_node_id(old_parent_id, share_id,
 
525
                                     server_hash="", local_hash="")
 
526
            self.m.action_q.query([(share_id, old_parent_id, "")])
 
527
        self.m.hash_q.insert(self.key['path'])
 
528
 
 
529
 
 
530
    def server_file_changed_back(self, event, params, hash):
 
531
        """cancel and dont reget this download."""
 
532
        self.key.set(server_hash=hash)
 
533
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
534
                            node_id=self.key['node_id'])
 
535
        self.key.remove_partial()
 
536
 
 
537
    def commit_file(self, event, params, hash):
 
538
        """commit the new content."""
 
539
        try:
 
540
            self.m.fs.commit_partial(
 
541
                            self.key['node_id'], self.key['share_id'], hash)
 
542
        except InconsistencyError:
 
543
            # someone or something broke out partials.
 
544
            # start work to go to a good state
 
545
            self.key.remove_partial()
 
546
            self.key.set(server_hash=self.key['local_hash'])
 
547
            self.m.action_q.query([
 
548
                (self.key["share_id"], self.key["node_id"], "")])
 
549
 
 
550
    def new_local_file(self, event, parms, path):
 
551
        """a new local file was created"""
 
552
        # XXX: lucio.torre: we should use markers here
 
553
        parent_path = os.path.dirname(path)
 
554
        parent = self.m.fs.get_by_path(parent_path)
 
555
        parent_id = parent.node_id or MDMarker(parent.mdid)
 
556
        share_id = parent.share_id
 
557
        self.m.fs.create(path=path, share_id=share_id, is_dir=False)
 
558
        self.key.set(local_hash=empty_hash)
 
559
        self.key.set(server_hash=empty_hash)
 
560
        name = os.path.basename(path)
 
561
        marker = MDMarker(self.key.get_mdid())
 
562
        self.m.action_q.make_file(share_id, parent_id, name, marker)
 
563
 
 
564
    def new_local_file_created(self, event, parms, new_id):
 
565
        """we got the server answer for the file creation."""
 
566
        self.m.fs.set_node_id(self.key['path'], new_id)
 
567
 
 
568
 
 
569
    def new_local_dir(self, event, parms, path):
 
570
        """a new local dir was created"""
 
571
        # XXX: lucio.torre: we should use markers here
 
572
        parent_path = os.path.dirname(path)
 
573
        parent = self.m.fs.get_by_path(parent_path)
 
574
        parent_id = parent.node_id or MDMarker(parent.mdid)
 
575
        share_id = parent.share_id
 
576
        self.m.fs.create(path=path, share_id=share_id, is_dir=True)
 
577
        name = os.path.basename(path)
 
578
        marker = MDMarker(self.key.get_mdid())
 
579
        self.m.action_q.make_dir(share_id, parent_id, name, marker)
 
580
        # pylint: disable-msg=W0704
 
581
        try:
 
582
            self.m.lr.scan_dir(path)
 
583
        except ValueError:
 
584
            pass #it was gone when lr got it
 
585
 
 
586
    def new_local_dir_created(self, event, parms, new_id):
 
587
        """we got the server answer for dir creation"""
 
588
        self.m.fs.set_node_id(self.key['path'], new_id)
 
589
 
 
590
    def calculate_hash(self, event, params):
 
591
        """calculate the hash of this."""
 
592
        self.m.hash_q.insert(self.key['path'])
 
593
 
 
594
    def put_file(self, event, params, hash, crc32, size, stat):
 
595
        """upload the file to the server."""
 
596
        previous_hash = self.key['server_hash']
 
597
        self.key.set(local_hash=hash)
 
598
        self.m.fs.update_stat(self.key.get_mdid(), stat)
 
599
        self.m.action_q.upload(share_id=self.key['share_id'],
 
600
            node_id=self.key['node_id'], previous_hash=previous_hash,
 
601
            hash=hash, crc32=crc32, size=size,
 
602
            fileobj_factory=self.key.open_file)
 
603
 
 
604
    def converges_to_server(self, event, params, hash, crc32, size, stat):
 
605
        """the local changes now match the server"""
 
606
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
607
                    node_id=self.key['node_id'])
 
608
        self.key.remove_partial()
 
609
        self.key.set(local_hash=hash)
 
610
        self.m.fs.update_stat(self.key.get_mdid(), stat)
 
611
 
 
612
    def reput_file_from_ok(self, event, param, hash):
 
613
        """put the file again, mark upload as ok"""
 
614
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
 
615
                            node_id=self.key['node_id'])
 
616
        self.key.set(local_hash=hash)
 
617
        self.key.set(server_hash=hash)
 
618
        self.m.hash_q.insert(self.key['path'])
 
619
 
 
620
 
 
621
    def reput_file(self, event, param, hash, crc32, size, stat):
 
622
        """put the file again."""
 
623
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
 
624
                            node_id=self.key['node_id'])
 
625
        previous_hash = self.key['server_hash']
 
626
 
 
627
        self.key.set(local_hash=hash)
 
628
        self.m.fs.update_stat(self.key.get_mdid(), stat)
 
629
        self.m.action_q.upload(share_id=self.key['share_id'],
 
630
            node_id=self.key['node_id'], previous_hash=previous_hash,
 
631
            hash=hash, crc32=crc32, size=size,
 
632
            fileobj_factory=self.key.open_file)
 
633
 
 
634
    def server_file_now_matches(self, event, params, hash):
 
635
        """We got a server hash that matches local hash"""
 
636
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
 
637
                            node_id=self.key['node_id'])
 
638
        self.key.set(server_hash=hash)
 
639
 
 
640
    def commit_upload(self, event, params, hash):
 
641
        """Finish an upload."""
 
642
        self.key.upload_finished(hash)
 
643
 
 
644
    def cancel_and_commit(self, event, params, hash):
 
645
        """Finish an upload."""
 
646
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
647
                            node_id=self.key['node_id'])
 
648
        self.key.remove_partial()
 
649
        self.key.upload_finished(hash)
 
650
 
 
651
 
 
652
    def delete_file(self, event, params):
 
653
        """server file was deleted."""
 
654
        try:
 
655
            self.key.delete_file()
 
656
        except OSError, e:
 
657
            if e.errno == 39:
 
658
                # if directory not empty
 
659
                self.key.move_to_conflict()
 
660
                self.key.delete_metadata()
 
661
            elif e.errno == 2:
 
662
                # file gone
 
663
                pass
 
664
            else:
 
665
                raise e
 
666
 
 
667
    def conflict_and_delete(self, event, params, *args, **kwargs):
 
668
        """move to conflict and delete file."""
 
669
        self.key.move_to_conflict()
 
670
        self.key.delete_metadata()
 
671
 
 
672
    def file_gone_wile_downloading(self, event, params):
 
673
        """a file we were downloading is gone."""
 
674
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
675
                            node_id=self.key['node_id'])
 
676
        self.key.remove_partial()
 
677
        self.delete_file(event, params)
 
678
 
 
679
    def file_not_created_remove(self, event, params, error):
 
680
        """kill it"""
 
681
        self.key.move_to_conflict()
 
682
        self.key.delete_metadata()
 
683
 
 
684
    def delete_on_server(self, event, params, path):
 
685
        """local file was deleted."""
 
686
        self.m.action_q.unlink(self.key['share_id'],
 
687
                               self.key['parent_id'],
 
688
                               self.key['node_id'])
 
689
        self.key.delete_metadata()
 
690
 
 
691
    def deleted_dir_while_downloading(self, event, params, path):
 
692
        """kill it"""
 
693
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
694
                                        node_id=self.key['node_id'])
 
695
        self.key.remove_partial()
 
696
        self.m.action_q.unlink(self.key['share_id'],
 
697
                               self.key['parent_id'],
 
698
                               self.key['node_id'])
 
699
        self.key.delete_metadata()
 
700
 
 
701
    def cancel_download_and_delete_on_server(self, event, params, path):
 
702
        """cancel_download_and_delete_on_server"""
 
703
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
704
                                        node_id=self.key['node_id'])
 
705
        self.key.remove_partial()
 
706
        self.m.action_q.unlink(self.key['share_id'],
 
707
                               self.key['parent_id'],
 
708
                               self.key['node_id'])
 
709
        self.key.delete_metadata()
 
710
 
 
711
    def cancel_upload_and_delete_on_server(self, event, params, path):
 
712
        """cancel_download_and_delete_on_server"""
 
713
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
 
714
                                      node_id=self.key['node_id'])
 
715
        self.m.action_q.unlink(self.key['share_id'],
 
716
                               self.key['parent_id'],
 
717
                               self.key['node_id'])
 
718
        self.key.delete_metadata()
 
719
 
 
720
 
 
721
    def server_moved(self, event, params, share_id, node_id,
 
722
                     new_share_id, new_parent_id, new_name):
 
723
        """file was moved on the server"""
 
724
        self.key.move_file(new_share_id, new_parent_id, new_name)
 
725
 
 
726
    def server_moved_dirty(self, event, params, share_id, node_id,
 
727
                     new_share_id, new_parent_id, new_name):
 
728
        """file was moved on the server while downloading it"""
 
729
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
730
                            node_id=self.key['node_id'])
 
731
        self.key.remove_partial()
 
732
        self.key.move_file(new_share_id, new_parent_id, new_name)
 
733
        self.get_file(event, params, self.key['server_hash'])
 
734
 
 
735
    def moved_dirty_local(self, event, params, path_from, path_to):
 
736
        """file was moved while uploading it"""
 
737
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
 
738
                            node_id=self.key['node_id'])
 
739
        self.key.set(local_hash=self.key['server_hash'])
 
740
        self.client_moved(event, params, path_from, path_to)
 
741
        self.m.hash_q.insert(self.key['path'])
 
742
 
 
743
 
 
744
    def moved_dirty_server(self, event, params, path_from, path_to):
 
745
        """file was moved while downloading it"""
 
746
        self.client_moved(event, params, path_from, path_to)
 
747
 
 
748
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
 
749
                            node_id=self.key['node_id'])
 
750
        self.key.remove_partial()
 
751
        self.key.set(server_hash=self.key['local_hash'])
 
752
        self.m.action_q.query([(self.key['share_id'],
 
753
                                self.key['node_id'],
 
754
                                self.key['local_hash'] or "")])
 
755
 
 
756
    def DESPAIR(self, event, params, *args, **kwargs):
 
757
        """if we got here, we are in trouble"""
 
758
        self.log.error("DESPAIR on event=%s params=%s args=%s kwargs=%s",
 
759
                                                event, params, args, kwargs)
 
760
 
 
761
    def save_stat(self, event, params, hash, crc32, size, stat):
 
762
        self.m.fs.update_stat(self.key.get_mdid(), stat)
 
763
 
 
764
 
 
765
class Sync(object):
 
766
    """Translates from EQ events into state machine events."""
 
767
    # XXX: lucio.torre:
 
768
    # this will need some refactoring once we handle more events
 
769
 
 
770
    def __init__(self, main):
 
771
        """create"""
 
772
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.sync')
 
773
        self.fsm = StateMachine(u1fsfsm.state_machine)
 
774
        self.m = main
 
775
        self.m.event_q.subscribe(self)
 
776
 
 
777
    def handle_SV_HASH_NEW(self, share_id, node_id, hash):
 
778
        """on SV_HASH_NEW"""
 
779
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
 
780
        log = FileLogger(self.logger, key)
 
781
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
782
        ssmr.signal_event_with_hash("SV_HASH_NEW", hash)
 
783
 
 
784
    def handle_SV_FILE_NEW(self, share_id, node_id, parent_id, name):
 
785
        """on SV_FILE_NEW"""
 
786
        parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
 
787
        path = os.path.join(parent["path"], name)
 
788
        key = FSKey(self.m.fs, path=path)
 
789
        log = FileLogger(self.logger, key)
 
790
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
791
        ssmr.on_event("SV_FILE_NEW", {}, share_id, node_id, parent_id, name)
 
792
 
 
793
    def handle_SV_DIR_NEW(self, share_id, node_id, parent_id, name):
 
794
        """on SV_DIR_NEW"""
 
795
        parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
 
796
        path = os.path.join(parent["path"], name)
 
797
        key = FSKey(self.m.fs, path=path)
 
798
        log = FileLogger(self.logger, key)
 
799
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
800
        ssmr.on_event("SV_DIR_NEW", {}, share_id, node_id, parent_id, name)
 
801
 
 
802
    def handle_SV_FILE_DELETED(self, share_id, node_id):
 
803
        """on SV_FILE_DELETED"""
 
804
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
 
805
        log = FileLogger(self.logger, key)
 
806
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
807
        ssmr.on_event("SV_FILE_DELETED", {})
 
808
 
 
809
    def handle_AQ_DOWNLOAD_FINISHED(self, share_id, node_id, server_hash):
 
810
        """on AQ_DOWNLOAD_FINISHED"""
 
811
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
 
812
        log = FileLogger(self.logger, key)
 
813
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
814
        ssmr.signal_event_with_hash("AQ_DOWNLOAD_FINISHED", server_hash)
 
815
 
 
816
    def handle_AQ_DOWNLOAD_ERROR(self, *args, **kwargs):
 
817
        """on AQ_DOWNLOAD_ERROR"""
 
818
        # for now we pass. later we will feed this into the state machine
 
819
        pass
 
820
 
 
821
    def handle_FS_FILE_CREATE(self, path):
 
822
        """on FS_FILE_CREATE"""
 
823
        key = FSKey(self.m.fs, path=path)
 
824
        log = FileLogger(self.logger, key)
 
825
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
826
        ssmr.on_event("FS_FILE_CREATE", {}, path)
 
827
 
 
828
    def handle_FS_DIR_CREATE(self, path):
 
829
        """on FS_DIR_CREATE"""
 
830
        key = FSKey(self.m.fs, path=path)
 
831
        log = FileLogger(self.logger, key)
 
832
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
833
        ssmr.on_event("FS_DIR_CREATE", {}, path)
 
834
 
 
835
    def handle_FS_FILE_DELETE(self, path):
 
836
        """on FS_FILE_DELETE"""
 
837
        key = FSKey(self.m.fs, path=path)
 
838
        log = FileLogger(self.logger, key)
 
839
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
840
        ssmr.on_event("FS_FILE_DELETE", {}, path)
 
841
 
 
842
    def handle_FS_DIR_DELETE(self, path):
 
843
        """on FS_DIR_DELETE"""
 
844
        key = FSKey(self.m.fs, path=path)
 
845
        log = FileLogger(self.logger, key)
 
846
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
847
        ssmr.on_event("FS_DIR_DELETE", {}, path)
 
848
 
 
849
    def handle_FS_FILE_MOVE(self, path_from, path_to):
 
850
        """on FS_FILE_MOVE"""
 
851
        key = FSKey(self.m.fs, path=path_from)
 
852
        log = FileLogger(self.logger, key)
 
853
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
854
        ssmr.on_event("FS_FILE_MOVE", {}, path_from, path_to)
 
855
    handle_FS_DIR_MOVE = handle_FS_FILE_MOVE
 
856
 
 
857
    def handle_AQ_FILE_NEW_OK(self, marker, new_id):
 
858
        """on AQ_FILE_NEW_OK"""
 
859
        key = FSKey(self.m.fs, mdid=marker)
 
860
        log = FileLogger(self.logger, key)
 
861
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
862
        ssmr.on_event("AQ_FILE_NEW_OK", {}, new_id)
 
863
 
 
864
    def handle_AQ_FILE_NEW_ERROR(self, marker, error):
 
865
        """on AQ_FILE_NEW_ERROR"""
 
866
        key = FSKey(self.m.fs, mdid=marker)
 
867
        log = FileLogger(self.logger, key)
 
868
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
869
        ssmr.signal_event_with_error("AQ_FILE_NEW_ERROR", error)
 
870
 
 
871
    def handle_AQ_DIR_NEW_ERROR(self, marker, error):
 
872
        """on AQ_DIR_NEW_ERROR"""
 
873
        key = FSKey(self.m.fs, mdid=marker)
 
874
        log = FileLogger(self.logger, key)
 
875
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
876
        ssmr.signal_event_with_error("AQ_DIR_NEW_ERROR", error)
 
877
 
 
878
    def handle_AQ_DIR_NEW_OK(self, marker, new_id):
 
879
        """on AQ_DIR_NEW_OK"""
 
880
        key = FSKey(self.m.fs, mdid=marker)
 
881
        log = FileLogger(self.logger, key)
 
882
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
883
        ssmr.on_event("AQ_DIR_NEW_OK", {}, new_id)
 
884
 
 
885
    def handle_FS_FILE_CLOSE_WRITE(self, path):
 
886
        """on FS_FILE_CLOSE_WRITE"""
 
887
        key = FSKey(self.m.fs, path=path)
 
888
        log = FileLogger(self.logger, key)
 
889
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
890
        ssmr.on_event('FS_FILE_CLOSE_WRITE', {})
 
891
 
 
892
    def handle_HQ_HASH_NEW(self, path, hash, crc32, size, stat):
 
893
        """on HQ_HASH_NEW"""
 
894
        key = FSKey(self.m.fs, path=path)
 
895
        log = FileLogger(self.logger, key)
 
896
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
897
        ssmr.signal_event_with_hash("HQ_HASH_NEW", hash, crc32, size, stat)
 
898
 
 
899
    def handle_AQ_UPLOAD_FINISHED(self, share_id, node_id, hash):
 
900
        """on AQ_UPLOAD_FINISHED"""
 
901
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
 
902
        log = FileLogger(self.logger, key)
 
903
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
904
        ssmr.signal_event_with_hash("AQ_UPLOAD_FINISHED", hash)
 
905
 
 
906
    def handle_AQ_UPLOAD_ERROR(self, share_id, node_id, error, hash):
 
907
        """on AQ_UPLOAD_FINISHED"""
 
908
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
 
909
        log = FileLogger(self.logger, key)
 
910
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
911
        ssmr.signal_event_with_error_and_hash("AQ_UPLOAD_ERROR", error, hash)
 
912
 
 
913
    def handle_SV_MOVED(self, share_id, node_id, new_share_id, new_parent_id,
 
914
                        new_name):
 
915
        """on SV_MOVED"""
 
916
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
 
917
        log = FileLogger(self.logger, key)
 
918
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
 
919
        ssmr.on_event("SV_MOVED", {}, share_id, node_id, new_share_id,
 
920
                      new_parent_id, new_name)