~ubuntuone-control-tower/ubuntuone-client/trunk

« back to all changes in this revision

Viewing changes to canonical/ubuntuone/storage/u1sync/client.py

  • Committer: Rodney Dawes
  • Date: 2009-05-12 13:36:05 UTC
  • Revision ID: rodney.dawes@canonical.com-20090512133605-6aqs6e8xnnmp5u1p
        Import the code
        Hook up lint/trial tests in setup.py
        Use icontool now instead of including the render script
        Add missing python-gnome2-desktop to package dependencies
        Update debian/rules to fix the icon cache issue

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# canonical.ubuntuone.storage.u1sync.client
 
2
#
 
3
# Client/protocol end of u1sync
 
4
#
 
5
# Author: Lucio Torre <lucio.torre@canonical.com>
 
6
# Author: Tim Cole <tim.cole@canonical.com>
 
7
#
 
8
# Copyright 2009 Canonical Ltd.
 
9
#
 
10
# This program is free software: you can redistribute it and/or modify it
 
11
# under the terms of the GNU General Public License version 3, as published
 
12
# by the Free Software Foundation.
 
13
#
 
14
# This program is distributed in the hope that it will be useful, but
 
15
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
16
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
17
# PURPOSE.  See the GNU General Public License for more details.
 
18
#
 
19
# You should have received a copy of the GNU General Public License along
 
20
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
21
"""Pretty API for protocol client."""
 
22
 
 
23
from __future__ import with_statement
 
24
 
 
25
import os
 
26
import sys
 
27
import shutil
 
28
from Queue import Queue, Empty
 
29
from threading import Thread, Lock
 
30
import zlib
 
31
import urlparse
 
32
import ConfigParser
 
33
from cStringIO import StringIO
 
34
 
 
35
from twisted.internet import reactor, defer, ssl
 
36
from twisted.internet.defer import inlineCallbacks, returnValue
 
37
from canonical.ubuntuone.storage.protocol.hash import crc32
 
38
from canonical.ubuntuone.oauthdesktop.config import get_config \
 
39
                                                 as get_oauth_config
 
40
from canonical.ubuntuone.oauthdesktop.auth import AuthorisationClient
 
41
from canonical.ubuntuone.storage.u1sync.genericmerge import MergeNode
 
42
from canonical.ubuntuone.storage.u1sync.utils import should_sync
 
43
 
 
44
CONSUMER_KEY = "ubuntuone"
 
45
 
 
46
from canonical.ubuntuone.storage.protocol.oauth import OAuthConsumer
 
47
from canonical.ubuntuone.storage.protocol.client import (
 
48
    StorageClientFactory, StorageClient)
 
49
from canonical.ubuntuone.storage.protocol import request
 
50
from canonical.ubuntuone.storage.protocol.dircontent_pb2 import \
 
51
    DirectoryContent, DIRECTORY
 
52
import uuid
 
53
 
 
54
def share_str(share_uuid):
 
55
    """Converts a share UUID to a form the protocol likes."""
 
56
    return str(share_uuid) if share_uuid is not None else request.ROOT
 
57
 
 
58
 
 
59
class SyncStorageClient(StorageClient):
 
60
    """Simple client that calls a callback on connection."""
 
61
 
 
62
    def connectionMade(self):
 
63
        """Setup and call callback."""
 
64
        StorageClient.connectionMade(self)
 
65
        if self.factory.current_protocol not in (None, self):
 
66
            self.factory.current_protocol.transport.loseConnection()
 
67
        self.factory.current_protocol = self
 
68
        self.factory.observer.connected()
 
69
 
 
70
    def connectionLost(self, reason=None):
 
71
        """Callback for established connection lost"""
 
72
        if self.factory.current_protocol is self:
 
73
            self.factory.current_protocol = None
 
74
            self.factory.observer.disconnected(reason)
 
75
 
 
76
 
 
77
class SyncClientFactory(StorageClientFactory):
 
78
    """A cmd protocol factory."""
 
79
    # no init: pylint: disable-msg=W0232
 
80
 
 
81
    protocol = SyncStorageClient
 
82
 
 
83
    def __init__(self, observer):
 
84
        """Create the factory"""
 
85
        self.observer = observer
 
86
        self.current_protocol = None
 
87
 
 
88
    def clientConnectionFailed(self, connector, reason):
 
89
        """We failed at connecting."""
 
90
        self.current_protocol = None
 
91
        self.observer.connection_failed(reason)
 
