1
# -*- coding: utf-8 -*-
3
# Author: Alejandro J. Cura <alecu@canonical.com>
5
# Copyright 2010 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
19
Provides a twisted interface to access the system keyring via DBus.
20
Implements the Secrets Service API Draft:
21
* http://code.confuego.org/secrets-xdg-specs/
26
from dbus.mainloop.glib import DBusGMainLoop
27
import dbus.mainloop.glib
28
from twisted.internet.defer import Deferred
30
gobject.threads_init()
31
dbus.mainloop.glib.threads_init()
32
DBusGMainLoop(set_as_default=True)
34
BUS_NAME = "org.freedesktop.secrets"
35
SERVICE_IFACE = "org.freedesktop.Secret.Service"
36
PROMPT_IFACE = "org.freedesktop.Secret.Prompt"
37
SESSION_IFACE = "org.freedesktop.Secret.Session"
38
COLLECTION_IFACE = "org.freedesktop.Secret.Collection"
39
ITEM_IFACE = "org.freedesktop.Secret.Item"
40
PROPERTIES_IFACE = "org.freedesktop.DBus.Properties"
41
SECRETS_SERVICE = "/org/freedesktop/secrets"
42
DEFAULT_COLLECTION = "/org/freedesktop/secrets/aliases/default"
45
LABEL_PROPERTY = "Label"
46
ATTRIBUTES_PROPERTY = "Attributes"
47
COLLECTIONS_PROPERTY = "Collections"
48
DEFAULT_LABEL = "default"
51
class UserCancelled(Exception):
52
"""The user cancelled a prompt."""
59
class SecretService(object):
60
"""The Secret Service manages all the sessions and collections."""
67
def open_session(self, window_id=0):
68
"""Open a unique session for the caller application."""
71
self.window_id = str(window_id)
72
self.bus = dbus.SessionBus()
73
service_object = self.bus.get_object(BUS_NAME, SECRETS_SERVICE)
74
self.service = dbus.Interface(service_object,
75
dbus_interface=SERVICE_IFACE)
76
self.properties = dbus.Interface(service_object,
77
dbus_interface=PROPERTIES_IFACE)
79
def session_opened(result, session):
80
"""The session was successfully opened."""
81
self.session = self.bus.get_object(BUS_NAME, session)
84
parameters = dbus.String(ALGORITHM_PARAMS, variant_level=1)
85
self.service.OpenSession(ALGORITHM, parameters,
86
reply_handler=session_opened,
87
error_handler=d.errback)
88
except dbus.exceptions.DBusException, e:
92
def do_prompt(self, prompt_path):
93
"""Show a prompt given its path."""
95
prompt_object = self.bus.get_object(BUS_NAME, prompt_path)
96
prompt = dbus.Interface(prompt_object, dbus_interface=PROMPT_IFACE)
98
def prompt_completed(dismissed, result):
99
"""The prompt was either completed or dismissed."""
100
sigcompleted.remove()
102
d.errback(UserCancelled())
106
sigcompleted = prompt.connect_to_signal("Completed", prompt_completed)
107
prompt.Prompt(self.window_id,
109
error_handler=d.errback)
112
def make_item_list(self, object_path_list):
113
"""Make a list of items given their paths."""
114
return [Item(self, o) for o in object_path_list]
116
def search_items(self, attributes):
117
"""Find items in any collection."""
121
def prompt_handle(unlocked):
122
"""Merge the items that were just unlocked."""
123
result.extend(unlocked)
126
def unlock_handler(unlocked, prompt):
127
"""The items were unlocked, or a prompt should be shown first."""
128
result.extend(unlocked)
130
d2 = self.do_prompt(prompt)
131
d2.addCallback(prompt_handle)
136
def items_found(unlocked, locked):
137
"""Called with two lists of found items."""
138
result.extend(unlocked)
140
self.service.Unlock(locked,
141
reply_handler=unlock_handler,
142
error_handler=d.errback)
146
self.service.SearchItems(attributes,
147
reply_handler=items_found,
148
error_handler=d.errback)
149
d.addCallback(self.make_item_list)
152
def create_collection(self, label):
153
"""Create a new collection with the specified properties."""
156
def createcollection_handler(collection, prompt):
157
"""A collection was created, or a prompt should be shown first."""
159
self.do_prompt(prompt).chainDeferred(d)
161
d.callback(collection)
163
properties = {LABEL_PROPERTY: dbus.String(label, variant_level=1)}
164
self.service.CreateCollection(properties,
165
reply_handler=createcollection_handler,
166
error_handler=d.errback)
168
d.addCallback(lambda p: Collection(self, p))
171
def get_default_collection(self):
172
"""The collection were default items should be created."""
175
def propertyget_handler(collection_paths):
176
"""The list of collection paths was retrieved."""
177
if len(collection_paths) > 0:
178
d.callback(Collection(self, DEFAULT_COLLECTION))
180
self.create_collection(DEFAULT_LABEL).chainDeferred(d)
182
self.properties.Get(SERVICE_IFACE, COLLECTIONS_PROPERTY,
183
reply_handler=propertyget_handler,
184
error_handler=d.errback)
188
class Collection(object):
189
"""A collection of items containing secrets."""
191
def __init__(self, service, object_path):
192
"""Initialize a new collection."""
193
self.service = service
194
collection_object = service.bus.get_object(BUS_NAME, object_path)
195
self.collection_iface = dbus.Interface(collection_object,
196
dbus_interface=COLLECTION_IFACE)
198
def create_item(self, label, attr, value, replace=True):
199
"""Create an item with the given attributes, secret and label.
201
If replace is set, then it replaces an item already present with the
202
same values for the attributes.
206
def createitem_handler(item, prompt):
207
"""An item was created, or a prompt should be shown first."""
209
self.service.do_prompt(prompt).chainDeferred(d)
213
properties = dbus.Dictionary(signature="sv")
214
properties[LABEL_PROPERTY] = label
215
attributes = dbus.Dictionary(attr, signature="ss")
216
properties[ATTRIBUTES_PROPERTY] = attributes
217
parameters = dbus.ByteArray(ALGORITHM_PARAMS)
218
value_bytes = dbus.ByteArray(value)
219
secret = (self.service.session, parameters, value_bytes)
220
self.collection_iface.CreateItem(properties, secret, replace,
221
reply_handler=createitem_handler,
222
error_handler=d.errback)
227
"""An item contains a secret, lookup attributes and has a label."""
229
def __init__(self, service, object_path):
230
"""Initialize this new Item."""
231
self.service = service
232
item_object = service.bus.get_object(BUS_NAME, object_path)
233
self.item_iface = dbus.Interface(item_object,
234
dbus_interface=ITEM_IFACE)
237
"""Retrieve the secret for this item."""
240
def getsecret_handler(secret):
241
"""The secret for this item was found."""
242
# pylint: disable=W0612
243
session, parameters, value = secret
246
self.item_iface.GetSecret(self.service.session, byte_arrays=True,
247
reply_handler=getsecret_handler,
248
error_handler=d.errback)
252
"""Delete this item."""
255
def delete_handler(prompt):
256
"""The item was deleted, or a prompt should be shown first."""
258
self.service.do_prompt(prompt).chainDeferred(d)
262
self.item_iface.Delete(reply_handler=delete_handler,
263
error_handler=d.errback)