~facundo/magicicada-client/changes-for-xenial

« back to all changes in this revision

Viewing changes to ubuntuone/utils/tests/test_txsecrets.py

  • Committer: Magicicada Bot
  • Author(s): Natalia
  • Date: 2016-05-30 15:43:30 UTC
  • mfrom: (1418.1.21 no-sso-client)
  • Revision ID: magicicada_bot-20160530154330-b4his4s3wlucu7zv
[r=facundo] - Decouple client code from ubuntu-sso-client code. Copied and made an initial cleanup on the networkstate, utils and keyring modules.
- Removed completely dependencies with oauthlibs.
- Moved tests/ folder to inside ubuntuone/ proper folders.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
#
 
3
# Copyright 2010-2012 Canonical Ltd.
 
4
# Copyright 2015-2016 Chicharreros (https://launchpad.net/~chicharreros)
 
5
#
 
6
# This program is free software: you can redistribute it and/or modify it
 
7
# under the terms of the GNU General Public License version 3, as published
 
8
# by the Free Software Foundation.
 
9
#
 
10
# This program is distributed in the hope that it will be useful, but
 
11
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
12
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
13
# PURPOSE.  See the GNU General Public License for more details.
 
14
#
 
15
# You should have received a copy of the GNU General Public License along
 
16
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
17
#
 
18
# In addition, as a special exception, the copyright holders give
 
19
# permission to link the code of portions of this program with the
 
20
# OpenSSL library under certain conditions as described in each
 
21
# individual source file, and distribute linked combinations
 
22
# including the two.
 
23
# You must obey the GNU General Public License in all respects
 
24
# for all of the code used other than OpenSSL.  If you modify
 
25
# file(s) with this exception, you may extend this exception to your
 
26
# version of the file(s), but you are not obligated to do so.  If you
 
27
# do not wish to do so, delete this exception statement from your
 
28
# version.  If you delete this exception statement from all source
 
29
# files in the program, then also delete it here.
 
30
 
 
31
"""Tests for txkeyring."""
 
32
 
 
33
import logging
 
34
import uuid
 
35
 
 
36
import dbus.service
 
37
 
 
38
from twisted.internet.defer import inlineCallbacks, returnValue
 
39
from ubuntuone.devtools.testcases.dbus import DBusTestCase
 
40
 
 
41
from ubuntuone.utils import txsecrets
 
42
 
 
43
KEY_TYPE_ATTR = {"key-type": "Foo credentials"}
 
44
ERROR_CREATE_BUT_LOCKED = "Cannot create an item in a locked collection"
 
45
PROMPT_BASE_PATH = "/org/freedesktop/secrets/prompt"
 
46
SESSION_BASE_PATH = "/org/freedesktop/secrets/session"
 
47
COLLECTION_BASE_PATH = "/org/freedesktop/secrets/collection/"
 
48
SAMPLE_CONTENT_TYPE = "text/plain; charset=utf8"
 
49
 
 
50
 
 
51
class InvalidProperty(Exception):
 
52
    """An exception for when invalid properties are passed in."""
 
53
 
 
54
 
 
55
class SampleMiscException(Exception):
 
56
    """An exception that will be turned into a DBus Exception."""
 
57
 
 
58
 
 
59
class ItemMock(dbus.service.Object):
 
60
    """An item contains a secret, lookup attributes and has a label."""
 
61
    get_secret_fail = False
 
62
    delete_fail = False
 
63
    delete_prompt = False
 
64
    dismissed = False
 
65
 
 
66
    def __init__(self, collection, label, attributes, value, *args, **kwargs):
 
67
        """Initialize this instance."""
 
68
        super(ItemMock, self).__init__(*args, **kwargs)
 
69
        self.collection = collection
 
70
        self.label = label
 
71
        self.attributes = attributes
 
72
        self.value = value
 
73
 
 
74
    @dbus.service.method(dbus_interface=txsecrets.ITEM_IFACE,
 
75
                         out_signature="o")
 
76
    def Delete(self):
 
77
        """Delete this item."""
 
78
        if self.delete_fail:
 
79
            raise SampleMiscException()
 
80
        self.collection.items.remove(self)
 
81
        if self.delete_prompt:
 
82
            prompt_path = create_object_path(PROMPT_BASE_PATH)
 
83
            prompt = self.dbus_publish(prompt_path, PromptMock,
 
84
                                       result="",
 
85
                                       dismissed=self.dismissed)
 
86
            return prompt
 
87
        else:
 
88
            return "/"
 
89
 
 
90
    @dbus.service.method(dbus_interface=txsecrets.ITEM_IFACE,
 
91
                         in_signature="o", out_signature="(oayay)")
 
92
    def GetSecret(self, session):
 
93
        """Retrieve the secret for this item."""
 
94
        if self.get_secret_fail:
 
95
            raise SampleMiscException()
 
96
        return (session, "", self.value)
 
97
 
 
98
    def matches(self, search_attr):
 
99
        """See if this item matches a given search."""
 
100
        for k, val in search_attr.items():
 
101
            if k not in self.attributes:
 
102
                return False
 
103
            if self.attributes[k] != val:
 
104
                return False
 
105
        return True
 
106
 
 
107
 
 
108
class PromptMock(dbus.service.Object):
 
109
    """A prompt necessary to complete an operation."""
 
110
 
 
111
    def __init__(self, dismissed=True,
 
112
                 result=dbus.String("", variant_level=1), *args, **kwargs):
 
113
        """Initialize this instance."""
 
114
        
 
115
        super(PromptMock, self).__init__(*args, **kwargs)
 
116
        self.dismissed = dismissed
 
117
        self.result = result
 
118
 
 
119
    @dbus.service.method(dbus_interface=txsecrets.PROMPT_IFACE,
 
120
                         in_signature="s")
 
