28
33
self.logger = Logger()
29
34
self.supervisor = supervisor
36
api = load().api.encoder
38
self.api = JSON(self.write,version,'1.0')
40
self.api = Text(self.write,version,'1.0')
34
44
self._receive_routes = {}
38
def _terminate (self,name):
39
self.logger.processes("Terminating process %s" % name)
40
self._process[name].terminate()
41
self._process[name].wait()
42
del self._process[name]
48
def _terminate (self,process):
49
self.logger.processes("Terminating process %s" % process)
50
self._process[process].terminate()
51
self._process[process].wait()
52
del self._process[process]
44
54
def terminate (self):
45
for name in list(self._process):
46
self.write(name,'shutdown',shutdown=True)
55
for process in list(self._process):
56
self.api.shutdown(process)
57
self.api.silence = True
48
for name in list(self._process):
59
for process in list(self._process):
61
self._terminate(process)
52
63
# we most likely received a SIGTERM signal and our child is already dead
64
self.logger.processes("child process %s was already dead" % process)
56
def _start (self,name):
68
def _start (self,process):
58
if name in self._process:
70
if process in self._process:
59
71
self.logger.processes("process already running")
61
proc = self.supervisor.configuration.process
73
if not process in self.supervisor.configuration.process:
63
74
self.logger.processes("Can not start process, no configuration for it (anymore ?)")
65
76
# Prevent some weird termcap data to be created at the start of the PIPE
66
77
# \x1b[?1034h (no-eol) (esc)
67
78
os.environ['TERM']='dumb'
68
self._receive_routes[name] = proc[name]['receive-routes']
69
self._process[name] = subprocess.Popen(proc[name]['run'],
79
self._receive_routes[process] = self.supervisor.configuration.process[process]['receive-routes']
80
self._process[process] = subprocess.Popen(self.supervisor.configuration.process[process]['run'],
70
81
stdin=subprocess.PIPE,
71
82
stdout=subprocess.PIPE,
72
83
preexec_fn=preexec_helper
73
84
# This flags exists for python 2.7.3 in the documentation but on on my MAC
74
85
# creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
76
neighbor = proc[name]['neighbor']
77
self._notify.setdefault(neighbor,[]).append(name)
78
self.logger.processes("Forked process %s" % name)
87
neighbor = self.supervisor.configuration.process[process]['neighbor']
88
self._notify.setdefault(neighbor,[]).append(process)
89
self.logger.processes("Forked process %s" % process)
79
90
except (subprocess.CalledProcessError,OSError,ValueError),e:
80
self._broken.append(name)
81
self.logger.processes("Could not start process %s" % name)
91
self._broken.append(process)
92
self.logger.processes("Could not start process %s" % process)
82
93
self.logger.processes("reason: %s" % str(e))
85
proc = self.supervisor.configuration.process
88
for name in list(self._process):
96
for process in self.supervisor.configuration.process:
98
for process in list(self._process):
99
if not process in self.supervisor.configuration.process:
100
self._terminate(process)
92
102
def notify (self,neighbor):
93
for name in self._notify.get(neighbor,[]):
95
for name in self._notify.get('*',[]):
103
for process in self._notify.get(neighbor,[]):
105
for process in self._notify.get('*',[]):
98
108
def broken (self,neighbor):
100
for name in self._notify.get(neighbor,[]):
101
if name in self._broken:
110
for process in self._notify.get(neighbor,[]):
111
if process in self._broken:
103
113
if '*' in self._broken:
122
132
# there is not data to read
125
self.logger.processes("Command from process %s : %s " % (name,line))
126
lines.setdefault(name,[]).append(line)
135
self.logger.processes("Command from process %s : %s " % (process,line))
136
lines.setdefault(process,[]).append(line)
127
137
except (subprocess.CalledProcessError,OSError,ValueError):
128
138
self.logger.processes("Issue with the process, terminating it and restarting it")
129
self._terminate(name)
139
self._terminate(process)
133
def write (self,name,string,shutdown=False):
143
def write (self,process,string):
136
self._process[name].stdin.write('%s\r\n' % string)
146
self._process[process].stdin.write('%s\r\n' % string)
137
147
except IOError,e:
138
if shutdown: return True
139
self._broken.append(name)
148
self._broken.append(process)
140
149
if e.errno == errno.EPIPE:
141
self._broken.append(name)
150
self._broken.append(process)
142
151
self.logger.processes("Issue while sending data to our helper program")
143
152
raise ProcessError()
145
# Could it have been caused by a signal ? What to do.
154
# Could it have been caused by a signal ? What to do.
146
155
self.logger.processes("REPORT TO DEVELOPERS: IOError received while SENDING data to helper program %s, retrying" % str(e.errno))
151
self._process[name].stdin.flush()
160
self._process[process].stdin.flush()
152
161
except IOError,e:
153
if shutdown: return True
154
162
# AFAIK, the buffer should be flushed at the next attempt.
155
163
self.logger.processes("REPORT TO DEVELOPERS: IOError received while FLUSHING data to helper program %s, retrying" % str(e.errno))