92
 
 
93
 
 
94
class UnsupportedOperationError(Exception):
 
95
    """The operation is unsupported by the protocol version."""
 
96
 
 
97
 
 
98
class ConnectionError(Exception):
 
99
    """A connection error."""
 
100
 
 
101
 
 
102
class AuthenticationError(Exception):
 
103
    """An authentication error."""
 
104
 
 
105
 
 
106
class NoSuchShareError(Exception):
 
107
    """Error when there is no such share available."""
 
108
 
 
109
 
 
110
class Client(object):
 
111
    """U1 storage client facade."""
 
112
 
 
113
    def __init__(self, realm):
 
114
        """Create the instance."""
 
115
 
 
116
        self.thread = Thread(target=self._run)
 
117
        self.thread.setDaemon(True)
 
118
        self.factory = SyncClientFactory(self)
 
119
 
 
120
        self._status_lock = Lock()
 
121
        self._status = "disconnected"
 
122
        self._status_reason = None
 
123
        self._status_waiting = []
 
124
 
 
125
        self.realm = realm
 
126
 
 
127
        oauth_config = get_oauth_config()
 
128
        if oauth_config.has_section(realm):
 
129
            config_section = realm
 
130
        elif self.realm.startswith("http://localhost") and \
 
131
             oauth_config.has_section("http://localhost"):
 
132
            config_section = "http://localhost"
 
133
        else:
 
134
            config_section = "default"
 
135
 
 
136
        def get_oauth_option(option):
 
137
            """Retrieves an option from oauth config."""
 
138
            try:
 
139
                return oauth_config.get(config_section, option)
 
140
            except ConfigParser.NoOptionError:
 
141
                return oauth_config.get("default", option)
 
142
 
 
143
        def get_oauth_url(option):
 
144
            """Retrieves an absolutized URL from the OAuth config."""
 
145
            suffix = get_oauth_option(option)
 
146
            return urlparse.urljoin(realm, suffix)
 
147
 
 
148
        self.consumer_key = CONSUMER_KEY
 
149
        self.consumer_secret = get_oauth_option("consumer_secret")
 
150
 
 
151
        self.request_token_url = get_oauth_url("request_token_url")
 
152
        self.user_authorisation_url = get_oauth_url("user_authorisation_url")
 
153
        self.access_token_url = get_oauth_url("access_token_url")
 
154
 
 
155
    def obtain_oauth_token(self, create_token):
 
156
        """Obtains an oauth token, optionally creating one if requried."""
 
157
        token_result = Queue()
 
158
 
 
159
        def have_token(token):
 
160
            """When a token is available."""
 
161
            token_result.put(token)
 
162
 
 
163
        def no_token():
 
164
            """When no token is available."""
 
165
            token_result.put(None)
 
166
 
 
167
        oauth_client = AuthorisationClient(realm=self.realm,
 
168
                                           request_token_url=
 
169
                                           self.request_token_url,
 
170
                                           user_authorisation_url=
 
171
                                           self.user_authorisation_url,
 
172
                                           access_token_url=
 
173
                                           self.access_token_url,
 
174
                                           consumer_key=self.consumer_key,
 
175
                                           consumer_secret=
 
176
                                           self.consumer_secret,
 
177
                                           callback_parent=have_token,
 
178
                                           callback_denied=no_token,
 
179
                                           do_login=create_token)
 
180
 
 
181
        def _obtain_token():
 
182
            """Obtains or creates a token."""
 
183
            if create_token:
 
184
                oauth_client.clear_token()
 
185
            oauth_client.ensure_access_token()
 
186
 
 
187
        reactor.callFromThread(_obtain_token)
 
188
        token = token_result.get()
 
189
        if token is None:
 
190
            raise AuthenticationError("Unable to obtain OAuth token.")
 
191
        return token
 
192
 
 
193
    def _change_status(self, status, reason=None):
 
194
        """Changes the client status.  Usually called from the reactor
 
195
        thread.
 
196
 
 
197
        """
 
198
        with self._status_lock:
 
199
            self._status = status
 
200
            self._status_reason = reason
 
201
            waiting = self._status_waiting
 
202
            if len(waiting) > 0:
 
203
                self._status_waiting = []
 
204
                for waiter in waiting:
 
205
                    waiter.put((status, reason))
 
206
 
 
207
    def _await_status_not(self, *ignore_statuses):
 
208
        """Blocks until the client status changes, returning the new status.
 
209
        Should never be called from the reactor thread.
 
210
 
 
211
        """
 
