~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/manager/scriptexecution.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Functionality for running arbitrary shell scripts.
 
3
 
 
4
@var ALL_USERS: A token indicating all users should be allowed.
 
5
"""
 
6
import os
 
7
import pwd
 
8
import tempfile
 
9
import operator
 
10
import shutil
 
11
 
 
12
from twisted.internet.protocol import ProcessProtocol
 
13
from twisted.internet.defer import Deferred, fail
 
14
from twisted.internet.error import ProcessDone
 
15
 
 
16
from landscape.manager.manager import ManagerPlugin, SUCCEEDED, FAILED
 
17
 
 
18
 
 
19
ALL_USERS = object()
 
20
 
 
21
TIMEOUT_RESULT = 102
 
22
PROCESS_FAILED_RESULT = 103
 
23
 
 
24
 
 
25
class ProcessTimeLimitReachedError(Exception):
 
26
    """
 
27
    Raised when a process has been running for too long.
 
28
 
 
29
    @ivar data: The data that the process printed before reaching the time
 
30
        limit.
 
31
    """
 
32
 
 
33
    def __init__(self, data):
 
34
        self.data = data
 
35
 
 
36
 
 
37
class ProcessFailedError(Exception):
 
38
    """Raised when a process exits with a non-0 exit code.
 
39
 
 
40
    @ivar data: The data that the process printed before reaching the time
 
41
        limit.
 
42
    """
 
43
 
 
44
    def __init__(self, data):
 
45
        self.data = data
 
46
 
 
47
 
 
48
class ScriptExecution(ManagerPlugin):
 
49
    """A plugin which allows execution of arbitrary shell scripts.
 
50
 
 
51
    @ivar size_limit: The number of bytes at which to truncate process output.
 
52
    """
 
53
 
 
54
    size_limit = 500000
 
55
 
 
56
    def __init__(self, process_factory=None):
 
57
        """
 
58
        @param process_factory: The L{IReactorProcess} provider to run the
 
59
            process with.
 
60
        """
 
61
        if process_factory is None:
 
62
            from twisted.internet import reactor as process_factory
 
63
        self.process_factory = process_factory
 
64
 
 
65
    def register(self, registry):
 
66
        super(ScriptExecution, self).register(registry)
 
67
        registry.register_message("execute-script", self._handle_execute_script)
 
68
 
 
69
    def _respond(self, status, data, opid, result_code=None):
 
70
        message =  {"type": "operation-result",
 
71
                    "status": status,
 
72
                    "result-text": data,
 
73
                    "operation-id": opid}
 
74
        if result_code:
 
75
            message["result-code"] = result_code
 
76
        return self.registry.broker.send_message(message, True)
 
77
 
 
78
    def _handle_execute_script(self, message):
 
79
        opid = message["operation-id"]
 
80
        try:
 
81
            user = message["username"]
 
82
            allowed_users = self.registry.config.get_allowed_script_users()
 
83
            if allowed_users != ALL_USERS and user not in allowed_users:
 
84
                return self._respond(
 
85
                    FAILED,
 
86
                    u"Scripts cannot be run as user %s." % (user,),
 
87
                    opid)
 
88
 
 
89
            d = self.run_script(message["interpreter"], message["code"],
 
90
                                time_limit=message["time-limit"],
 
91
                                user=user, attachments=message["attachments"])
 
92
            d.addCallback(self._respond_success, opid)
 
93
            d.addErrback(self._respond_failure, opid)
 
94
            return d
 
95
        except Exception, e:
 
96
            self._respond(FAILED, self._format_exception(e), opid)
 
97
            raise
 
98
 
 
99
    def _format_exception(self, e):
 
100
        return u"%s: %s" % (e.__class__.__name__, e)
 
101
 
 
102
    def _respond_success(self, data, opid):
 
103
        return self._respond(SUCCEEDED, data, opid)
 
104
 
 
105
    def _respond_failure(self, failure, opid):
 
106
        code = None
 
107
        if failure.check(ProcessTimeLimitReachedError):
 
108
            code = TIMEOUT_RESULT
 
109
        elif failure.check(ProcessFailedError):
 
110
            code = PROCESS_FAILED_RESULT
 
111
        if code is not None:
 
112
            return self._respond(FAILED, failure.value.data, opid, code)
 
113
        else:
 
114
            return self._respond(FAILED, str(failure), opid)
 
115
 
 
116
    def run_script(self, shell, code, user=None, time_limit=None,
 
117
                   attachments=None):
 
118
        """
 
119
        Run a script based on a shell and the code.
 
120
 
 
121
        A file will be written with #!<shell> as the first line, as executable,
 
122
        and run as the given user.
 
123
 
 
124
        XXX: Handle the 'reboot' and 'killall landscape-client' commands
 
125
        gracefully.
 
126
 
 
127
        @param shell: The interpreter to use.
 
128
        @param code: The code to run.
 
129
        @param user: The username to run the process as.
 
130
        @param time_limit: The number of seconds to allow the process to run
 
131
            before killing it and failing the returned Deferred with a
 
132
            L{ProcessTimeLimitReachedError}.
 
133
        @param attachments: C{dict} of filename/data attached to the script.
 
134
 
 
135
        @return: A deferred that will fire with the data printed by the process
 
136
            or fail with a L{ProcessTimeLimitReachedError}.
 
