~ubuntu-branches/ubuntu/oneiric/ubuntuone-client/oneiric

« back to all changes in this revision

Viewing changes to ubuntuone/platform/linux/tools.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2010-12-14 14:57:03 UTC
  • mto: This revision was merged to the branch mainline in revision 61.
  • Revision ID: james.westby@ubuntu.com-20101214145703-6rqksi59jbeb4xpy
Tags: upstream-1.5.1
ImportĀ upstreamĀ versionĀ 1.5.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# ubuntuone.syncdaemon.tools - tools for SyncDaemon
 
2
#
 
3
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
""" SyncDaemon Tools """
 
19
import dbus
 
20
import logging
 
21
import time
 
22
import sys
 
23
 
 
24
from ubuntuone.platform.linux.dbus_interface import (
 
25
    DBUS_IFACE_NAME,
 
26
    DBUS_IFACE_STATUS_NAME,
 
27
    DBUS_IFACE_SHARES_NAME,
 
28
    DBUS_IFACE_FOLDERS_NAME,
 
29
    DBUS_IFACE_SYNC_NAME,
 
30
    DBUS_IFACE_FS_NAME,
 
31
    DBUS_IFACE_PUBLIC_FILES_NAME,
 
32
    DBUS_IFACE_CONFIG_NAME,
 
33
)
 
34
from ubuntuone.syncdaemon.config import get_user_config
 
35
from dbus.lowlevel import SignalMessage, MethodCallMessage, ErrorMessage
 
36
from dbus.exceptions import DBusException
 
37
from twisted.internet import defer, reactor
 
38
from twisted.python.failure import Failure
 
39
 
 
40
 
 
41
def is_running(bus=None):
 
42
    """Check if there is a syncdaemon instance running or if
 
43
    the name is registered in the bus.
 
44
    """
 
45
    if bus is None:
 
46
        bus = dbus.SessionBus()
 
47
    return DBUS_IFACE_NAME in bus.list_names()
 
48
 
 
49
class ErrorSignal(Exception):
 
50
    pass
 
51
 
 
52
 
 
53
class DBusClient(object):
 
54
    """ Low level dbus client. To help testing the DBus interface. """
 
55
 
 
56
    def __init__(self, bus, path, interface, destination=DBUS_IFACE_NAME):
 
57
        """ create the instance """
 
58
        self.bus = bus
 
59
        self.path = path
 
60
        self.interface = interface
 
61
        self.destination = destination
 
62
 
 
63
    def send_signal(self, signal, *args):
 
64
        """ Send method with *args """
 
65
        msg = SignalMessage(self.path, self.interface,
 
66
                            signal)
 
67
        msg.set_no_reply(True)
 
68
        msg.append(*args)
 
69
        self.bus.send_message(msg)
 
70
 
 
71
    def call_method(self, method, *args, **kwargs):
 
72
        """ Call method with *args and **kwargs over dbus"""
 
73
        msg = MethodCallMessage(self.destination, self.path, self.interface,
 
74
                                method)
 
75
        msg.set_no_reply(True)
 
76
        # get the signature
 
77
        signature = kwargs.get('signature', None)
 
78
        if signature is not None:
 
79
            msg.append(signature=signature, *args)
 
80
        else:
 
81
            msg.append(*args)
 
82
        #gbet the reply/error handlers
 
83
        reply_handler = kwargs.get('reply_handler', None)
 
84
        error_handler = kwargs.get('error_handler', None)
 
85
        assert error_handler != None
 
86
 
 
87
        def parse_reply(message):
 
88
            """ handle the reply message"""
 
89
            if isinstance(message, ErrorMessage):
 
90
                return error_handler(DBusException(
 
91
                                    name=message.get_error_name(),
 
92
                                    *message.get_args_list()))
 
93
            args_list = message.get_args_list(utf8_strings=False,
 
94
                                                  byte_arrays=False)
 
95
            if reply_handler:
 
96
                if len(args_list) == 0:
 
97
                    reply_handler(None)
 
98
                elif len(args_list) == 1:
 
99
                    return reply_handler(args_list[0])
 
100
                else:
 
101
                    return reply_handler(tuple(args_list))
 
102
        return self.bus.send_message_with_reply(msg,
 
103
                                                reply_handler=parse_reply)
 
104
 
 
105
 
 
106
class SyncDaemonTool(object):
 
107
    """ Various utility methods to test/play with the SyncDaemon. """
 
108
 
 
109
    def __init__(self, bus):
 
110
        self.bus = bus
 
111
        self.last_event = 0
 
112
        self.delayed_call = None
 
113
        self.log = logging.getLogger('ubuntuone.SyncDaemon.SDTool')
 
114
 
 
115
    def _get_dict(self, a_dict):
 
116
        """ Converts a dict returned by dbus to a dict of strings. """
 
117
        str_dict = {}
 
118
        for key in a_dict:
 
119
            str_dict[key] = unicode(a_dict[key])
 
120
        return str_dict
 
121
 
 
122
    def wait_connected(self):
 
