5
from gi.repository import GObject
8
POLICY_NAME = "com.canonical.LandscapeClientSettings.configure"
11
class PolicyKitMechanism(dbus.service.Object):
13
L{PolicyKitMechanism} is a specialised L{dbus.service.Object} which
14
provides PolicyKit authorization checks for a provided DBus bus name and
15
object path. Subclasses must therefore call l{__init__} here with their
16
object path, bus name and an error class to be raised when permission
19
@type object_path: string
20
@param object_path: The object path to register the subclass with.
21
@type bus_name: dbus.service.BusName
22
@param bus_name: The L{BusName} to the register the subclass with.
23
@type permission_error: dbus.DBusException
24
@param permission_error: A L{dbus.DBusException} to be raised when
25
PolicyKit authorisation fails for the client.
28
def __init__(self, object_path, bus_name, permission_error,
29
bypass=False, conn=None):
30
super(PolicyKitMechanism, self).__init__(
31
conn, object_path, bus_name)
32
self.permission_error = permission_error
37
def _get_polkit_authorization(self, pid, privilege):
39
Check that the process with id L{pid} is allowed, by policy to utilise
40
the L{privilege }. If the class was initialised with L{bypass}=True
41
then just say it was authorised without checking (useful for testing).
44
return (True, None, "Bypass")
45
polkit = dbus.Interface(dbus.SystemBus().get_object(
46
'org.freedesktop.PolicyKit1',
47
'/org/freedesktop/PolicyKit1/Authority', False),
48
'org.freedesktop.PolicyKit1.Authority')
49
subject = ('unix-process',
50
{'pid': dbus.UInt32(pid, variant_level=1),
51
'start-time': dbus.UInt64(0, variant_level=1)})
53
details = {"": ""} # <- empty strings allow type inference
54
flags = dbus.UInt32(1)
56
return polkit.CheckAuthorization(
64
def _get_peer_pid(self, sender, conn):
66
Get the process ID of the L{sender}.
68
if self.dbus_info is None:
69
self.dbus_info = dbus.Interface(
70
conn.get_object('org.freedesktop.DBus',
71
'/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
72
return self.dbus_info.GetConnectionUnixProcessID(sender)
74
def _is_local_call(self, sender, conn):
76
Check if this is a local call, implying it is within a secure context.
78
return (sender is None and conn is None)
80
def _is_allowed_by_policy(self, sender, conn, privilege):
82
Check if we are already in a secure context, and if not check if the
83
policy associated with L{privilege} both exists and allows the peer to
84
utilise it. As a side effect, if escalation of privileges is required
85
then this will occur and a challenge will be generated if needs be.
87
if self._is_local_call(sender, conn):
89
peer_pid = self._get_peer_pid(sender, conn)
90
(is_auth, _, details) = self._get_polkit_authorization(peer_pid,
93
raise self.permission_error(privilege)
99
Invoke a L{gobject.MainLoop} to process incoming DBus events.
101
mainloop = GObject.MainLoop()