212
        with self._status_lock:
 
213
            status = self._status
 
214
            reason = self._status_reason
 
215
            while status in ignore_statuses:
 
216
                waiter = Queue()
 
217
                self._status_waiting.append(waiter)
 
218
                self._status_lock.release()
 
219
                try:
 
220
                    status, reason = waiter.get()
 
221
                finally:
 
222
                    self._status_lock.acquire()
 
223
            return (status, reason)
 
224
 
 
225
    def connection_failed(self, reason):
 
226
        """Notification that connection failed."""
 
227
        self._change_status("disconnected", reason)
 
228
 
 
229
    def connected(self):
 
230
        """Notification that connection succeeded."""
 
231
        self._change_status("connected")
 
232
 
 
233
    def disconnected(self, reason):
 
234
        """Notification that we were disconnected."""
 
235
        self._change_status("disconnected", reason)
 
236
 
 
237
    def _run(self):
 
238
        """Run the reactor in bg."""
 
239
        reactor.run(installSignalHandlers=False)
 
240
 
 
241
    def start(self):
 
242
        """Start the reactor thread."""
 
243
        self.thread.start()
 
244
 
 
245
    def stop(self):
 
246
        """Shut down the reactor."""
 
247
        reactor.callWhenRunning(reactor.stop)
 
248
        self.thread.join(1.0)
 
249
 
 
250
    def defer_from_thread(self, function, *args, **kwargs):
 
251
        """Do twisted defer magic to get results and show exceptions."""
 
252
 
 
253
        queue = Queue()
 
254
        def runner():
 
255
            """inner."""
 
256
            # we do want to catch all
 
257
            # no init: pylint: disable-msg=W0703
 
258
            try:
 
259
                d = function(*args, **kwargs)
 
260
                if isinstance(d, defer.Deferred):
 
261
                    d.addCallbacks(lambda r: queue.put((r, None, None)),
 
262
                                   lambda f: queue.put((None, None, f)))
 
263
                else:
 
264
                    queue.put((d, None, None))
 
265
            except Exception, e:
 
266
                queue.put((None, sys.exc_info(), None))
 
267
 
 
268
        reactor.callFromThread(runner)
 
269
        while True:
 
270
            try:
 
271
                # poll with a timeout so that interrupts are still serviced
 
272
                result, exc_info, failure = queue.get(True, 1)
 
273
                break
 
274
            except Empty: # pylint: disable-msg=W0704
 
275
                pass
 
276
        if exc_info:
 
277
            raise exc_info[1], None, exc_info[2]
 
278
        elif failure:
 
279
            failure.raiseException()
 
280
        else:
 
281
            return result
 
282
 
 
283
    def connect(self, host, port):
 
284
        """Connect to host/port."""
 
285
        def _connect():
 
286
            """Deferred part."""
 
287
            reactor.connectTCP(host, port, self.factory)
 
288
        self._connect_inner(_connect)
 
289
 
 
290
    def connect_ssl(self, host, port):
 
291
        """Connect to host/port using ssl."""
 
292
        def _connect():
 
293
            """deferred part."""
 
294
            reactor.connectSSL(host, port, self.factory,
 
295
                               ssl.ClientContextFactory())
 
296
        self._connect_inner(_connect)
 
297
 
 
298
    def _connect_inner(self, _connect):
 
299
        """Helper function for connecting."""
 
300
        self._change_status("connecting")
 
301
        reactor.callFromThread(_connect)
 
302
        status, reason = self._await_status_not("connecting")
 
303
        if status != "connected":
 
304
            raise ConnectionError(reason.value)
 
305
 
 
306
    def disconnect(self):
 
307
        """Disconnect."""
 
308
        if self.factory.current_protocol is not None:
 
309
            reactor.callFromThread(
 
310
                self.factory.current_protocol.transport.loseConnection)
 
311
        self._await_status_not("connecting", "connected", "authenticated")
 
312
 
 
313
    def oauth_from_token(self, token):
 
314
        """Perform OAuth authorisation using an existing token."""
 
315
 
 
316
        consumer = OAuthConsumer(self.consumer_key, self.consumer_secret)
 
317
 
 
318
        def _auth_successful(value):
 
319
            """Callback for successful auth.  Changes status to
 
320
            authenticated."""
 
321
            self._change_status("authenticated")
 
322
            return value
 
323
 
 
324
        def _auth_failed(value):
 