123
        """ Wait until syncdaemon is connected to the server. """
 
124
        self.log.debug('wait_connected')
 
125
        d = defer.Deferred()
 
126
        def check_connection_status():
 
127
            """ Check if the daemon is up and running. """
 
128
            # check if the syncdaemon is running
 
129
            # catch all errors, pylint: disable-msg=W0703
 
130
            try:
 
131
                self.bus.get_object(DBUS_IFACE_NAME, '/',
 
132
                                    follow_name_owner_changes=True)
 
133
                self.log.debug('wait_connected: Done!')
 
134
                d.callback(True)
 
135
            except Exception, e:
 
136
                self.log.debug('Not connected: %s', e)
 
137
                d.errback()
 
138
 
 
139
        reactor.callLater(.5, check_connection_status)
 
140
        return d
 
141
 
 
142
    def get_current_downloads(self):
 
143
        """ Return a deferred that 'll be called with the list
 
144
        of current downloads
 
145
        """
 
146
        d = defer.Deferred()
 
147
        def current_downloads():
 
148
            """ Call Status.current_downloads """
 
149
            status_client = DBusClient(self.bus, '/status',
 
150
                                       DBUS_IFACE_STATUS_NAME)
 
151
            status_client.call_method('current_downloads',
 
152
                                      reply_handler=reply_handler,
 
153
                                      error_handler=d.errback)
 
154
 
 
155
        def reply_handler(downloads):
 
156
            """ current downloads callback """
 
157
            downloads_str = []
 
158
            for download in downloads:
 
159
                downloads_str.append(self._get_dict(download))
 
160
            d.callback(downloads_str)
 
161
 
 
162
        reactor.callLater(0, current_downloads)
 
163
        return d
 
164
 
 
165
    def wait_all_downloads(self, verbose=False):
 
166
        """ Wait until there is no more pending downloads """
 
167
        self.log.debug('wait_all_downloads')
 
168
        d = self.get_current_downloads()
 
169
        def reply_handler(downloads):
 
170
            """ Check if the are downloads in progress, and reschelude a
 
171
            new check if there is at least one.
 
172
            """
 
173
            if verbose:
 
174
                sys.stdout.write(', %s' % str(len(downloads)))
 
175
                sys.stdout.flush()
 
176
            if len(downloads) > 0:
 
177
                self.log.debug('wait_all_downloads: %d', len(downloads))
 
178
                return self.get_current_downloads()
 
179
            else:
 
180
                self.log.debug('wait_all_downloads: No more downloads')
 
181
                return True
 
182
 
 
183
        if verbose:
 
184
            sys.stdout.write('\nchecking current downloads')
 
185
            sys.stdout.flush()
 
186
        d.addCallback(reply_handler)
 
187
        return d
 
188
 
 
189
    def get_current_uploads(self):
 
190
        """ Return a deferred that 'll be called with the list
 
191
        of current uploads
 
192
        """
 
193
        d = defer.Deferred()
 
194
        def current_uploads():
 
195
            """ Call Status.current_uploads """
 
196
            status_client = DBusClient(self.bus, '/status',
 
197
                                       DBUS_IFACE_STATUS_NAME)
 
198
            status_client.call_method('current_uploads',
 
199
                                      reply_handler=reply_handler,
 
200
                                      error_handler=d.errback)
 
201
 
 
202
        def reply_handler(uploads):
 
203
            """ reply handler """
 
204
            uploads_str = []
 
205
            for upload in uploads:
 
206
                uploads_str.append(self._get_dict(upload))
 
207
            d.callback(uploads_str)
 
208
 
 
209
        reactor.callLater(0, current_uploads)
 
210
        return d
 
211
 
 
212
    def wait_all_uploads(self, verbose=False):
 
213
        """ Wait until there is no more pending uploads """
 
214
        self.log.debug('wait_all_uploads')
 
215
        d = self.get_current_uploads()
 
216
 
 
217
        def reply_handler(uploads):
 
218
            """ Check if the are downloads in progress, and reschelude a
 
219
            new check if there is at least one.
 
220
            """
 
221
            if verbose:
 
222
                sys.stdout.write(', %s' % str(len(uploads)))
 
223
                sys.stdout.flush()
 
224
            if len(uploads) > 0:
 
225
                self.log.debug('wait_all_uploads: %d', len(uploads))
 
226
                return self.get_current_uploads()
 
227
            else:
 
228
                self.log.debug('wait_all_uploads: No more uploads')
 
229
                return True
 
230
 
 
231
        if verbose:
 
232
            sys.stdout.write('\nchecking current uploads')
 
233
            sys.stdout.flush()
 
234
 
 
235
        d.addCallback(reply_handler)
 
236
        return d
 
237
 
 
238
    def wait_no_more_events(self, last_event_interval, verbose=False):
 
239
        """ Wait until no more events are fired by the syncdaemon. """
 
240
        self.log.debug('wait_no_more_events')
 
241
        d = defer.Deferred()
 
242
        def check_last_event():
 