121
    def Prompt(self, window_id):
 
122
        """Perform the prompt."""
 
123
        self.Completed(self.dismissed, self.result)
 
124
 
 
125
    @dbus.service.signal(dbus_interface=txsecrets.PROMPT_IFACE,
 
126
                         signature="bv")
 
127
    def Completed(self, dismissed, result):
 
128
        """The prompt and operation completed."""
 
129
 
 
130
 
 
131
class BaseCollectionMock(dbus.service.Object):
 
132
    """Base collection of items containing secrets."""
 
133
    SUPPORTS_MULTIPLE_OBJECT_PATHS = True
 
134
    SUPPORTS_MULTIPLE_CONNECTIONS = True
 
135
    create_item_prompt = False
 
136
    dismissed = False
 
137
    create_item_fail = False
 
138
    locked = False
 
139
    unlock_prompts = False
 
140
    item_mock_class = ItemMock
 
141
 
 
142
    item_attrs_property = txsecrets.ITEM_ATTRIBUTES_PROPERTY_OLD
 
143
    item_label_property = txsecrets.ITEM_LABEL_PROPERTY_OLD
 
144
    clxn_label_property = txsecrets.CLXN_LABEL_PROPERTY_OLD
 
145
 
 
146
    def __init__(self, label, *args, **kwargs):
 
147
        """Initialize this instance."""
 
148
        super(BaseCollectionMock, self).__init__(*args, **kwargs)
 
149
        self.items = []
 
150
        self.label = label
 
151
 
 
152
    def _create_item(self, properties, secret, replace):
 
153
        """Create an item with the given attributes, secret and label.
 
154
 
 
155
        If replace is set, then it replaces an item already present with the
 
156
        same values for the attributes.
 
157
        """
 
158
        if self.create_item_fail:
 
159
            raise SampleMiscException()
 
160
        if self.locked:
 
161
            raise SampleMiscException(ERROR_CREATE_BUT_LOCKED)
 
162
        attributes = properties[self.item_attrs_property]
 
163
        item_label = properties[self.item_label_property]
 
164
        value = secret[2]
 
165
        item_path = create_object_path(make_coll_path(self.label))
 
166
        item = self.dbus_publish(item_path, self.item_mock_class, self,
 
167
                                 item_label, attributes, value)
 
168
        self.items.append(item)
 
169
        if self.create_item_prompt:
 
170
            prompt_path = create_object_path(PROMPT_BASE_PATH)
 
171
            prompt = self.dbus_publish(prompt_path, PromptMock,
 
172
                                       result=item,
 
173
                                       dismissed=self.dismissed)
 
174
            return "/", prompt
 
175
        else:
 
176
            return item, "/"
 
177
 
 
178
    @dbus.service.method(dbus_interface=txsecrets.PROPERTIES_IFACE,
 
179
                         in_signature="ss", out_signature="v")
 
180
    def Get(self, interface, propname):
 
181
        """The only property implemented is Label."""
 
182
        if interface == txsecrets.COLLECTION_IFACE and \
 
183
                propname == self.clxn_label_property:
 
184
            return dbus.String(self.label)
 
185
        raise InvalidProperty("Invalid property: {}".format(propname))
 
186
 
 
187
 
 
188
class CollectionMock(BaseCollectionMock):
 
189
    """Collection of items containing secrets."""
 
190
 
 
191
    @dbus.service.method(dbus_interface=txsecrets.COLLECTION_IFACE,
 
192
                         in_signature="a{sv}(oayay)b", out_signature="oo",
 
193
                         byte_arrays=True)
 
194
    def CreateItem(self, properties, secret, replace):
 
195
        """Expose the _create_item method on DBus."""
 
196
        assert len(secret) == 3
 
197
        return self._create_item(properties, secret, replace)
 
198
 
 
199
 
 
200
class SessionMock(dbus.service.Object):
 
201
    """A session tracks state between the service and a client application."""
 
202
 
 
203
    @dbus.service.method(dbus_interface=txsecrets.SESSION_IFACE)
 
204
    def Close(self):
 
205
        """Close this session."""
 
206
 
 
207
 
 
208
def make_coll_path(label):
 
209
    """Make the path to a collection with its label"""
 
210
    return COLLECTION_BASE_PATH + label
 
211
 
 
212
 
 
213
class SecretServiceMock(dbus.service.Object):
 
214
    """The Secret Service manages all the sessions and collections."""
 
215
    create_collection_prompt = False
 
216
    create_collection_fail = False
 
217
    open_session_fail = False
 
218
    dismissed = False
 
219
    collection_mock_class = CollectionMock
 
220
 
 
221
    clxn_label_property = txsecrets.CLXN_LABEL_PROPERTY_OLD
 
222
    collections_property = txsecrets.COLLECTIONS_PROPERTY_OLD
 
223
 
 
224
    def __init__(self, *args, **kwargs):
 
225
        """Initialize this instance."""
 
226
        
 
227
        super(SecretServiceMock, self).__init__(*args, **kwargs)
 
228
        self.sessions = {}
 
229
        self.collections = {}
 
230
        self.aliases = {}
 
231
 
 
232
    @dbus.service.method(dbus_interface=txsecrets.SERVICE_IFACE,
 
233
                         in_signature="sv", out_signature="vo")
 
234
    def OpenSession(self, algorithm, algorithm_parameters):
 
235
        """Open a unique session for the caller application."""
 
236
        if self.open_session_fail:
 
237
            raise SampleMiscException()
 
238
        session_path = create_object_path(SESSION_BASE_PATH)
 
239
        session = self.dbus_publish(session_path, SessionMock)
 
240
        self.sessions[session_path] = session
 
241
        return True, session
 
242
 
 
243
    @dbus.service.method(dbus_interface=txsecrets.SERVICE_IFACE,
 
244
                         in_signature="a{sv}", out_signature="oo")
 
