1
# -*- coding: utf-8 -*-
3
# Copyright 2011-2012 Canonical Ltd.
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU General Public License version 3, as published
7
# by the Free Software Foundation.
9
# This program is distributed in the hope that it will be useful, but
10
# WITHOUT ANY WARRANTY; without even the implied warranties of
11
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12
# PURPOSE. See the GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License along
15
# with this program. If not, see <http://www.gnu.org/licenses/>.
17
# In addition, as a special exception, the copyright holders give
18
# permission to link the code of portions of this program with the
19
# OpenSSL library under certain conditions as described in each
20
# individual source file, and distribute linked combinations
22
# You must obey the GNU General Public License in all respects
23
# for all of the code used other than OpenSSL. If you modify
24
# file(s) with this exception, you may extend this exception to your
25
# version of the file(s), but you are not obligated to do so. If you
26
# do not wish to do so, delete this exception statement from your
27
# version. If you delete this exception statement from all source
28
# files in the program, then also delete it here.
29
"""Magicicada credentials management IPC service."""
32
from ubuntu_sso.main import get_sso_client
33
from twisted.internet import defer
35
from ubuntuone.platform.credentials import (
43
class RemovableSignal(object):
44
"""A signal that can be removed."""
46
def __init__(self, proxy, signal_name, callback):
47
"""Initialize this instance."""
49
self.signal_name = signal_name
50
self.callback = callback
51
setattr(self.proxy, signal_name, self)
53
def __call__(self, *args, **kwargs):
54
"""Call this instance."""
55
app_name = args[0] if len(args) > 0 else None
56
logger.debug('Handling signal_name: %r, app_name: %r.',
57
self.signal_name, app_name)
59
if app_name != APP_NAME:
60
# This fixed bug #818190: filter signals not related to APP_NAME
61
logger.info('Received %r but app_name %r does not match %r, '
62
'exiting.', self.signal_name, app_name, APP_NAME)
65
if self.callback is not None:
66
# drop the app name, callers do not care about it
68
logger.debug('Calling %r with %d args and %d kwargs.',
69
self.callback, len(args), len(kwargs))
70
return self.callback(*args, **kwargs)
73
"""Remove this signal."""
74
if getattr(self.proxy, self.signal_name, False):
75
setattr(self.proxy, self.signal_name, None)
78
class CredentialsManagement(object):
79
"""Object that manages Magicicada credentials."""
81
_SIGNAL_TO_CALLBACK_MAPPING = {
82
'AuthorizationDenied': 'on_authorization_denied_cb',
83
'CredentialsCleared': 'on_credentials_cleared_cb',
84
'CredentialsError': 'on_credentials_error_cb',
85
'CredentialsFound': 'on_credentials_found_cb',
86
'CredentialsNotFound': 'on_credentials_not_found_cb',
87
'CredentialsStored': 'on_credentials_stored_cb',
90
def __init__(self, proxy, *args, **kwargs):
91
super(CredentialsManagement, self).__init__(*args, **kwargs)
92
self.sso_proxy = proxy
94
def connect_to_signal(self, signal_name, callback):
95
"""Register 'callback' to be called when 'signal_name' is emitted."""
96
cb_name = self._SIGNAL_TO_CALLBACK_MAPPING[signal_name]
97
match = RemovableSignal(self.sso_proxy, cb_name, callback)
100
def find_credentials(self, reply_handler=NO_OP, error_handler=NO_OP):
101
"""Ask the Magicicada credentials."""
102
d = self.sso_proxy.find_credentials(APP_NAME, {})
103
d.addCallbacks(lambda _: reply_handler(), error_handler)
105
def clear_credentials(self, reply_handler=NO_OP, error_handler=NO_OP):
106
"""Clear the Magicicada credentials."""
107
d = self.sso_proxy.clear_credentials(APP_NAME, {})
108
d.addCallbacks(lambda _: reply_handler(), error_handler)
110
def store_credentials(self, credentials,
111
reply_handler=NO_OP, error_handler=NO_OP):
112
"""Store the token for Magicicada application."""
113
d = self.sso_proxy.store_credentials(APP_NAME, credentials)
114
d.addCallbacks(lambda _: reply_handler(), error_handler)
116
def register(self, args, reply_handler=NO_OP, error_handler=NO_OP):
117
"""Get credentials if found else prompt to register to Magicicada."""
118
params = dict(UI_PARAMS)
120
d = self.sso_proxy.register(APP_NAME, params)
121
d.addCallbacks(lambda _: reply_handler(), error_handler)
123
def login(self, args, reply_handler=NO_OP, error_handler=NO_OP):
124
"""Get credentials if found else prompt to login to Magicicada."""
125
params = dict(UI_PARAMS)
127
d = self.sso_proxy.login(APP_NAME, params)
128
d.addCallbacks(lambda _: reply_handler(), error_handler)
130
def login_email_password(self, args,
131
reply_handler=NO_OP, error_handler=NO_OP):
132
"""Get credentials if found else login to Magicicada."""
133
params = dict(UI_PARAMS)
135
d = self.sso_proxy.login_email_password(APP_NAME, params)
136
d.addCallbacks(lambda _: reply_handler(), error_handler)
138
def register_to_credentials_stored(self, callback):
139
"""Register to the CredentialsStored dbus signal."""
140
return RemovableSignal(self.sso_proxy, "on_credentials_stored_cb",
143
def register_to_credentials_cleared(self, callback):
144
"""Register to the CredentialsCleared dbus signal."""
145
return RemovableSignal(self.sso_proxy, "on_credentials_cleared_cb",
148
def register_to_credentials_found(self, callback):
149
"""Register to the CredentialsFound dbus signal."""
150
return RemovableSignal(self.sso_proxy, "on_credentials_found_cb",
153
def register_to_credentials_not_found(self, callback):
154
"""Register to the CredentialsFound dbus signal."""
155
return RemovableSignal(self.sso_proxy, "on_credentials_not_found_cb",
158
def register_to_authorization_denied(self, callback):
159
"""Register to the AuthorizationDenied dbus signal."""
160
return RemovableSignal(self.sso_proxy, "on_authorization_denied_cb",
163
def register_to_credentials_error(self, callback):
164
"""Register to the CredentialsError dbus signal."""
165
return RemovableSignal(self.sso_proxy, "on_credentials_error_cb",
169
@defer.inlineCallbacks
170
def get_creds_proxy():
171
"""Get the CredentialsManagement proxy."""
172
client = yield get_sso_client()
173
result = CredentialsManagement(client.cred_manager)
174
defer.returnValue(result)