~ubuntu-branches/ubuntu/natty/landscape-client/natty-updates

« back to all changes in this revision

Viewing changes to landscape/ui/lib/polkit.py

  • Committer: Package Import Robot
  • Author(s): Andreas Hasenack
  • Date: 2012-03-19 09:33:34 UTC
  • mto: (1.2.10)
  • mto: This revision was merged to the branch mainline in revision 39.
  • Revision ID: package-import@ubuntu.com-20120319093334-oxjttz163vvfgq8s
ImportĀ upstreamĀ versionĀ 12.04

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import dbus
 
2
import dbus.service
 
3
import dbus.glib
 
4
 
 
5
from gi.repository import GObject
 
6
 
 
7
 
 
8
POLICY_NAME = "com.canonical.LandscapeClientSettings.configure"
 
9
 
 
10
 
 
11
class PolicyKitMechanism(dbus.service.Object):
 
12
    """
 
13
    L{PolicyKitMechanism} is a specialised L{dbus.service.Object} which
 
14
    provides PolicyKit authorization checks for a provided DBus bus name and
 
15
    object path.  Subclasses must therefore call l{__init__} here with their
 
16
    object path, bus name and an error class to be raised when permission
 
17
    escalation fails.
 
18
 
 
19
    @type object_path: string
 
20
    @param object_path: The object path to register the subclass with.
 
21
    @type bus_name: dbus.service.BusName
 
22
    @param bus_name: The L{BusName} to the register the subclass with.
 
23
    @type permission_error: dbus.DBusException
 
24
    @param permission_error: A L{dbus.DBusException} to be raised when
 
25
        PolicyKit authorisation fails for the client.
 
26
    """
 
27
 
 
28
    def __init__(self, object_path, bus_name, permission_error,
 
29
                 bypass=False, conn=None):
 
30
        super(PolicyKitMechanism, self).__init__(
 
31
            conn, object_path, bus_name)
 
32
        self.permission_error = permission_error
 
33
        self.dbus_info = None
 
34
        self.polkit = None
 
35
        self.bypass = bypass
 
36
 
 
37
    def _get_polkit_authorization(self, pid, privilege):
 
38
        """
 
39
        Check that the process with id L{pid} is allowed, by policy to utilise
 
40
        the L{privilege }.  If the class was initialised with L{bypass}=True
 
41
        then just say it was authorised without checking (useful for testing).
 
42
        """
 
43
        if self.bypass:
 
44
            return (True, None, "Bypass")
 
45
        polkit = dbus.Interface(dbus.SystemBus().get_object(
 
46
                'org.freedesktop.PolicyKit1',
 
47
                '/org/freedesktop/PolicyKit1/Authority', False),
 
48
                'org.freedesktop.PolicyKit1.Authority')
 
49
        subject = ('unix-process',
 
50
                   {'pid': dbus.UInt32(pid, variant_level=1),
 
51
                    'start-time': dbus.UInt64(0, variant_level=1)})
 
52
        action_id = privilege
 
53
        details = {"": ""}  # <- empty strings allow type inference
 
54
        flags = dbus.UInt32(1)
 
55
        cancellation_id = ""
 
56
        return polkit.CheckAuthorization(
 
57
            subject,
 
58
            action_id,
 
59
            details,
 
60
            flags,
 
61
            cancellation_id,
 
62
            timeout=15)
 
63
 
 
64
    def _get_peer_pid(self, sender, conn):
 
65
        """
 
66
        Get the process ID of the L{sender}.
 
67
        """
 
68
        if self.dbus_info is None:
 
69
            self.dbus_info = dbus.Interface(
 
70
                conn.get_object('org.freedesktop.DBus',
 
71
                '/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
 
72
        return self.dbus_info.GetConnectionUnixProcessID(sender)
 
73
 
 
74
    def _is_local_call(self, sender, conn):
 
75
        """
 
76
        Check if this is a local call, implying it is within a secure context.
 
77
        """
 
78
        return (sender is None and conn is None)
 
79
 
 
80
    def _is_allowed_by_policy(self, sender, conn, privilege):
 
81
        """
 
82
        Check if we are already in a secure context, and if not check if the
 
83
        policy associated with L{privilege} both exists and allows the peer to
 
84
        utilise it.  As a side effect, if escalation of privileges is required
 
85
        then this will occur and a challenge will be generated if needs be.
 
86
        """
 
87
        if self._is_local_call(sender, conn):
 
88
            return True
 
89
        peer_pid = self._get_peer_pid(sender, conn)
 
90
        (is_auth, _, details) = self._get_polkit_authorization(peer_pid,
 
91
                                                               privilege)
 
92
        if not is_auth:
 
93
            raise self.permission_error(privilege)
 
94
        return True
 
95
 
 
96
 
 
97
def listen():
 
98
    """
 
99
    Invoke a L{gobject.MainLoop} to process incoming DBus events.
 
100
    """
 
101
    mainloop = GObject.MainLoop()
 
102
    mainloop.run()