2
Functionality for running arbitrary shell scripts.
4
@var ALL_USERS: A token indicating all users should be allowed.
12
from twisted.internet.protocol import ProcessProtocol
13
from twisted.internet.defer import Deferred, fail
14
from twisted.internet.error import ProcessDone
16
from landscape.manager.manager import ManagerPlugin, SUCCEEDED, FAILED
22
PROCESS_FAILED_RESULT = 103
25
class ProcessTimeLimitReachedError(Exception):
27
Raised when a process has been running for too long.
29
@ivar data: The data that the process printed before reaching the time
33
def __init__(self, data):
37
class ProcessFailedError(Exception):
38
"""Raised when a process exits with a non-0 exit code.
40
@ivar data: The data that the process printed before reaching the time
44
def __init__(self, data):
48
class ScriptExecution(ManagerPlugin):
49
"""A plugin which allows execution of arbitrary shell scripts.
51
@ivar size_limit: The number of bytes at which to truncate process output.
56
def __init__(self, process_factory=None):
58
@param process_factory: The L{IReactorProcess} provider to run the
61
if process_factory is None:
62
from twisted.internet import reactor as process_factory
63
self.process_factory = process_factory
65
def register(self, registry):
66
super(ScriptExecution, self).register(registry)
67
registry.register_message("execute-script", self._handle_execute_script)
69
def _respond(self, status, data, opid, result_code=None):
70
message = {"type": "operation-result",
75
message["result-code"] = result_code
76
return self.registry.broker.send_message(message, True)
78
def _handle_execute_script(self, message):
79
opid = message["operation-id"]
81
user = message["username"]
82
allowed_users = self.registry.config.get_allowed_script_users()
83
if allowed_users != ALL_USERS and user not in allowed_users:
86
u"Scripts cannot be run as user %s." % (user,),
89
d = self.run_script(message["interpreter"], message["code"],
90
time_limit=message["time-limit"],
91
user=user, attachments=message["attachments"])
92
d.addCallback(self._respond_success, opid)
93
d.addErrback(self._respond_failure, opid)
96
self._respond(FAILED, self._format_exception(e), opid)
99
def _format_exception(self, e):
100
return u"%s: %s" % (e.__class__.__name__, e)
102
def _respond_success(self, data, opid):
103
return self._respond(SUCCEEDED, data, opid)
105
def _respond_failure(self, failure, opid):
107
if failure.check(ProcessTimeLimitReachedError):
108
code = TIMEOUT_RESULT
109
elif failure.check(ProcessFailedError):
110
code = PROCESS_FAILED_RESULT
112
return self._respond(FAILED, failure.value.data, opid, code)
114
return self._respond(FAILED, str(failure), opid)
116
def run_script(self, shell, code, user=None, time_limit=None,
119
Run a script based on a shell and the code.
121
A file will be written with #!<shell> as the first line, as executable,
122
and run as the given user.
124
XXX: Handle the 'reboot' and 'killall landscape-client' commands
127
@param shell: The interpreter to use.
128
@param code: The code to run.
129
@param user: The username to run the process as.
130
@param time_limit: The number of seconds to allow the process to run
131
before killing it and failing the returned Deferred with a
132
L{ProcessTimeLimitReachedError}.
133
@param attachments: C{dict} of filename/data attached to the script.
135
@return: A deferred that will fire with the data printed by the process
136
or fail with a L{ProcessTimeLimitReachedError}.
138
if not os.path.exists(shell.split()[0]):
140
ProcessFailedError("Unknown interpreter: '%s'" % shell))
145
info = pwd.getpwnam(user)
149
if not os.path.exists(path):
152
fd, filename = tempfile.mkstemp()
153
script_file = os.fdopen(fd, "w")
154
# Chown and chmod it before we write the data in it - the script may
155
# have sensitive content
156
# It would be nice to use fchown(2) and fchmod(2), but they're not
157
# available in python and using it with ctypes is pretty tedious, not
158
# to mention we can't get errno.
159
os.chmod(filename, 0700)
161
os.chown(filename, uid, gid)
163
"#!%s\n%s" % (shell.encode("utf-8"), code.encode("utf-8")))
167
old_umask = os.umask(0022)
170
attachment_dir = tempfile.mkdtemp()
171
env["LANDSCAPE_ATTACHMENTS"] = attachment_dir
172
for attachment_filename, data in attachments.iteritems():
173
full_filename = os.path.join(
174
attachment_dir, attachment_filename)
175
attachment = file(full_filename, "wb")
176
os.chmod(full_filename, 0600)
178
os.chown(full_filename, uid, gid)
179
attachment.write(data)
181
os.chmod(attachment_dir, 0700)
183
os.chown(attachment_dir, uid, gid)
184
pp = ProcessAccumulationProtocol(
185
self.registry.reactor, self.size_limit)
186
self.process_factory.spawnProcess(pp, filename, uid=uid, gid=gid,
188
if time_limit is not None:
189
pp.schedule_cancel(time_limit)
190
result = pp.result_deferred
191
return result.addBoth(self._remove_script, filename, attachment_dir,
197
def _remove_script(self, result, filename, attachment_dir, old_umask):
204
shutil.rmtree(attachment_dir)
211
class ProcessAccumulationProtocol(ProcessProtocol):
212
"""A ProcessProtocol which accumulates output.
214
@ivar size_limit: The number of bytes at which to truncate output.
217
def __init__(self, reactor, size_limit):
219
self.result_deferred = Deferred()
220
self._cancelled = False
221
self.size_limit = size_limit
222
self.reactor = reactor
223
self._scheduled_cancel = None
225
def schedule_cancel(self, time_limit):
226
self._scheduled_cancel = self.reactor.call_later(
227
time_limit, self._cancel)
229
def childDataReceived(self, fd, data):
230
"""Some data was received from the child.
232
Add it to our buffer, as long as it doesn't go over L{size_limit}
235
current_size = reduce(operator.add, map(len, self.data), 0)
236
self.data.append(data[:self.size_limit - current_size])
238
def processEnded(self, reason):
239
"""Fire back the deferred.
241
The deferred will be fired with the string of data received from the
242
subprocess, or if the subprocess was cancelled, a
243
L{ProcessTimeLimitReachedError} will be fired with data accumulated so
246
data = "".join(self.data)
248
self.result_deferred.errback(ProcessTimeLimitReachedError(data))
250
if self._scheduled_cancel is not None:
251
scheduled = self._scheduled_cancel
252
self._scheduled_cancel = None
253
self.reactor.cancel_call(scheduled)
255
if reason.check(ProcessDone):
256
self.result_deferred.callback(data)
258
self.result_deferred.errback(ProcessFailedError(data))
262
Close filedescriptors, kill the process, and indicate that a
263
L{ProcessTimeLimitReachedError} should be fired on the deferred.
265
# Sometimes children of the shell we're killing won't die unless their
266
# file descriptors are closed! For example, if /bin/sh -c "cat" is the
267
# process, "cat" won't die when we kill its shell. I'm not sure if this
268
# is really sufficient: maybe there's a way we can walk over all
269
# children of the process we started and kill them all.
271
self.transport.closeChildFD(i)
272
self.transport.signalProcess("KILL")
273
self._cancelled = True