4
Created by Thomas Mangin on 2011-05-02.
5
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
15
from exabgp.structure.api import Text,JSON
16
from exabgp.structure.log import Logger
18
class ProcessError (Exception):
21
def preexec_helper ():
22
# make this process a new process group
24
# This prevent the signal to be sent to the children (and create a new process group)
26
#signal.signal(signal.SIGINT, signal.SIG_IGN)
28
class Processes (object):
29
def __init__ (self,supervisor):
30
self.logger = Logger()
31
self.supervisor = supervisor
38
self._api_encoder = {}
39
self._neighbor_process = {}
42
def _terminate (self,process):
43
self.logger.processes("Terminating process %s" % process)
44
self._process[process].terminate()
45
self._process[process].wait()
46
del self._process[process]
49
for process in list(self._process):
51
self.write(process,self._api_encoder[process].shutdown())
54
for process in list(self._process):
56
self._terminate(process)
58
# we most likely received a SIGTERM signal and our child is already dead
59
self.logger.processes("child process %s was already dead" % process)
63
def _start (self,process):
65
if process in self._process:
66
self.logger.processes("process already running")
68
if not process in self.supervisor.configuration.process:
69
self.logger.processes("Can not start process, no configuration for it (anymore ?)")
72
# Prevent some weird termcap data to be created at the start of the PIPE
73
# \x1b[?1034h (no-eol) (esc)
74
os.environ['TERM']='dumb'
76
run = self.supervisor.configuration.process[process].get('run','')
78
api = self.supervisor.configuration.process[process]['encoder']
79
self._api_encoder[process] = JSON('2.0') if api == 'json' else Text('1.0')
81
self._process[process] = subprocess.Popen(run,
82
stdin=subprocess.PIPE,
83
stdout=subprocess.PIPE,
84
preexec_fn=preexec_helper
85
# This flags exists for python 2.7.3 in the documentation but on on my MAC
86
# creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
88
fcntl.fcntl(self._process[process].stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
90
self.logger.processes("Forked process %s" % process)
92
neighbor = self.supervisor.configuration.process[process]['neighbor']
93
self._neighbor_process.setdefault(neighbor,[]).append(process)
94
except (subprocess.CalledProcessError,OSError,ValueError),e:
95
self._broken.append(process)
96
self.logger.processes("Could not start process %s" % process)
97
self.logger.processes("reason: %s" % str(e))
100
for process in self.supervisor.configuration.process:
102
for process in list(self._process):
103
if not process in self.supervisor.configuration.process:
104
self._terminate(process)
106
def broken (self,neighbor):
108
if '*' in self._broken:
110
for process in self._neighbor_process.get(neighbor,[]):
111
if process in self._broken:
117
for process in list(self._process):
119
proc = self._process[process]
120
r,_,_ = select.select([proc.stdout,],[],[],0)
124
line = proc.stdout.readline().rstrip()
126
self.logger.processes("Command from process %s : %s " % (process,line))
127
lines.setdefault(process,[]).append(line)
129
self.logger.processes("The process died, trying to respawn it")
130
self._terminate(process)
134
if e.errno == errno.EINTR: # call interrupted
135
pass # we most likely have data, we will try to read them a the next loop iteration
136
elif e.errno != errno.EAGAIN: # no more data
137
self.logger.processes("unexpected errno received from forked process: %d [%s]" % (e.errno,errno.errorcode[e.errno]))
138
except (subprocess.CalledProcessError,OSError,ValueError):
139
self.logger.processes("Issue with the process, terminating it and restarting it")
140
self._terminate(process)
144
def write (self,process,string):
147
self._process[process].stdin.write('%s\r\n' % string)
149
self._broken.append(process)
150
if e.errno == errno.EPIPE:
151
self._broken.append(process)
152
self.logger.processes("Issue while sending data to our helper program")
155
# Could it have been caused by a signal ? What to do.
156
self.logger.processes("REPORT TO DEVELOPERS: IOError received while SENDING data to helper program %s, retrying" % str(e.errno))
161
self._process[process].stdin.flush()
163
# AFAIK, the buffer should be flushed at the next attempt.
164
self.logger.processes("REPORT TO DEVELOPERS: IOError received while FLUSHING data to helper program %s, retrying" % str(e.errno))
168
def _notify (self,neighbor,event):
169
for process in self._neighbor_process.get(neighbor,[]):
170
if process in self._process:
172
for process in self._neighbor_process.get('*',[]):
173
if process in self._process:
176
def up (self,neighbor):
177
if self.silence: return
178
for process in self._notify(neighbor,'neighbor-changes'):
179
self.write(process,self._api_encoder[process].up(neighbor))
181
def connected (self,neighbor):
182
if self.silence: return
183
for process in self._notify(neighbor,'neighbor-changes'):
184
self.write(process,self._api_encoder[process].connected(neighbor))
186
def down (self,neighbor,reason=''):
187
if self.silence: return
188
for process in self._notify(neighbor,'neighbor-changes'):
189
self.write(process,self._api_encoder[process].down(neighbor))
191
def receive (self,neighbor,category,header,body):
192
if self.silence: return
193
for process in self._notify(neighbor,'receive-packets'):
194
self.write(process,self._api_encoder[process].receive(neighbor,category,header,body))
196
def send (self,neighbor,category,header,body):
197
if self.silence: return
198
for process in self._notify(neighbor,'send-packets'):
199
self.write(process,self._api_encoder[process].send(neighbor,category,header,body))
201
def routes (self,neighbor,routes):
202
if self.silence: return
203
for process in self._notify(neighbor,'receive-routes'):
204
self.write(process,self._api_encoder[process].routes(neighbor,routes))