1
# ubuntuone.syncdaemon.dbus_interface - DBus Interface
3
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
""" DBUS interface module """
21
from ubuntuone.syncdaemon.event_queue import EVENTS
22
from ubuntuone.syncdaemon.interfaces import IMarker
25
DBUS_IFACE_NAME = 'com.ubuntuone.SyncDaemon'
26
DBUS_IFACE_SYNC_NAME = 'com.ubuntuone.SyncDaemon.SyncDaemon'
27
DBUS_IFACE_STATUS_NAME = DBUS_IFACE_NAME+'.Status'
28
DBUS_IFACE_EVENTS_NAME = DBUS_IFACE_NAME+'.Events'
29
DBUS_IFACE_FS_NAME = DBUS_IFACE_NAME+'.FileSystem'
30
DBUS_IFACE_SHARES_NAME = DBUS_IFACE_NAME+'.Shares'
32
# NetworkManager State constants
35
NM_STATE_CONNECTING = 2
36
NM_STATE_CONNECTED = 3
37
NM_STATE_DISCONNECTED = 4
38
# NM state -> events mapping
39
NM_STATE_EVENTS = {NM_STATE_CONNECTED: 'SYS_NET_CONNECTED',
40
NM_STATE_DISCONNECTED: 'SYS_NET_DISCONNECTED'}
43
class DBusExposedObject(dbus.service.Object):
44
""" Base class that provides some helper methods to
48
def __init__(self, bus_name, path):
49
""" creates the instance. """
50
dbus.service.Object.__init__(self, bus_name=bus_name,
51
object_path=self.path)
53
def bool_str(self, value):
54
""" return a string value that can be converted back to bool. """
55
return 'True' if value else ''
57
@dbus.service.signal(DBUS_IFACE_SYNC_NAME, signature='sa{ss}')
58
def SignalError(self, signal, extra_args):
59
""" An error ocurred while trying to emit a signal. """
62
def emit_signal_error(self, signal, extra_args):
63
""" emit's a Error signal. """
64
self.SignalError(signal, extra_args)
67
class Status(DBusExposedObject):
68
""" Represent the status of the syncdaemon """
70
def __init__(self, bus_name, dbus_iface):
71
""" Creates the instance.
73
@param bus: the BusName of this DBusExposedObject.
75
self.dbus_iface = dbus_iface
76
self.action_queue = dbus_iface.action_queue
77
self.fs_manager = dbus_iface.fs_manager
79
DBusExposedObject.__init__(self, bus_name=bus_name,
82
@dbus.service.method(DBUS_IFACE_STATUS_NAME,
83
in_signature='', out_signature='a{ss}')
84
def current_status(self):
85
""" return the current status of the system, one of: local_rescan,
86
offline, trying_to_connect, server_rescan or online.
88
state = self.dbus_iface.main.state.state
89
self.emit_status_changed(state)
90
state_dict = {'name':state.name,
91
'description':state.description,
92
'is_error':self.bool_str(state.is_error),
93
'is_connected':self.bool_str(state.is_connected),
94
'is_online':self.bool_str(state.is_online)}
97
@dbus.service.method(DBUS_IFACE_STATUS_NAME, out_signature='aa{ss}')
98
def current_downloads(self):
99
""" return list of files with a download in progress. """
100
current_downloads = []
101
for download in self.action_queue.downloading:
102
path = self.fs_manager.get_abspath(download[0],
103
self.fs_manager.get_by_node_id(*download).path)
104
info = self.action_queue.downloading[download]
105
entry = {'path':path,
106
'share_id':download[0],
107
'node_id':download[1],
108
'deflated_size':str(info.get('deflated_size', -1)),
109
'n_bytes_read':str(info.get('n_bytes_read', -1))}
110
current_downloads.append(entry)
111
return current_downloads
113
@dbus.service.method(DBUS_IFACE_STATUS_NAME,
114
in_signature='', out_signature='aa{ss}')
115
def current_uploads(self):
116
""" return a list of files with a upload in progress """
118
for upload in self.action_queue.uploading:
119
share_id, node_id = upload
120
if IMarker.providedBy(node_id):
122
path = self.fs_manager.get_by_node_id(share_id, node_id).path
123
path = self.fs_manager.get_abspath(share_id, path)
124
info = self.action_queue.uploading[upload]
125
entry = {'path':path,
126
'share_id':upload[0],
128
'deflated_size':str(info.get('deflated_size', -1)),
129
'n_bytes_written':str(info.get('n_bytes_written', -1))}
130
current_uploads.append(entry)
131
return current_uploads
133
@dbus.service.signal(DBUS_IFACE_STATUS_NAME)
134
def DownloadStarted(self, path):
135
""" Fire a D-BUS signal, notifying a download has started. """
138
@dbus.service.signal(DBUS_IFACE_STATUS_NAME)
139
def DownloadFinished(self, path):
140
""" Fire a D-BUS signal, notifying a download has finished. """
143
@dbus.service.signal(DBUS_IFACE_STATUS_NAME)
144
def UploadStarted(self, path):
145
""" Fire a D-BUS signal, notifying an upload has started. """
148
@dbus.service.signal(DBUS_IFACE_STATUS_NAME)
149
def UploadFinished(self, path):
150
""" Fire a D-BUS signal, notifying an upload has finished. """
153
@dbus.service.signal(DBUS_IFACE_STATUS_NAME)
154
def StatusChanged(self, status):
155
""" Fire a D-BUS signal, notifying that the status of the
160
def emit_status_changed(self, state):
161
""" Emits the signal """
162
state_dict = {'name':state.name,
163
'description':self.bool_str(state.description),
164
'is_error':self.bool_str(state.is_error),
165
'is_connected':self.bool_str(state.is_connected),
166
'is_online':self.bool_str(state.is_online)}
167
self.StatusChanged(state_dict)
169
def emit_download_started(self, download):
170
""" Emits the signal """
171
self.DownloadStarted(download)
173
def emit_download_finished(self, download):
174
""" Emits the signal """
175
self.DownloadFinished(download)
177
def emit_upload_started(self, upload):
178
""" Emits the signal """
179
self.UploadStarted(upload)
181
def emit_upload_finished(self, upload):
182
""" Emits the signal """
183
self.UploadFinished(upload)
186
class Events(DBusExposedObject):
187
""" The events of the system translated to D-BUS signals """
189
def __init__(self, bus_name, event_queue):
190
""" Creates the instance.
192
@param bus: the BusName of this DBusExposedObject.
194
self.event_queue = event_queue
195
self.path = '/events'
196
DBusExposedObject.__init__(self, bus_name=bus_name,
199
@dbus.service.signal(DBUS_IFACE_EVENTS_NAME,
201
def Event(self, event_dict):
202
""" Fire a D-BUS signal, notifying an event. """
205
def emit_event(self, event):
206
""" Emits the signal """
208
for key, value in event.iteritems():
209
event_dict[str(key)] = str(value)
210
self.Event(event_dict)
212
@dbus.service.method(DBUS_IFACE_EVENTS_NAME, in_signature='sas')
213
def push_event(self, event_name, args):
214
""" Push a event to the event queue """
217
str_args.append(str(arg))
218
self.event_queue.push(str(event_name), *str_args)
221
class EventListener(object):
222
""" An Event Queue Listener """
224
def __init__(self, dbus_iface):
225
""" created the instance """
226
self.dbus_iface = dbus_iface
228
def _fire_event(self, event_dict):
229
""" Calls emit_event in a reactor thread. """
230
self.dbus_iface.events.emit_event(event_dict)
232
def handle_default(self, event_name, *args, **kwargs):
233
""" handle all events """
234
event_dict = {'event_name':event_name,}
235
event_args = list(EVENTS[event_name])
236
event_dict.update(kwargs)
237
for key in set(event_args).intersection(kwargs.keys()):
238
event_args.pop(event_args.index(key))
239
for i in xrange(0, len(event_args)):
240
event_dict[event_args[i]] = args[i]
241
event_dict.update(kwargs)
242
self._fire_event(event_dict)
244
def handle_AQ_DOWNLOAD_STARTED(self, share_id, node_id, server_hash):
245
""" handle AQ_DOWNLOAD_STARTED """
246
self.handle_default('AQ_DOWNLOAD_STARTED', share_id, node_id,
249
mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id)
250
path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path)
251
self.dbus_iface.status.emit_download_started(path)
253
args = dict(message='The md is gone before sending '
254
'DownloadStarted signal',
256
share_id=str(share_id),
257
node_id=str(node_id))
258
self.dbus_iface.status.emit_signal_error('DownloadStarted', args)
260
def handle_AQ_DOWNLOAD_FINISHED(self, share_id, node_id, server_hash):
261
""" handle AQ_DOWNLOAD_FINISHED """
262
self.handle_default('AQ_DOWNLOAD_FINISHED', share_id,
263
node_id, server_hash)
265
mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id)
266
path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path)
267
self.dbus_iface.status.emit_download_finished(path)
269
# file is gone before we got this
270
args = dict(message='The md is gone before sending '
271
'DownloadFinished signal',
273
share_id=str(share_id),
274
node_id=str(node_id))
275
self.dbus_iface.status.emit_signal_error('DownloadFinished', args)
277
def handle_AQ_UPLOAD_STARTED(self, share_id, node_id, hash):
278
""" handle AQ_UPLOAD_STARTED """
279
self.handle_default('AQ_UPLOAD_STARTED', share_id, node_id, hash)
281
mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id)
282
path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path)
283
self.dbus_iface.status.emit_upload_started(path)
285
args = dict(message='The md is gone before sending '
286
'UploadStarted signal',
288
share_id=str(share_id),
289
node_id=str(node_id))
290
self.dbus_iface.status.emit_signal_error('UploadStarted', args)
292
def handle_AQ_UPLOAD_FINISHED(self, share_id, node_id, hash):
293
""" handle AQ_UPLOAD_FINISHED """
294
self.handle_default('AQ_UPLOAD_FINISHED', share_id, node_id, hash)
296
mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id)
297
path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path)
298
self.dbus_iface.status.emit_upload_finished(path)
300
# file is gone before we got this
301
args = dict(message='The metadata is gone before sending '
302
'UploadFinished signal',
304
share_id=str(share_id),
305
node_id=str(node_id))
306
self.dbus_iface.status.emit_signal_error('UploadFinished', args)
308
def handle_SYS_STATE_CHANGED(self, state):
309
""" handle SYS_STATE_CHANGED """
310
self.handle_default('SYS_STATE_CHANGED', state)
311
self.dbus_iface.status.emit_status_changed(state)
313
def handle_SV_SHARE_CHANGED(self, message, share):
314
""" handle SV_SHARE_CHANGED event, emit's ShareChanged signal. """
315
self.handle_default('SV_SHARE_CHANGED', message, share)
316
self.dbus_iface.shares.emit_share_changed(message, share)
318
def handle_AQ_CREATE_SHARE_OK(self, share_id, marker):
319
""" handle AQ_CREATE_SHARE_OK event, emit's ShareCreated signal. """
320
self.handle_default('AQ_CREATE_SHARE_OK', share_id, marker)
321
share = self.dbus_iface.volume_manager.shared.get(share_id)
324
share_dict.update(self.dbus_iface.shares._get_share_dict(share))
326
share_dict.update(dict(share_id=share_id))
327
self.dbus_iface.shares.emit_share_created(share_dict)
329
def handle_AQ_CREATE_SHARE_ERROR(self, marker, error):
330
""" handle AQ_CREATE_SHARE_ERROR event, emit's ShareCreateError signal.
332
self.handle_default('AQ_CREATE_SHARE_ERROR', marker, error)
333
self.dbus_iface.shares.emit_share_create_error(dict(marker=marker),
337
class SyncDaemon(DBusExposedObject):
338
""" The Daemon dbus interface. """
340
def __init__(self, bus_name, dbus_iface):
341
""" Creates the instance.
343
@param bus: the BusName of this DBusExposedObject.
345
self.dbus_iface = dbus_iface
347
DBusExposedObject.__init__(self, bus_name=bus_name,
350
@dbus.service.method(DBUS_IFACE_SYNC_NAME,
351
in_signature='', out_signature='')
353
""" Connect to the server. """
354
self.dbus_iface.connect()
356
@dbus.service.method(DBUS_IFACE_SYNC_NAME,
357
in_signature='', out_signature='')
358
def disconnect(self):
359
""" Disconnect from the server. """
360
self.dbus_iface.disconnect()
362
@dbus.service.method(DBUS_IFACE_SYNC_NAME,
363
in_signature='', out_signature='s')
364
def get_rootdir(self):
365
""" Returns the root dir/mount point. """
366
return self.dbus_iface.main.get_rootdir()
368
@dbus.service.method(DBUS_IFACE_SYNC_NAME,
369
in_signature='d', out_signature='b',
370
async_callbacks=('reply_handler', 'error_handler'))
371
def wait_for_nirvana(self, last_event_interval,
372
reply_handler=None, error_handler=None):
373
""" call the reply handler when there are no more
376
d = self.dbus_iface.main.wait_for_nirvana(last_event_interval)
377
d.addCallbacks(reply_handler, error_handler)
380
@dbus.service.method(DBUS_IFACE_SYNC_NAME,
381
in_signature='s', out_signature='')
382
def query_by_path(self, path):
383
""" Request a query of the node identified by path. """
384
mdobj = self.dbus_iface.fs_manager.get_by_path(path.encode('utf-8'))
385
items = [(mdobj.share_id, mdobj.node_id, "")]
386
self.dbus_iface.action_queue.query(items)
388
@dbus.service.method(DBUS_IFACE_SYNC_NAME,
389
in_signature='', out_signature='',
390
async_callbacks=('reply_handler', 'error_handler'))
391
def quit(self, reply_handler=None, error_handler=None):
392
""" shutdown the syncdaemon. """
393
self.dbus_iface.log.debug('Quit requested')
396
self.dbus_iface.quit()
398
class FileSystem(DBusExposedObject):
399
""" A dbus interface to the FileSystem Manager. """
401
def __init__(self, bus_name, fs_manager):
402
""" Creates the instance. """
403
self.fs_manager = fs_manager
404
self.path = '/filesystem'
405
DBusExposedObject.__init__(self, bus_name=bus_name,
408
@dbus.service.method(DBUS_IFACE_FS_NAME,
409
in_signature='s', out_signature='a{ss}')
410
def get_metadata(self, path):
411
""" returns the dict with the attributes of the metadata
412
for the specified path.
414
mdobj = self.fs_manager.get_by_path(path.encode('utf-8'))
416
for k, v in mdobj.__dict__.items():
419
md_dict[str(k)] = str(v)
420
if mdobj.__dict__.get('info', None):
421
for k, v in mdobj.info.__dict__.items():
422
md_dict['info_'+str(k)] = str(v)
426
class Shares(DBusExposedObject):
427
""" A dbus interface to interact wiht shares """
429
def __init__(self, bus_name, fs_manager, volume_manager):
430
""" Creates the instance. """
431
self.fs_manager = fs_manager
432
self.vm = volume_manager
433
self.path = '/shares'
434
DBusExposedObject.__init__(self, bus_name=bus_name,
437
@dbus.service.method(DBUS_IFACE_SHARES_NAME,
438
in_signature='', out_signature='aa{ss}')
439
def get_shares(self):
440
""" returns a list of dicts, each dict represents a share """
442
for share_id, share in self.vm.shares.items():
445
share_dict = self._get_share_dict(share)
446
shares.append(share_dict)
449
@dbus.service.method(DBUS_IFACE_SHARES_NAME,
450
in_signature='s', out_signature='',
451
async_callbacks=('reply_handler', 'error_handler'))
452
def accept_share(self, share_id, reply_handler=None, error_handler=None):
453
""" Accepts a share. """
454
if str(share_id) in self.vm.shares:
455
d = self.vm.accept_share(str(share_id), True)
456
d.addCallbacks(lambda _: reply_handler(), error_handler)
458
error_handler(ValueError("The share with id: %s don't exists" % \
461
@dbus.service.method(DBUS_IFACE_SHARES_NAME,
462
in_signature='s', out_signature='',
463
async_callbacks=('reply_handler', 'error_handler'))
464
def reject_share(self, share_id, reply_handler=None, error_handler=None):
465
""" Rejects a share. """
466
if str(share_id) in self.vm.shares:
467
d = self.vm.accept_share(str(share_id), False)
468
d.addCallbacks(lambda _: reply_handler(), error_handler)
470
error_handler(ValueError("The share with id: %s don't exists" % \
473
@dbus.service.signal(DBUS_IFACE_SHARES_NAME,
475
def ShareChanged(self, share_dict):
476
""" A share changed, share_dict contains all the share attributes. """
479
@dbus.service.signal(DBUS_IFACE_SHARES_NAME,
481
def ShareDeleted(self, share_dict):
482
""" A share was deleted, share_dict contains all available
483
share attributes. """
486
def emit_share_changed(self, message, share):
487
""" emits ShareChanged or ShareDeleted signal for the share
490
if message == 'deleted':
491
self.ShareDeleted(self._get_share_dict(share))
492
elif message == 'changed':
493
share = self.vm.shares[share.share_id]
494
self.ShareChanged(self._get_share_dict(share))
496
def _get_share_dict(self, share):
497
""" get a dict with all the attributes of: share. """
498
share_dict = share.__dict__.copy()
499
for k, v in share_dict.items():
501
share_dict[unicode(k)] = ''
503
share_dict[unicode(k)] = v.decode('utf-8')
505
share_dict[unicode(k)] = unicode(v)
508
@dbus.service.method(DBUS_IFACE_SHARES_NAME,
509
in_signature='ssss', out_signature='')
510
def create_share(self, path, username, name, access_level):
511
""" Share a subtree to the user identified by username.
513
@param path: that path to share (the root of the subtree)
514
@param username: the username to offer the share to
515
@param name: the name of the share
516
@param access_level: 'View' or 'Modify'
518
path = path.encode("utf8")
519
username = unicode(username)
521
access_level = str(access_level)
523
self.fs_manager.get_by_path(path)
525
raise ValueError("path '%r' does not exist" % path)
526
self.vm.create_share(path, username, name, access_level)
528
@dbus.service.signal(DBUS_IFACE_SHARES_NAME,
530
def ShareCreated(self, share_info):
531
""" The requested share was succesfully created. """
534
@dbus.service.signal(DBUS_IFACE_SHARES_NAME,
536
def ShareCreateError(self, share_info, error):
537
""" An error ocurred while creating the share. """
540
def emit_share_created(self, share_info):
541
""" emits ShareCreated signal """
542
self.ShareCreated(share_info)
544
def emit_share_create_error(self, share_info, error):
545
""" emits ShareDeleted signal """
546
self.ShareCreateError(share_info, error)
548
@dbus.service.method(DBUS_IFACE_SHARES_NAME,
549
in_signature='', out_signature='')
550
def refresh_shares(self):
551
""" Refresh the share list, requesting it to the server. """
552
self.vm.refresh_shares()
554
@dbus.service.method(DBUS_IFACE_SHARES_NAME,
555
in_signature='', out_signature='aa{ss}')
556
def get_shared(self):
557
""" returns a list of dicts, each dict represents a shared share.
558
A share might not have the path set, as we might be still fetching the
559
nodes from the server. In this cases the path is ''
562
for share_id, share in self.vm.shared.items():
565
share_dict = self._get_share_dict(share)
566
shares.append(share_dict)
570
class DBusInterface(object):
571
""" Holder of all DBus exposed objects """
574
def __init__(self, bus, main, system_bus=None):
575
""" Create the instance and add the exposed object to the
578
self.log = logging.getLogger("ubuntuone.SyncDaemon.DBus")
581
self.event_queue = main.event_q
582
self.action_queue = main.action_q
583
self.fs_manager = main.fs
584
self.volume_manager = main.vm
585
self.busName = dbus.service.BusName(DBUS_IFACE_NAME, bus=self.bus)
586
self.status = Status(self.busName, self)
587
self.events = Events(self.busName, self.event_queue)
588
self.event_listener = EventListener(self)
589
self.sync = SyncDaemon(self.busName, self)
590
self.fs = FileSystem(self.busName, self.fs_manager)
591
self.shares = Shares(self.busName, self.fs_manager,
593
if system_bus is None and not DBusInterface.test:
594
self.log.debug('using the real system bus')
595
self.system_bus = self.bus.get_system()
596
elif system_bus is None and DBusInterface.test:
597
# this is just for the case when test_sync instatiate Main for
598
# running it's tests as pqm don't have a system bus running
599
self.log.debug('using the session bus as system bus')
600
self.system_bus = self.bus
602
self.system_bus = system_bus
604
self.event_queue.subscribe(self.event_listener)
605
# on initialization, fake a SYS_NET_CONNECTED if appropriate
606
if DBusInterface.test:
607
# testing under sync; just do it
608
self.log.debug('using the fake NetworkManager')
609
self.connection_state_changed(NM_STATE_CONNECTED)
611
def error_handler(error):
613
Handle errors from NM
616
"Error while getting the NetworkManager state %s",
618
# If we get an error back from NetworkManager, we should
619
# just try to connect anyway; it probably means that
620
# NetworkManager is down or broken or something.
621
self.connection_state_changed(NM_STATE_CONNECTED)
623
nm = self.system_bus.get_object(
624
'org.freedesktop.NetworkManager',
625
'/org/freedesktop/NetworkManager')
626
except dbus.DBusException, e:
627
if e.get_dbus_name() == \
628
'org.freedesktop.DBus.Error.ServiceUnknown':
629
# NetworkManager isn't running.
630
self.log.warn("Unable to connect to NetworkManager. "
631
"Assuming we have network.")
632
self.connection_state_changed(NM_STATE_CONNECTED)
636
nm.Get('org.freedesktop.NetworkManager', 'State',
637
reply_handler=self.connection_state_changed,
638
error_handler=error_handler,
639
dbus_interface='org.freedesktop.DBus.Properties')
640
# register a handler to NM StateChanged signal
641
self.system_bus.add_signal_receiver(self.connection_state_changed,
642
signal_name='StateChanged',
643
dbus_interface='org.freedesktop.NetworkManager',
644
path='/org/freedesktop/NetworkManager')
648
""" remove the registered object from the bus and unsubscribe from the
651
self.log.debug('Shuttingdown DBusInterface!')
652
self.status.remove_from_connection()
653
self.events.remove_from_connection()
654
self.sync.remove_from_connection()
655
self.fs.remove_from_connection()
656
self.shares.remove_from_connection()
657
self.event_queue.unsubscribe(self.event_listener)
658
# remove the NM's StateChanged signal receiver
659
self.system_bus.remove_signal_receiver(self.connection_state_changed,
660
signal_name='StateChanged',
661
dbus_interface='org.freedesktop.NetworkManager',
662
path='/org/freedesktop/NetworkManager')
663
self.busName.get_bus().release_name(self.busName.get_name())
665
def connection_state_changed(self, state):
666
""" Push a connection state changed event to the Event Queue. """
667
event = NM_STATE_EVENTS.get(state, None)
668
if event is not None:
669
self.event_queue.push(event)
672
""" Push the SYS_CONNECT event with the tokens in the keyring. """
673
from ubuntuone.syncdaemon.main import NoAccessToken
675
access_token = self.main.get_access_token()
676
self.event_queue.push('SYS_CONNECT', access_token)
677
except NoAccessToken, e:
678
self.log.exception("Can't get the auth token")
680
def disconnect(self):
681
""" Push the SYS_DISCONNECT event. """
682
self.event_queue.push('SYS_DISCONNECT')
685
""" calls Main.quit. """
686
self.log.debug('Calling Main.quit')