243
            """ Check if the daemon is connected and we didn't received event
 
244
            in the last_event_interval
 
245
            """
 
246
            current_time = time.time()
 
247
            if self.last_event and \
 
248
               current_time - self.last_event < last_event_interval:
 
249
                # keep it running in case this is the last event
 
250
                self.log.debug('rescheduling wait_no_more_events')
 
251
                if not self.delayed_call.active():
 
252
                    self.delayed_call = reactor.callLater(last_event_interval,
 
253
                                                          check_last_event)
 
254
                else:
 
255
                    self.delayed_call.reset(last_event_interval)
 
256
            else:
 
257
                self.log.debug('wait_no_more_events: No more events!')
 
258
                d.callback(True)
 
259
 
 
260
        if verbose:
 
261
            sys.stdout.write("Listening events")
 
262
            sys.stdout.flush()
 
263
        def event_handler(event_dict):
 
264
            """ update last_event and run checks """
 
265
            self.last_event = time.time()
 
266
            self.log.debug('wait_no_more_events - new event: %s - %s',
 
267
                           event_dict['event_name'], str(self.last_event))
 
268
            if verbose:
 
269
                sys.stdout.write('.')
 
270
                sys.stdout.flush()
 
271
            if self.delayed_call.active():
 
272
                self.delayed_call.reset(last_event_interval)
 
273
 
 
274
        self.bus.add_signal_receiver(event_handler, signal_name='Event')
 
275
        def cleanup(result):
 
276
            """ remove the signal handler """
 
277
            self.bus.remove_signal_receiver(event_handler,
 
278
                                             signal_name='Event')
 
279
            return result
 
280
        d.addBoth(cleanup)
 
281
        # in case the daemon already reached nirvana
 
282
        self.delayed_call = reactor.callLater(last_event_interval,
 
283
                                              check_last_event)
 
284
        return d
 
285
 
 
286
    def wait_for_nirvana(self, last_event_interval=5, verbose=False):
 
287
        """ Wait until the syncdaemon reachs nirvana. This is when there are:
 
288
            - the syncdaemon is connected
 
289
            - 0 transfers inprogress
 
290
            - no more events are fired in the event queue
 
291
        @param last_event_interval: the seconds to wait to determine that there
 
292
        is no more events in the queue and the daemon reached nirvana
 
293
        """
 
294
        self.log.debug('wait_for_nirvana')
 
295
        sd_client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
 
296
        d = defer.Deferred()
 
297
        sd_client.call_method('wait_for_nirvana', last_event_interval,
 
298
                              reply_handler=d.callback,
 
299
                              error_handler=d.errback)
 
300
        return d
 
301
 
 
302
    def accept_share(self, share_id):
 
303
        """ Accept the share with id: share_id. """
 
304
        self.log.debug('accept_share(%s)', share_id)
 
305
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
 
306
        d = self.wait_for_signal('ShareAnswerResponse',
 
307
                                 lambda info: info['volume_id']==share_id)
 
308
        shares_client.call_method('accept_share', share_id,
 
309
                                  reply_handler=lambda _: None,
 
310
                                  error_handler=d.errback)
 
311
        return d
 
312
 
 
313
    def reject_share(self, share_id):
 
314
        """ Reject the share with id: share_id. """
 
315
        self.log.debug('reject_share(%s)', share_id)
 
316
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
 
317
        d = self.wait_for_signal('ShareAnswerResponse',
 
318
                                    lambda info: info['volume_id']==share_id)
 
319
        shares_client.call_method('reject_share', share_id,
 
320
                                  reply_handler=lambda _: None,
 
321
                                  error_handler=d.errback)
 
322
        return d
 
323
 
 
324
    def get_shares(self):
 
325
        """ Get the list of shares (accepted or not) """
 
326
        self.log.debug('get_shares')
 
327
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
 
328
        d = defer.Deferred()
 
329
        def reply_handler(results):
 
330
            """ get_shares reply handler. """
 
331
            shares = []
 
332
            for result in results:
 
333
                shares.append(self._get_dict(result))
 
334
            self.log.debug('shares: %r', shares)
 
335
            d.callback(shares)
 
336
 
 
337
        shares_client.call_method('get_shares',
 
338
                                  reply_handler=reply_handler,
 
339
                                  error_handler=d.errback)
 
340
        return d
 
341
 
 
342
    def refresh_shares(self):
 
343
        """ Call refresh_shares method via DBus.
 
344
        Request a refresh of share list to the server.
 
345
        """
 
346
        self.log.debug('refresh_shares')
 
347
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
 
348
        d = defer.Deferred()
 
349
        shares_client.call_method('refresh_shares',
 
350
                                  reply_handler=d.callback,
 
351
                                  error_handler=d.errback)
 
352
        return d
 
353
 
 
354
    def offer_share(self, path, username, name, access_level):
 
355
        """ Offer a share at the specified path to user with id: username. """
 
356
        self.log.debug('offer_share(%s, %s, %s, %s)',
 
357
                   path, username, name, access_level)
 
