1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Support for starting, monitoring, and restarting child process.
10
from twisted.python import log
11
from twisted.internet import error, protocol, reactor
12
from twisted.application import service
13
from twisted.protocols import basic
19
transport = DummyTransport()
21
class LineLogger(basic.LineReceiver):
26
def lineReceived(self, line):
27
log.msg('[%s] %s' % (self.tag, line))
29
class LoggingProtocol(protocol.ProcessProtocol):
35
def connectionMade(self):
36
self.output = LineLogger()
37
self.output.tag = self.name
38
self.output.makeConnection(transport)
40
def outReceived(self, data):
41
self.output.dataReceived(data)
42
self.empty = data[-1] == '\n'
44
errReceived = outReceived
46
def processEnded(self, reason):
48
self.output.dataReceived('\n')
49
self.service.connectionLost(self.name)
52
class ProcessMonitor(service.Service):
54
ProcessMonitor runs processes, monitors their progress, and restarts them
57
The ProcessMonitor will not attempt to restart a process that appears to die
58
instantly -- with each "instant" death (less than 1 second, by default), it
59
will delay approximately twice as long before restarting it. A successful
60
run will reset the counter.
62
The primary interface is L{addProcess} and L{removeProcess}. When the service
63
is active (that is, when the application it is attached to is running),
64
adding a process automatically starts it.
66
Each process has a name. This name string must uniquely identify the
67
process. In particular, attempting to add two processes with the same name
68
will result in a C{KeyError}.
70
@type threshold: C{float}
71
@ivar threshold: How long a process has to live before the death is
72
considered instant, in seconds. The default value is 1 second.
74
@type killTime: C{float}
75
@ivar killTime: How long a process being killed has to get its affairs in
76
order before it gets killed with an unmaskable signal. The default value
79
@type consistencyDelay: C{float}
80
@ivar consistencyDelay: The time between consistency checks. The default
96
def __getstate__(self):
97
dct = service.Service.__getstate__(self)
98
for k in ('active', 'consistency'):
101
dct['protocols'] = {}
103
dct['timeStarted'] = {}
107
def _checkConsistency(self):
108
for name, protocol in self.protocols.items():
109
proc = protocol.transport
111
proc.signalProcess(0)
112
except (OSError, error.ProcessExitedAlready):
113
log.msg("Lost process %r somehow, restarting." % name)
114
del self.protocols[name]
115
self.startProcess(name)
116
self.consistency = reactor.callLater(self.consistencyDelay,
117
self._checkConsistency)
120
def addProcess(self, name, args, uid=None, gid=None, env={}):
122
Add a new process to launch, monitor, and restart when necessary.
124
Note that args are passed to the system call, not to the shell. If
125
running the shell is desired, the common idiom is to use
126
C{.addProcess("name", ['/bin/sh', '-c', shell_script])}
128
See L{removeProcess} for removing processes from the monitor.
130
@param name: A label for this process. This value must be unique
131
across all processes added to this monitor.
133
@param args: The argv sequence for the process to launch.
134
@param uid: The user ID to use to run the process. If C{None}, the
137
@param gid: The group ID to use to run the process. If C{None}, the
140
@param env: The environment to give to the launched process. See
141
L{IReactorProcess.spawnProcess}'s C{env} parameter.
144
if name in self.processes:
145
raise KeyError("remove %s first" % name)
146
self.processes[name] = args, uid, gid, env
148
self.startProcess(name)
151
def removeProcess(self, name):
153
If the process is started, kill it. It will never get restarted.
155
See L{addProcess} for adding processes to the monitor.
158
@param name: The string that uniquely identifies the process.
160
del self.processes[name]
161
self.stopProcess(name)
164
def startService(self):
165
service.Service.startService(self)
167
for name in self.processes.keys():
168
reactor.callLater(0, self.startProcess, name)
169
self.consistency = reactor.callLater(self.consistencyDelay,
170
self._checkConsistency)
172
def stopService(self):
173
service.Service.stopService(self)
175
for name in self.processes.keys():
176
self.stopProcess(name)
177
self.consistency.cancel()
179
def connectionLost(self, name):
180
if self.murder.has_key(name):
181
self.murder[name].cancel()
182
del self.murder[name]
183
if self.protocols.has_key(name):
184
del self.protocols[name]
185
if time.time()-self.timeStarted[name]<self.threshold:
186
delay = self.delay[name] = min(1+2*self.delay.get(name, 0), 3600)
188
delay = self.delay[name] = 0
189
if self.active and self.processes.has_key(name):
190
reactor.callLater(delay, self.startProcess, name)
192
def startProcess(self, name):
193
if self.protocols.has_key(name):
195
p = self.protocols[name] = LoggingProtocol()
198
args, uid, gid, env = self.processes[name]
199
self.timeStarted[name] = time.time()
200
reactor.spawnProcess(p, args[0], args, uid=uid, gid=gid, env=env)
202
def _forceStopProcess(self, proc):
204
proc.signalProcess('KILL')
205
except error.ProcessExitedAlready:
208
def stopProcess(self, name):
209
if not self.protocols.has_key(name):
211
proc = self.protocols[name].transport
212
del self.protocols[name]
214
proc.signalProcess('TERM')
215
except error.ProcessExitedAlready:
218
self.murder[name] = reactor.callLater(self.killTime, self._forceStopProcess, proc)
221
def restartAll(self):
223
Restart all processes. This is useful for third party management
224
services to allow a user to restart servers because of an outside change
225
in circumstances -- for example, a new version of a library is
228
for name in self.processes.keys():
229
self.stopProcess(name)
234
for name, proc in self.processes.items():
236
if proc[1] is not None:
237
uidgid = str(proc[1])
238
if proc[2] is not None:
239
uidgid += ':'+str(proc[2])
242
uidgid = '(' + uidgid + ')'
243
l.append('%r%s: %r' % (name, uidgid, proc[0]))
244
return ('<' + self.__class__.__name__ + ' '
249
from signal import SIGTERM
250
mon = ProcessMonitor()
251
mon.addProcess('foo', ['/bin/sh', '-c', 'sleep 2;echo hello'])
252
mon.addProcess('qux', ['/bin/sh', '-c', 'sleep 2;printf pilim'])
253
mon.addProcess('bar', ['/bin/sh', '-c', 'echo goodbye'])
254
mon.addProcess('baz', ['/bin/sh', '-c',
255
'echo welcome;while :;do echo blah;sleep 5;done'])
256
reactor.callLater(30, lambda mon=mon:
257
os.kill(mon.protocols['baz'].transport.pid, SIGTERM))
258
reactor.callLater(60, mon.restartAll)
260
reactor.addSystemEventTrigger('before', 'shutdown', mon.stopService)
263
if __name__ == '__main__':