245
    def CreateCollection(self, properties):
 
246
        """Create a new collection with the specified properties."""
 
247
        if self.create_collection_fail:
 
248
            raise SampleMiscException()
 
249
        label = str(properties[self.clxn_label_property])
 
250
        coll_path = make_coll_path(label)
 
251
        collection = self.dbus_publish(coll_path, self.collection_mock_class,
 
252
                                       label)
 
253
        self.collections[label] = collection
 
254
 
 
255
        if self.create_collection_prompt:
 
256
            prompt_path = create_object_path(PROMPT_BASE_PATH)
 
257
            prompt = self.dbus_publish(prompt_path, PromptMock,
 
258
                                       result=collection,
 
259
                                       dismissed=self.dismissed)
 
260
            return "/", prompt
 
261
        else:
 
262
            return collection, "/"
 
263
 
 
264
    @dbus.service.method(dbus_interface=txsecrets.SERVICE_IFACE,
 
265
                         in_signature="a{ss}", out_signature="aoao")
 
266
    def SearchItems(self, attributes):
 
267
        """Find items in any collection."""
 
268
        unlocked_items = []
 
269
        locked_items = []
 
270
        for c in self.collections.values():
 
271
            if c.locked:
 
272
                append_item = locked_items.append
 
273
            else:
 
274
                append_item = unlocked_items.append
 
275
            for i in c.items:
 
276
                if i.matches(attributes):
 
277
                    append_item(i)
 
278
 
 
279
        return unlocked_items, locked_items
 
280
 
 
281
    def unlock_objects(self, objects):
 
282
        """Unlock the objects or its containers."""
 
283
        for c in self.collections.values():
 
284
            for l in c.locations:
 
285
                path = l[1]
 
286
                if path in objects:
 
287
                    c.locked = False
 
288
            for i in c.items:
 
289
                for l in i.locations:
 
290
                    path = l[1]
 
291
                    if path in objects:
 
292
                        c.locked = False
 
293
 
 
294
    @dbus.service.method(dbus_interface=txsecrets.SERVICE_IFACE,
 
295
                         in_signature="ao", out_signature="aoo")
 
296
    def Unlock(self, objects):
 
297
        """Unlock the specified objects."""
 
298
        locked = []
 
299
        unlocked = []
 
300
        for c in self.collections.values():
 
301
            for i in c.items:
 
302
                path = i.__dbus_object_path__
 
303
                if path in objects:
 
304
                    if c.unlock_prompts:
 
305
                        locked.append(path)
 
306
                    else:
 
307
                        unlocked.append(path)
 
308
        if locked:
 
309
            prompt_path = create_object_path(PROMPT_BASE_PATH)
 
310
            self.unlock_objects(objects)
 
311
            prompt = self.dbus_publish(prompt_path, PromptMock,
 
312
                                       result=locked,
 
313
                                       dismissed=self.dismissed)
 
314
            return unlocked, prompt
 
315
        else:
 
316
            self.unlock_objects(objects)
 
317
            return objects, "/"
 
318
 
 
319
    @dbus.service.method(dbus_interface=txsecrets.SERVICE_IFACE,
 
320
                         in_signature="s", out_signature="o")
 
321
    def ReadAlias(self, name):
 
322
        """Get the collection with the given alias."""
 
323
        result = self.aliases.get(name, "/")
 
324
        return result
 
325
 
 
326
    @dbus.service.method(dbus_interface=txsecrets.SERVICE_IFACE,
 
327
                         in_signature="so")
 
328
    def SetAlias(self, name, collection_path):
 
329
        """Setup a collection alias."""
 
330
        self.aliases[name] = collection_path
 
331
 
 
332
    @dbus.service.method(dbus_interface=txsecrets.PROPERTIES_IFACE,
 
333
                         in_signature="ss", out_signature="v")
 
334
    def Get(self, interface, propname):
 
335
        """The only property implemented is Collections."""
 
336
        if interface == txsecrets.SERVICE_IFACE and \
 
337
                propname == self.collections_property:
 
338
            coll_paths = [make_coll_path(l) for l in self.collections]
 
339
            return dbus.Array(coll_paths, signature="o", variant_level=1)
 
340
        raise InvalidProperty("Invalid property: {}".format(propname))
 
341
 
 
342
 
 
343
class AltItemMock(ItemMock):
 
344
    """The secret in this item has a content_type."""
 
345
 
 
346
    @dbus.service.method(dbus_interface=txsecrets.ITEM_IFACE,
 
347
                         in_signature="o", out_signature="(oayays)")
 
348
    def GetSecret2(self, session):
 
349
        """Retrieve the secret for this item."""
 
350
        if self.get_secret_fail:
 
351
            raise SampleMiscException()
 
352
        return (session, "", self.value, SAMPLE_CONTENT_TYPE)
 
353
 
 
354
 
 
355
class AltCollectionMock(BaseCollectionMock):
 
356
    """The secrets in this collection have a content_type field."""
 
357
 
 
358
    item_mock_class = AltItemMock
 
359
 
 
360
    item_attrs_property = txsecrets.ITEM_ATTRIBUTES_PROPERTY
 
361
    item_label_property = txsecrets.ITEM_LABEL_PROPERTY
 
362
    clxn_label_property = txsecrets.CLXN_LABEL_PROPERTY
 
363
 
 
364
    @dbus.service.method(dbus_interface=txsecrets.COLLECTION_IFACE,
 
365
                         in_signature="a{sv}(oayays)b", out_signature="oo",
 
366
                         byte_arrays=True)
 
367
    def CreateItem(self, properties, secret, replace):
 
368
        """Expose the _create_item method on DBus."""
 
369
        assert len(secret) == 4
 
370
        return self._create_item(properties, secret, replace)
 
