~jamesh/storage-provider-webdav/landing-20161118

« back to all changes in this revision

Viewing changes to tests/utils/fake-online-accounts-daemon.py

  • Committer: Tarmac
  • Author(s): James Henstridge
  • Date: 2016-09-23 05:03:11 UTC
  • mfrom: (7.1.13 provider-test-fixture)
  • Revision ID: tarmac-20160923050311-bb3m6zal58cpccqu
Add test infrastructure for testing DavProvider.

Approved by Michi Henning, unity-api-1-bot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python3
 
2
# Copyright (C) 2016 Canonical Ltd
 
3
#
 
4
# This program is free software: you can redistribute it and/or modify
 
5
# it under the terms of the GNU Lesser General Public License version 3 as
 
6
# published by the Free Software Foundation.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU Lesser General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU Lesser General Public License
 
14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
15
#
 
16
# Authors: James Henstridge <james.henstridge@canonical.com>
 
17
 
 
18
"""A fake version of the OnlineAccounts D-Bus service."""
 
19
 
 
20
import os
 
21
import sys
 
22
 
 
23
import dbus.service
 
24
import dbus.mainloop.glib
 
25
from gi.repository import GLib
 
26
 
 
27
BUS_NAME = "com.ubuntu.OnlineAccounts.Manager"
 
28
OBJECT_PATH = "/com/ubuntu/OnlineAccounts/Manager"
 
29
OA_IFACE = "com.ubuntu.OnlineAccounts.Manager"
 
30
 
 
31
INTERFACE_XML = os.path.join(os.path.dirname(__file__),
 
32
                             "com.ubuntu.OnlineAccounts.Manager.xml")
 
33
 
 
34
AUTH_OAUTH1 = 1
 
35
AUTH_OAUTH2 = 2
 
36
AUTH_PASSWORD = 3
 
37
AUTH_SASL = 4
 
38
 
 
39
class OAuth1:
 
40
    method = AUTH_OAUTH1
 
41
    def __init__(self, consumer_key, consumer_secret, token, token_secret, signature_method="HMAC-SHA1"):
 
42
        self.consumer_key = consumer_key
 
43
        self.consumer_secret = consumer_secret
 
44
        self.token = token
 
45
        self.token_secret = token_secret
 
46
        self.signature_method = signature_method
 
47
 
 
48
    def serialise(self):
 
49
        return dbus.Dictionary({
 
50
            "ConsumerKey": dbus.String(self.consumer_key),
 
51
            "ConsumerSecret": dbus.String(self.consumer_secret),
 
52
            "Token": dbus.String(self.token),
 
53
            "TokenSecret": dbus.String(self.token_secret),
 
54
            "SignatureMethod": dbus.String(self.signature_method),
 
55
        }, signature="sv")
 
56
 
 
57
class OAuth2:
 
58
    method = AUTH_OAUTH2
 
59
    def __init__(self, access_token, expires_in=0, granted_scopes=[]):
 
60
        self.access_token = access_token
 
61
        self.expires_in = expires_in
 
62
        self.granted_scopes = granted_scopes
 
63
 
 
64
    def serialise(self):
 
65
        return dbus.Dictionary({
 
66
            "AccessToken": dbus.String(self.access_token),
 
67
            "ExpiresIn": dbus.Int32(self.expires_in),
 
68
            "GrantedScopes": dbus.Array(self.granted_scopes, signature="s"),
 
69
        }, signature="sv")
 
70
 
 
71
class Password:
 
72
    method = AUTH_PASSWORD
 
73
    def __init__(self, username, password):
 
74
        self.username = username
 
75
        self.password = password
 
76
 
 
77
    def serialise(self):
 
78
        return dbus.Dictionary({
 
79
            "Username": dbus.String(self.username),
 
80
            "Password": dbus.String(self.password),
 
81
        }, signature="sv")
 
82
 
 
83
class Account:
 
84
    def __init__(self, account_id, display_name, service_id, credentials):
 
85
        self.account_id = account_id
 
86
        self.display_name = display_name
 
87
        self.service_id = service_id
 
88
        self.credentials = credentials
 
