1
# -*- coding: utf-8 -*-
3
# Author: Alejandro J. Cura <alecu@canonical.com>
5
# Copyright 2010 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
19
Provides a twisted interface to access the system keyring via DBus.
20
Implements the Secrets Service API Draft:
21
* http://code.confuego.org/secrets-xdg-specs/
26
from dbus.mainloop.glib import DBusGMainLoop
27
import dbus.mainloop.glib
28
from twisted.internet.defer import Deferred
30
gobject.threads_init()
31
dbus.mainloop.glib.threads_init()
32
DBusGMainLoop(set_as_default=True)
34
BUS_NAME = "org.gnome.keyring"
35
SERVICE_IFACE = "org.freedesktop.Secret.Service"
36
PROMPT_IFACE = "org.freedesktop.Secret.Prompt"
37
SESSION_IFACE = "org.freedesktop.Secret.Session"
38
COLLECTION_IFACE = "org.freedesktop.Secret.Collection"
39
ITEM_IFACE = "org.freedesktop.Secret.Item"
40
SECRETS_SERVICE = "/org/freedesktop/secrets"
41
DEFAULT_COLLECTION = "/org/freedesktop/secrets/aliases/default"
44
LABEL_PROPERTY = "Label"
45
ATTRIBUTES_PROPERTY = "Attributes"
48
class UserCancelled(Exception):
49
"""The user cancelled a prompt."""
56
class SecretService(object):
57
"""The Secret Service manages all the sessions and collections."""
63
def open_session(self, window_id=0):
64
"""Open a unique session for the caller application."""
67
self.window_id = str(window_id)
68
self.bus = dbus.SessionBus()
69
service_object = self.bus.get_object(BUS_NAME, SECRETS_SERVICE)
70
self.service = dbus.Interface(service_object,
71
dbus_interface=SERVICE_IFACE)
73
def session_opened(result, session):
74
"""The session was successfully opened."""
75
self.session = self.bus.get_object(BUS_NAME, session)
78
parameters = dbus.String(ALGORITHM_PARAMS, variant_level=1)
79
self.service.OpenSession(ALGORITHM, parameters,
80
reply_handler=session_opened,
81
error_handler=d.errback)
82
except dbus.exceptions.DBusException, e:
86
def do_prompt(self, prompt_path):
87
"""Show a prompt given its path."""
89
prompt_object = self.bus.get_object(BUS_NAME, prompt_path)
90
prompt = dbus.Interface(prompt_object, dbus_interface=PROMPT_IFACE)
92
def prompt_completed(dismissed, result):
93
"""The prompt was either completed or dismissed."""
96
d.errback(UserCancelled())
100
sigcompleted = prompt.connect_to_signal("Completed", prompt_completed)
101
prompt.Prompt(self.window_id,
103
error_handler=d.errback)
106
def make_item_list(self, object_path_list):
107
"""Make a list of items given their paths."""
108
return [Item(self, o) for o in object_path_list]
110
def search_items(self, attributes):
111
"""Find items in any collection."""
112
# TODO: check if the lists of unlocked items found should be merged
115
def unlock_handler(unlocked, prompt):
116
"""The items were unlocked, or a prompt should be shown first."""
118
self.do_prompt(prompt).chainDeferred(d)
122
def items_found(unlocked, locked):
123
"""Called with two lists of found items."""
125
self.service.Unlock(locked,
126
reply_handler=unlock_handler,
127
error_handler=d.errback)
131
self.service.SearchItems(attributes,
132
reply_handler=items_found,
133
error_handler=d.errback)
134
d.addCallback(self.make_item_list)
137
def create_collection(self, label):
138
"""Create a new collection with the specified properties."""
141
def createcollection_handler(collection, prompt):
142
"""A collection was created, or a prompt should be shown first."""
144
self.do_prompt(prompt).chainDeferred(d)
146
d.callback(collection)
148
properties = {LABEL_PROPERTY: dbus.String(label, variant_level=1)}
149
self.service.CreateCollection(properties,
150
reply_handler=createcollection_handler,
151
error_handler=d.errback)
153
d.addCallback(lambda p: Collection(self, p))
156
def get_default_collection(self):
157
"""The collection were default items should be created."""
158
return Collection(self, DEFAULT_COLLECTION)
161
class Collection(object):
162
"""A collection of items containing secrets."""
164
def __init__(self, service, object_path):
165
"""Initialize a new collection."""
166
self.service = service
167
collection_object = service.bus.get_object(BUS_NAME, object_path)
168
self.collection_iface = dbus.Interface(collection_object,
169
dbus_interface=COLLECTION_IFACE)
171
def create_item(self, label, attr, value, replace=True):
172
"""Create an item with the given attributes, secret and label.
174
If replace is set, then it replaces an item already present with the
175
same values for the attributes.
179
def createitem_handler(item, prompt):
180
"""An item was created, or a prompt should be shown first."""
182
self.service.do_prompt(prompt).chainDeferred(d)
186
properties = dbus.Dictionary(signature="sv")
187
properties[LABEL_PROPERTY] = label
188
attributes = dbus.Dictionary(attr, signature="ss")
189
properties[ATTRIBUTES_PROPERTY] = attributes
190
parameters = dbus.ByteArray(ALGORITHM_PARAMS)
191
value_bytes = dbus.ByteArray(value)
192
secret = (self.service.session, parameters, value_bytes)
193
self.collection_iface.CreateItem(properties, secret, replace,
194
reply_handler=createitem_handler,
195
error_handler=d.errback)
200
"""An item contains a secret, lookup attributes and has a label."""
202
def __init__(self, service, object_path):
203
"""Initialize this new Item."""
204
self.service = service
205
item_object = service.bus.get_object(BUS_NAME, object_path)
206
self.item_iface = dbus.Interface(item_object,
207
dbus_interface=ITEM_IFACE)
210
"""Retrieve the secret for this item."""
213
def getsecret_handler(secret):
214
"""The secret for this item was found."""
215
# pylint: disable=W0612
216
session, parameters, value = secret
219
self.item_iface.GetSecret(self.service.session, byte_arrays=True,
220
reply_handler=getsecret_handler,
221
error_handler=d.errback)
225
"""Delete this item."""
228
def delete_handler(prompt):
229
"""The item was deleted, or a prompt should be shown first."""
231
self.service.do_prompt(prompt).chainDeferred(d)
235
self.item_iface.Delete(reply_handler=delete_handler,
236
error_handler=d.errback)