371
 
 
372
 
 
373
class AltSecretServiceMock(SecretServiceMock):
 
374
    """The secrets in this service have a content_type field."""
 
375
 
 
376
    collection_mock_class = AltCollectionMock
 
377
 
 
378
    clxn_label_property = txsecrets.CLXN_LABEL_PROPERTY
 
379
    collections_property = txsecrets.COLLECTIONS_PROPERTY
 
380
 
 
381
    
 
382
    @dbus.service.method(dbus_interface=txsecrets.SERVICE_IFACE,
 
383
                         in_signature="a{sv}s", out_signature="oo")
 
384
    def CreateCollection(self, properties, alias):
 
385
        """Create a new collection with the specified properties."""
 
386
        
 
387
        collection, prompt = super(AltSecretServiceMock,
 
388
                                   self).CreateCollection(properties)
 
389
        self.SetAlias(alias, collection)
 
390
        return collection, prompt
 
391
    
 
392
 
 
393
 
 
394
def create_object_path(base):
 
395
    """Create a random object path given a base path."""
 
396
    random = uuid.uuid4().hex
 
397
    return base + "/" + random
 
398
 
 
399
 
 
400
class TextFilter(object):
 
401
    """Prevents the logging of messages containing a given text."""
 
402
 
 
403
    def __init__(self, *args):
 
404
        """Initialize this filter."""
 
405
        super(TextFilter, self).__init__()
 
406
        self.filtered_strings = args
 
407
 
 
408
    def filter(self, record):
 
409
        """See if we need to filter a given log record."""
 
410
        return not any(s in record.msg for s in self.filtered_strings)
 
411
 
 
412
 
 
413
class BaseTestCase(DBusTestCase):
 
414
    """Base class for DBus tests."""
 
415
    timeout = 10
 
416
    secret_service_class = SecretServiceMock
 
417
 
 
418
    @inlineCallbacks
 
419
    def setUp(self):
 
420
        yield super(BaseTestCase, self).setUp()
 
421
        self.session_bus = dbus.SessionBus()
 
422
        self.mock_service = self.dbus_publish(txsecrets.SECRETS_SERVICE,
 
423
                                              self.secret_service_class)
 
424
        self.secretservice = txsecrets.SecretService()
 
425
        self.silence_dbus_logging()
 
426
 
 
427
    def silence_dbus_logging(self):
 
428
        """Silence the warnings printed by dbus that pollute test results."""
 
429
        logger = logging.getLogger('dbus.connection')
 
430
        logfilter = TextFilter("Unable to set arguments")
 
431
        logger.addFilter(logfilter)
 
432
        self.addCleanup(logger.removeFilter, logfilter)
 
433
 
 
434
    def dbus_publish(self, object_path, object_class, *args, **kwargs):
 
435
        """Create an object and publish it on the bus."""
 
436
        name = self.session_bus.request_name(txsecrets.BUS_NAME,
 
437
                                             dbus.bus.NAME_FLAG_DO_NOT_QUEUE)
 
438
        self.assertNotEqual(name, dbus.bus.REQUEST_NAME_REPLY_EXISTS)
 
439
        mock_object = object_class(*args, object_path=object_path,
 
440
                                   conn=self.session_bus, **kwargs)
 
441
        self.addCleanup(mock_object.remove_from_connection,
 
442
                        connection=self.session_bus,
 
443
                        path=object_path)
 
444
        mock_object.dbus_publish = self.dbus_publish
 
445
        return mock_object
 
446
 
 
447
    @inlineCallbacks
 
448
    def create_sample_collection(self, label, make_alias=True,
 
449
                                 publish_default_path=False):
 
450
        """Create a collection with a given label."""
 
451
        coll = yield self.secretservice.create_collection(label)
 
452
        if make_alias:
 
453
            coll_path = make_coll_path(label)
 
454
            self.mock_service.SetAlias("default", coll_path)
 
455
        if publish_default_path:
 
456
            mock_object = self.mock_service.collections[label]
 
457
            mock_object.add_to_connection(self.session_bus,
 
458
                                          txsecrets.DEFAULT_COLLECTION)
 
459
            self.addCleanup(mock_object.remove_from_connection,
 
460
                            connection=self.session_bus,
 
461
                            path=txsecrets.DEFAULT_COLLECTION)
 
462
        returnValue(coll)
 
463
 
 
464
 
 
465
class SecretServiceTestCase(BaseTestCase):
 
466
    """Test the Secret Service class."""
 
467
 
 
468
    @inlineCallbacks
 
469
    def test_open_session(self):
 
470
        """The secret service session is opened."""
 
471
        result = yield self.secretservice.open_session()
 
472
        self.assertEqual(result, self.secretservice)
 
473
 
 
474
    @inlineCallbacks
 
475
    def test_open_session_throws_dbus_error_as_failure(self):
 
476
        """The secret service open session throws a dbus error as a failure."""
 
477
        d = self.secretservice.open_session()
 
478
        self.mock_service.open_session_fail = True
 
479
        yield self.assertFailure(d, dbus.exceptions.DBusException)
 
480
 
 
481
    @inlineCallbacks
 
482
    def test_open_session_fails_before_opening_as_failure(self):
 
483
        """A dbus error before opening the session is thrown as a failure."""
 
484
 
 
485
        def fail(*args, **kwargs):
 
486
            """Throw a DBus exception."""
 
487
            raise dbus.exceptions.DBusException()
 
488
 
 
489
        self.patch(txsecrets.dbus, "SessionBus", fail)
 
490
        d = self.secretservice.open_session()
 
491
        self.mock_service.open_session_fail = True
 
492
        yield self.assertFailure(d, dbus.exceptions.DBusException)
 
493
 
 
494
    @inlineCallbacks
 
495
    def test_create_collection(self):
 
