4
Created by Thomas Mangin on 2011-05-02.
5
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
15
from exabgp.util.errstr import errstr
17
from exabgp.reactor.api.encoding import Text,JSON
18
from exabgp.configuration.file import formated
19
from exabgp.logger import Logger
21
class ProcessError (Exception):
24
def preexec_helper ():
25
# make this process a new process group
27
# This prevent the signal to be sent to the children (and create a new process group)
29
#signal.signal(signal.SIGINT, signal.SIG_IGN)
31
class Processes (object):
32
# how many time can a process can respawn in the time interval
34
respawn_timemask = 0xFFFFFF - pow(2,6) + 1 # '0b111111111111111111000000' (around a minute, 63 seconds)
36
def __init__ (self,reactor):
37
self.logger = Logger()
38
self.reactor = reactor
45
self._api_encoder = {}
46
self._neighbor_process = {}
50
def _terminate (self,process):
51
self.logger.processes("Terminating process %s" % process)
52
self._process[process].terminate()
53
self._process[process].wait()
54
del self._process[process]
57
for process in list(self._process):
60
self.write(process,self._api_encoder[process].shutdown())
65
for process in list(self._process):
67
self._terminate(process)
69
# we most likely received a SIGTERM signal and our child is already dead
70
self.logger.processes("child process %s was already dead" % process)
74
def _start (self,process):
76
if process in self._process:
77
self.logger.processes("process already running")
79
if not process in self.reactor.configuration.process:
80
self.logger.processes("Can not start process, no configuration for it (anymore ?)")
83
# Prevent some weird termcap data to be created at the start of the PIPE
84
# \x1b[?1034h (no-eol) (esc)
85
os.environ['TERM']='dumb'
87
run = self.reactor.configuration.process[process].get('run','')
89
api = self.reactor.configuration.process[process]['encoder']
90
self._api_encoder[process] = JSON('3.3.0') if api == 'json' else Text('3.3.0')
92
self._process[process] = subprocess.Popen(run,
93
stdin=subprocess.PIPE,
94
stdout=subprocess.PIPE,
95
preexec_fn=preexec_helper
96
# This flags exists for python 2.7.3 in the documentation but on on my MAC
97
# creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
99
fcntl.fcntl(self._process[process].stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
101
self.logger.processes("Forked process %s" % process)
103
around_now = int(time.time()) & self.respawn_timemask
104
if process in self._respawning:
105
if around_now in self._respawning[process]:
106
self._respawning[process][around_now] += 1
107
# we are respawning too fast
108
if self._respawning[process][around_now] > self.respawn_number:
109
self.logger.processes("Too many respawn for %s (%d) terminating program" % (process,self.respawn_number),'critical')
112
# reset long time since last respawn
113
self._respawning[process] = {around_now: 1}
116
self._respawning[process] = {around_now: 1}
118
neighbor = self.reactor.configuration.process[process]['neighbor']
119
self._neighbor_process.setdefault(neighbor,[]).append(process)
120
except (subprocess.CalledProcessError,OSError,ValueError),e:
121
self._broken.append(process)
122
self.logger.processes("Could not start process %s" % process)
123
self.logger.processes("reason: %s" % str(e))
125
def start (self,restart=False):
126
for process in self.reactor.configuration.process:
128
self._terminate(process)
130
for process in list(self._process):
131
if not process in self.reactor.configuration.process:
132
self._terminate(process)
134
def broken (self,neighbor):
136
if '*' in self._broken:
138
for process in self._neighbor_process.get(neighbor,[]):
139
if process in self._broken:
144
return [self._process[process].stdout for process in self._process]
147
for process in list(self._process):
149
proc = self._process[process]
150
r,_,_ = select.select([proc.stdout,],[],[],0)
153
for line in proc.stdout:
156
self.logger.processes("Command from process %s : %s " % (process,line))
157
yield (process,formated(line))
159
self.logger.processes("The process died, trying to respawn it")
160
self._terminate(process)
164
if e.errno == errno.EINTR: # call interrupted
165
pass # we most likely have data, we will try to read them a the next loop iteration
166
elif e.errno != errno.EAGAIN: # no more data
167
self.logger.processes("unexpected errno received from forked process (%s)" % errstr(e))
168
except (subprocess.CalledProcessError,OSError,ValueError):
169
self.logger.processes("Issue with the process, terminating it and restarting it")
170
self._terminate(process)
173
def write (self,process,string):
176
self._process[process].stdin.write('%s\n' % string)
178
self._broken.append(process)
179
if e.errno == errno.EPIPE:
180
self._broken.append(process)
181
self.logger.processes("Issue while sending data to our helper program")
184
# Could it have been caused by a signal ? What to do.
185
self.logger.processes("Error received while SENDING data to helper program, retrying (%s)" % errstr(e))
190
self._process[process].stdin.flush()
192
# AFAIK, the buffer should be flushed at the next attempt.
193
self.logger.processes("Error received while FLUSHING data to helper program, retrying (%s)" % errstr(e))
197
def _notify (self,neighbor,event):
198
for process in self._neighbor_process.get(neighbor,[]):
199
if process in self._process:
201
for process in self._neighbor_process.get('*',[]):
202
if process in self._process:
205
def up (self,neighbor):
206
if self.silence: return
207
for process in self._notify(neighbor,'neighbor-changes'):
208
self.write(process,self._api_encoder[process].up(neighbor))
210
def connected (self,neighbor):
211
if self.silence: return
212
for process in self._notify(neighbor,'neighbor-changes'):
213
self.write(process,self._api_encoder[process].connected(neighbor))
215
def down (self,neighbor,reason=''):
216
if self.silence: return
217
for process in self._notify(neighbor,'neighbor-changes'):
218
self.write(process,self._api_encoder[process].down(neighbor))
220
def receive (self,neighbor,category,header,body):
221
if self.silence: return
222
for process in self._notify(neighbor,'receive-packets'):
223
self.write(process,self._api_encoder[process].receive(neighbor,category,header,body))
225
def send (self,neighbor,category,header,body):
226
if self.silence: return
227
for process in self._notify(neighbor,'send-packets'):
228
self.write(process,self._api_encoder[process].send(neighbor,category,header,body))
230
def update (self,neighbor,update):
231
if self.silence: return
232
for process in self._notify(neighbor,'receive-routes'):
233
self.write(process,self._api_encoder[process].update(neighbor,update))
235
def refresh (self,neighbor,refresh):
236
if self.silence: return
237
for process in self._notify(neighbor,'receive-routes'):
238
self.write(process,self._api_encoder[process].refresh(neighbor,refresh))
240
def operational (self,neighbor,what,operational):
241
if self.silence: return
242
for process in self._notify(neighbor,'receive-operational'):
243
self.write(process,self._api_encoder[process].operational(neighbor,what,operational))