89
 
 
90
    def serialise(self):
 
91
        return (dbus.UInt32(self.account_id), dbus.Dictionary({
 
92
            "displayName": dbus.String(self.display_name),
 
93
            "serviceId": dbus.String(self.service_id),
 
94
            "authMethod": dbus.Int32(self.credentials.method),
 
95
            }, signature="sv"))
 
96
 
 
97
class Manager(dbus.service.Object):
 
98
    def __init__(self, connection, object_path, accounts):
 
99
        super(Manager, self).__init__(connection, object_path)
 
100
        self.accounts = accounts
 
101
 
 
102
    @dbus.service.method(dbus_interface=OA_IFACE,
 
103
                         in_signature="a{sv}", out_signature="a(ua{sv})")
 
104
    def GetAccounts(self, filters):
 
105
        print("GetAccounts %r" % filters)
 
106
        sys.stdout.flush()
 
107
        return dbus.Array([a.serialise() for a in self.accounts],
 
108
                          signature="a(ua{sv})")
 
109
 
 
110
    @dbus.service.method(dbus_interface=OA_IFACE,
 
111
                         in_signature="usbba{sv}", out_signature="a{sv}")
 
112
    def Authenticate(self, account_id, service_id, interactive, invalidate, parameters):
 
113
        print("Authenticate %r %r %r %r %r" % (account_id, service_id, interactive, invalidate, parameters))
 
114
        sys.stdout.flush()
 
115
        for account in self.accounts:
 
116
            if account.account_id == account_id and account.service_id == service_id:
 
117
                return account.credentials.serialise()
 
118
        else:
 
119
            raise KeyError(repr((account_id, service_id)))
 
120
 
 
121
    @dbus.service.method(dbus_interface=OA_IFACE,
 
122
                         in_signature="sa{sv}", out_signature="(ua{sv})a{sv}")
 
123
    def RequestAccess(self, service_id, parameters):
 
124
        print("RequestAccess %r %r" % (service_id, parameters))
 
125
        sys.stdout.flush()
 
126
        for account in self.accounts:
 
127
            if account.service_id == service_id:
 
128
                return (account.serialise(),
 
129
                        account.credentials.serialise())
 
130
        else:
 
131
            raise KeyError(service_id)
 
132
 
 
133
class Server:
 
134
    def __init__(self, accounts):
 
135
        self.accounts = accounts
 
136
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
137
        self.main_loop = GLib.MainLoop()
 
138
        self.connection = dbus.SessionBus()
 
139
        # Quit when the bus disconnectes
 
140
        self.connection.add_signal_receiver(
 
141
            self.main_loop.quit, signal_name="Disconnected",
 
142
            path="/org/freedesktop/DBus/Local",
 
143
            dbus_interface="org.freedesktop.DBus.Local")
 
144
        self.manager = Manager(self.connection, OBJECT_PATH, self.accounts)
 
145
        self.bus_name = dbus.service.BusName(BUS_NAME, self.connection,
 
146
                                             allow_replacement=True,
 
147
                                             replace_existing=True,
 
148
                                             do_not_queue=True)
 
149
 
 
150
    def run(self):
 
151
        try:
 
152
            self.main_loop.run()
 
153
        except KeyboardInterrupt:
 
154
            pass
 
155
 
 
156
if __name__ == "__main__":
 
157
    accounts = [
 
158
        Account(1, "OAuth1 account", "oauth1-service",
 
159
                OAuth1("consumer_key", "consumer_secret", "token", "token_secret")),
 
160
        Account(2, "OAuth2 account", "oauth2-service",
 
161
                OAuth2("access_token", 0, ["scope1", "scope2"])),
 
162
        Account(3, "Password account", "password-service",
 
163
                Password("user", "pass")),
 
164
        Account(42, "Fake google account", "google-drive-scope",
 
165
                OAuth2("fake-google-access-token", 0, [])),
 
166
        Account(99, "Fake mcloud account", "com.canonical.scopes.mcloud_mcloud_mcloud",
 
167
                OAuth2("fake-mcloud-access-token", 0, [])),
 
168
    ]
 
169
    server = Server(accounts)
 
170
    server.run()