496
        """The secret service creates a collection."""
 
497
        yield self.secretservice.open_session()
 
498
        collection_label = "sample_keyring"
 
499
        yield self.create_sample_collection(collection_label)
 
500
        self.assertIn(collection_label, self.mock_service.collections)
 
501
 
 
502
    @inlineCallbacks
 
503
    def test_create_collection_prompt(self):
 
504
        """The secret service creates a collection after a prompt."""
 
505
        yield self.secretservice.open_session()
 
506
        self.mock_service.create_collection_prompt = True
 
507
        collection_label = "sample_keyring"
 
508
        yield self.create_sample_collection(collection_label)
 
509
        self.assertIn(collection_label, self.mock_service.collections)
 
510
 
 
511
    @inlineCallbacks
 
512
    def test_create_collection_prompt_dismissed(self):
 
513
        """The service fails to create collection when prompt dismissed."""
 
514
        yield self.secretservice.open_session()
 
515
        self.mock_service.create_collection_prompt = True
 
516
        self.mock_service.dismissed = True
 
517
        collection_label = "sample_keyring"
 
518
        yield self.assertFailure(
 
519
            self.create_sample_collection(collection_label),
 
520
            txsecrets.UserCancelled)
 
521
 
 
522
    @inlineCallbacks
 
523
    def test_create_collection_throws_dbus_error(self):
 
524
        """The service fails to create collection on a DBus error."""
 
525
        yield self.secretservice.open_session()
 
526
        self.mock_service.create_collection_fail = True
 
527
        collection_label = "sample_keyring"
 
528
        yield self.assertFailure(
 
529
            self.create_sample_collection(collection_label),
 
530
            dbus.exceptions.DBusException)
 
531
 
 
532
    @inlineCallbacks
 
533
    def test_prompt_accepted(self):
 
534
        """A prompt is accepted."""
 
535
        yield self.secretservice.open_session()
 
536
        expected_result = "hello world"
 
537
        prompt_path = "/prompt"
 
538
        self.dbus_publish(prompt_path, PromptMock, result=expected_result,
 
539
                          dismissed=False)
 
540
        result = yield self.secretservice.do_prompt(prompt_path)
 
541
        self.assertEqual(result, expected_result)
 
542
 
 
543
    @inlineCallbacks
 
544
    def test_prompt_dismissed(self):
 
545
        """A prompt is dismissed with a UserCancelled failure."""
 
546
        yield self.secretservice.open_session()
 
547
        expected_result = "hello world2"
 
548
        prompt_path = "/prompt"
 
549
        self.dbus_publish(prompt_path, PromptMock, result=expected_result,
 
550
                          dismissed=True)
 
551
        d = self.secretservice.do_prompt(prompt_path)
 
552
        self.assertFailure(d, txsecrets.UserCancelled)
 
553
 
 
554
    @inlineCallbacks
 
555
    def test_search_unlocked_items(self):
 
556
        """The secret service searchs for unlocked items."""
 
557
        yield self.secretservice.open_session()
 
558
        coll = yield self.create_sample_collection("sample_keyring")
 
559
        attr = KEY_TYPE_ATTR
 
560
        sample_secret = "secret83!"
 
561
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
562
        items = yield self.secretservice.search_items(attr)
 
563
        self.assertEqual(len(items), 1)
 
564
        value = yield items[0].get_value()
 
565
        self.assertEqual(value, sample_secret)
 
566
 
 
567
    @inlineCallbacks
 
568
    def test_search_locked_items(self):
 
569
        """The secret service searchs for locked items."""
 
570
        yield self.secretservice.open_session()
 
571
        collection_name = "sample_keyring"
 
572
        coll = yield self.create_sample_collection(collection_name)
 
573
        mock_collection = self.mock_service.collections[collection_name]
 
574
        attr = KEY_TYPE_ATTR
 
575
        sample_secret = "secret99!"
 
576
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
577
        mock_collection.locked = True
 
578
 
 
579
        items = yield self.secretservice.search_items(attr)
 
580
        self.assertEqual(len(items), 1)
 
581
        value = yield items[0].get_value()
 
582
        self.assertEqual(value, sample_secret)
 
583
 
 
584
    @inlineCallbacks
 
585
    def test_search_locked_items_prompts(self):
 
586
        """The secret service searchs for locked items after a prompt."""
 
587
        yield self.secretservice.open_session()
 
588
        collection_name = "sample_keyring"
 
589
        coll = yield self.create_sample_collection(collection_name)
 
590
        mock_collection = self.mock_service.collections[collection_name]
 
591
        attr = KEY_TYPE_ATTR
 
592
        sample_secret = "secret99!"
 
593
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
594
        mock_collection.locked = True
 
595
        mock_collection.unlock_prompts = True
 
596
 
 
597
        items = yield self.secretservice.search_items(attr)
 
598
        self.assertEqual(len(items), 1)
 
599
        value = yield items[0].get_value()
 
600
        self.assertEqual(value, sample_secret)
 
601
 
 
602
    @inlineCallbacks
 
603
    def test_search_locked_items_prompts_dismissed(self):
 
604
        """Service fails search for locked items after dismissed prompt."""
 
605
        yield self.secretservice.open_session()
 
606
        collection_name = "sample_keyring"
 
607
        coll = yield self.create_sample_collection(collection_name)
 
608
        mock_collection = self.mock_service.collections[collection_name]
 
609
        self.mock_service.dismissed = True
 
610
        attr = KEY_TYPE_ATTR
 
611
        sample_secret = "secret99!"
 
612
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
613
        mock_collection.locked = True
 
614
        mock_collection.unlock_prompts = True
 
615
 
 
616
        d = self.secretservice.search_items(attr)
 
617
        yield self.assertFailure(d, txsecrets.UserCancelled)
 
618
 
 
619
    @inlineCallbacks
 
