~ubuntu-branches/ubuntu/raring/exabgp/raring

« back to all changes in this revision

Viewing changes to lib/exabgp/structure/processes.py

  • Committer: Package Import Robot
  • Author(s): Henry-Nicolas Tourneur
  • Date: 2013-01-02 11:42:00 UTC
  • mfrom: (1.1.6)
  • Revision ID: package-import@ubuntu.com-20130102114200-heowva4yrqiiukc5
Tags: 3.1.1-1

* New upstream release
* Closes: #687134 debconf abuse, notice of software behavior take places under
  NEWS.Debian, not in a debconf screen, information moved accordingly.
* Closes: #693338 Japanese translation removed since debconf template is out.
* Closes: #697178 Russian translation removed since debconf template is out.
* Closes: #689533 UCF-based conf added. 
  No more manual changes to exabgp.env, thanks to Federico Ceratto.
          

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
import subprocess
12
12
import select
13
13
 
 
14
from exabgp.api import Text,JSON
 
15
 
 
16
from exabgp.version import version
 
17
from exabgp.structure.environment import load
 
18
 
14
19
from exabgp.structure.log import Logger
15
20
 
16
21
class ProcessError (Exception):
28
33
                self.logger = Logger()
29
34
                self.supervisor = supervisor
30
35
                self.clean()
31
 
        
 
36
                api = load().api.encoder
 
37
                if api == 'json':
 
38
                        self.api = JSON(self.write,version,'1.0')
 
39
                else:
 
40
                        self.api = Text(self.write,version,'1.0')
 
41
 
32
42
        def clean (self):
33
43
                self._process = {}
34
44
                self._receive_routes = {}
35
45
                self._notify = {}
36
46
                self._broken = []
37
47
 
38
 
        def _terminate (self,name):
39
 
                self.logger.processes("Terminating process %s" % name)
40
 
                self._process[name].terminate()
41
 
                self._process[name].wait()
42
 
                del self._process[name]
 
48
        def _terminate (self,process):
 
49
                self.logger.processes("Terminating process %s" % process)
 
50
                self._process[process].terminate()
 
51
                self._process[process].wait()
 
52
                del self._process[process]
43
53
 
44
54
        def terminate (self):
45
 
                for name in list(self._process):
46
 
                        self.write(name,'shutdown',shutdown=True)
 
55
                for process in list(self._process):
 
56
                        self.api.shutdown(process)
 
57
                        self.api.silence = True
47
58
                time.sleep(0.1)
48
 
                for name in list(self._process):
 
59
                for process in list(self._process):
49
60
                        try:
50
 
                                self._terminate(name)
 
61
                                self._terminate(process)
51
62
                        except OSError:
52
63
                                # we most likely received a SIGTERM signal and our child is already dead
 
64
                                self.logger.processes("child process %s was already dead" % process)
53
65
                                pass
54
66
                self.clean()
55
67
 
56
 
        def _start (self,name):
 
68
        def _start (self,process):
57
69
                try:
58
 
                        if name in self._process:
 
70
                        if process in self._process:
59
71
                                self.logger.processes("process already running")
60
72
                                return
61
 
                        proc = self.supervisor.configuration.process
62
 
                        if not name in proc:
 
73
                        if not process in self.supervisor.configuration.process:
63
74
                                self.logger.processes("Can not start process, no configuration for it (anymore ?)")
64
75
                                return
65
76
                        # Prevent some weird termcap data to be created at the start of the PIPE
66
77
                        # \x1b[?1034h (no-eol) (esc)
67
78
                        os.environ['TERM']='dumb'
68
 
                        self._receive_routes[name] = proc[name]['receive-routes']
