~ubuntu-branches/ubuntu/utopic/exabgp/utopic

« back to all changes in this revision

Viewing changes to lib/exabgp/reactor/api/processes.py

  • Committer: Package Import Robot
  • Author(s): Henry-Nicolas Tourneur
  • Date: 2014-03-08 19:07:00 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20140308190700-xjbibpg1g6001c9x
Tags: 3.3.1-1
* New upstream release
* Bump python minimal required version (2.7)
* Closes: #726066 Debian packaging improvements proposed by Vincent Bernat
* Closes: #703774 not existent rundir (/var/run/exabgp) after reboot

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
process.py
 
3
 
 
4
Created by Thomas Mangin on 2011-05-02.
 
5
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
 
6
"""
 
7
 
 
8
import os
 
9
import errno
 
10
import time
 
11
import subprocess
 
12
import select
 
13
import fcntl
 
14
 
 
15
from exabgp.util.errstr import errstr
 
16
 
 
17
from exabgp.reactor.api.encoding import Text,JSON
 
18
from exabgp.configuration.file import formated
 
19
from exabgp.logger import Logger
 
20
 
 
21
class ProcessError (Exception):
 
22
        pass
 
23
 
 
24
def preexec_helper ():
 
25
        # make this process a new process group
 
26
        #os.setsid()
 
27
        # This prevent the signal to be sent to the children (and create a new process group)
 
28
        os.setpgrp()
 
29
        #signal.signal(signal.SIGINT, signal.SIG_IGN)
 
30
 
 
31
class Processes (object):
 
32
        # how many time can a process can respawn in the time interval
 
33
        respawn_number = 5
 
34
        respawn_timemask = 0xFFFFFF - pow(2,6) + 1  # '0b111111111111111111000000' (around a minute, 63 seconds)
 
35
 
 
36
        def __init__ (self,reactor):
 
37
                self.logger = Logger()
 
38
                self.reactor = reactor
 
39
                self.clean()
 
40
                self.silence = False
 
41
 
 
42
        def clean (self):
 
43
                self._process = {}
 
44
                self._api = {}
 
45
                self._api_encoder = {}
 
46
                self._neighbor_process = {}
 
47
                self._broken = []
 
48
                self._respawning = {}
 
49
 
 
50
        def _terminate (self,process):
 
51
                self.logger.processes("Terminating process %s" % process)
 
52
                self._process[process].terminate()
 
53
                self._process[process].wait()
 
54
                del self._process[process]
 
55
 
 
56
        def terminate (self):
 
57
                for process in list(self._process):
 
58
                        if not self.silence:
 
59
                                try:
 
60
                                        self.write(process,self._api_encoder[process].shutdown())
 
61
                                except ProcessError:
 
62
                                        pass
 
63
                self.silence = True
 
64
                time.sleep(0.1)
 
65
                for process in list(self._process):
 
66
                        try:
 
67
                                self._terminate(process)
 
68
                        except OSError:
 
69
                                # we most likely received a SIGTERM signal and our child is already dead
 
70
                                self.logger.processes("child process %s was already dead" % process)
 
71
                                pass
 
72
                self.clean()
 
73
 
 
74
        def _start (self,process):
 
75
                try:
 
76
                        if process in self._process:
 
77
                                self.logger.processes("process already running")
 
78
                                return
 
79
                        if not process in self.reactor.configuration.process:
 
80
                                self.logger.processes("Can not start process, no configuration for it (anymore ?)")
 
81
                                return
 
82
 
 
83
                        # Prevent some weird termcap data to be created at the start of the PIPE
 
84
                        # \x1b[?1034h (no-eol) (esc)
 
85
                        os.environ['TERM']='dumb'
 
86
 
 
87
                        run = self.reactor.configuration.process[process].get('run','')
 
88
                        if run:
 
89
                                api = self.reactor.configuration.process[process]['encoder']
 
90
                                self._api_encoder[process] = JSON('3.3.0') if api == 'json' else Text('3.3.0')
 
91
 
 
92
                                self._process[process] = subprocess.Popen(run,
 
93
                                        stdin=subprocess.PIPE,
 
94
                                        stdout=subprocess.PIPE,
 
95
                                        preexec_fn=preexec_helper
 
96
                                        # This flags exists for python 2.7.3 in the documentation but on on my MAC
 
97
                                        # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
 
98
                                )
 
99
                                fcntl.fcntl(self._process[process].stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
 
100
 
 
101
                                self.logger.processes("Forked process %s" % process)
 
102
 
 
103
                                around_now = int(time.time()) & self.respawn_timemask
 
104
                                if process in self._respawning:
 
105
                                        if around_now in self._respawning[process]:
 
106
                                                self._respawning[process][around_now] += 1
 
107
                                                # we are respawning too fast
 
108
                                                if self._respawning[process][around_now] > self.respawn_number:
 
109
                                                        self.logger.processes("Too many respawn for %s (%d) terminating program" % (process,self.respawn_number),'critical')
 
110
                                                        raise ProcessError()
 
111
                                        else:
 
112
                                                # reset long time since last respawn
 
113
                                                self._respawning[process] = {around_now: 1}
 
114
                                else:
 
115
                                        # record respawing
 
116
                                        self._respawning[process] = {around_now: 1}
 
117
 
 
118
                        neighbor = self.reactor.configuration.process[process]['neighbor']
 
119
                        self._neighbor_process.setdefault(neighbor,[]).append(process)
 
120
                except (subprocess.CalledProcessError,OSError,ValueError),e:
 
121
                        self._broken.append(process)
 
122
                        self.logger.processes("Could not start process %s" % process)
 
123
                        self.logger.processes("reason: %s" % str(e))
 
124
 
 
125
        def start (self,restart=False):
 
126
                for process in self.reactor.configuration.process:
 
127
                        if restart:
 
128
                                self._terminate(process)
 
129
                        self._start(process)
 
130
                for process in list(self._process):
 
131
                        if not process in self.reactor.configuration.process:
 
132
                                self._terminate(process)
 
133
 
 
134
        def broken (self,neighbor):
 
135
                if self._broken:
 
136
                        if '*' in self._broken:
 
137
                                return True
 
138
                        for process in self._neighbor_process.get(neighbor,[]):
 
139
                                if process in self._broken:
 
140
                                        return True
 
141
                return False
 
142
 
 
143
        def fds (self):
 
144
                return [self._process[process].stdout for process in self._process]
 
145
 
 
146
        def received (self):
 
147
                for process in list(self._process):
 
148
                        try:
 
149
                                proc = self._process[process]
 
150
                                r,_,_ = select.select([proc.stdout,],[],[],0)
 
151
                                if r:
 
152
                                        try:
 
153
                                                for line in proc.stdout:
 
154
                                                        line = line.rstrip()
 
155
                                                        if line:
 
156
                                                                self.logger.processes("Command from process %s : %s " % (process,line))
 
157
                                                                yield (process,formated(line))
 
158
                                                        else:
 
159
                                                                self.logger.processes("The process died, trying to respawn it")
 
160
                                                                self._terminate(process)
 
161
                                                                self._start(process)
 
162
                                                                break
 
163
                                        except IOError,e:
 
164
                                                if e.errno == errno.EINTR:  # call interrupted
 
165
                                                        pass  # we most likely have data, we will try to read them a the next loop iteration
 
166
                                                elif e.errno != errno.EAGAIN:  # no more data
 
167
                                                        self.logger.processes("unexpected errno received from forked process (%s)" % errstr(e))
 
168
                        except (subprocess.CalledProcessError,OSError,ValueError):
 
169
                                self.logger.processes("Issue with the process, terminating it and restarting it")
 
170
                                self._terminate(process)
 
171
                                self._start(process)
 
172
 
 
173
        def write (self,process,string):
 
174
                while True:
 
175
                        try:
 
176
                                self._process[process].stdin.write('%s\n' % string)
 
177
                        except IOError,e:
 
178
                                self._broken.append(process)
 
179
                                if e.errno == errno.EPIPE:
 
180
                                        self._broken.append(process)
 
181
                                        self.logger.processes("Issue while sending data to our helper program")
 
182
                                        raise ProcessError()
 
183
                                else:
 
184
                                        # Could it have been caused by a signal ? What to do.
 
185
                                        self.logger.processes("Error received while SENDING data to helper program, retrying (%s)" % errstr(e))
 
186
                                        continue
 
187
                        break
 
188
 
 
189
                try:
 
190
                        self._process[process].stdin.flush()
 
191
                except IOError,e:
 
192
                        # AFAIK, the buffer should be flushed at the next attempt.
 
193
                        self.logger.processes("Error received while FLUSHING data to helper program, retrying (%s)" % errstr(e))
 
194
 
 
195
                return True
 
196
 
 
197
        def _notify (self,neighbor,event):
 
198
                for process in self._neighbor_process.get(neighbor,[]):
 
199
                        if process in self._process:
 
200
                                yield process
 
201
                for process in self._neighbor_process.get('*',[]):
 
202
                        if process in self._process:
 
203
                                yield process
 
204
 
 
205
        def up (self,neighbor):
 
206
                if self.silence: return
 
207
                for process in self._notify(neighbor,'neighbor-changes'):
 
208
                        self.write(process,self._api_encoder[process].up(neighbor))
 
209
 
 
210
        def connected (self,neighbor):
 
211
                if self.silence: return
 
212
                for process in self._notify(neighbor,'neighbor-changes'):
 
213
                        self.write(process,self._api_encoder[process].connected(neighbor))
 
214
 
 
215
        def down (self,neighbor,reason=''):
 
216
                if self.silence: return
 
217
                for process in self._notify(neighbor,'neighbor-changes'):
 
218
                        self.write(process,self._api_encoder[process].down(neighbor))
 
219
 
 
220
        def receive (self,neighbor,category,header,body):
 
221
                if self.silence: return
 
222
                for process in self._notify(neighbor,'receive-packets'):
 
223
                        self.write(process,self._api_encoder[process].receive(neighbor,category,header,body))
 
224
 
 
225
        def send (self,neighbor,category,header,body):
 
226
                if self.silence: return
 
227
                for process in self._notify(neighbor,'send-packets'):
 
228
                        self.write(process,self._api_encoder[process].send(neighbor,category,header,body))
 
229
 
 
230
        def update (self,neighbor,update):
 
231
                if self.silence: return
 
232
                for process in self._notify(neighbor,'receive-routes'):
 
233
                        self.write(process,self._api_encoder[process].update(neighbor,update))
 
234
 
 
235
        def refresh (self,neighbor,refresh):
 
236
                if self.silence: return
 
237
                for process in self._notify(neighbor,'receive-routes'):
 
238
                        self.write(process,self._api_encoder[process].refresh(neighbor,refresh))
 
239
 
 
240
        def operational (self,neighbor,what,operational):
 
241
                if self.silence: return
 
242
                for process in self._notify(neighbor,'receive-operational'):
 
243
                        self.write(process,self._api_encoder[process].operational(neighbor,what,operational))