620
    def test_search_items_merges_unlocked_and_locked_items(self):
 
621
        """search_items merges unlocked and locked items."""
 
622
        yield self.secretservice.open_session()
 
623
        attr = KEY_TYPE_ATTR
 
624
 
 
625
        collection_name = "coll1"
 
626
        coll = yield self.create_sample_collection(collection_name)
 
627
        mock_coll1 = self.mock_service.collections[collection_name]
 
628
        unlocked_secret = "coll 1 secret!"
 
629
        yield coll.create_item("Cucaracha", attr, unlocked_secret)
 
630
        mock_coll1.locked = False
 
631
        mock_coll1.unlock_prompts = False
 
632
 
 
633
        collection_name = "coll2"
 
634
        coll = yield self.create_sample_collection(collection_name)
 
635
        mock_coll2 = self.mock_service.collections[collection_name]
 
636
        locked_secret = "coll 2 secret!"
 
637
        yield coll.create_item("Cucaracha", attr, locked_secret)
 
638
        mock_coll2.locked = True
 
639
        mock_coll2.unlock_prompts = False
 
640
 
 
641
        result = yield self.secretservice.search_items(attr)
 
642
        self.assertEqual(len(result), 2)
 
643
 
 
644
    @inlineCallbacks
 
645
    def test_search_items_merges_unlocked_locked_and_prompt_items(self):
 
646
        """search_items merges unlocked, locked and prompt items."""
 
647
        yield self.secretservice.open_session()
 
648
        attr = KEY_TYPE_ATTR
 
649
 
 
650
        collection_name = "coll1"
 
651
        coll = yield self.create_sample_collection(collection_name)
 
652
        mock_coll1 = self.mock_service.collections[collection_name]
 
653
        unlocked_secret = "coll 1 secret!"
 
654
        yield coll.create_item("Cucaracha", attr, unlocked_secret)
 
655
        mock_coll1.locked = False
 
656
        mock_coll1.unlock_prompts = False
 
657
 
 
658
        collection_name = "coll2"
 
659
        coll = yield self.create_sample_collection(collection_name)
 
660
        mock_coll2 = self.mock_service.collections[collection_name]
 
661
        locked_secret = "coll 2 secret!"
 
662
        yield coll.create_item("Cucaracha", attr, locked_secret)
 
663
        mock_coll2.locked = True
 
664
        mock_coll2.unlock_prompts = False
 
665
 
 
666
        collection_name = "coll3"
 
667
        coll = yield self.create_sample_collection(collection_name)
 
668
        mock_coll3 = self.mock_service.collections[collection_name]
 
669
        locked_secret = "coll 3 secret!"
 
670
        yield coll.create_item("Cucaracha", attr, locked_secret)
 
671
        mock_coll3.locked = True
 
672
        mock_coll3.unlock_prompts = True
 
673
 
 
674
        result = yield self.secretservice.search_items(attr)
 
675
        self.assertEqual(len(result), 3)
 
676
 
 
677
    @inlineCallbacks
 
678
    def test_get_collections(self):
 
679
        """The list of all collections is returned."""
 
680
        collection_names = ["collection1", "collection2"]
 
681
 
 
682
        yield self.secretservice.open_session()
 
683
        for name in collection_names:
 
684
            yield self.create_sample_collection(name)
 
685
        collections = yield self.secretservice.get_collections()
 
686
        self.assertEqual(len(collections), len(collection_names))
 
687
 
 
688
    @inlineCallbacks
 
689
    def test_get_default_collection_honours_default_path(self):
 
690
        """The default collection is returned from the default path."""
 
691
        yield self.secretservice.open_session()
 
692
        collection_name = "sample_default_keyring"
 
693
        yield self.create_sample_collection(collection_name, make_alias=False,
 
694
                                            publish_default_path=True)
 
695
        self.assertEqual(len(self.mock_service.collections), 1)
 
696
        yield self.secretservice.get_default_collection()
 
697
        self.assertEqual(len(self.mock_service.collections), 1)
 
698
 
 
699
    @inlineCallbacks
 
700
    def test_get_default_collection_honours_readalias(self):
 
701
        """The default collection is returned if default alias set."""
 
702
        yield self.secretservice.open_session()
 
703
        collection_name = "sample_default_keyring"
 
704
        yield self.create_sample_collection(collection_name)
 
705
        self.assertEqual(len(self.mock_service.collections), 1)
 
706
        yield self.secretservice.get_default_collection()
 
707
        self.assertEqual(len(self.mock_service.collections), 1)
 
708
 
 
709
    @inlineCallbacks
 
710
    def test_get_default_collection_created_if_no_default(self):
 
711
        """The default collection is created if there's no default."""
 
712
        yield self.secretservice.open_session()
 
713
        collection_name = "sample_nondefault_keyring"
 
714
        yield self.create_sample_collection(collection_name, make_alias=False)
 
715
        self.assertEqual(len(self.mock_service.collections), 1)
 
716
        yield self.secretservice.get_default_collection()
 
717
        self.assertEqual(len(self.mock_service.collections), 2)
 
718
 
 
719
    @inlineCallbacks
 
720
    def test_get_default_collection_created_if_nonexistent(self):
 
721
        """The default collection is created if it doesn't exist yet."""
 
722
        yield self.secretservice.open_session()
 
723
        self.assertEqual(len(self.mock_service.collections), 0)
 
724
        yield self.secretservice.get_default_collection()
 
725
        self.assertEqual(len(self.mock_service.collections), 1)
 
726
 
 
727
    @inlineCallbacks
 
728
    def test_get_default_collection_set_as_default_if_nonexistent(self):
 
729
        """The default collection is set as default if it doesn't exist yet."""
 
730
        yield self.secretservice.open_session()
 
731
        yield self.secretservice.get_default_collection()
 