325
            """Callback for failed auth.  Disconnects."""
 
326
            self.factory.current_protocol.transport.loseConnection()
 
327
            return value
 
328
 
 
329
        def _wrapped_authenticate():
 
330
            """Wrapped authenticate."""
 
331
            d = self.factory.current_protocol.oauth_authenticate(consumer,
 
332
                                                                 token)
 
333
            d.addCallbacks(_auth_successful, _auth_failed)
 
334
            return d
 
335
 
 
336
        try:
 
337
            self.defer_from_thread(_wrapped_authenticate)
 
338
        except request.StorageProtocolError, e:
 
339
            raise AuthenticationError(e)
 
340
        status, reason = self._await_status_not("connected")
 
341
        if status != "authenticated":
 
342
            raise AuthenticationError(reason.value)
 
343
 
 
344
    def get_root_info(self, share_uuid):
 
345
        """Returns the UUID of the applicable share root."""
 
346
        if share_uuid is None:
 
347
            _get_root = self.factory.current_protocol.get_root
 
348
            root = self.defer_from_thread(_get_root)
 
349
            return (uuid.UUID(root), True)
 
350
        else:
 
351
            str_share_uuid = str(share_uuid)
 
352
            share = self._match_share(lambda s: str(s.id) == str_share_uuid)
 
353
            return (uuid.UUID(str(share.subtree)),
 
354
                    share.access_level == "Modify")
 
355
 
 
356
    def find_share(self, share_spec):
 
357
        """Finds a share matching the given UUID.  Looks at both share UUIDs
 
358
        and root node UUIDs."""
 
359
        share = self._match_share(lambda s: str(s.id) == share_spec or \
 
360
                                            str(s.subtree) == share_spec)
 
361
        return uuid.UUID(str(share.id))
 
362
 
 
363
    def _match_share(self, predicate):
 
364
        """Finds a share matching the given predicate."""
 
365
        _list_shares = self.factory.current_protocol.list_shares
 
366
        r = self.defer_from_thread(_list_shares)
 
367
        for share in r.shares:
 
368
            if predicate(share) and share.direction == "to_me":
 
369
                return share
 
370
        raise NoSuchShareError()
 
371
 
 
372
    def build_tree(self, share_uuid, root_uuid):
 
373
        """Builds and returns a tree representing the metadata for the given
 
374
        subtree in the given share.
 
375
 
 
376
        @param share_uuid: the share UUID or None for the user's volume
 
377
        @param root_uuid: the root UUID of the subtree (must be a directory)
 
378
        @return: a MergeNode tree
 
379
 
 
380
        """
 
381
        root = MergeNode(node_type=DIRECTORY, uuid=root_uuid)
 
382
 
 
383
        @inlineCallbacks
 
384
        def _get_root_content_hash():
 
385
            """Obtain the content hash for the root node."""
 
386
            result = yield self._get_node_hashes(share_uuid, [root_uuid])
 
387
            returnValue(result.get(root_uuid, None))
 
388
 
 
389
        root.content_hash = self.defer_from_thread(_get_root_content_hash)
 
390
 
 
391
        @inlineCallbacks
 
392
        def _get_children(parent_uuid, parent_content_hash):
 
393
            """Obtain a sequence of MergeNodes corresponding to a node's
 
394
            immediate children.
 
395
 
 
396
            """
 
397
            entries = yield self._get_raw_dir_entries(share_uuid,
 
398
                                                      parent_uuid,
 
399
                                                      parent_content_hash)
 
400
            children = {}
 
401
            for entry in entries:
 
402
                if should_sync(entry.name):
 
403
                    child = MergeNode(node_type=entry.node_type,
 
404
                                      uuid=uuid.UUID(entry.node))
 
405
                    children[entry.name] = child
 
406
 
 
407
            child_uuids = [child.uuid for child in children.itervalues()]
 
408
            content_hashes = yield self._get_node_hashes(share_uuid,
 
409
                                                         child_uuids)
 
410
            for child in children.itervalues():
 
411
                child.content_hash = content_hashes.get(child.uuid, None)
 
412
 
 
413
            returnValue(children)
 
414
 
 
415
        need_children = [root]
 
416
        while need_children:
 
417
            node = need_children.pop()
 
418
            if node.content_hash is not None:
 
419
                children = self.defer_from_thread(_get_children, node.uuid,
 
420
                                                  node.content_hash)
 
421
                node.children = children
 
422
                for child in children.itervalues():
 