358
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
 
359
        d = defer.Deferred()
 
360
        shares_client.call_method('create_share', path, username,
 
361
                                  name, access_level,
 
362
                                  reply_handler=d.callback,
 
363
                                  error_handler=d.errback)
 
364
        return d
 
365
 
 
366
    def list_shared(self):
 
367
        """ get the list of the shares "shared"/created/offered. """
 
368
        self.log.debug('list_shared')
 
369
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
 
370
        d = defer.Deferred()
 
371
        def reply_handler(results):
 
372
            """ get_shares reply handler. """
 
373
            shares = []
 
374
            for result in results:
 
375
                shares.append(self._get_dict(result))
 
376
            self.log.debug('shared: %r', shares)
 
377
            d.callback(shares)
 
378
        shares_client.call_method('get_shared',
 
379
                                  reply_handler=reply_handler,
 
380
                                  error_handler=d.errback)
 
381
        return d
 
382
 
 
383
    def wait_for_signals(self, signal_ok, signal_error,
 
384
                         dbus_iface=DBUS_IFACE_FOLDERS_NAME):
 
385
        """wait for one of the specified DBus signals
 
386
        this returns a deferred
 
387
 
 
388
        @param signal_ok: this will fire the deferred's callback
 
389
        @param signal_error: the will fire the deferred's errback
 
390
        @param dbus_iface: the interface the signal belongs to
 
391
        """
 
392
 
 
393
        d = defer.Deferred()
 
394
        def signal_handler(*args, **kwargs):
 
395
            """Signal handler"""
 
396
            member = kwargs.get('member', None)
 
397
            if member == signal_ok:
 
398
                d.callback(args)
 
399
            elif member == signal_error:
 
400
                d.errback(ErrorSignal(signal_error, args))
 
401
 
 
402
        # register signal handlers for each kind of error
 
403
        match = self.bus.add_signal_receiver(
 
404
            signal_handler, member_keyword='member', dbus_interface=dbus_iface)
 
405
        def remove_signal_receiver(r):
 
406
            # cleanup the signal receivers
 
407
            self.bus.remove_signal_receiver(match, dbus_interface=dbus_iface)
 
408
            return r
 
409
 
 
410
        d.addBoth(remove_signal_receiver)
 
411
        return d
 
412
 
 
413
    def create_folder(self, path):
 
414
        """Create a user defined folder in the specified path."""
 
415
        self.log.debug('create_folder')
 
416
        folders_client = DBusClient(self.bus, '/folders', DBUS_IFACE_FOLDERS_NAME)
 
417
 
 
418
        d = self.wait_for_signals('FolderCreated', 'FolderCreateError')
 
419
        folders_client.call_method('create', path,
 
420
                                  reply_handler=lambda _: None,
 
421
                                  error_handler=d.errback)
 
422
        return d
 
423
 
 
424
    def delete_folder(self, folder_id):
 
425
        """Delete a user defined folder given its id."""
 
426
        self.log.debug('delete_folder')
 
427
        folders_client = DBusClient(self.bus, '/folders', DBUS_IFACE_FOLDERS_NAME)
 
428
 
 
429
        d = self.wait_for_signals('FolderDeleted', 'FolderDeleteError')
 
430
        folders_client.call_method('delete', folder_id,
 
431
                                  reply_handler=lambda _: None,
 
432
                                  error_handler=d.errback)
 
433
        return d
 
434
 
 
435
    def subscribe_folder(self, folder_id):
 
436
        """Subscribe to a user defined folder given its id."""
 
437
        self.log.debug('subscribe_folder')
 
438
        folders_client = DBusClient(self.bus, '/folders', DBUS_IFACE_FOLDERS_NAME)
 
439
 
 
440
        d = self.wait_for_signals('FolderSubscribed', 'FolderSubscribeError')
 
441
        folders_client.call_method('subscribe', folder_id,
 
442
                                  reply_handler=lambda _: None,
 
443
                                  error_handler=d.errback)
 
444
        return d
 
445
 
 
446
    def unsubscribe_folder(self, folder_id):
 
447
        """Unsubscribe from a user defined folder given its id."""
 
448
        self.log.debug('unsubscribe_folder')
 
449
        folders_client = DBusClient(self.bus, '/folders', DBUS_IFACE_FOLDERS_NAME)
 
450
 
 
451
        d = self.wait_for_signals('FolderUnSubscribed', 'FolderUnSubscribeError')
 
452
        folders_client.call_method('unsubscribe', folder_id,
 
453
                                  reply_handler=lambda _: None,
 
454
                                  error_handler=d.errback)
 
455
        return d
 
456
 
 
457
    def get_folders(self):
 
458
        """Return the list of folders (a list of dicts)."""
 
459
        self.log.debug('get_folders')
 
460
        folders_client = DBusClient(self.bus, '/folders', DBUS_IFACE_FOLDERS_NAME)
 
461
        d = defer.Deferred()
 
462
        def reply_handler(results):
 
463
            """ get_folders reply handler. """
 
