1
# ubuntuone.syncdaemon.interfaces - ActionQueue interface
3
# Authors: Manuel de la Pena <manuel@canonical.com>
5
# Copyright 2011 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
"""Interfaces used to interact with the sync daemon."""
23
from ubuntuone.syncdaemon import config
24
from ubuntuone.syncdaemon.interfaces import IMarker
25
from ubuntuone.syncdaemon.action_queue import Download, Upload
27
logger = logging.getLogger("ubuntuone.SyncDaemon.InteractionInterfaces")
31
"""Return a string value that can be converted back to bool."""
32
return 'True' if value else ''
35
def get_share_dict(share):
36
"""Get a dict with all the attributes of: share."""
37
share_dict = share.__dict__.copy()
38
if 'subscribed' not in share_dict:
39
share_dict['subscribed'] = share.subscribed
40
for k, v in share_dict.items():
42
share_dict[unicode(k)] = ''
44
share_dict[unicode(k)] = v.decode('utf-8')
45
elif k == 'accepted' or k == 'subscribed':
46
share_dict[unicode(k)] = bool_str(v)
48
share_dict[unicode(k)] = unicode(v)
52
def get_udf_dict(udf):
53
"""Get a dict with all the attributes of: udf."""
54
udf_dict = udf.__dict__.copy()
55
for k, v in udf_dict.items():
57
udf_dict[unicode(k)] = ''
58
elif k == 'subscribed':
59
udf_dict[unicode(k)] = bool_str(v)
61
udf_dict[unicode(k)] = v.decode('utf-8')
62
elif k == 'suggested_path' and isinstance(v, str):
63
udf_dict[unicode(k)] = v.decode('utf-8')
65
udf_dict[unicode(k)] = unicode(v)
69
class SyncdaemonStatus(object):
70
"""Represent the status of the syncdaemon."""
72
def __init__(self, main, action_queue, fs_manager):
73
"""Creates the instance."""
74
super(SyncdaemonStatus, self).__init__()
76
self.action_queue = action_queue
77
self.fs_manager = fs_manager
79
def _get_current_state(self):
80
"""Get the current status of the system."""
81
state = self.main.state_manager.state
82
connection = self.main.state_manager.connection.state
83
queues = self.main.state_manager.queues.state.name
86
'description': state.description,
87
'is_error': bool_str(state.is_error),
88
'is_connected': bool_str(state.is_connected),
89
'is_online': bool_str(state.is_online),
91
'connection': connection,
95
def current_status(self):
96
"""Return the current status of the system, one of: local_rescan,
97
offline, trying_to_connect, server_rescan or online.
99
logger.debug('called current_status')
100
return self._get_current_state()
102
def current_downloads(self):
103
"""Return a list of files with a download in progress."""
104
logger.debug('called current_downloads')
105
current_downloads = []
106
for cmd in self.action_queue.queue.waiting:
107
if isinstance(cmd, Download) and cmd.running:
110
'share_id': cmd.share_id,
111
'node_id': cmd.node_id,
112
'n_bytes_read': str(cmd.n_bytes_read),
114
if cmd.deflated_size is not None:
115
entry['deflated_size'] = str(cmd.deflated_size)
116
current_downloads.append(entry)
117
return current_downloads
119
def _get_command_path(self, cmd):
120
"""Return the path on which the command applies."""
122
if IMarker.providedBy(cmd.node_id):
123
# it's a marker! so it's the mdid :)
124
relpath = self.fs_manager.get_by_mdid(cmd.node_id).path
126
relpath = self.fs_manager.get_by_node_id(cmd.share_id,
128
path = self.fs_manager.get_abspath(cmd.share_id, relpath)
130
# probably in the trash (normal case for Unlink)
132
key = cmd.share_id, cmd.node_id
133
if key in self.fs_manager.trash:
134
node_data = self.fs_manager.trash[key]
135
if len(node_data) == 3:
141
def waiting_metadata(self):
142
"""Return a list of the operations in the meta-queue.
144
As we don't have meta-queue anymore, this is faked.
146
logger.debug('called waiting_metadata')
147
waiting_metadata = []
148
for cmd in self.action_queue.queue.waiting:
149
if not isinstance(cmd, (Upload, Download)):
150
operation = cmd.__class__.__name__
152
if 'path' not in data and hasattr(cmd, 'share_id') \
153
and hasattr(cmd, 'node_id'):
154
data['path'] = self._get_command_path(cmd)
155
waiting_metadata.append((operation, data))
156
return waiting_metadata
158
def waiting_content(self):
159
"""Return a list of files that are waiting to be up- or downloaded.
161
As we don't have content-queue anymore, this is faked.
163
logger.debug('called waiting_content')
165
for cmd in self.action_queue.queue.waiting:
166
if isinstance(cmd, (Upload, Download)):
167
path = self._get_command_path(cmd)
168
data = dict(path=path, share=cmd.share_id, node=cmd.node_id,
169
operation=cmd.__class__.__name__)
170
waiting_content.append(data)
171
return waiting_content
173
def schedule_next(self, share_id, node_id):
175
Make the command on the given share and node be next in the
176
queue of waiting commands.
178
logger.debug('called schedule_next')
179
self.action_queue.content_queue.schedule_next(share_id, node_id)
181
def current_uploads(self):
182
"""return a list of files with a upload in progress"""
183
logger.debug('called current_uploads')
185
for cmd in self.action_queue.queue.waiting:
186
if isinstance(cmd, Upload) and cmd.running:
189
'share_id': cmd.share_id,
190
'node_id': cmd.node_id,
191
'n_bytes_written': str(cmd.n_bytes_written),
193
if cmd.deflated_size is not None:
194
entry['deflated_size'] = str(cmd.deflated_size)
195
current_uploads.append(entry)
196
return current_uploads
198
class SyncdaemonFileSystem(object):
199
"""An interface to the FileSystem Manager."""
201
def __init__(self, fs_manager, action_queue):
202
"""Creates the instance."""
203
super(SyncdaemonFileSystem, self).__init__()
204
self.fs_manager = fs_manager
205
self.action_queue = action_queue
207
def get_metadata(self, path):
208
"""Return the metadata (as a dict) for the specified path."""
209
logger.debug('get_metadata by path: %r', path)
210
real_path = os.path.realpath(path.encode('utf-8'))
211
mdobj = self.fs_manager.get_by_path(real_path)
212
md_dict = self._mdobj_dict(mdobj)
213
md_dict['path'] = path
216
def get_metadata_by_node(self, share_id, node_id):
217
"""Return the metadata (as a dict) for the specified share/node."""
218
logger.debug('get_metadata by share: %r node: %r', share_id, node_id)
219
mdobj = self.fs_manager.get_by_node_id(share_id, node_id)
220
md_dict = self._mdobj_dict(mdobj)
221
path = self.fs_manager.get_abspath(mdobj.share_id, mdobj.path)
222
md_dict['path'] = path
225
def get_metadata_and_quick_tree_synced(self, path):
226
"""Return the dict with the attributes of the metadata for
227
the specified path, including the quick subtree status.
229
logger.debug('get_metadata_and_quick_tree_synced: %r', path)
230
real_path = os.path.realpath(path.encode('utf-8'))
231
mdobj = self.fs_manager.get_by_path(real_path)
232
md_dict = self._mdobj_dict(mdobj)
233
md_dict['path'] = path
234
if self._path_in_queue(real_path):
235
md_dict['quick_tree_synced'] = ''
237
md_dict['quick_tree_synced'] = 'synced'
240
def _path_in_queue(self, path):
241
"""Return whether there are queued commands pertaining to the path."""
242
for cmd in self.action_queue.queue.waiting:
243
share_id = getattr(cmd, 'share_id', None)
244
node_id = getattr(cmd, 'node_id', None)
245
if share_id is not None and node_id is not None:
246
# XXX: nested try/excepts in a loop are probably a
247
# sign that I'm doing something wrong - or that
251
node_md = self.fs_manager.get_by_node_id(share_id, node_id)
253
# maybe it's actually the mdid?
255
node_md = self.fs_manager.get_by_mdid(node_id)
257
# hm, nope. Dunno what to do then
260
this_path = self.fs_manager.get_abspath(share_id,
263
this_path = self.fs_manager.get_abspath(share_id,
265
if this_path.startswith(path):
269
def _mdobj_dict(self, mdobj):
270
"""Returns a dict from a MDObject."""
272
for k, v in mdobj.__dict__.items():
276
md_dict[str(k)] = v.decode('utf-8')
278
md_dict[str(k)] = str(v)
279
if mdobj.__dict__.get('info', None):
280
for k, v in mdobj.info.__dict__.items():
281
md_dict['info_' + str(k)] = str(v)
284
def get_dirty_nodes(self):
285
"""Rerturn a list of dirty nodes."""
286
mdobjs = self.fs_manager.get_dirty_nodes()
289
dirty_nodes.append(self._mdobj_dict(mdobj))
292
class SyncdaemonShares(object):
293
"""An interface to interact with shares."""
295
def __init__(self, fs_manager, volume_manager):
296
"""Create the instance."""
297
super(SyncdaemonShares, self).__init__()
298
self.fs_manager = fs_manager
299
self.vm = volume_manager
301
def get_volume(self, share_id):
302
"""Return the volume for the given share."""
303
return self.vm.get_volume(share_id)
305
def get_create_error_share_info(self, share_info):
306
"""Get the share info used for errors."""
307
path = self.fs_manager.get_by_mdid(str(share_info['marker'])).path
308
share_info.update(dict(path=path))
311
def get_shares(self):
312
"""Return a list of dicts, each dict represents a share."""
313
logger.debug('called get_shares')
315
for share_id, share in self.vm.shares.items():
318
share_dict = get_share_dict(share)
319
shares.append(share_dict)
322
def accept_share(self, share_id, reply_handler=None, error_handler=None):
325
A ShareAnswerOk|Error signal will be fired in the future as a
326
success/failure indicator.
329
logger.debug('accept_share: %r', share_id)
330
if str(share_id) in self.vm.shares:
331
self.vm.accept_share(str(share_id), True)
334
error_handler(ValueError("The share with id: %s don't exists" % \
337
def reject_share(self, share_id, reply_handler=None, error_handler=None):
338
"""Reject a share."""
339
logger.debug('reject_share: %r', share_id)
340
if str(share_id) in self.vm.shares:
341
self.vm.accept_share(str(share_id), False)
344
error_handler(ValueError("The share with id: %s don't exists" % \
347
def delete_share(self, share_id):
348
"""Delete a Share, both kinds: "to me" and "from me"."""
349
from ubuntuone.syncdaemon.volume_manager import VolumeDoesNotExist
350
logger.debug('delete_share: %r', share_id)
352
self.vm.delete_volume(str(share_id))
353
except VolumeDoesNotExist:
354
# isn't a volume! it might be a "share from me (a.k.a shared)"
355
self.vm.delete_share(str(share_id))
357
def subscribe(self, share_id):
358
"""Subscribe to the specified share."""
359
logger.debug('Shares.subscribe: %r', share_id)
360
d = self.vm.subscribe_share(str(share_id))
361
msg = 'subscribe_share for id %r failed with %r'
362
d.addErrback(lambda f: logger.error(msg, share_id, f))
364
def unsubscribe(self, share_id):
365
"""Unsubscribe from the specified share."""
366
logger.debug('Shares.unsubscribe: %r', share_id)
367
self.vm.unsubscribe_share(str(share_id))
369
def create_share(self, path, username, name, access_level):
370
"""Share a subtree to the user identified by username.
372
@param path: that path to share (the root of the subtree)
373
@param username: the username to offer the share to
374
@param name: the name of the share
375
@param access_level: 'View' or 'Modify'
377
logger.debug('create share: %r, %r, %r, %r',
378
path, username, name, access_level)
379
path = path.encode("utf8")
380
username = unicode(username)
382
access_level = str(access_level)
384
self.fs_manager.get_by_path(path)
386
raise ValueError("path '%r' does not exist" % path)
387
self.vm.create_share(path, username, name, access_level)
389
def create_shares(self, path, usernames, name, access_level):
390
"""Share a subtree with several users at once.
392
@param path: that path to share (the root of the subtree)
393
@param usernames: the user names to offer the share to
394
@param name: the name of the share
395
@param access_level: 'View' or 'Modify'
397
logger.debug('create shares: %r, %r, %r, %r',
398
path, usernames, name, access_level)
399
for user in usernames:
400
self.create_share(path, user, name, access_level)
402
def refresh_shares(self):
403
"""Refresh the share list, requesting it to the server."""
404
self.vm.refresh_shares()
406
def get_shared(self):
407
"""Returns a list of dicts, each dict represents a shared share.
408
A share might not have the path set, as we might be still fetching the
409
nodes from the server. In this cases the path is ''
411
logger.debug('called get_shared')
413
for share_id, share in self.vm.shared.items():
416
share_dict = get_share_dict(share)
417
shares.append(share_dict)
420
class SyncdaemonConfig(object):
421
"""The Syncdaemon config/settings dbus interface."""
423
def __init__(self, main, action_queue):
424
"""Creates the instance.
426
@param bus: the BusName of this DBusExposedObject.
428
super(SyncdaemonConfig, self).__init__()
430
self.action_queue = action_queue
432
def get_throttling_limits(self, reply_handler=None, error_handler=None):
433
"""Get the read/write limit from AQ and return a dict.
434
Returns a dict(download=int, upload=int), if int is -1 the value isn't
436
The values are bytes/second
438
logger.debug("called get_throttling_limits")
440
aq = self.action_queue
443
if aq.readLimit is not None:
444
download = aq.readLimit
445
if aq.writeLimit is not None:
446
upload = aq.writeLimit
447
info = dict(download=download,
453
# pylint: disable-msg=W0703
460
def set_throttling_limits(self, download, upload,
461
reply_handler=None, error_handler=None):
462
"""Set the read and write limits. The expected values are bytes/sec."""
463
logger.debug("called set_throttling_limits")
465
# modify and save the config file
466
user_config = config.get_user_config()
467
user_config.set_throttling_read_limit(download)
468
user_config.set_throttling_write_limit(upload)
471
aq = self.action_queue
476
aq.readLimit = download
477
aq.writeLimit = upload
480
# pylint: disable-msg=W0703
487
def enable_bandwidth_throttling(self, reply_handler=None,
489
"""Enable bandwidth throttling."""
491
self._set_throttling_enabled(True)
494
# pylint: disable-msg=W0703
501
def disable_bandwidth_throttling(self, reply_handler=None,
503
"""Disable bandwidth throttling."""
505
self._set_throttling_enabled(False)
508
# pylint: disable-msg=W0703
515
def _set_throttling_enabled(self, enabled):
516
"""Set throttling enabled value and save the config"""
517
# modify and save the config file
518
user_config = config.get_user_config()
519
user_config.set_throttling(enabled)
523
self.action_queue.enable_throttling()
525
self.action_queue.disable_throttling()
527
def bandwidth_throttling_enabled(self, reply_handler=None,
529
"""Returns True (actually 1) if bandwidth throttling is enabled and
532
enabled = self.action_queue.throttling_enabled
534
reply_handler(enabled)
538
def udf_autosubscribe_enabled(self):
539
"""Return the udf_autosubscribe config value."""
540
return config.get_user_config().get_udf_autosubscribe()
542
def enable_udf_autosubscribe(self):
543
"""Enable UDF autosubscribe."""
544
user_config = config.get_user_config()
545
user_config.set_udf_autosubscribe(True)
548
def disable_udf_autosubscribe(self):
549
"""Enable UDF autosubscribe."""
550
user_config = config.get_user_config()
551
user_config.set_udf_autosubscribe(False)
554
def share_autosubscribe_enabled(self):
555
"""Return the share_autosubscribe config value."""
556
return config.get_user_config().get_share_autosubscribe()
558
def enable_share_autosubscribe(self):
559
"""Enable UDF autosubscribe."""
560
user_config = config.get_user_config()
561
user_config.set_share_autosubscribe(True)
564
def disable_share_autosubscribe(self):
565
"""Enable UDF autosubscribe."""
566
user_config = config.get_user_config()
567
user_config.set_share_autosubscribe(False)
570
def set_files_sync_enabled(self, enabled):
571
"""Enable/disable file sync service."""
572
logger.debug('called set_files_sync_enabled %d', enabled)
573
user_config = config.get_user_config()
574
user_config.set_files_sync_enabled(bool(int(enabled)))
577
def files_sync_enabled(self):
578
"""Return the files_sync_enabled config value."""
579
logger.debug('called files_sync_enabled')
580
return config.get_user_config().get_files_sync_enabled()
582
def autoconnect_enabled(self):
583
"""Return the autoconnect config value."""
584
return config.get_user_config().get_autoconnect()
586
def set_autoconnect_enabled(self, enabled):
587
"""Enable syncdaemon autoconnect."""
588
user_config = config.get_user_config()
589
user_config.set_autoconnect(enabled)
592
def show_all_notifications_enabled(self):
593
"""Return the show_all_notifications config value."""
594
return config.get_user_config().get_show_all_notifications()
596
def enable_show_all_notifications(self):
597
"""Enable showing all notifications."""
598
user_config = config.get_user_config()
599
user_config.set_show_all_notifications(True)
601
self.main.status_listener.show_all_notifications = True
603
def disable_show_all_notifications(self):
604
"""Disable showing all notifications."""
605
user_config = config.get_user_config()
606
user_config.set_show_all_notifications(False)
608
self.main.status_listener.show_all_notifications = False
611
class SyncdaemonFolders(object):
612
"""A dbus interface to interact with User Defined Folders"""
614
def __init__(self, volume_manager, fs_manager):
615
"""Create the instance."""
616
super(SyncdaemonFolders, self).__init__()
617
self.vm = volume_manager
620
def create(self, path):
621
"""Create a user defined folder in the specified path."""
622
logger.debug('Folders.create: %r', path)
623
path = os.path.normpath(path)
624
self.vm.create_udf(path.encode('utf-8'))
626
def delete(self, folder_id):
627
"""Delete the folder specified by folder_id"""
628
logger.debug('Folders.delete: %r', folder_id)
629
self.vm.delete_volume(str(folder_id))
631
def get_folders(self):
632
"""Return the list of folders (a list of dicts)"""
633
logger.debug('Folders.get_folders')
634
return [get_udf_dict(udf) for udf in self.vm.udfs.values()]
636
def subscribe(self, folder_id):
637
"""Subscribe to the specified folder"""
638
logger.debug('Folders.subscribe: %r', folder_id)
640
self.vm.subscribe_udf(str(folder_id))
642
logger.exception('Error while subscribing udf: %r', folder_id)
645
def unsubscribe(self, folder_id):
646
"""Unsubscribe from the specified folder"""
647
logger.debug('Folders.unsubscribe: %r', folder_id)
649
self.vm.unsubscribe_udf(str(folder_id))
651
logger.exception('Error while unsubscribing udf: %r', folder_id)
654
def get_info(self, path):
655
"""Return a dict containing the folder information."""
656
logger.debug('Folders.get_info: %r', path)
657
mdobj = self.fs.get_by_path(path.encode('utf-8'))
658
udf = self.vm.udfs.get(mdobj.share_id, None)
662
return get_udf_dict(udf)
664
def refresh_volumes(self):
665
"""Refresh the volumes list, requesting it to the server."""
666
self.vm.refresh_volumes()
668
class SyncdaemonPublicFiles(object):
669
"""A DBus interface for handling public files."""
671
def __init__(self, fs_manager, action_queue):
672
super(SyncdaemonPublicFiles, self).__init__()
674
self.aq = action_queue
676
def get_path(self, share_id, node_id):
677
"""Get the path of the public file with the given ids."""
678
share_id = str(share_id) if share_id else ''
679
node_id = str(node_id)
681
relpath = self.fs.get_by_node_id(share_id,
686
path=self.fs.get_abspath(share_id, relpath)
689
def change_public_access(self, share_id, node_id, is_public):
690
"""Change the public access of a file."""
691
logger.debug('PublicFiles.change_public_access: %r, %r, %r',
692
share_id, node_id, is_public)
694
share_id = uuid.UUID(share_id)
697
node_id = uuid.UUID(node_id)
698
self.aq.change_public_access(share_id, node_id, is_public)
700
def get_public_files(self):
701
"""Request the list of public files to the server.
703
The result will be send in a PublicFilesList signal.
705
return self.aq.get_public_files()
708
class SyncdaemonEvents(object):
709
"""The events of the system translated to IPC signals.
711
@param bus_name: the BusName of this DBusExposedObject.
712
@param event_queue: the Event Queue
714
def __init__(self, event_queue):
715
super(SyncdaemonEvents, self).__init__()
716
self.event_queue = event_queue
718
def push_event(self, event_name, args):
719
"""Push an event to the event queue."""
720
logger.debug('push_event: %r with %r', event_name, args)
721
str_args = dict((str(k), str(v)) for k, v in args.items())
722
self.event_queue.push(str(event_name), **str_args)
724
class SyncdaemonService(object):
725
""" The Daemon dbus interface. """
727
def __init__(self, service, main, volume_manager, action_queue):
728
""" Creates the instance.
730
@param bus: the BusName of this DBusExposedObject.
732
super(SyncdaemonService, self).__init__()
733
self.service = service
735
self.volume_manager = volume_manager
736
self.action_queue = action_queue
739
""" Connect to the server. """
740
logger.debug('connect requested')
741
self.service.connect()
743
def disconnect(self):
744
""" Disconnect from the server. """
745
logger.debug('disconnect requested')
746
self.service.disconnect()
748
def get_rootdir(self):
749
""" Returns the root dir/mount point. """
750
logger.debug('called get_rootdir')
751
return self.main.get_rootdir()
753
def get_sharesdir(self):
754
""" Returns the shares dir/mount point. """
755
logger.debug('called get_sharesdir')
756
return self.main.get_sharesdir()
758
def get_sharesdir_link(self):
759
""" Returns the shares dir/mount point. """
760
logger.debug('called get_sharesdir_link')
761
return self.main.get_sharesdir_link()
763
def wait_for_nirvana(self, last_event_interval,
764
reply_handler=None, error_handler=None):
765
""" call the reply handler when there are no more
768
logger.debug('called wait_for_nirvana')
769
d = self.main.wait_for_nirvana(last_event_interval)
770
d.addCallbacks(reply_handler, error_handler)
773
def quit(self, reply_handler=None, error_handler=None):
774
""" shutdown the syncdaemon. """
775
logger.debug('Quit requested')
780
def rescan_from_scratch(self, volume_id):
781
"""Request a rescan from scratch of the volume with volume_id."""
782
# check that the volume exists
783
volume = self.volume_manager.get_volume(str(volume_id))
784
self.action_queue.rescan_from_scratch(volume.volume_id)
787
class SyncdaemonEventListener(object):
788
"""An Event Queue Listener."""
790
def __init__(self, interact_interface):
791
"""The interact interface that contains all the exposed methods."""
792
super(SyncdaemonEventListener, self).__init__()
793
self.interface = interact_interface
795
def handle_AQ_DOWNLOAD_STARTED(self, share_id, node_id, server_hash):
796
"""Handle AQ_DOWNLOAD_STARTED."""
798
mdobj = self.interface.fs_manager.get_by_node_id(share_id, node_id)
801
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
802
self.interface.status.emit_download_started(path)
804
args = dict(message='The md is gone before sending '
805
'DownloadStarted signal',
807
share_id=str(share_id),
808
node_id=str(node_id))
809
self.interface.status.emit_signal_error('DownloadStarted', args)
811
def handle_AQ_DOWNLOAD_FILE_PROGRESS(self, share_id, node_id,
812
n_bytes_read, deflated_size):
813
"""Handle AQ_DOWNLOAD_FILE_PROGRESS."""
815
mdobj = self.interface.fs_manager.get_by_node_id(share_id, node_id)
817
args = dict(message='The md is gone before sending '
818
'DownloadFileProgress signal',
820
share_id=str(share_id),
821
node_id=str(node_id))
822
self.interface.status.emit_signal_error('DownloadFileProgress',
825
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
826
self.interface.status.emit_download_file_progress(path,
827
n_bytes_read=n_bytes_read,
828
deflated_size=deflated_size
831
def handle_AQ_DOWNLOAD_FINISHED(self, share_id, node_id, server_hash):
832
"""Handle AQ_DOWNLOAD_FINISHED."""
834
mdobj = self.interface.fs_manager.get_by_node_id(share_id, node_id)
837
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
838
self.interface.status.emit_download_finished(path)
840
# file is gone before we got this
841
args = dict(message='The md is gone before sending '
842
'DownloadFinished signal',
844
share_id=str(share_id),
845
node_id=str(node_id))
846
self.interface.status.emit_signal_error('DownloadFinished', args)
848
def handle_AQ_DOWNLOAD_CANCELLED(self, share_id, node_id, server_hash):
849
"""Handle AQ_DOWNLOAD_CANCELLED."""
850
self.handle_AQ_DOWNLOAD_ERROR(share_id, node_id, server_hash,
851
'CANCELLED', 'AQ_DOWNLOAD_CANCELLED')
853
def handle_AQ_DOWNLOAD_ERROR(self, share_id, node_id, server_hash, error,
854
event='AQ_DOWNLOAD_ERROR'):
855
"""Handle AQ_DOWNLOAD_ERROR."""
857
mdobj = self.interface.fs_manager.get_by_node_id(share_id, node_id)
860
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
861
self.interface.status.emit_download_finished(path, error=error)
863
# file is gone before we got this
864
args = dict(message='The md is gone before sending '
865
'DownloadFinished signal',
867
share_id=str(share_id),
868
node_id=str(node_id),
869
download_error=str(error))
870
self.interface.status.emit_signal_error('DownloadFinished', args)
872
def handle_AQ_UPLOAD_STARTED(self, share_id, node_id, hash):
873
"""Handle AQ_UPLOAD_STARTED."""
875
mdobj = self.interface.fs_manager.get_by_node_id(share_id, node_id)
878
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
879
self.interface.status.emit_upload_started(path)
881
args = dict(message='The md is gone before sending '
882
'UploadStarted signal',
884
share_id=str(share_id),
885
node_id=str(node_id))
886
self.interface.status.emit_signal_error('UploadStarted', args)
888
def handle_AQ_UPLOAD_FILE_PROGRESS(self, share_id, node_id,
889
n_bytes_written, deflated_size):
890
"""Handle AQ_UPLOAD_FILE_PROGRESS."""
892
mdobj = self.interface.fs_manager.get_by_node_id(share_id, node_id)
894
args = dict(message='The md is gone before sending '
895
'UploadFileProgress signal',
897
share_id=str(share_id),
898
node_id=str(node_id))
899
self.interface.status.emit_signal_error('UploadFileProgress',
902
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
903
self.interface.status.emit_upload_file_progress(path,
904
n_bytes_written=n_bytes_written,
905
deflated_size=deflated_size
908
def handle_AQ_UPLOAD_FINISHED(self, share_id, node_id, hash,
910
"""Handle AQ_UPLOAD_FINISHED."""
912
mdobj = self.interface.fs_manager.get_by_node_id(share_id,
916
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
917
self.interface.status.emit_upload_finished(path)
919
# file is gone before we got this
920
args = dict(message='The metadata is gone before sending '
921
'UploadFinished signal',
923
share_id=str(share_id),
924
node_id=str(node_id))
925
self.interface.status.emit_signal_error('UploadFinished', args)
927
def handle_SV_ACCOUNT_CHANGED(self, account_info):
928
"""Handle SV_ACCOUNT_CHANGED."""
929
self.interface.status.emit_account_changed(account_info)
931
def handle_AQ_UPLOAD_ERROR(self, share_id, node_id, error, hash):
932
"""Handle AQ_UPLOAD_ERROR."""
934
mdobj = self.interface.fs_manager.get_by_node_id(share_id, node_id)
937
path = self.interface.fs_manager.get_abspath(share_id, mdobj.path)
938
self.interface.status.emit_upload_finished(path, error=error)
940
# file is gone before we got this
941
args = dict(message='The metadata is gone before sending '
942
'UploadFinished signal',
944
share_id=str(share_id),
945
node_id=str(node_id),
946
upload_error=str(error))
947
self.interface.status.emit_signal_error('UploadFinished', args)
949
def handle_FS_INVALID_NAME(self, dirname, filename):
950
"""Handle FS_INVALID_NAME."""
951
self.interface.status.emit_invalid_name(dirname, filename)
953
def handle_SYS_BROKEN_NODE(self, volume_id, node_id, mdid, path):
954
"""Handle SYS_BROKEN_NODE."""
955
self.interface.status.emit_broken_node(volume_id, node_id, mdid, path)
957
def handle_SYS_STATE_CHANGED(self, state):
958
"""Handle SYS_STATE_CHANGED."""
959
self.interface.status.emit_status_changed(state)
961
def handle_SV_FREE_SPACE(self, share_id, free_bytes):
962
"""Handle SV_FREE_SPACE event, emit ShareChanged signal."""
963
self.interface.shares.emit_free_space(share_id, free_bytes)
965
def handle_AQ_CREATE_SHARE_OK(self, share_id, marker):
966
"""Handle AQ_CREATE_SHARE_OK event, emit ShareCreated signal."""
967
share = self.interface.volume_manager.shared.get(str(share_id))
970
# pylint: disable-msg=W0212
971
share_dict.update(get_share_dict(share))
973
share_dict.update(dict(volume_id=str(share_id)))
974
self.interface.shares.emit_share_created(share_dict)
976
def handle_AQ_CREATE_SHARE_ERROR(self, marker, error):
977
"""Handle AQ_CREATE_SHARE_ERROR event, emit ShareCreateError signal."""
978
self.interface.shares.emit_share_create_error(dict(marker=marker),
981
def handle_AQ_ANSWER_SHARE_OK(self, share_id, answer):
982
""" handle AQ_ANSWER_SHARE_OK event, emit ShareAnswerOk signal. """
983
self.interface.shares.emit_share_answer_response(str(share_id), answer)
985
def handle_AQ_ANSWER_SHARE_ERROR(self, share_id, answer, error):
986
"""Handle AQ_ANSWER_SHARE_ERROR event, emit ShareAnswerError signal."""
987
self.interface.shares.emit_share_answer_response(str(share_id), answer,
989
def handle_VM_UDF_SUBSCRIBED(self, udf):
990
"""Handle VM_UDF_SUBSCRIBED event, emit FolderSubscribed signal."""
991
self.interface.folders.emit_folder_subscribed(udf)
993
def handle_VM_UDF_SUBSCRIBE_ERROR(self, udf_id, error):
994
"""Handle VM_UDF_SUBSCRIBE_ERROR, emit FolderSubscribeError signal."""
995
self.interface.folders.emit_folder_subscribe_error(udf_id, error)
997
def handle_VM_UDF_UNSUBSCRIBED(self, udf):
998
"""Handle VM_UDF_UNSUBSCRIBED event, emit FolderUnSubscribed signal."""
999
self.interface.folders.emit_folder_unsubscribed(udf)
1001
def handle_VM_UDF_UNSUBSCRIBE_ERROR(self, udf_id, error):
1002
"""Handle VM_UDF_UNSUBSCRIBE_ERROR, emit FolderUnSubscribeError."""
1003
self.interface.folders.emit_folder_unsubscribe_error(udf_id, error)
1005
def handle_VM_UDF_CREATED(self, udf):
1006
"""Handle VM_UDF_CREATED event, emit FolderCreated signal."""
1007
self.interface.folders.emit_folder_created(udf)
1009
def handle_VM_UDF_CREATE_ERROR(self, path, error):
1010
"""Handle VM_UDF_CREATE_ERROR event, emit FolderCreateError signal."""
1011
self.interface.folders.emit_folder_create_error(path, error)
1013
def handle_VM_SHARE_SUBSCRIBED(self, share):
1014
"""Handle VM_SHARE_SUBSCRIBED event, emit ShareSubscribed signal."""
1015
self.interface.shares.emit_share_subscribed(share)
1017
def handle_VM_SHARE_SUBSCRIBE_ERROR(self, share_id, error):
1018
"""Handle VM_SHARE_SUBSCRIBE_ERROR, emit ShareSubscribeError signal."""
1019
self.interface.shares.emit_share_subscribe_error(share_id, error)
1021
def handle_VM_SHARE_UNSUBSCRIBED(self, share):
1022
"""Handle VM_SHARE_UNSUBSCRIBED event, emit ShareUnSubscribed."""
1023
self.interface.shares.emit_share_unsubscribed(share)
1025
def handle_VM_SHARE_UNSUBSCRIBE_ERROR(self, share_id, error):
1026
"""Handle VM_SHARE_UNSUBSCRIBE_ERROR, emit ShareUnSubscribeError."""
1027
self.interface.shares.emit_share_unsubscribe_error(share_id, error)
1029
def handle_VM_SHARE_CREATED(self, share_id):
1030
"""Handle VM_SHARE_CREATED event, emit NewShare event."""
1031
self.interface.shares.emit_new_share(share_id)
1033
def handle_VM_SHARE_DELETED(self, share):
1034
"""Handle VM_SHARE_DELETED event, emit NewShare event."""
1035
self.interface.shares.emit_share_changed('deleted', share)
1037
def handle_VM_SHARE_DELETE_ERROR(self, share_id, error):
1038
"""Handle VM_DELETE_SHARE_ERROR event, emit ShareCreateError signal."""
1039
self.interface.shares.ShareDeleteError(dict(volume_id=share_id), error)
1041
def handle_VM_VOLUME_DELETED(self, volume):
1042
"""Handle VM_VOLUME_DELETED event.
1044
Emits FolderDeleted or ShareChanged signal.
1047
from ubuntuone.syncdaemon.volume_manager import Share, UDF
1049
if isinstance(volume, Share):
1050
self.interface.shares.emit_share_changed('deleted', volume)
1051
elif isinstance(volume, UDF):
1052
self.interface.folders.emit_folder_deleted(volume)
1054
logger.error("Unable to handle VM_VOLUME_DELETE for "
1055
"volume_id=%r as it's not a share or UDF", volume.id)
1057
def handle_VM_VOLUME_DELETE_ERROR(self, volume_id, error):
1058
"""Handle VM_VOLUME_DELETE_ERROR event, emit ShareDeleted event."""
1059
from ubuntuone.syncdaemon.volume_manager import Share, UDF, \
1063
volume = self.interface.volume_manager.get_volume(volume_id)
1064
except VolumeDoesNotExist:
1065
logger.error("Unable to handle VM_VOLUME_DELETE_ERROR for "
1066
"volume_id=%r, no such volume.", volume_id)
1068
if isinstance(volume, Share):
1069
self.interface.shares.emit_share_delete_error(volume, error)
1070
elif isinstance(volume, UDF):
1071
self.interface.folders.emit_folder_delete_error(volume, error)
1073
logger.error("Unable to handle VM_VOLUME_DELETE_ERROR for "
1074
"volume_id=%r as it's not a share or UDF", volume_id)
1076
def handle_VM_SHARE_CHANGED(self, share_id):
1077
""" handle VM_SHARE_CHANGED event, emit's ShareChanged signal. """
1078
share = self.interface.volume_manager.shares.get(share_id)
1079
self.interface.shares.emit_share_changed('changed', share)
1081
def handle_AQ_CHANGE_PUBLIC_ACCESS_OK(self, share_id, node_id,
1082
is_public, public_url):
1083
"""Handle the AQ_CHANGE_PUBLIC_ACCESS_OK event."""
1084
self.interface.public_files.emit_public_access_changed(
1085
share_id, node_id, is_public, public_url)
1087
def handle_AQ_CHANGE_PUBLIC_ACCESS_ERROR(self, share_id, node_id, error):
1088
"""Handle the AQ_CHANGE_PUBLIC_ACCESS_ERROR event."""
1089
self.interface.public_files.emit_public_access_change_error(
1090
share_id, node_id, error)
1092
def handle_SYS_ROOT_MISMATCH(self, root_id, new_root_id):
1093
"""Handle the SYS_ROOT_MISMATCH event."""
1094
self.interface.sync.emit_root_mismatch(root_id, new_root_id)
1096
def handle_AQ_PUBLIC_FILES_LIST_OK(self, public_files):
1097
"""Handle the AQ_PUBLIC_FILES_LIST_OK event."""
1098
self.interface.public_files.emit_public_files_list(public_files)
1100
def handle_AQ_PUBLIC_FILES_LIST_ERROR(self, error):
1101
"""Handle the AQ_PUBLIC_FILES_LIST_ERROR event."""
1102
self.interface.public_files.emit_public_files_list_error(error)
1104
def handle_SYS_QUOTA_EXCEEDED(self, volume_id, free_bytes):
1105
"""Handle the SYS_QUOTA_EXCEEDED event."""
1106
from ubuntuone.syncdaemon.volume_manager import UDF
1108
volume = self.interface.volume_manager.get_volume(str(volume_id))
1111
if isinstance(volume, UDF):
1112
volume_dict = get_udf_dict(volume)
1114
# either a Share or Root
1115
volume_dict = get_share_dict(volume)
1117
# be sure that the volume has the most updated free bytes info
1118
volume_dict['free_bytes'] = str(free_bytes)
1120
self.interface.sync.emit_quota_exceeded(volume_dict)
1122
def handle_SYS_QUEUE_ADDED(self, command):
1123
"""Handle SYS_QUEUE_ADDED."""
1124
if isinstance(command, (Upload, Download)):
1125
self.interface.status.emit_content_queue_changed()
1127
self.interface.status.emit_metaqueue_changed()
1129
def handle_SYS_QUEUE_REMOVED(self, command):
1130
"""Handle SYS_QUEUE_REMOVED."""
1131
self.handle_SYS_QUEUE_ADDED(command)