~ubuntu-branches/ubuntu/utopic/exabgp/utopic

« back to all changes in this revision

Viewing changes to lib/exabgp/structure/processes.py

  • Committer: Package Import Robot
  • Author(s): Henry-Nicolas Tourneur
  • Date: 2014-03-08 19:07:00 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20140308190700-xjbibpg1g6001c9x
Tags: 3.3.1-1
* New upstream release
* Bump python minimal required version (2.7)
* Closes: #726066 Debian packaging improvements proposed by Vincent Bernat
* Closes: #703774 not existent rundir (/var/run/exabgp) after reboot

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""
2
 
process.py
3
 
 
4
 
Created by Thomas Mangin on 2011-05-02.
5
 
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
6
 
"""
7
 
 
8
 
import os
9
 
import errno
10
 
import time
11
 
import subprocess
12
 
import select
13
 
import fcntl
14
 
 
15
 
from exabgp.structure.api import Text,JSON
16
 
from exabgp.structure.log import Logger
17
 
 
18
 
class ProcessError (Exception):
19
 
        pass
20
 
 
21
 
def preexec_helper ():
22
 
        # make this process a new process group
23
 
        #os.setsid()
24
 
        # This prevent the signal to be sent to the children (and create a new process group)
25
 
        os.setpgrp()
26
 
        #signal.signal(signal.SIGINT, signal.SIG_IGN)
27
 
 
28
 
class Processes (object):
29
 
        def __init__ (self,supervisor):
30
 
                self.logger = Logger()
31
 
                self.supervisor = supervisor
32
 
                self.clean()
33
 
                self.silence = False
34
 
 
35
 
        def clean (self):
36
 
                self._process = {}
37
 
                self._api = {}
38
 
                self._api_encoder = {}
39
 
                self._neighbor_process = {}
40
 
                self._broken = []
41
 
 
42
 
        def _terminate (self,process):
43
 
                self.logger.processes("Terminating process %s" % process)
44
 
                self._process[process].terminate()
45
 
                self._process[process].wait()
46
 
                del self._process[process]
47
 
 
48
 
        def terminate (self):
49
 
                for process in list(self._process):
50
 
                        if not self.silence:
51
 
                                self.write(process,self._api_encoder[process].shutdown())
52
 
                self.silence = True
53
 
                time.sleep(0.1)
54
 
                for process in list(self._process):
55
 
                        try:
56
 
                                self._terminate(process)
57
 
                        except OSError:
58
 
                                # we most likely received a SIGTERM signal and our child is already dead
59
 
                                self.logger.processes("child process %s was already dead" % process)
60
 
                                pass
61
 
                self.clean()
62
 
 
63
 
        def _start (self,process):
64
 
                try:
65
 
                        if process in self._process:
66
 
                                self.logger.processes("process already running")
67
 
                                return
68
 
                        if not process in self.supervisor.configuration.process:
69
 
                                self.logger.processes("Can not start process, no configuration for it (anymore ?)")
70
 
                                return
71
 
 
72
 
                        # Prevent some weird termcap data to be created at the start of the PIPE
73
 
                        # \x1b[?1034h (no-eol) (esc)
74
 
                        os.environ['TERM']='dumb'
75
 
 
76
 
                        run = self.supervisor.configuration.process[process].get('run','')
77
 
                        if run:
78
 
                                api = self.supervisor.configuration.process[process]['encoder']
79
 
                                self._api_encoder[process] = JSON('2.0') if api == 'json' else Text('1.0')
80
 
 
81
 
                                self._process[process] = subprocess.Popen(run,
82
 
                                        stdin=subprocess.PIPE,
83
 
                                        stdout=subprocess.PIPE,
84
 
                                        preexec_fn=preexec_helper
85
 
                                        # This flags exists for python 2.7.3 in the documentation but on on my MAC
86
 
                                        # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
87
 
                                )
88
 
                                fcntl.fcntl(self._process[process].stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
89
 
 
90
 
                                self.logger.processes("Forked process %s" % process)
91
 
 
92
 
                        neighbor = self.supervisor.configuration.process[process]['neighbor']
93
 
                        self._neighbor_process.setdefault(neighbor,[]).append(process)
94
 
                except (subprocess.CalledProcessError,OSError,ValueError),e:
95
 
                        self._broken.append(process)
96
 
                        self.logger.processes("Could not start process %s" % process)
97
 
                        self.logger.processes("reason: %s" % str(e))
98
 
 
99
 
        def start (self):
100
 
                for process in self.supervisor.configuration.process:
101
 
                        self._start(process)
102
 
                for process in list(self._process):
103
 
                        if not process in self.supervisor.configuration.process:
104
 
                                self._terminate(process)
105
 
 
106
 
        def broken (self,neighbor):
107
 
                if self._broken:
108
 
                        if '*' in self._broken:
109
 
                                return True
110
 
                        for process in self._neighbor_process.get(neighbor,[]):
111
 
                                if process in self._broken:
112
 
                                        return True
113
 
                return False
114
 
 
115
 
        def received (self):
116
 
                lines = {}
117
 
                for process in list(self._process):
118
 
                        try:
119
 
                                proc = self._process[process]
120
 
                                r,_,_ = select.select([proc.stdout,],[],[],0)
121
 
                                if r:
122
 
                                        try:
123
 
                                                while True:
124
 
                                                        line = proc.stdout.readline().rstrip()
125
 
                                                        if line:
126
 
                                                                self.logger.processes("Command from process %s : %s " % (process,line))
127
 
                                                                lines.setdefault(process,[]).append(line)
128
 
                                                        else:
129
 
                                                                self.logger.processes("The process died, trying to respawn it")
130
 
                                                                self._terminate(process)
131
 
                                                                self._start(process)
132
 
                                                                break
133
 
                                        except IOError,e:
134
 
                                                if e.errno == errno.EINTR:  # call interrupted
135
 
                                                        pass  # we most likely have data, we will try to read them a the next loop iteration
136
 
                                                elif e.errno != errno.EAGAIN:  # no more data
137
 
                                                        self.logger.processes("unexpected errno received from forked process: %d [%s]" % (e.errno,errno.errorcode[e.errno]))
138
 
                        except (subprocess.CalledProcessError,OSError,ValueError):
139
 
                                self.logger.processes("Issue with the process, terminating it and restarting it")
140
 
                                self._terminate(process)
141
 
                                self._start(process)
142
 
                return lines
143
 
 
144
 
        def write (self,process,string):
145
 
                while True:
146
 
                        try:
147
 
                                self._process[process].stdin.write('%s\r\n' % string)
148
 
                        except IOError,e:
149
 
                                self._broken.append(process)
150
 
                                if e.errno == errno.EPIPE:
151
 
                                        self._broken.append(process)
152
 
                                        self.logger.processes("Issue while sending data to our helper program")
153
 
                                        raise ProcessError()
154
 
                                else:
155
 
                                        # Could it have been caused by a signal ? What to do.
156
 
                                        self.logger.processes("REPORT TO DEVELOPERS: IOError received while SENDING data to helper program %s, retrying" % str(e.errno))
157
 
                                        continue
158
 
                        break
159
 
 
160
 
                try:
161
 
                        self._process[process].stdin.flush()
162
 
                except IOError,e:
163
 
                        # AFAIK, the buffer should be flushed at the next attempt.
164
 
                        self.logger.processes("REPORT TO DEVELOPERS: IOError received while FLUSHING data to helper program %s, retrying" % str(e.errno))
165
 
 
166
 
                return True
167
 
 
168
 
        def _notify (self,neighbor,event):
169
 
                for process in self._neighbor_process.get(neighbor,[]):
170
 
                        if process in self._process:
171
 
                                yield process
172
 
                for process in self._neighbor_process.get('*',[]):
173
 
                        if process in self._process:
174
 
                                yield process
175
 
 
176
 
        def up (self,neighbor):
177
 
                if self.silence: return
178
 
                for process in self._notify(neighbor,'neighbor-changes'):
179
 
                        self.write(process,self._api_encoder[process].up(neighbor))
180
 
 
181
 
        def connected (self,neighbor):
182
 
                if self.silence: return
183
 
                for process in self._notify(neighbor,'neighbor-changes'):
184
 
                        self.write(process,self._api_encoder[process].connected(neighbor))
185
 
 
186
 
        def down (self,neighbor,reason=''):
187
 
                if self.silence: return
188
 
                for process in self._notify(neighbor,'neighbor-changes'):
189
 
                        self.write(process,self._api_encoder[process].down(neighbor))
190
 
 
191
 
        def receive (self,neighbor,category,header,body):
192
 
                if self.silence: return
193
 
                for process in self._notify(neighbor,'receive-packets'):
194
 
                        self.write(process,self._api_encoder[process].receive(neighbor,category,header,body))
195
 
 
196
 
        def send (self,neighbor,category,header,body):
197
 
                if self.silence: return
198
 
                for process in self._notify(neighbor,'send-packets'):
199
 
                        self.write(process,self._api_encoder[process].send(neighbor,category,header,body))
200
 
 
201
 
        def routes (self,neighbor,routes):
202
 
                if self.silence: return
203
 
                for process in self._notify(neighbor,'receive-routes'):
204
 
                        self.write(process,self._api_encoder[process].routes(neighbor,routes))