464
            folders = []
 
465
            for result in results:
 
466
                folders.append(self._get_dict(result))
 
467
            self.log.debug('folders: %r', folders)
 
468
            d.callback(folders)
 
469
        folders_client.call_method('get_folders',
 
470
                                  reply_handler=reply_handler,
 
471
                                  error_handler=d.errback)
 
472
        return d
 
473
 
 
474
    def get_folder_info(self, path):
 
475
        """Call the get_info method for a UDF path."""
 
476
        self.log.debug('get_info')
 
477
        client = DBusClient(self.bus, '/folders', DBUS_IFACE_FOLDERS_NAME)
 
478
        d = defer.Deferred()
 
479
        client.call_method('get_info', path,
 
480
                           reply_handler=d.callback,
 
481
                           error_handler=d.errback)
 
482
        return d
 
483
 
 
484
    def get_metadata(self, path):
 
485
        """ calls the exposed mtehod FileSystem.get_metadata using DBus. """
 
486
        self.log.debug('get_metadata(%s)', path)
 
487
        fs_client = DBusClient(self.bus, '/filesystem', DBUS_IFACE_FS_NAME)
 
488
        d = defer.Deferred()
 
489
        fs_client.call_method('get_metadata', path,
 
490
                              reply_handler=d.callback,
 
491
                              error_handler=d.errback)
 
492
        return d
 
493
 
 
494
    @defer.inlineCallbacks
 
495
    def change_public_access(self, path, is_public):
 
496
        """Change the public access for a given path."""
 
497
        self.log.debug('change_public_access(%s)', path)
 
498
        fs_client = DBusClient(self.bus, '/filesystem', DBUS_IFACE_FS_NAME)
 
499
        d = defer.Deferred()
 
500
        fs_client.call_method('get_metadata', path,
 
501
                              reply_handler=d.callback,
 
502
                              error_handler=d.errback)
 
503
        metadata = yield d
 
504
 
 
505
        pf_client = DBusClient(
 
506
            self.bus, '/publicfiles', DBUS_IFACE_PUBLIC_FILES_NAME)
 
507
        d = self.wait_for_signals(
 
508
            'PublicAccessChanged', 'PublicAccessChangeError',
 
509
            dbus_iface=DBUS_IFACE_PUBLIC_FILES_NAME)
 
510
        pf_client.call_method('change_public_access', metadata['share_id'],
 
511
                              metadata['node_id'], is_public,
 
512
                              reply_handler=lambda _: None,
 
513
                              error_handler=d.errback)
 
514
        (file_info,) = yield d
 
515
        defer.returnValue(file_info)
 
516
 
 
517
    def quit(self):
 
518
        """quit the syncdaemon"""
 
519
        self.log.debug('quit')
 
520
        # avoid triggering dbus activation while calling quit
 
521
        if not is_running(self.bus):
 
522
            return defer.succeed(None)
 
523
        sd_client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
 
524
        d = defer.Deferred()
 
525
        sd_client.call_method('quit',
 
526
                           reply_handler=d.callback,
 
527
                           error_handler=d.errback)
 
528
        def check(r):
 
529
            """wait 0.5 sec to return, to allow syncdaemon to shutdown"""
 
530
            d1 = defer.Deferred()
 
531
            reactor.callLater(0.5, d1.callback, r)
 
532
            return d1
 
533
        d.addCallback(check)
 
534
        return d
 
535
 
 
536
    def wait_for_signal(self, signal_name, filter):
 
537
        """wait for the specified DBus signal (the first received).
 
538
        'filter' is used to fire the deferred callback.
 
539
 
 
540
        @param signal_name: the signal name
 
541
        @param filter: a callable to filter signal, must return True
 
542
        """
 
543
        d = defer.Deferred()
 
544
        def signal_handler(result):
 
545
            """handle the signals and fires the call/errback"""
 
546
            try:
 
547
                if filter(result) and not d.called:
 
548
                    d.callback(result)
 
549
                # catch all exceptions, pylint: disable-msg=W0703
 
550
            except Exception, e:
 
551
                d.errback(Failure(e))
 
552
 
 
553
        match = self.bus.add_signal_receiver(signal_handler,
 
554
                                             signal_name=signal_name)
 
555
        def cleanup(result):
 
556
            """remove the signal receiver from the bus."""
 
557
            self.bus.remove_signal_receiver(match)
 
558
            return result
 
559
        d.addCallback(cleanup)
 
560
        return d
 
561
 
 
562
    def connect(self):
 
563
        """Connect syncdaemon"""
 
564
        sd_client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
 
565
        d = defer.Deferred()
 
566
        sd_client.call_method('connect',
 
567
                              reply_handler=d.callback,
 
568
                              error_handler=d.errback)
 
569
        return d
 
570
 
 
571
    def disconnect(self):
 
572
        """Disconnect syncdaemon"""
 
573
        sd_client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
 
574
        d = defer.Deferred()
 