732
        self.assertIn(txsecrets.DEFAULT_LABEL, self.mock_service.aliases)
 
733
 
 
734
    @inlineCallbacks
 
735
    def test_get_default_collection_is_unlocked_default_path(self):
 
736
        """The default collection is unlocked before being returned."""
 
737
        yield self.secretservice.open_session()
 
738
        collection_name = "sample_keyring"
 
739
        self.assertEqual(len(self.mock_service.collections), 0)
 
740
        coll = yield self.create_sample_collection(collection_name,
 
741
                                                   make_alias=False,
 
742
                                                   publish_default_path=True)
 
743
        self.assertEqual(len(self.mock_service.collections), 1)
 
744
        mock_collection = self.mock_service.collections[collection_name]
 
745
        mock_collection.locked = True
 
746
        yield self.secretservice.get_default_collection()
 
747
        attr = KEY_TYPE_ATTR
 
748
        sample_secret = "secret!"
 
749
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
750
        self.assertEqual(len(mock_collection.items), 1)
 
751
 
 
752
    @inlineCallbacks
 
753
    def test_get_default_collection_is_unlocked_readalias(self):
 
754
        """The default collection is unlocked before being returned."""
 
755
        yield self.secretservice.open_session()
 
756
        collection_name = "sample_keyring"
 
757
        self.assertEqual(len(self.mock_service.collections), 0)
 
758
        coll = yield self.create_sample_collection(collection_name)
 
759
        self.assertEqual(len(self.mock_service.collections), 1)
 
760
        mock_collection = self.mock_service.collections[collection_name]
 
761
        mock_collection.locked = True
 
762
        yield self.secretservice.get_default_collection()
 
763
        attr = KEY_TYPE_ATTR
 
764
        sample_secret = "secret!"
 
765
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
766
        self.assertEqual(len(mock_collection.items), 1)
 
767
 
 
768
 
 
769
class CollectionTestCase(BaseTestCase):
 
770
    """Test the Collection class."""
 
771
 
 
772
    @inlineCallbacks
 
773
    def test_get_label(self):
 
774
        """The collection gets its own label from the keyring."""
 
775
        yield self.secretservice.open_session()
 
776
        expected_label = "sample_keyring"
 
777
        yield self.create_sample_collection(expected_label)
 
778
        coll = yield self.secretservice.get_default_collection()
 
779
        result = yield coll.get_label()
 
780
        self.assertEqual(result, expected_label)
 
781
 
 
782
    @inlineCallbacks
 
783
    def test_create_item(self):
 
784
        """The collection creates an item."""
 
785
        yield self.secretservice.open_session()
 
786
        collection_label = "sample_keyring"
 
787
        yield self.create_sample_collection(collection_label)
 
788
        coll = yield self.secretservice.get_default_collection()
 
789
        mock_collection = self.mock_service.collections[collection_label]
 
790
        attr = KEY_TYPE_ATTR
 
791
        sample_secret = "secret!"
 
792
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
793
        self.assertEqual(len(mock_collection.items), 1)
 
794
        self.assertEqual(mock_collection.items[0].value, sample_secret)
 
795
 
 
796
    @inlineCallbacks
 
797
    def test_create_item_prompt(self):
 
798
        """The collection creates an item after a prompt."""
 
799
        yield self.secretservice.open_session()
 
800
        collection_label = "sample_keyring"
 
801
        yield self.create_sample_collection(collection_label)
 
802
        coll = yield self.secretservice.get_default_collection()
 
803
        mock_collection = self.mock_service.collections[collection_label]
 
804
        mock_collection.create_item_prompt = True
 
805
        attr = KEY_TYPE_ATTR
 
806
        sample_secret = "secret2!"
 
807
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
808
        self.assertEqual(len(mock_collection.items), 1)
 
809
        self.assertEqual(mock_collection.items[0].value, sample_secret)
 
810
 
 
811
    @inlineCallbacks
 
812
    def test_create_item_prompt_dismissed(self):
 
813
        """The collection fails to create an item when prompt is dismissed."""
 
814
        yield self.secretservice.open_session()
 
815
        collection_label = "sample_keyring"
 
816
        yield self.create_sample_collection(collection_label)
 
817
        coll = yield self.secretservice.get_default_collection()
 
818
        mock_collection = self.mock_service.collections[collection_label]
 
819
        mock_collection.create_item_prompt = True
 
820
        mock_collection.dismissed = True
 
821
        attr = KEY_TYPE_ATTR
 
822
        sample_secret = "secret3!"
 
823
        yield self.assertFailure(coll.create_item("Cuca", attr, sample_secret),
 
824
                                 txsecrets.UserCancelled)
 
825
 
 
826
    @inlineCallbacks
 
827
    def test_create_item_throws_dbus_error(self):
 
828
        """The collection fails to create an item when DBus fails."""
 
829
        yield self.secretservice.open_session()
 
830
        collection_label = "sample_keyring"
 
831
        yield self.create_sample_collection(collection_label)
 
832
        coll = yield self.secretservice.get_default_collection()
 
833
        mock_collection = self.mock_service.collections[collection_label]
 
834
        mock_collection.create_item_fail = True
 
835
        attr = KEY_TYPE_ATTR
 
836
        sample_secret = "secret4!"
 
837
        yield self.assertFailure(coll.create_item("Cuca", attr, sample_secret),
 
838
                                 dbus.exceptions.DBusException)
 
839
 
 
840
 
 
841
class ItemTestCase(BaseTestCase):
 
842
    """Test the Item class."""
 
843
 
 
844
    @inlineCallbacks
 
845
    def test_get_value(self):
 
846
        """The secret value is retrieved from the item."""
 
847
        yield self.secretservice.open_session()
 
848
        coll = yield self.create_sample_collection("sample_keyring")
 
