2
# This file is part of Checkbox.
4
# Copyright 2008 Canonical Ltd.
6
# Checkbox is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
11
# Checkbox is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
25
import dbus.mainloop.glib
27
from checkbox.lib.environ import append_path
29
from checkbox.properties import Int, String
32
class PermissionDeniedByPolicy(dbus.DBusException):
33
_dbus_error_name = "com.ubuntu.checkbox.PermissionDeniedByPolicy"
36
class UnknownRegistryException(dbus.DBusException):
37
_dbus_error_name = 'com.ubuntu.DeviceDriver.InvalidDriverDBException'
39
class UnknownTestException(dbus.DBusException):
40
_dbus_error_name = 'com.ubuntu.DeviceDriver.InvalidDriverDBException'
43
class BackendManager(dbus.service.Object):
45
DBUS_INTERFACE_NAME = "com.ubuntu.checkbox"
47
DBUS_BUS_NAME = "com.ubuntu.checkbox"
49
timeout = Int(default=600)
50
path = String(default="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
59
return "<BackendManager>"
61
def register(self, manager):
62
import dbus.mainloop.glib
63
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
64
self.bus = dbus.SystemBus()
65
self.dbus_name = dbus.service.BusName(self.DBUS_BUS_NAME, self.bus)
67
# Set environment which has been reset by dbus
68
for path in self.path.split(":"):
71
self._manager = manager
73
("test-.*", self.test_all),
75
self._manager.reactor.call_on(rt, rh)
77
@dbus.service.method(DBUS_INTERFACE_NAME,
78
in_signature="s", out_signature="s", sender_keyword="sender",
79
connection_keyword="conn")
80
def get_registry(self, name, sender=None, conn=None):
81
self._check_polkit_privilege(sender, conn, "com.ubuntu.checkbox.info")
82
if name not in self._manager.registry:
83
raise UnknownRegistryException, "Registry not found: %s" % name
85
return str(self._manager.registry[name])
87
@dbus.service.method(DBUS_INTERFACE_NAME,
88
in_signature="ss", out_signature="as", sender_keyword="sender",
89
connection_keyword="conn")
90
def get_test_result(self, suite, name, sender=None, conn=None):
91
self._check_polkit_privilege(sender, conn, "com.ubuntu.checkbox.test")
92
if (suite, name) not in self.tests:
93
raise UnknownTestException, \
94
"Suite/test not found: %s/%s" % (suite, name)
96
test = self.tests[(suite, name)]
97
result = test.command()
98
return (result.status, result.data, str(result.duration))
100
@dbus.service.method(DBUS_INTERFACE_NAME,
101
in_signature="ss", out_signature="s", sender_keyword="sender",
102
connection_keyword="conn")
103
def get_test_description(self, suite, name, sender=None, conn=None):
104
self._check_polkit_privilege(sender, conn, "com.ubuntu.checkbox.test")
105
if (suite, name) not in self.tests:
106
raise UnknownTestException, \
107
"Suite/test not found: %s/%s" % (suite, name)
109
test = self.tests[(suite, name)]
110
return test.description()
112
def test_all(self, test):
113
self.tests[(test.suite, test.name)] = test
116
self._manager.reactor.fire("gather")
118
# Delay instantiating the service after register
119
dbus.service.Object.__init__(self, self.bus, "/checkbox")
120
main_loop = gobject.MainLoop()
126
gobject.timeout_add(self.timeout * 1000, _t)
128
# run until we time out
134
def _check_polkit_privilege(self, sender, conn, privilege):
135
"""Verify that sender has a given PolicyKit privilege.
137
sender is the sender's (private) D-BUS name, such as ":1:42"
138
(sender_keyword in @dbus.service.methods). conn is
139
the dbus.Connection object (connection_keyword in
140
@dbus.service.methods). privilege is the PolicyKit privilege string.
142
This method returns if the caller is privileged, and otherwise throws a
143
PermissionDeniedByPolicy exception.
145
if sender is None and conn is None:
146
# called locally, not through D-BUS
150
if self.dbus_info is None:
151
self.dbus_info = dbus.Interface(conn.get_object("org.freedesktop.DBus",
152
"/org/freedesktop/DBus/Bus", False), "org.freedesktop.DBus")
153
pid = self.dbus_info.GetConnectionUnixProcessID(sender)
156
if self.polkit is None:
157
self.polkit = dbus.Interface(dbus.SystemBus().get_object(
158
"org.freedesktop.PolicyKit", "/", False),
159
"org.freedesktop.PolicyKit")
161
res = self.polkit.IsProcessAuthorized(privilege, pid, True)
162
except dbus.DBusException, e:
163
if e._dbus_error_name == "org.freedesktop.DBus.Error.ServiceUnknown":
164
# polkitd timed out, connect again
166
return self._check_polkit_privilege(sender, conn, privilege)
171
logging.debug("_check_polkit_privilege: sender %s "
172
"on connection %s pid %i requested %s: %s",
173
sender, conn, pid, privilege, res)
174
raise PermissionDeniedByPolicy(privilege + " " + res)
177
factory = BackendManager