423
                    if child.node_type == DIRECTORY:
 
424
                        need_children.append(child)
 
425
 
 
426
        return root
 
427
 
 
428
    def _get_raw_dir_entries(self, share_uuid, node_uuid, content_hash):
 
429
        """Gets raw dir entries for the given directory."""
 
430
        d = self.factory.current_protocol.get_content(share_str(share_uuid),
 
431
                                                      str(node_uuid),
 
432
                                                      content_hash)
 
433
        d.addCallback(lambda c: zlib.decompress(c.data))
 
434
 
 
435
        def _parse_content(raw_content):
 
436
            """Parses directory content into a list of entry objects."""
 
437
            unserialized_content = DirectoryContent()
 
438
            unserialized_content.ParseFromString(raw_content)
 
439
            return list(unserialized_content.entries)
 
440
 
 
441
        d.addCallback(_parse_content)
 
442
        return d
 
443
 
 
444
    def download_string(self, share_uuid, node_uuid, content_hash):
 
445
        """Reads a file from the server into a string."""
 
446
        output = StringIO()
 
447
        self._download_inner(share_uuid=share_uuid, node_uuid=node_uuid,
 
448
                             content_hash=content_hash, output=output)
 
449
        return output.getValue()
 
450
 
 
451
    def download_file(self, share_uuid, node_uuid, content_hash, filename):
 
452
        """Downloads a file from the server."""
 
453
        partial_filename = "%s.u1partial" % filename
 
454
        output = open(partial_filename, "w")
 
455
 
 
456
        def rename_file():
 
457
            """Renames the temporary file to the final name."""
 
458
            output.close()
 
459
            os.rename(partial_filename, filename)
 
460
 
 
461
        def delete_file():
 
462
            """Deletes the temporary file."""
 
463
            output.close()
 
464
            os.unlink(partial_filename)
 
465
 
 
466
        self._download_inner(share_uuid=share_uuid, node_uuid=node_uuid,
 
467
                             content_hash=content_hash, output=output,
 
468
                             on_success=rename_file, on_failure=delete_file)
 
469
 
 
470
    def _download_inner(self, share_uuid, node_uuid, content_hash, output,
 
471
                        on_success=lambda: None, on_failure=lambda: None):
 
472
        """Helper function for content downloads."""
 
473
        dec = zlib.decompressobj()
 
474
 
 
475
        def write_data(data):
 
476
            """Helper which writes data to the output file."""
 
477
            uncompressed_data = dec.decompress(data)
 
478
            output.write(uncompressed_data)
 
479
 
 
480
        def finish_download(value):
 
481
            """Helper which finishes the download."""
 
482
            uncompressed_data = dec.flush()
 
483
            output.write(uncompressed_data)
 
484
            on_success()
 
485
            return value
 
486
 
 
487
        def abort_download(value):
 
488
            """Helper which aborts the download."""
 
489
            on_failure()
 
490
            return value
 
491
 
 
492
        def _download():
 
493
            """Async helper."""
 
494
            _get_content = self.factory.current_protocol.get_content
 
495
            d = _get_content(share_str(share_uuid), str(node_uuid),
 
496
                             content_hash, callback=write_data)
 
497
            d.addCallbacks(finish_download, abort_download)
 
498
            return d
 
499
 
 
500
        self.defer_from_thread(_download)
 
501
 
 
502
    def create_directory(self, share_uuid, parent_uuid, name):
 
503
        """Creates a directory on the server."""
 
504
        r = self.defer_from_thread(self.factory.current_protocol.make_dir,
 
505
                                   share_str(share_uuid), str(parent_uuid),
 
506
                                   name)
 
507
        return uuid.UUID(r.new_id)
 
508
 
 
509
    def create_file(self, share_uuid, parent_uuid, name):
 
510
        """Creates a file on the server."""
 
511
        r = self.defer_from_thread(self.factory.current_protocol.make_file,
 
512
                                   share_str(share_uuid), str(parent_uuid),
 
513
                                   name)
 
514
        return uuid.UUID(r.new_id)
 
515
 
 
516
    def create_symlink(self, share_uuid, parent_uuid, name, target):
 
517
        """Creates a symlink on the server."""
 
518
        raise UnsupportedOperationError("Protocol does not support symlinks")
 
519
 
 
520
    def upload_string(self, share_uuid, node_uuid, old_content_hash,
 
521
                      content_hash, content):
 
522
        """Uploads a string to the server as file content."""
 
