~flexiondotorg/aptdaemon/aptdaemon-lp1623856

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Provides access to PolicyKit privilege mangement using gdefer Deferreds."""
# Copyright (C) 2008-2009 Sebastian Heinlein <devel@glatzor.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

__author__ = "Sebastian Heinlein <devel@glatzor.de>"

__all__ = ("check_authorization_by_name", "check_authorization_by_pid",
           "get_pid_from_dbus_name", "get_uid_from_dbus_name",
           "CHECK_AUTH_ALLOW_USER_INTERACTION", "CHECK_AUTH_NONE",
           "PK_ACTION_ADD_REMOVE_VENDOR_KEY", "PK_ACTION_CANCEL_FOREIGN",
           "PK_ACTION_CHANGE_REPOSITORY",
           "PK_ACTION_CHANGE_CONIFG",
           "PK_ACTION_GET_TRUSTED_VENDOR_KEYS",
           "PK_ACTION_INSTALL_FILE",
           "PK_ACTION_INSTALL_OR_REMOVE_PACKAGES",
           "PK_ACTION_INSTALL_PACKAGES_FROM_NEW_REPO",
           "PK_ACTION_INSTALL_PACKAGES_FROM_HIGH_TRUST_REPO",
           "PK_ACTION_INSTALL_PURCHASED_PACKAGES",
           "PK_ACTION_UPDATE_CACHE", "PK_ACTION_UPGRADE_PACKAGES",
           "PK_ACTION_SET_PROXY", "PK_ACTION_CLEAN")

import dbus

from defer import Deferred, inline_callbacks, return_value
from .errors import NotAuthorizedError, AuthorizationFailed

PK_ACTION_INSTALL_OR_REMOVE_PACKAGES = (
    "org.debian.apt.install-or-remove-packages")
PK_ACTION_INSTALL_PURCHASED_PACKAGES = (
    "org.debian.apt.install-purchased-packages")
PK_ACTION_INSTALL_PACKAGES_FROM_NEW_REPO = (
    "org.debian.apt.install-packages-from-new-repo")
PK_ACTION_INSTALL_PACKAGES_FROM_HIGH_TRUST_REPO = (
    "org.debian.apt.install-packages.high-trust-repo")
PK_ACTION_INSTALL_FILE = "org.debian.apt.install-file"
PK_ACTION_UPGRADE_PACKAGES = "org.debian.apt.upgrade-packages"
PK_ACTION_UPDATE_CACHE = "org.debian.apt.update-cache"
PK_ACTION_CANCEL_FOREIGN = "org.debian.apt.cancel-foreign"
PK_ACTION_GET_TRUSTED_VENDOR_KEYS = "org.debian.apt.get-trusted-vendor-keys"
PK_ACTION_CHANGE_REPOSITORY = "org.debian.apt.change-repository"
PK_ACTION_CHANGE_CONFIG = "org.debian.apt.change-config"
PK_ACTION_SET_PROXY = "org.debian.apt.set-proxy"
PK_ACTION_CLEAN = "org.debian.apt.clean"

CHECK_AUTH_NONE = 0
CHECK_AUTH_ALLOW_USER_INTERACTION = 1


def check_authorization_by_name(dbus_name, action_id, timeout=86400, bus=None,
                                flags=None):
    """Check if the given sender is authorized for the specified action.

    If the sender is not authorized raise NotAuthorizedError.

    Keyword arguments:
    dbus_name -- D-Bus name of the subject
    action_id -- the PolicyKit policy name of the action
    timeout -- time in seconds for the user to authenticate
    bus -- the D-Bus connection (defaults to the system bus)
    flags -- optional flags to control the authentication process
    """
    subject = ("system-bus-name", {"name": dbus_name})
    return _check_authorization(subject, action_id, timeout, bus, flags)


def check_authorization_by_pid(pid, action_id, timeout=86400, bus=None,
                               flags=None):
    """Check if the given process is authorized for the specified action.

    If the sender is not authorized raise NotAuthorizedError.

    Keyword arguments:
    pid -- id of the process
    action_id -- the PolicyKit policy name of the action
    timeout -- time in seconds for the user to authenticate
    bus -- the D-Bus connection (defaults to the system bus)
    flags -- optional flags to control the authentication process
    """
    subject = ("unix-process", {"pid": pid})
    return _check_authorization(subject, action_id, timeout, bus, flags)


def _check_authorization(subject, action_id, timeout, bus, flags=None):
    def policykit_done(xxx_todo_changeme):
        (authorized, challenged, auth_details) = xxx_todo_changeme
        if authorized:
            deferred.callback(auth_details)
        elif challenged:
            deferred.errback(AuthorizationFailed(subject, action_id))
        else:
            deferred.errback(NotAuthorizedError(subject, action_id))
    if not bus:
        bus = dbus.SystemBus()
    # Set the default flags
    if flags is None:
        flags = CHECK_AUTH_ALLOW_USER_INTERACTION
    deferred = Deferred()
    pk = bus.get_object("org.freedesktop.PolicyKit1",
                        "/org/freedesktop/PolicyKit1/Authority")
    details = {}
    pk.CheckAuthorization(
        subject, action_id, details, flags, "",
        dbus_interface="org.freedesktop.PolicyKit1.Authority",
        timeout=timeout,
        reply_handler=policykit_done,
        error_handler=deferred.errback)
    return deferred


def get_pid_from_dbus_name(dbus_name, bus=None):
    """Return a deferred that gets the id of process owning the given
    system D-Bus name.
    """
    if not bus:
        bus = dbus.SystemBus()
    deferred = Deferred()
    bus_obj = bus.get_object("org.freedesktop.DBus",
                             "/org/freedesktop/DBus/Bus")
    bus_obj.GetConnectionUnixProcessID(dbus_name,
                                       dbus_interface="org.freedesktop.DBus",
                                       reply_handler=deferred.callback,
                                       error_handler=deferred.errback)
    return deferred


@inline_callbacks
def get_uid_from_dbus_name(dbus_name, bus=None):
    """Return a deferred that gets the uid of the user owning the given
    system D-Bus name.
    """
    if not bus:
        bus = dbus.SystemBus()
    pid = yield get_pid_from_dbus_name(dbus_name, bus)
    with open("/proc/%s/status" % pid) as proc:
        values = [v for v in proc.readlines() if v.startswith("Uid:")]
    uid = int(values[0].split()[1])
    return_value(uid)


@inline_callbacks
def get_proc_info_from_dbus_name(dbus_name, bus=None):
    """Return a deferred that gets the pid, the uid of the user owning the
    given system D-Bus name and its command line.
    """
    if not bus:
        bus = dbus.SystemBus()
    pid = yield get_pid_from_dbus_name(dbus_name, bus)
    with open("/proc/%s/status" % pid) as proc:
        lines = proc.readlines()
        uid_values = [v for v in lines if v.startswith("Uid:")]
        gid_values = [v for v in lines if v.startswith("Gid:")]
    # instead of ", encoding='utf8'" we use the "rb"/decode() here for
    # py2 compatibility
    with open("/proc/%s/cmdline" % pid, "rb") as cmdline_file:
        cmdline = cmdline_file.read().decode("utf-8")
    uid = int(uid_values[0].split()[1])
    gid = int(gid_values[0].split()[1])
    return_value((pid, uid, gid, cmdline))

# vim:ts=4:sw=4:et