575
        sd_client.call_method('disconnect',
 
576
                              reply_handler=d.callback,
 
577
                              error_handler=d.errback)
 
578
        return d
 
579
 
 
580
    def get_status(self):
 
581
        """Get the current_status dict"""
 
582
        d = defer.Deferred()
 
583
        status_client = DBusClient(self.bus, '/status',
 
584
                                   DBUS_IFACE_STATUS_NAME)
 
585
        def reply_handler(status):
 
586
            """The reply handler"""
 
587
            state_dict = self._get_dict(status)
 
588
            state_dict['is_connected'] = bool(state_dict['is_connected'])
 
589
            state_dict['is_online'] = bool(state_dict['is_online'])
 
590
            state_dict['is_error'] = bool(state_dict['is_error'])
 
591
            d.callback(state_dict)
 
592
        status_client.call_method('current_status',
 
593
                                  reply_handler=reply_handler,
 
594
                                  error_handler=d.errback)
 
595
        return d
 
596
 
 
597
    def waiting_metadata(self):
 
598
        """Return a description of the waiting metadata queue elements."""
 
599
        d = defer.Deferred()
 
600
        status_client = DBusClient(self.bus, '/status',
 
601
                                   DBUS_IFACE_STATUS_NAME)
 
602
        status_client.call_method('waiting_metadata',
 
603
                                  reply_handler=d.callback,
 
604
                                  error_handler=d.errback)
 
605
        return d
 
606
 
 
607
    def waiting_content(self):
 
608
        """Returns the waiting content queue elements."""
 
609
        d = defer.Deferred()
 
610
        status_client = DBusClient(self.bus, '/status',
 
611
                                   DBUS_IFACE_STATUS_NAME)
 
612
        status_client.call_method('waiting_content',
 
613
                                  reply_handler=d.callback,
 
614
                                  error_handler=d.errback)
 
615
        return d
 
616
 
 
617
    def schedule_next(self, share_id, node_id):
 
618
        """Make the command on the given share_id and node_id be next in the
 
619
        queue of waiting commands.
 
620
        """
 
621
        d = defer.Deferred()
 
622
        status_client = DBusClient(self.bus, '/status',
 
623
                                   DBUS_IFACE_STATUS_NAME)
 
624
        status_client.call_method('schedule_next', share_id, node_id,
 
625
                                  reply_handler=d.callback,
 
626
                                  error_handler=d.errback)
 
627
        return d
 
628
 
 
629
    def start(self):
 
630
        """Start syncdaemon using the StartServiceByName method
 
631
        if it's not running.
 
632
        """
 
633
        if not is_running(self.bus):
 
634
            wait_d = self.wait_for_signal('StatusChanged', lambda x: x)
 
635
            d = defer.Deferred()
 
636
            bus_client = DBusClient(self.bus, dbus.bus.BUS_DAEMON_PATH,
 
637
                                    dbus.bus.BUS_DAEMON_IFACE,
 
638
                                    dbus.bus.BUS_DAEMON_NAME)
 
639
            bus_client.call_method('StartServiceByName',
 
640
                                   DBUS_IFACE_NAME, 0,
 
641
                                   signature='su',
 
642
                                   reply_handler=d.callback,
 
643
                                   error_handler=d.errback)
 
644
            d.addCallback(lambda _: wait_d)
 
645
            return d
 
646
        else:
 
647
            return defer.succeed(None)
 
648
 
 
649
    def get_throttling_limits(self):
 
650
        """Return a dict with the read and write limits."""
 
651
        d = defer.Deferred()
 
652
        config_client = DBusClient(self.bus, '/config',
 
653
                                   DBUS_IFACE_CONFIG_NAME)
 
654
        config_client.call_method('get_throttling_limits',
 
655
                                  reply_handler=d.callback,
 
656
                                  error_handler=d.errback)
 
657
        return d
 
658
 
 
659
    def set_throttling_limits(self, read_limit, write_limit):
 
660
        """Set the read and write limits."""
 
661
        d = defer.Deferred()
 
662
        config_client = DBusClient(self.bus, '/config',
 
663
                                   DBUS_IFACE_CONFIG_NAME)
 
664
        config_client.call_method('set_throttling_limits',
 
665
                                  read_limit, write_limit,
 
666
                                  reply_handler=d.callback,
 
667
                                  error_handler=d.errback)
 
668
        return d
 
669
 
 
670
    def is_throttling_enabled(self):
 
671
        """Check if throttling is enabled."""
 
672
        d = defer.Deferred()
 
673
        config_client = DBusClient(self.bus, '/config',
 
674
                                   DBUS_IFACE_CONFIG_NAME)
 
675
        config_client.call_method('bandwidth_throttling_enabled',
 
676
                                  reply_handler=d.callback,
 
677
                                  error_handler=d.errback)
 
678
        return d
 
679
 
 
680
 
 
681
    def enable_throttling(self, enabled):
 
682
        """Enable/disable throttling."""
 
683
        d = defer.Deferred()
 
684
        config_client = DBusClient(self.bus, '/config',
 
685
                                   DBUS_IFACE_CONFIG_NAME)
 