849
        attr = KEY_TYPE_ATTR
 
850
        sample_secret = "secret83!"
 
851
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
852
        items = yield self.secretservice.search_items(attr)
 
853
        self.assertEqual(len(items), 1)
 
854
        value = yield items[0].get_value()
 
855
        self.assertEqual(value, sample_secret)
 
856
 
 
857
    @inlineCallbacks
 
858
    def test_get_value_throws_dbus_error(self):
 
859
        """The secret value is not retrieved if DBus fails."""
 
860
        yield self.secretservice.open_session()
 
861
        collection_label = "sample_keyring"
 
862
        coll = yield self.create_sample_collection(collection_label)
 
863
        attr = KEY_TYPE_ATTR
 
864
        sample_secret = "secret83!"
 
865
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
866
        items = yield self.secretservice.search_items(attr)
 
867
        self.assertEqual(len(items), 1)
 
868
        mock = self.mock_service.collections[collection_label].items[0]
 
869
        mock.get_secret_fail = True
 
870
        yield self.assertFailure(items[0].get_value(),
 
871
                                 dbus.exceptions.DBusException)
 
872
 
 
873
    @inlineCallbacks
 
874
    def test_delete(self):
 
875
        """The item is deleted."""
 
876
        yield self.secretservice.open_session()
 
877
        coll = yield self.create_sample_collection("sample_keyring")
 
878
        attr = KEY_TYPE_ATTR
 
879
        sample_secret = "secret83!"
 
880
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
881
        items = yield self.secretservice.search_items(attr)
 
882
        self.assertEqual(len(items), 1)
 
883
        yield items[0].delete()
 
884
        items = yield self.secretservice.search_items(attr)
 
885
        self.assertEqual(len(items), 0)
 
886
 
 
887
    @inlineCallbacks
 
888
    def test_delete_prompt(self):
 
889
        """The item is deleted after a prompt."""
 
890
        yield self.secretservice.open_session()
 
891
        collection_label = "sample_keyring"
 
892
        coll = yield self.create_sample_collection(collection_label)
 
893
        attr = KEY_TYPE_ATTR
 
894
        sample_secret = "secret83!"
 
895
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
896
        items = yield self.secretservice.search_items(attr)
 
897
        self.assertEqual(len(items), 1)
 
898
        mock_item = self.mock_service.collections[collection_label].items[0]
 
899
        mock_item.delete_prompt = True
 
900
        yield items[0].delete()
 
901
        items = yield self.secretservice.search_items(attr)
 
902
        self.assertEqual(len(items), 0)
 
903
 
 
904
    @inlineCallbacks
 
905
    def test_delete_prompt_dismissed(self):
 
906
        """The item is not deleted after a dismissed prompt."""
 
907
        yield self.secretservice.open_session()
 
908
        collection_label = "sample_keyring"
 
909
        coll = yield self.create_sample_collection(collection_label)
 
910
        attr = KEY_TYPE_ATTR
 
911
        sample_secret = "secret83!"
 
912
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
913
        items = yield self.secretservice.search_items(attr)
 
914
        self.assertEqual(len(items), 1)
 
915
        mock_item = self.mock_service.collections[collection_label].items[0]
 
916
        mock_item.delete_prompt = True
 
917
        mock_item.dismissed = True
 
918
        yield self.assertFailure(items[0].delete(), txsecrets.UserCancelled)
 
919
 
 
920
    @inlineCallbacks
 
921
    def test_delete_throws_dbus_error(self):
 
922
        """The item is not deleted when a DBus error happens."""
 
923
        yield self.secretservice.open_session()
 
924
        collection_label = "sample_keyring"
 
925
        coll = yield self.create_sample_collection(collection_label)
 
926
        attr = KEY_TYPE_ATTR
 
927
        sample_secret = "secret83!"
 
928
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
929
        items = yield self.secretservice.search_items(attr)
 
930
        self.assertEqual(len(items), 1)
 
931
        mock_item = self.mock_service.collections[collection_label].items[0]
 
932
        mock_item.delete_fail = True
 
933
        yield self.assertFailure(items[0].delete(),
 
934
                                 dbus.exceptions.DBusException)
 
935
 
 
936
 
 
937
class AltItemTestCase(BaseTestCase):
 
938
    """Test the Item class with 4 fields in the secret struct."""
 
939
    secret_service_class = AltSecretServiceMock
 
940
 
 
941
    @inlineCallbacks
 
942
    def test_create_item_four_fields_per_secret(self):
 
943
        """The collection creates an item when the dbus struct has 4 fields."""
 
944
        yield self.secretservice.open_session()
 
945
        collection_label = "sample_keyring"
 
946
        yield self.create_sample_collection(collection_label)
 
947
        coll = yield self.secretservice.get_default_collection()
 
948
        mock_collection = self.mock_service.collections[collection_label]
 
949
        attr = KEY_TYPE_ATTR
 
950
        sample_secret = "secret!"
 
951
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
952
        self.assertEqual(len(mock_collection.items), 1)
 
953
        self.assertEqual(mock_collection.items[0].value, sample_secret)
 
954
 
 
955
    @inlineCallbacks
 
956
    def test_get_value_four_fields_per_secret(self):
 
957
        """The code works fine when the secret dbus struct has 4 fields."""
 
958
        yield self.secretservice.open_session()
 
959
        collection_label = "sample_keyring"
 
960
        coll = yield self.create_sample_collection(collection_label)
 
961
        attr = KEY_TYPE_ATTR
 
962
        sample_secret = "secret83!"
 
963
        yield coll.create_item("Cucaracha", attr, sample_secret)
 
964
        items = yield self.secretservice.search_items(attr)
 
965
        self.assertEqual(len(items), 1)
 
966
        value = yield items[0].get_value()
 
967
        self.assertEqual(value, sample_secret)