69
 
                        self._process[name] = subprocess.Popen(proc[name]['run'],
 
79
                        self._receive_routes[process] = self.supervisor.configuration.process[process]['receive-routes']
 
80
                        self._process[process] = subprocess.Popen(self.supervisor.configuration.process[process]['run'],
70
81
                                stdin=subprocess.PIPE,
71
82
                                stdout=subprocess.PIPE,
72
83
                                preexec_fn=preexec_helper
73
84
                                # This flags exists for python 2.7.3 in the documentation but on on my MAC
74
85
                                # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
75
86
                        )
76
 
                        neighbor = proc[name]['neighbor']
77
 
                        self._notify.setdefault(neighbor,[]).append(name)
78
 
                        self.logger.processes("Forked process %s" % name)
 
87
                        neighbor = self.supervisor.configuration.process[process]['neighbor']
 
88
                        self._notify.setdefault(neighbor,[]).append(process)
 
89
                        self.logger.processes("Forked process %s" % process)
79
90
                except (subprocess.CalledProcessError,OSError,ValueError),e:
80
 
                        self._broken.append(name)
81
 
                        self.logger.processes("Could not start process %s" % name)
 
91
                        self._broken.append(process)
 
92
                        self.logger.processes("Could not start process %s" % process)
82
93
                        self.logger.processes("reason: %s" % str(e))
83
94
 
84
95
        def start (self):
85
 
                proc = self.supervisor.configuration.process
86
 
                for name in proc:
87
 
                        self._start(name)
88
 
                for name in list(self._process):
89
 
                        if not name in proc:
90
 
                                self._terminate(name)
 
96
                for process in self.supervisor.configuration.process:
 
97
                        self._start(process)
 
98
                for process in list(self._process):
 
99
                        if not process in self.supervisor.configuration.process:
 
100
                                self._terminate(process)
91
101
 
92
102
        def notify (self,neighbor):
93
 
                for name in self._notify.get(neighbor,[]):
94
 
                        yield name
95
 
                for name in self._notify.get('*',[]):
96
 
                        yield name
 
103
                for process in self._notify.get(neighbor,[]):
 
104
                        yield process
 
105
                for process in self._notify.get('*',[]):
 
106
                        yield process
97
107
 
98
108
        def broken (self,neighbor):
99
109
                if self._broken:
100
 
                        for name in self._notify.get(neighbor,[]):
101
 
                                if name in self._broken:
 
110
                        for process in self._notify.get(neighbor,[]):
 
111
                                if process in self._broken:
102
112
                                        return True
103
113
                        if '*' in self._broken:
104
114
                                return True
106
116
 
107
117
        def received (self):
108
118
                lines = {}
109
 
                for name in list(self._process):
 
119
                for process in list(self._process):
110
120
                        try:
111
 
                                proc = self._process[name]
 
121
                                proc = self._process[process]
112
122
                                r = True
113
123
                                while r:
114
124
                                        r,_,_ = select.select([proc.stdout,],[],[],0)
122
132
                                                        # there is not data to read
123
133
                                                        r = False
124
134
                                                else:
125
 
                                                        self.logger.processes("Command from process %s : %s " % (name,line))
126
 
                                                        lines.setdefault(name,[]).append(line)
 
135
                                                        self.logger.processes("Command from process %s : %s " % (process,line))
 
136
                                                        lines.setdefault(process,[]).append(line)
127
137
                        except (subprocess.CalledProcessError,OSError,ValueError):
128
138
                                self.logger.processes("Issue with the process, terminating it and restarting it")
129
 
                                self._terminate(name)
130
 
                                self._start(name)
 
139
                                self._terminate(process)
 
140
                                self._start(process)
131
141
                return lines
132
142
 
133
 
        def write (self,name,string,shutdown=False):
 
143
        def write (self,process,string):
134
144
                while True:
135
145
                        try:
136
 
                                self._process[name].stdin.write('%s\r\n' % string)
 
146
                                self._process[process].stdin.write('%s\r\n' % string)
137
147
                        except IOError,e:
138
 
                                if shutdown: return True
139
 
                                self._broken.append(name)
 
148
                                self._broken.append(process)
140
149
                                if e.errno == errno.EPIPE:
141
 
                                        self._broken.append(name)
 
150
                                        self._broken.append(process)
142
151
                                        self.logger.processes("Issue while sending data to our helper program")
143
152
                                        raise ProcessError()
144
153
                                else:
145
 
                                        # Could it have been caused by a signal ? What to do. 
 
154
                                        # Could it have been caused by a signal ? What to do.
146
155
                                        self.logger.processes("REPORT TO DEVELOPERS: IOError received while SENDING data to helper program %s, retrying" % str(e.errno))
147
156
                                        continue
148
157
                        break
149
158
 
150
159
                try:
151
 
                        self._process[name].stdin.flush()
 
160
                        self._process[process].stdin.flush()
152
161
                except IOError,e:
153
 
                        if shutdown: return True
154
162
                        # AFAIK, the buffer should be flushed at the next attempt.
155
163
                        self.logger.processes("REPORT TO DEVELOPERS: IOError received while FLUSHING data to helper program %s, retrying" % str(e.errno))
156
164
 
158
166
 
159
167
        # return all the process which are interrested in route update notification
160
168
        def receive_routes (self):
161
 
                for name in self._process:
162
 
                        if self._receive_routes[name]:
163
 
                                yield name
 
169
                for process in self._process:
 
170
                        if self._receive_routes[process]:
 
171
                                yield process