1
# ubuntuone.syncdaemon.sync - sync module
3
# Author: Lucio Torre <lucio.torre@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
"""This is the magic"""
19
from __future__ import with_statement
23
from cStringIO import StringIO
26
from ubuntuone.syncdaemon.marker import MDMarker
27
from ubuntuone.storageprotocol.dircontent_pb2 import DIRECTORY
28
from ubuntuone.storageprotocol import dircontent, hash
29
from ubuntuone.syncdaemon.fsm.fsm import \
30
StateMachineRunner, StateMachine
31
from ubuntuone.syncdaemon.interfaces import IMarker
32
from ubuntuone.syncdaemon import u1fsfsm
33
from ubuntuone.syncdaemon.filesystem_manager import \
35
empty_hash = hash.content_hash_factory().content_hash()
38
"""This encapsulates the problem of getting the metadata with different
41
def __init__(self, fs, **keys):
47
"""Get the metadata id."""
48
if len(self.keys) == 1 and "path" in self.keys:
49
mdid = self.fs._idx_path[self.keys["path"]]
50
elif len(self.keys) == 1 and "mdid" in self.keys:
51
mdid = self.keys["mdid"]
52
elif len(self.keys) == 2 and "node_id" in self.keys \
53
and "share_id" in self.keys:
54
mdid = self.fs._idx_node_id[self.keys["share_id"],
57
raise KeyError("Incorrect keys: %s" % self.keys)
59
raise KeyError("cant find mdid")
63
"""Get the value for key."""
64
mdid = self.get_mdid()
66
mdobj = self.fs.get_by_mdid(mdid)
67
return self.fs.get_abspath(mdobj.share_id, mdobj.path)
68
elif key == 'node_id':
69
mdobj = self.fs.get_by_mdid(mdid)
70
if mdobj.node_id is None:
74
elif key == 'parent_id':
75
mdobj = self.fs.get_by_mdid(mdid)
76
path = self.fs.get_abspath(mdobj.share_id, mdobj.path)
77
parent_path = os.path.dirname(path)
78
parent = self.fs.get_by_path(parent_path)
79
return parent.node_id or MDMarker(parent.mdid)
81
return getattr(self.fs.get_by_mdid(mdid), key, None)
83
def __getitem__(self, key):
84
"""Get the value for key."""
87
def set(self, **kwargs):
88
"""Set the values for kwargs."""
89
mdid = self.get_mdid()
90
self.fs.set_by_mdid(mdid, **kwargs)
92
def has_metadata(self):
93
"""The State Machine value version of has_metadata."""
95
return str(self.fs.has_metadata(**self.keys))[0]
96
except KeyError, TypeError:
100
"""The State Machine value version of is_dir."""
102
return str(self.fs.is_dir(**self.keys))[0]
107
"""The State Machine value version of changed."""
109
return self.fs.changed(**self.keys)
114
"""get the file object for reading"""
115
mdid = self.get_mdid()
117
fo = self.fs.open_file(mdid)
119
# this is a HUGE cheat
120
# the state expectes to start a download
121
# but the file is gone. so to keep the transitions correct
122
# we return an empty file. we will later receive the FS_DELETE
126
def upload_finished(self, server_hash):
127
"""signal that we have uploaded the file"""
128
mdid = self.get_mdid()
129
self.fs.upload_finished(mdid, server_hash)
131
def delete_file(self):
132
"""delete the file and metadata"""
134
self.fs.delete_file(path)
136
def delete_metadata(self):
137
"""delete the metadata"""
139
self.fs.delete_metadata(path)
141
def move_file(self, new_share_id, new_parent_id, new_name):
142
"""get the stuff we need to move the file."""
143
source_path = self['path']
144
parent_path = self.fs.get_by_node_id(new_share_id, new_parent_id).path
145
dest_path = os.path.join(
146
self.fs.get_abspath(new_share_id, parent_path),
148
self.fs.move_file(new_share_id, source_path, dest_path)
150
def moved(self, new_share_id, path_to):
151
"""change the metadata of a moved file."""
152
self.fs.moved(new_share_id, self['path'], path_to)
153
if "path" in self.keys:
154
self.keys["path"] = path_to
156
def remove_partial(self):
157
"""remove a partial file"""
158
# pylint: disable-msg=W0704
160
self.fs.remove_partial(self["node_id"], self["share_id"])
162
# we had no partial, ignore
165
def move_to_conflict(self):
166
"""Move file to conflict"""
167
self.fs.move_to_conflict(self.get_mdid())
169
def refresh_stat(self):
170
"""refresh the stat"""
172
# pylint: disable-msg=W0704
174
self.fs.refresh_stat(path)
176
# no file to stat, nothing to do
179
def safe_get(self, key, default='^_^'):
180
""" safe version of self.get, to be used in the FileLogger. """
181
# catch all errors as we are here to help logging
182
# pylint: disable-msg=W0703
189
"create the local empty file"
190
self.fs.create_file(self.get_mdid())
196
"""Make a function that logs at lvl log level."""
197
def level_log(self, message, *args, **kwargs):
199
self.log(lvl, message, *args, **kwargs)
203
class FileLogger(object):
204
"""A logger that knows about the file and its state."""
206
def __init__(self, logger, key):
207
"""Create a logger for this guy"""
211
def log(self, lvl, message, *args, **kwargs):
214
format = "%(hasmd)s:%(changed)s:%(isdir)s %(mdid)s "\
215
"[%(share_id)s:%(node_id)s] '%(path)r' | %(message)s"
216
exc_info = sys.exc_info
217
if self.key.has_metadata() == "T":
218
# catch all errors as we are logging, pylint: disable-msg=W0703
220
base = os.path.split(self.key.fs._get_share(
221
self.key['share_id']).path)[1]
222
path = os.path.join(base, self.key.fs._share_relative_path(
223
self.key['share_id'], self.key['path']))
225
# error while getting the path
226
self.logger.exception("Error in logger while building the "
227
"relpath of: %r", self.key['path'])
228
path = self.key.safe_get('path')
229
extra = dict(message=message,
230
mdid=self.key.safe_get("mdid"),
232
share_id=self.key.safe_get("share_id") or 'root',
233
node_id=self.key.safe_get("node_id"),
234
hasmd=self.key.has_metadata(),
235
isdir=self.key.is_dir(),
236
changed=self.key.changed())
238
extra = dict(message=message, mdid="-",
245
extra.update(self.key.keys)
246
message = format % extra
248
kwargs.update({'exc_info':exc_info})
249
self.logger.error(message, *args, **kwargs)
251
self.logger.log(lvl, message, *args, **kwargs)
253
critical = loglevel(logging.CRITICAL)
254
error = loglevel(logging.ERROR)
255
warning = loglevel(logging.WARNING)
256
info = loglevel(logging.INFO)
257
debug = loglevel(logging.DEBUG)
258
exception = loglevel(-1)
260
class SyncStateMachineRunner(StateMachineRunner):
261
"""This is where all the state machine methods are."""
263
def __init__(self, fsm, main, key, logger=None):
264
"""Create the runner."""
265
super(SyncStateMachineRunner, self).__init__(fsm, logger)
269
def signal_event_with_hash(self, event, hash, *args):
270
"""An event that takes a hash ocurred, build the params and signal."""
271
self.on_event(event, self.build_hash_eq(hash), hash, *args)
273
def build_hash_eq(self, hash):
274
"""Build the event params."""
276
sh = str(self.key["server_hash"] == hash)[0]
277
lh = str(self.key["local_hash"] == hash)[0]
280
return dict(hash_eq_server_hash=sh, hash_eq_local_hash=lh)
282
def signal_event_with_error_and_hash(self, event, error, hash, *args):
283
"""An event that takes a hash ocurred, build the params and signal."""
284
params = self.build_error_eq(error)
285
params.update(self.build_hash_eq(hash))
286
self.on_event(event, params, error, hash, *args)
288
def signal_event_with_error(self, event, error, *args):
289
"""An event returned with error."""
290
params = self.build_error_eq(error)
291
self.on_event(event, params, error, *args)
293
def build_error_eq(self, error):
294
"""Get the error state."""
295
return dict(not_available="F", not_authorized="F")
297
def get_state_values(self):
298
"""Get the values for the current state."""
300
has_metadata=self.key.has_metadata(),
301
changed=self.key.changed(),
302
is_directory=self.key.is_dir(),
307
def nothing(self, event, params, *args):
311
def new_dir(self, event, params, share_id, node_id, parent_id, name):
312
"""create a local file."""
313
mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
314
path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
315
self.m.fs.create(path=path, share_id=share_id, is_dir=True)
316
self.m.fs.set_node_id(path, node_id)
317
self.m.action_q.query([(share_id, node_id, "")])
318
# pylint: disable-msg=W0704
319
# this should be provided by FSM, fix!!
321
with self.m.fs._enable_share_write(share_id, self.key['path']):
322
os.mkdir(self.key['path'])
324
if not e.errno == 17: #already exists
329
# we hope the user wont have time to add a file just after
330
# *we* created the directory
331
# this is until we can solve and issue with LR and
332
# new dirs and fast downloads
334
self.m.event_q.inotify_add_watch(path)
335
#self.m.lr.scan_dir(path)
337
pass #it was gone when lr got it
340
def new_dir_on_server_with_local(self, event, params, share_id,
341
node_id, parent_id, name):
342
"""new dir on server and we have a local file."""
343
self.key.move_to_conflict()
344
self.key.delete_metadata()
345
self.new_dir(event, params, share_id, node_id, parent_id, name)
347
def reget_dir(self, event, params, hash):
348
"""Reget the directory."""
349
self.m.action_q.cancel_download(share_id=self.key['share_id'],
350
node_id=self.key['node_id'])
351
self.key.remove_partial()
352
self.m.action_q.query([(self.key['share_id'],
354
self.key['local_hash'] or "")])
355
self.key.set(server_hash=self.key['local_hash'])
357
def get_dir(self, event, params, hash):
358
"""Get the directory."""
359
self.key.set(server_hash=hash)
360
self.m.fs.create_partial(node_id=self.key['node_id'],
361
share_id=self.key['share_id'])
362
self.m.action_q.listdir(
363
self.key['share_id'], self.key['node_id'], hash,
364
lambda : self.m.fs.get_partial_for_writing(
365
node_id=self.key['node_id'],
366
share_id=self.key['share_id'])
369
def file_conflict(self, event, params, hash, crc32, size, stat):
370
"""This file is in conflict."""
371
self.key.move_to_conflict()
374
def local_file_conflict(self, event, params, hash):
375
"""This file is in conflict."""
376
self.key.move_to_conflict()
377
self.m.action_q.cancel_upload(share_id=self.key['share_id'],
378
node_id=self.key['node_id'])
380
self.get_file(event, params, hash)
382
def merge_directory(self, event, params, hash):
383
"""Merge the server directory with the local one."""
391
fd = self.m.fs.get_partial(node_id=self.key['node_id'],
392
share_id=self.key['share_id'])
393
except InconsistencyError:
394
self.key.remove_partial()
395
self.key.set(server_hash=self.key['local_hash'])
396
self.m.action_q.query([
397
(self.key["share_id"], self.key["node_id"], "")])
398
# we dont perform the merge, we try to re get it
402
items = dircontent.parse_dir_content(fd)
403
server_dir = [ (o.utf8_name, o.node_type == DIRECTORY, o.uuid)
405
client_dir = self.m.fs.dir_content(self.key['path'])
406
# XXX: lucio.torre: with huge dirs, this could take a while
408
share = self.key['share_id']
409
for name, isdir, uuid in server_dir:
410
# we took the name as bytes already encoded in utf8
411
# directly from dircontent!
413
md = self.m.fs.get_by_node_id(share, uuid)
415
# not there, a new thing
417
new_dirs.append((share, uuid, name))
419
new_files.append((share, uuid, name))
421
mdpath = self.m.fs.get_abspath(md.share_id, md.path)
422
if mdpath != os.path.join(self.key['path'], name):
427
self.m.event_q.push("SV_MOVED",
428
share_id=md.share_id, node_id=uuid,
429
new_share_id=share, new_parent_id=self.key['node_id'],
433
for name, isdir, uuid in client_dir:
437
if not (name, isdir, uuid) in server_dir:
438
# not there, a its gone on the server
440
# this was a move, dont delete
443
deleted_dirs.append((share, uuid))
445
deleted_files.append((share, uuid))
448
parent_uuid = self.key['node_id']
449
for share, uuid, name in new_files:
450
self.m.event_q.push("SV_FILE_NEW", parent_id=parent_uuid,
451
node_id=uuid, share_id=share, name=name)
452
for share, uuid, name in new_dirs:
453
self.m.event_q.push("SV_DIR_NEW", parent_id=parent_uuid,
454
node_id=uuid, share_id=share, name=name)
455
for share, uuid in deleted_files:
456
self.m.event_q.push("SV_FILE_DELETED",
457
node_id=uuid, share_id=share)
458
for share, uuid in deleted_dirs:
459
self.m.event_q.push("SV_FILE_DELETED",
460
node_id=uuid, share_id=share)
462
self.key.remove_partial()
463
self.key.set(local_hash=hash)
465
def new_file(self, event, params, share_id, node_id, parent_id, name):
466
"""create a local file."""
467
mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
468
path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
469
self.m.fs.create(path=path, share_id=share_id, is_dir=False)
470
self.m.fs.set_node_id(path, node_id)
471
self.key.set(server_hash="")
472
self.key.set(local_hash="")
474
self.m.action_q.query([(share_id, node_id, "")])
476
def new_file_on_server_with_local(self, event, params, share_id,
477
node_id, parent_id, name):
478
"""move local file to conflict and re create"""
479
self.key.move_to_conflict()
480
self.key.delete_metadata()
481
self.new_file(event, params, share_id, node_id, parent_id, name)
483
def get_file(self, event, params, hash):
484
"""Get the contents for the file."""
485
self.key.set(server_hash=hash)
486
self.m.fs.create_partial(node_id=self.key['node_id'],
487
share_id=self.key['share_id'])
488
self.m.action_q.download(
489
share_id=self.key['share_id'], node_id=self.key['node_id'],
491
fileobj_factory=lambda: self.m.fs.get_partial_for_writing(
492
node_id=self.key['node_id'],
493
share_id=self.key['share_id'])
496
def reget_file(self, event, params, hash):
497
"""cancel and reget this download."""
498
self.key.set(server_hash=hash)
499
self.m.action_q.cancel_download(share_id=self.key['share_id'],
500
node_id=self.key['node_id'])
501
self.key.remove_partial()
502
self.get_file(event, params, hash)
504
def client_moved(self, event, params, path_from, path_to):
505
"""the client moved a file"""
506
parent_path = os.path.dirname(path_from)
507
old_parent = FSKey(self.m.fs, path=parent_path)
508
old_parent_id = old_parent['node_id']
509
new_path = os.path.dirname(path_to)
510
new_name = os.path.basename(path_to)
511
new_parent = FSKey(self.m.fs, path=new_path)
512
new_parent_id = new_parent['node_id']
514
self.m.action_q.move(share_id=self.key['share_id'],
515
node_id=self.key['node_id'], old_parent_id=old_parent_id,
516
new_parent_id=new_parent_id, new_name=new_name)
517
self.key.moved(self.key['share_id'], path_to)
519
# this is cheating, we change the state of another node
520
if not IMarker.providedBy(old_parent_id):
521
share_id = self.key['share_id']
522
self.m.action_q.cancel_download(share_id, old_parent_id)
523
old_parent.remove_partial()
524
self.m.fs.set_by_node_id(old_parent_id, share_id,
525
server_hash="", local_hash="")
526
self.m.action_q.query([(share_id, old_parent_id, "")])
527
self.m.hash_q.insert(self.key['path'])
530
def server_file_changed_back(self, event, params, hash):
531
"""cancel and dont reget this download."""
532
self.key.set(server_hash=hash)
533
self.m.action_q.cancel_download(share_id=self.key['share_id'],
534
node_id=self.key['node_id'])
535
self.key.remove_partial()
537
def commit_file(self, event, params, hash):
538
"""commit the new content."""
540
self.m.fs.commit_partial(
541
self.key['node_id'], self.key['share_id'], hash)
542
except InconsistencyError:
543
# someone or something broke out partials.
544
# start work to go to a good state
545
self.key.remove_partial()
546
self.key.set(server_hash=self.key['local_hash'])
547
self.m.action_q.query([
548
(self.key["share_id"], self.key["node_id"], "")])
550
def new_local_file(self, event, parms, path):
551
"""a new local file was created"""
552
# XXX: lucio.torre: we should use markers here
553
parent_path = os.path.dirname(path)
554
parent = self.m.fs.get_by_path(parent_path)
555
parent_id = parent.node_id or MDMarker(parent.mdid)
556
share_id = parent.share_id
557
self.m.fs.create(path=path, share_id=share_id, is_dir=False)
558
self.key.set(local_hash=empty_hash)
559
self.key.set(server_hash=empty_hash)
560
name = os.path.basename(path)
561
marker = MDMarker(self.key.get_mdid())
562
self.m.action_q.make_file(share_id, parent_id, name, marker)
564
def new_local_file_created(self, event, parms, new_id):
565
"""we got the server answer for the file creation."""
566
self.m.fs.set_node_id(self.key['path'], new_id)
569
def new_local_dir(self, event, parms, path):
570
"""a new local dir was created"""
571
# XXX: lucio.torre: we should use markers here
572
parent_path = os.path.dirname(path)
573
parent = self.m.fs.get_by_path(parent_path)
574
parent_id = parent.node_id or MDMarker(parent.mdid)
575
share_id = parent.share_id
576
self.m.fs.create(path=path, share_id=share_id, is_dir=True)
577
name = os.path.basename(path)
578
marker = MDMarker(self.key.get_mdid())
579
self.m.action_q.make_dir(share_id, parent_id, name, marker)
580
# pylint: disable-msg=W0704
582
self.m.lr.scan_dir(path)
584
pass #it was gone when lr got it
586
def new_local_dir_created(self, event, parms, new_id):
587
"""we got the server answer for dir creation"""
588
self.m.fs.set_node_id(self.key['path'], new_id)
590
def calculate_hash(self, event, params):
591
"""calculate the hash of this."""
592
self.m.hash_q.insert(self.key['path'])
594
def put_file(self, event, params, hash, crc32, size, stat):
595
"""upload the file to the server."""
596
previous_hash = self.key['server_hash']
597
self.key.set(local_hash=hash)
598
self.m.fs.update_stat(self.key.get_mdid(), stat)
599
self.m.action_q.upload(share_id=self.key['share_id'],
600
node_id=self.key['node_id'], previous_hash=previous_hash,
601
hash=hash, crc32=crc32, size=size,
602
fileobj_factory=self.key.open_file)
604
def converges_to_server(self, event, params, hash, crc32, size, stat):
605
"""the local changes now match the server"""
606
self.m.action_q.cancel_download(share_id=self.key['share_id'],
607
node_id=self.key['node_id'])
608
self.key.remove_partial()
609
self.key.set(local_hash=hash)
610
self.m.fs.update_stat(self.key.get_mdid(), stat)
612
def reput_file_from_ok(self, event, param, hash):
613
"""put the file again, mark upload as ok"""
614
self.m.action_q.cancel_upload(share_id=self.key['share_id'],
615
node_id=self.key['node_id'])
616
self.key.set(local_hash=hash)
617
self.key.set(server_hash=hash)
618
self.m.hash_q.insert(self.key['path'])
621
def reput_file(self, event, param, hash, crc32, size, stat):
622
"""put the file again."""
623
self.m.action_q.cancel_upload(share_id=self.key['share_id'],
624
node_id=self.key['node_id'])
625
previous_hash = self.key['server_hash']
627
self.key.set(local_hash=hash)
628
self.m.fs.update_stat(self.key.get_mdid(), stat)
629
self.m.action_q.upload(share_id=self.key['share_id'],
630
node_id=self.key['node_id'], previous_hash=previous_hash,
631
hash=hash, crc32=crc32, size=size,
632
fileobj_factory=self.key.open_file)
634
def server_file_now_matches(self, event, params, hash):
635
"""We got a server hash that matches local hash"""
636
self.m.action_q.cancel_upload(share_id=self.key['share_id'],
637
node_id=self.key['node_id'])
638
self.key.set(server_hash=hash)
640
def commit_upload(self, event, params, hash):
641
"""Finish an upload."""
642
self.key.upload_finished(hash)
644
def cancel_and_commit(self, event, params, hash):
645
"""Finish an upload."""
646
self.m.action_q.cancel_download(share_id=self.key['share_id'],
647
node_id=self.key['node_id'])
648
self.key.remove_partial()
649
self.key.upload_finished(hash)
652
def delete_file(self, event, params):
653
"""server file was deleted."""
655
self.key.delete_file()
658
# if directory not empty
659
self.key.move_to_conflict()
660
self.key.delete_metadata()
667
def conflict_and_delete(self, event, params, *args, **kwargs):
668
"""move to conflict and delete file."""
669
self.key.move_to_conflict()
670
self.key.delete_metadata()
672
def file_gone_wile_downloading(self, event, params):
673
"""a file we were downloading is gone."""
674
self.m.action_q.cancel_download(share_id=self.key['share_id'],
675
node_id=self.key['node_id'])
676
self.key.remove_partial()
677
self.delete_file(event, params)
679
def file_not_created_remove(self, event, params, error):
681
self.key.move_to_conflict()
682
self.key.delete_metadata()
684
def delete_on_server(self, event, params, path):
685
"""local file was deleted."""
686
self.m.action_q.unlink(self.key['share_id'],
687
self.key['parent_id'],
689
self.key.delete_metadata()
691
def deleted_dir_while_downloading(self, event, params, path):
693
self.m.action_q.cancel_download(share_id=self.key['share_id'],
694
node_id=self.key['node_id'])
695
self.key.remove_partial()
696
self.m.action_q.unlink(self.key['share_id'],
697
self.key['parent_id'],
699
self.key.delete_metadata()
701
def cancel_download_and_delete_on_server(self, event, params, path):
702
"""cancel_download_and_delete_on_server"""
703
self.m.action_q.cancel_download(share_id=self.key['share_id'],
704
node_id=self.key['node_id'])
705
self.key.remove_partial()
706
self.m.action_q.unlink(self.key['share_id'],
707
self.key['parent_id'],
709
self.key.delete_metadata()
711
def cancel_upload_and_delete_on_server(self, event, params, path):
712
"""cancel_download_and_delete_on_server"""
713
self.m.action_q.cancel_upload(share_id=self.key['share_id'],
714
node_id=self.key['node_id'])
715
self.m.action_q.unlink(self.key['share_id'],
716
self.key['parent_id'],
718
self.key.delete_metadata()
721
def server_moved(self, event, params, share_id, node_id,
722
new_share_id, new_parent_id, new_name):
723
"""file was moved on the server"""
724
self.key.move_file(new_share_id, new_parent_id, new_name)
726
def server_moved_dirty(self, event, params, share_id, node_id,
727
new_share_id, new_parent_id, new_name):
728
"""file was moved on the server while downloading it"""
729
self.m.action_q.cancel_download(share_id=self.key['share_id'],
730
node_id=self.key['node_id'])
731
self.key.remove_partial()
732
self.key.move_file(new_share_id, new_parent_id, new_name)
733
self.get_file(event, params, self.key['server_hash'])
735
def moved_dirty_local(self, event, params, path_from, path_to):
736
"""file was moved while uploading it"""
737
self.m.action_q.cancel_upload(share_id=self.key['share_id'],
738
node_id=self.key['node_id'])
739
self.key.set(local_hash=self.key['server_hash'])
740
self.client_moved(event, params, path_from, path_to)
741
self.m.hash_q.insert(self.key['path'])
744
def moved_dirty_server(self, event, params, path_from, path_to):
745
"""file was moved while downloading it"""
746
self.client_moved(event, params, path_from, path_to)
748
self.m.action_q.cancel_download(share_id=self.key['share_id'],
749
node_id=self.key['node_id'])
750
self.key.remove_partial()
751
self.key.set(server_hash=self.key['local_hash'])
752
self.m.action_q.query([(self.key['share_id'],
754
self.key['local_hash'] or "")])
756
def DESPAIR(self, event, params, *args, **kwargs):
757
"""if we got here, we are in trouble"""
758
self.log.error("DESPAIR on event=%s params=%s args=%s kwargs=%s",
759
event, params, args, kwargs)
761
def save_stat(self, event, params, hash, crc32, size, stat):
762
self.m.fs.update_stat(self.key.get_mdid(), stat)
766
"""Translates from EQ events into state machine events."""
768
# this will need some refactoring once we handle more events
770
def __init__(self, main):
772
self.logger = logging.getLogger('ubuntuone.SyncDaemon.sync')
773
self.fsm = StateMachine(u1fsfsm.state_machine)
775
self.m.event_q.subscribe(self)
777
def handle_SV_HASH_NEW(self, share_id, node_id, hash):
779
key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
780
log = FileLogger(self.logger, key)
781
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
782
ssmr.signal_event_with_hash("SV_HASH_NEW", hash)
784
def handle_SV_FILE_NEW(self, share_id, node_id, parent_id, name):
786
parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
787
path = os.path.join(parent["path"], name)
788
key = FSKey(self.m.fs, path=path)
789
log = FileLogger(self.logger, key)
790
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
791
ssmr.on_event("SV_FILE_NEW", {}, share_id, node_id, parent_id, name)
793
def handle_SV_DIR_NEW(self, share_id, node_id, parent_id, name):
795
parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
796
path = os.path.join(parent["path"], name)
797
key = FSKey(self.m.fs, path=path)
798
log = FileLogger(self.logger, key)
799
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
800
ssmr.on_event("SV_DIR_NEW", {}, share_id, node_id, parent_id, name)
802
def handle_SV_FILE_DELETED(self, share_id, node_id):
803
"""on SV_FILE_DELETED"""
804
key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
805
log = FileLogger(self.logger, key)
806
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
807
ssmr.on_event("SV_FILE_DELETED", {})
809
def handle_AQ_DOWNLOAD_FINISHED(self, share_id, node_id, server_hash):
810
"""on AQ_DOWNLOAD_FINISHED"""
811
key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
812
log = FileLogger(self.logger, key)
813
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
814
ssmr.signal_event_with_hash("AQ_DOWNLOAD_FINISHED", server_hash)
816
def handle_AQ_DOWNLOAD_ERROR(self, *args, **kwargs):
817
"""on AQ_DOWNLOAD_ERROR"""
818
# for now we pass. later we will feed this into the state machine
821
def handle_FS_FILE_CREATE(self, path):
822
"""on FS_FILE_CREATE"""
823
key = FSKey(self.m.fs, path=path)
824
log = FileLogger(self.logger, key)
825
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
826
ssmr.on_event("FS_FILE_CREATE", {}, path)
828
def handle_FS_DIR_CREATE(self, path):
829
"""on FS_DIR_CREATE"""
830
key = FSKey(self.m.fs, path=path)
831
log = FileLogger(self.logger, key)
832
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
833
ssmr.on_event("FS_DIR_CREATE", {}, path)
835
def handle_FS_FILE_DELETE(self, path):
836
"""on FS_FILE_DELETE"""
837
key = FSKey(self.m.fs, path=path)
838
log = FileLogger(self.logger, key)
839
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
840
ssmr.on_event("FS_FILE_DELETE", {}, path)
842
def handle_FS_DIR_DELETE(self, path):
843
"""on FS_DIR_DELETE"""
844
key = FSKey(self.m.fs, path=path)
845
log = FileLogger(self.logger, key)
846
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
847
ssmr.on_event("FS_DIR_DELETE", {}, path)
849
def handle_FS_FILE_MOVE(self, path_from, path_to):
850
"""on FS_FILE_MOVE"""
851
key = FSKey(self.m.fs, path=path_from)
852
log = FileLogger(self.logger, key)
853
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
854
ssmr.on_event("FS_FILE_MOVE", {}, path_from, path_to)
855
handle_FS_DIR_MOVE = handle_FS_FILE_MOVE
857
def handle_AQ_FILE_NEW_OK(self, marker, new_id):
858
"""on AQ_FILE_NEW_OK"""
859
key = FSKey(self.m.fs, mdid=marker)
860
log = FileLogger(self.logger, key)
861
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
862
ssmr.on_event("AQ_FILE_NEW_OK", {}, new_id)
864
def handle_AQ_FILE_NEW_ERROR(self, marker, error):
865
"""on AQ_FILE_NEW_ERROR"""
866
key = FSKey(self.m.fs, mdid=marker)
867
log = FileLogger(self.logger, key)
868
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
869
ssmr.signal_event_with_error("AQ_FILE_NEW_ERROR", error)
871
def handle_AQ_DIR_NEW_ERROR(self, marker, error):
872
"""on AQ_DIR_NEW_ERROR"""
873
key = FSKey(self.m.fs, mdid=marker)
874
log = FileLogger(self.logger, key)
875
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
876
ssmr.signal_event_with_error("AQ_DIR_NEW_ERROR", error)
878
def handle_AQ_DIR_NEW_OK(self, marker, new_id):
879
"""on AQ_DIR_NEW_OK"""
880
key = FSKey(self.m.fs, mdid=marker)
881
log = FileLogger(self.logger, key)
882
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
883
ssmr.on_event("AQ_DIR_NEW_OK", {}, new_id)
885
def handle_FS_FILE_CLOSE_WRITE(self, path):
886
"""on FS_FILE_CLOSE_WRITE"""
887
key = FSKey(self.m.fs, path=path)
888
log = FileLogger(self.logger, key)
889
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
890
ssmr.on_event('FS_FILE_CLOSE_WRITE', {})
892
def handle_HQ_HASH_NEW(self, path, hash, crc32, size, stat):
894
key = FSKey(self.m.fs, path=path)
895
log = FileLogger(self.logger, key)
896
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
897
ssmr.signal_event_with_hash("HQ_HASH_NEW", hash, crc32, size, stat)
899
def handle_AQ_UPLOAD_FINISHED(self, share_id, node_id, hash):
900
"""on AQ_UPLOAD_FINISHED"""
901
key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
902
log = FileLogger(self.logger, key)
903
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
904
ssmr.signal_event_with_hash("AQ_UPLOAD_FINISHED", hash)
906
def handle_AQ_UPLOAD_ERROR(self, share_id, node_id, error, hash):
907
"""on AQ_UPLOAD_FINISHED"""
908
key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
909
log = FileLogger(self.logger, key)
910
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
911
ssmr.signal_event_with_error_and_hash("AQ_UPLOAD_ERROR", error, hash)
913
def handle_SV_MOVED(self, share_id, node_id, new_share_id, new_parent_id,
916
key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
917
log = FileLogger(self.logger, key)
918
ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
919
ssmr.on_event("SV_MOVED", {}, share_id, node_id, new_share_id,
920
new_parent_id, new_name)