523
        crc32 = crc32(content, 0)
 
524
        compressed_content = zlib.compress(content, 9)
 
525
        compressed = StringIO(compressed_content)
 
526
        self.defer_from_thread(self.factory.current_protocol.put_content,
 
527
                               share_str(share_uuid), str(node_uuid),
 
528
                               old_content_hash, content_hash,
 
529
                               crc32, len(content), len(compressed_content),
 
530
                               compressed)
 
531
 
 
532
    def upload_file(self, share_uuid, node_uuid, old_content_hash,
 
533
                    content_hash, filename):
 
534
        """Uploads a file to the server."""
 
535
        parent_dir = os.path.split(filename)[0]
 
536
        unique_filename = os.path.join(parent_dir, "." + str(uuid.uuid4()))
 
537
 
 
538
 
 
539
        class StagingFile(object):
 
540
            """An object which tracks data being compressed for staging."""
 
541
            def __init__(self, stream):
 
542
                """Initialize a compression object."""
 
543
                self.crc32 = 0
 
544
                self.enc = zlib.compressobj(9)
 
545
                self.size = 0
 
546
                self.compressed_size = 0
 
547
                self.stream = stream
 
548
 
 
549
            def write(self, bytes):
 
550
                """Compress bytes, keeping track of length and crc32."""
 
551
                self.size += len(bytes)
 
552
                self.crc32 = crc32(bytes, self.crc32)
 
553
                compressed_bytes = self.enc.compress(bytes)
 
554
                self.compressed_size += len(compressed_bytes)
 
555
                self.stream.write(compressed_bytes)
 
556
 
 
557
            def finish(self):
 
558
                """Finish staging compressed data."""
 
559
                compressed_bytes = self.enc.flush()
 
560
                self.compressed_size += len(compressed_bytes)
 
561
                self.stream.write(compressed_bytes)
 
562
 
 
563
        with open(unique_filename, "w+") as compressed:
 
564
            os.unlink(unique_filename)
 
565
            with open(filename, "r") as original:
 
566
                staging = StagingFile(compressed)
 
567
                shutil.copyfileobj(original, staging)
 
568
            staging.finish()
 
569
            compressed.seek(0)
 
570
            self.defer_from_thread(self.factory.current_protocol.put_content,
 
571
                                   share_str(share_uuid), str(node_uuid),
 
572
                                   old_content_hash, content_hash,
 
573
                                   staging.crc32,
 
574
                                   staging.size, staging.compressed_size,
 
575
                                   compressed)
 
576
 
 
577
    def move(self, share_uuid, parent_uuid, name, node_uuid):
 
578
        """Moves a file on the server."""
 
579
        self.defer_from_thread(self.factory.current_protocol.move,
 
580
                               share_str(share_uuid), str(node_uuid),
 
581
                               str(parent_uuid), name)
 
582
 
 
583
    def unlink(self, share_uuid, node_uuid):
 
584
        """Unlinks a file on the server."""
 
585
        self.defer_from_thread(self.factory.current_protocol.unlink,
 
586
                               share_str(share_uuid), str(node_uuid))
 
587
 
 
588
    def _get_node_hashes(self, share_uuid, node_uuids):
 
589
        """Fetches hashes for the given nodes."""
 
590
        share = share_str(share_uuid)
 
591
        queries = [(share, str(node_uuid), request.UNKNOWN_HASH) \
 
592
                   for node_uuid in node_uuids]
 
593
        d = self.factory.current_protocol.query(queries)
 
594
 
 
595
        def _collect_hashes(multi_result):
 
596
            """Accumulate hashes from query replies."""
 
597
            hashes = {}
 
598
            for (success, value) in multi_result:
 
599
                if success:
 
600
                    for node_state in value.response:
 
601
                        node_uuid = uuid.UUID(node_state.node)
 
602
                        hashes[node_uuid] = node_state.hash
 
603
            return hashes
 
604
 
 
605
        d.addCallback(_collect_hashes)
 
606
        return d
 
607
 
 
608
    def get_incoming_shares(self):
 
609
        """Returns a list of incoming shares as (name, uuid, accepted)
 
610
        tuples.
 
611
 
 
612
        """
 
613
        _list_shares = self.factory.current_protocol.list_shares
 
614
        r = self.defer_from_thread(_list_shares)
 
615
        return [(s.name, s.id, s.other_visible_name,
 
616
                 s.accepted, s.access_level) \
 
617
                for s in r.shares if s.direction == "to_me"]