137
        """
 
138
        if not os.path.exists(shell.split()[0]):
 
139
            return fail(
 
140
                ProcessFailedError("Unknown interpreter: '%s'" % shell))
 
141
        uid = None
 
142
        gid = None
 
143
        path = None
 
144
        if user is not None:
 
145
            info = pwd.getpwnam(user)
 
146
            uid = info.pw_uid
 
147
            gid = info.pw_gid
 
148
            path = info.pw_dir
 
149
            if not os.path.exists(path):
 
150
                path = "/"
 
151
 
 
152
        fd, filename = tempfile.mkstemp()
 
153
        script_file = os.fdopen(fd, "w")
 
154
        # Chown and chmod it before we write the data in it - the script may
 
155
        # have sensitive content
 
156
        # It would be nice to use fchown(2) and fchmod(2), but they're not
 
157
        # available in python and using it with ctypes is pretty tedious, not
 
158
        # to mention we can't get errno.
 
159
        os.chmod(filename, 0700)
 
160
        if uid is not None:
 
161
            os.chown(filename, uid, gid)
 
162
        script_file.write(
 
163
            "#!%s\n%s" % (shell.encode("utf-8"), code.encode("utf-8")))
 
164
        script_file.close()
 
165
        env = {}
 
166
        attachment_dir = ""
 
167
        old_umask = os.umask(0022)
 
168
        try:
 
169
            if attachments:
 
170
                attachment_dir = tempfile.mkdtemp()
 
171
                env["LANDSCAPE_ATTACHMENTS"] = attachment_dir
 
172
                for attachment_filename, data in attachments.iteritems():
 
173
                    full_filename = os.path.join(
 
174
                        attachment_dir, attachment_filename)
 
175
                    attachment = file(full_filename, "wb")
 
176
                    os.chmod(full_filename, 0600)
 
177
                    if uid is not None:
 
178
                        os.chown(full_filename, uid, gid)
 
179
                    attachment.write(data)
 
180
                    attachment.close()
 
181
                os.chmod(attachment_dir, 0700)
 
182
                if uid is not None:
 
183
                    os.chown(attachment_dir, uid, gid)
 
184
            pp = ProcessAccumulationProtocol(
 
185
                self.registry.reactor, self.size_limit)
 
186
            self.process_factory.spawnProcess(pp, filename, uid=uid, gid=gid,
 
187
                                              path=path, env=env)
 
188
            if time_limit is not None:
 
189
                pp.schedule_cancel(time_limit)
 
190
            result = pp.result_deferred
 
191
            return result.addBoth(self._remove_script, filename, attachment_dir,
 
192
                                  old_umask)
 
193
        except:
 
194
            os.umask(old_umask)
 
195
            raise
 
196
 
 
197
    def _remove_script(self, result, filename, attachment_dir, old_umask):
 
198
        try:
 
199
            os.unlink(filename)
 
200
        except:
 
201
            pass
 
202
        if attachment_dir:
 
203
            try:
 
204
                shutil.rmtree(attachment_dir)
 
205
            except:
 
206
                pass
 
207
        os.umask(old_umask)
 
208
        return result
 
209
 
 
210
 
 
211
class ProcessAccumulationProtocol(ProcessProtocol):
 
212
    """A ProcessProtocol which accumulates output.
 
213
 
 
214
    @ivar size_limit: The number of bytes at which to truncate output.
 
215
    """
 
216
 
 
217
    def __init__(self, reactor, size_limit):
 
218
        self.data = []
 
219
        self.result_deferred = Deferred()
 
220
        self._cancelled = False
 
221
        self.size_limit = size_limit
 
222
        self.reactor = reactor
 
223
        self._scheduled_cancel = None
 
224
 
 
225
    def schedule_cancel(self, time_limit):
 
226
        self._scheduled_cancel = self.reactor.call_later(
 
227
            time_limit, self._cancel)
 
228
 
 
229
    def childDataReceived(self, fd, data):
 
230
        """Some data was received from the child.
 
231
 
 
232
        Add it to our buffer, as long as it doesn't go over L{size_limit}
 
233
        bytes.
 
234
        """
 
235
        current_size = reduce(operator.add, map(len, self.data), 0)
 
236
        self.data.append(data[:self.size_limit - current_size])
 
237
 
 
238
    def processEnded(self, reason):
 
239
        """Fire back the deferred.
 
240
 
 
241
        The deferred will be fired with the string of data received from the
 
242
        subprocess, or if the subprocess was cancelled, a
 
243
        L{ProcessTimeLimitReachedError} will be fired with data accumulated so
 
244
        far.
 
245
        """
 
246
        data = "".join(self.data)
 
247
        if self._cancelled:
 
248
            self.result_deferred.errback(ProcessTimeLimitReachedError(data))
 
249
        else:
 
250
            if self._scheduled_cancel is not None:
 
251
                scheduled = self._scheduled_cancel
 
252
                self._scheduled_cancel = None
 
253
                self.reactor.cancel_call(scheduled)
 
254
 
 
255
            if reason.check(ProcessDone):
 
256
                self.result_deferred.callback(data)
 
257
            else:
 
258
                self.result_deferred.errback(ProcessFailedError(data))
 
259
 
 
260
    def _cancel(self):
 
261
        """
 
262
        Close filedescriptors, kill the process, and indicate that a
 
263
        L{ProcessTimeLimitReachedError} should be fired on the deferred.
 
264
        """
 
265
        # Sometimes children of the shell we're killing won't die unless their
 
266
        # file descriptors are closed! For example, if /bin/sh -c "cat" is the
 
267
        # process, "cat" won't die when we kill its shell. I'm not sure if this
 
268
        # is really sufficient: maybe there's a way we can walk over all
 
269
        # children of the process we started and kill them all.
 
270
        for i in (0, 1, 2):
 
271
            self.transport.closeChildFD(i)
 
272
        self.transport.signalProcess("KILL")
 
273
        self._cancelled = True