7
class PolicyKitMechanism(dbus.service.Object):
9
L{PolicyKitMechanism} is a specialised L{dbus.service.Object} which
10
provides PolicyKit authorization checks for a provided DBus bus name and
11
object path. Subclasses must therefore call l{__init__} here with their
12
object path, bus name and an error class to be raised when permission
15
@type object_path: string
16
@param object_path: The object path to register the subclass with.
17
@type bus_name: dbus.service.BusName
18
@param bus_name: The L{BusName} to the register the subclass with.
19
@type permission_error: dbus.DBusException
20
@param permission_error: A L{dbus.DBusException} to be raised when
21
PolicyKit authorisation fails for the client.
24
def __init__(self, object_path, bus_name, permission_error,
25
bypass=False, conn=None):
26
super(PolicyKitMechanism, self).__init__(
27
conn, object_path, bus_name)
28
self.permission_error = permission_error
33
def _get_polkit_authorization(self, pid, privilege):
35
Check that the process with id L{pid} is allowed, by policy to utilise
36
the L{privilege }. If the class was initialised with L{bypass}=True
37
then just say it was authorised without checking (useful for testing).
40
return (True, None, "Bypass")
41
polkit = self._get_polkit()
43
subject = ('unix-process',
44
{'pid': dbus.UInt32(pid, variant_level=1),
45
'start-time': dbus.UInt64(0, variant_level=1)})
47
details = {"": ""} # <- empty strings allow type inference
48
flags = dbus.UInt32(1)
50
return polkit.CheckAuthorization(
57
except dbus.DBusException, err:
58
if (err._dbus_error_name ==
59
'org.freedesktop.DBus.Error.ServiceUnknown'):
60
# This occurs on timeouts, so we retry
62
return self._get_polkit_authorization(pid, privilege)
66
def _get_peer_pid(self, sender, conn):
68
Get the process ID of the L{sender}.
70
if self.dbus_info is None:
71
self.dbus_info = dbus.Interface(
72
conn.get_object('org.freedesktop.DBus',
73
'/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
74
return self.dbus_info.GetConnectionUnixProcessID(sender)
76
def _get_polkit(self):
78
Connect to polkitd via DBus and return the interface.
80
if self.polkit is None:
81
self.polkit = dbus.Interface(dbus.SystemBus().get_object(
82
'org.freedesktop.PolicyKit1',
83
'/org/freedesktop/PolicyKit1/Authority', False),
84
'org.freedesktop.PolicyKit1.Authority')
87
def _is_local_call(self, sender, conn):
89
Check if this is a local call, implying it is within a secure context.
91
return (sender is None and conn is None)
93
def _is_allowed_by_policy(self, sender, conn, privilege):
95
Check if we are already in a secure context, and if not check if the
96
policy associated with L{privilege} both exists and allows the peer to
97
utilise it. As a side effect, if escalation of privileges is required
98
then this will occur and a challenge will be generated if needs be.
100
if self._is_local_call(sender, conn):
102
peer_pid = self._get_peer_pid(sender, conn)
103
(is_auth, _, details) = self._get_polkit_authorization(peer_pid,
106
raise self.permission_error(privilege)
112
Invoke a L{gobject.MainLoop} to process incoming DBus events.
114
mainloop = gobject.MainLoop()