686
        if enabled:
 
687
            meth = 'enable_bandwidth_throttling'
 
688
        else:
 
689
            meth = 'disable_bandwidth_throttling'
 
690
        config_client.call_method(meth,
 
691
                                  reply_handler=d.callback,
 
692
                                  error_handler=d.errback)
 
693
        return d
 
694
 
 
695
    def is_files_sync_enabled(self):
 
696
        """Check if files sync is enabled."""
 
697
        self.log.debug('is_files_sync_enabled')
 
698
        return get_user_config().get_files_sync_enabled()
 
699
 
 
700
    def enable_files_sync(self, enabled):
 
701
        """Enable/disable files sync."""
 
702
        self.log.debug('enable_files_sync %d', enabled)
 
703
        config = get_user_config()
 
704
        if config.get_files_sync_enabled():
 
705
            d = defer.Deferred()
 
706
            config_client = DBusClient(self.bus, '/config',
 
707
                                       DBUS_IFACE_CONFIG_NAME)
 
708
            config_client.call_method('set_files_sync_enabled',
 
709
                                      enabled,
 
710
                                      reply_handler=d.callback,
 
711
                                      error_handler=d.errback)
 
712
            config.set_files_sync_enabled(False)
 
713
            return d
 
714
        else:
 
715
            if enabled:
 
716
                config.set_files_sync_enabled(True)
 
717
                config.save()
 
718
                self.start()
 
719
            return defer.succeed(None)
 
720
 
 
721
    def refresh_volumes(self):
 
722
        """Call refresh_volumes method via DBus.
 
723
 
 
724
        Request the volumes list to the server.
 
725
        """
 
726
        self.log.debug('refresh_volumes')
 
727
        shares_client = DBusClient(self.bus, '/folders', DBUS_IFACE_FOLDERS_NAME)
 
728
        d = defer.Deferred()
 
729
        shares_client.call_method('refresh_volumes',
 
730
                                  reply_handler=d.callback,
 
731
                                  error_handler=d.errback)
 
732
        return d
 
733
 
 
734
    def rescan_from_scratch(self, volume_id):
 
735
        """Call rescan_from_scratch via DBus.
 
736
 
 
737
        Request a rescan from scratch for volume_id.
 
738
        """
 
739
        self.log.debug('rescan_from_scratch %r', volume_id)
 
740
        shares_client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
 
741
        d = defer.Deferred()
 
742
        shares_client.call_method('rescan_from_scratch', volume_id,
 
743
                                  reply_handler=d.callback,
 
744
                                  error_handler=d.errback)
 
745
        return d
 
746
 
 
747
    def get_dirty_nodes(self):
 
748
        """Call get_dirty_nodes via DBus.
 
749
 
 
750
        Return the list of dirty nodes.
 
751
        """
 
752
        self.log.debug('get_dirty_nodes')
 
753
        fs_client = DBusClient(self.bus, '/filesystem', DBUS_IFACE_FS_NAME)
 
754
        d = defer.Deferred()
 
755
        fs_client.call_method('get_dirty_nodes',
 
756
                                  reply_handler=d.callback,
 
757
                                  error_handler=d.errback)
 
758
        return d
 
759
 
 
760
 
 
761
# callbacks used by u1sdtool script
 
762
 
 
763
def show_shared(shares, out):
 
764
    """ Callback that prints the list of shared shares"""
 
765
    if len(shares) == 0:
 
766
        out.write("No shared\n")
 
767
    else:
 
768
        out.write("Shared list:\n")
 
769
    for share in shares:
 
770
        msg_template = '  id=%s name=%s accepted=%s ' + \
 
771
                'access_level=%s to=%s path=%s\n'
 
772
        out.write(msg_template % (share['volume_id'], share['name'],
 
773
                                  bool(share['accepted']), share['access_level'],
 
774
                                  share['other_username'],
 
775
                                  share['path']))
 
776
 
 
777
 
 
778
def show_folders(folders, out):
 
779
    """ Callback that prints the list of user defined folders"""
 
780
    if len(folders) == 0:
 
781
        out.write("No folders\n")
 
782
    else:
 
783
        out.write("Folder list:\n")
 
784
    for folder in folders:
 
785
        msg_template = '  id=%s subscribed=%s path=%s\n'
 
786
        out.write(msg_template % (folder['volume_id'],
 
787
                                  bool(folder['subscribed']),
 
788
                                  folder['path']))
 
789
 
 
790
 
 
791
def show_error(error, out):
 
792
    """Format an error when things go wrong"""
 
793
    try:
 
794
        raise error.value
 
795
    except ErrorSignal:
 
796
        signal, (args, retval) = error.value.args
 
797
        msg_template = u"%s: %s (%s)\n"
 
798
        fmtd_args = u", ".join("%s=%s"%(k, v) for k, v in args.items())
 
799
        out.write( msg_template % (signal, retval, fmtd_args) )
 
800
 
 
801
 
 
802
def show_shares(shares, out):
 
803
    """ Callback that print the list of shares"""
 
804
    if len(shares) == 0:
 
805
        out.write("No shares\n")
 
806
    else:
 
807
        out.write("Shares list:\n")
 
808
    for share in shares:
 
809
        out.write(' id=%s name=%s accepted=%s access_level=%s from=%s\n' % \
 
810
                  (share['volume_id'], share['name'], bool(share['accepted']),
 
811
                   share['access_level'], share['other_username']))
 
812
 
 
813
 
 
814
def show_path_info(result, path, out):
 
815
    """ print the path info to stdout"""
 
816
    out_encoding = out.encoding
 
817
    if out_encoding is None:
 
818
        out_encoding = 'utf-8'
 
819
    out.write(" File: %s\n" % path.decode(out_encoding, 'replace'))
 
820
    keys = list(result.keys())
 
821
    keys.sort()
 
822
    for key in keys:
 
823
        out.write("  %s: %s\n" % (key, result[key]))
 
824
 
 
825
 
 
826
def show_uploads(uploads, out):
 
827
    """ print the uploads to stdout"""
 
828
    if uploads:
 
829
        out.write("Current uploads:\n")
 
830
    else:
 
831
        out.write("Current uploads: 0\n")
 
832
    for upload in uploads:
 
833
        out.write("  path: %s\n" % upload['path'])
 
834
        out.write("    deflated size: %s\n" % \
 
835
                  upload.get('deflated_size', 'N/A'))
 
836
        out.write("    bytes written: %s\n" % upload['n_bytes_written'])
 
837
 
 
838
 
 
839
def show_downloads(downloads, out):
 
840
    """ print the downloas to stdout"""
 
841
    if downloads:
 
842
        out.write("Current downloads:\n")
 
843
    else:
 
844
        out.write("Current downloads: 0\n")
 
845
    for download in downloads:
 
846
        out.write("  path: %s\n" % download['path'])
 
847
        out.write("    deflated size: %s\n" % \
 
848
                  download.get('deflated_size', 'N/A'))
 
849
        out.write("    bytes read: %s\n" % download['n_bytes_read'])
 
850
 
 
851
 
 
852
def show_state(state_dict, out):
 
853
    """Print the state to out."""
 
854
    out.write("State: %s\n" % state_dict.pop('name'))
 
855
    for k, v in sorted(state_dict.items()):
 
856
        out.write("    %s: %s\n" % (k, v))
 
857
    out.write("\n")
 
858
 
 
859
 
 
860
def show_waiting_metadata(waiting_ops, out):
 
861
    """Print the waiting_metadata result.
 
862
 
 
863
    We receive an unordered dict, but always try to show first the
 
864
    share_id, then the node_id, then the path, and the rest in
 
865
    alphabetical order.
 
866
    """
 
867
    for op_name, op_data in waiting_ops:
 
868
        out.write(' ' + op_name)
 
869
        if not op_data:
 
870
            out.write('\n')
 
871
            continue
 
872
 
 
873
        # custom
 
874
        attributes = []
 
875
        for attr in ('share_id', 'node_id', 'path'):
 
876
            if attr in op_data:
 
877
                attributes.append(u"%s='%s'" % (attr, op_data.pop(attr)))
 
878
 
 
879
        # the rest, ordered
 
880
        for attr in sorted(op_data):
 
881
            attributes.append(u"%s='%s'" % (attr, op_data[attr]))
 
882
 
 
883
        out.write("(%s)\n" % (u', '.join(attributes),))
 
884
 
 
885
 
 
886
def show_waiting_content(waiting_ops, out):
 
887
    """Print the waiting_content result"""
 
888
    value_tpl = "operation='%(operation)s' node_id='%(node)s' " + \
 
889
            "share_id='%(share)s' path='%(path)s'"
 
890
    for value in waiting_ops:
 
891
        str_value = value_tpl % value
 
892
        out.write("%s\n" % str_value)
 
893
 
 
894
 
 
895
def show_public_file_info(file_info, out):
 
896
    """Print the public access information for a file."""
 
897
    if file_info['is_public']:
 
898
        out.write("File is published at %s\n" % file_info['public_url'])
 
899
    else:
 
900
        out.write("File is not published\n")
 
901
 
 
902
 
 
903
def show_dirty_nodes(nodes, out):
 
904
    """Print the list of dirty nodes."""
 
905
    if not nodes:
 
906
        out.write(" No dirty nodes.\n")
 
907
        return
 
908
    out_encoding = out.encoding
 
909
    if out_encoding is None:
 
910
        out_encoding = 'utf-8'
 
911
    node_line_tpl = "mdid: %(mdid)s volume_id: %(share_id)s " + \
 
912
            "node_id: %(node_id)s is_dir: %(is_dir)s path: %(path)s\n"
 
913
    out.write(" Dirty nodes:\n")
 
914
    for node in nodes:
 
915
        node['path'] = node['path'].decode(out_encoding, 'replace')
 
916
        out.write(node_line_tpl % node)