~openerp-community/openobject-server/fix-1130010-toxml-escape-quot

« back to all changes in this revision

Viewing changes to openerp/service/workers.py

  • Committer: Christophe Simonis
  • Date: 2013-06-13 17:39:00 UTC
  • mfrom: (4867.1.20 saas-1)
  • Revision ID: chs@openerp.com-20130613173900-xl7rh321nnw2b04b
[MERGE] forward port of branch saas-1 up to revid 4887 chs@openerp.com-20130612153934-qyp6pb3bc4za4taf

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
import socket
15
15
import sys
16
16
import time
 
17
import subprocess
 
18
import os.path
17
19
 
18
20
import werkzeug.serving
19
21
try:
23
25
 
24
26
import openerp
25
27
import openerp.tools.config as config
 
28
from openerp.tools.misc import stripped_sys_argv
26
29
 
27
30
_logger = logging.getLogger(__name__)
28
31
 
35
38
    def __init__(self, app):
36
39
        # config
37
40
        self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
38
 
        self.long_polling_address = (config['xmlrpc_interface'] or '0.0.0.0', config['longpolling_port'])
39
41
        self.population = config['workers']
40
42
        self.timeout = config['limit_time_real']
41
43
        self.limit_request = config['limit_request']
46
48
        self.socket = None
47
49
        self.workers_http = {}
48
50
        self.workers_cron = {}
49
 
        self.workers_longpolling = {}
50
51
        self.workers = {}
51
52
        self.generation = 0
52
53
        self.queue = []
 
54
        self.long_polling_pid = None
53
55
 
54
56
    def pipe_new(self):
55
57
        pipe = os.pipe()
89
91
            worker.run()
90
92
            sys.exit(0)
91
93
 
 
94
    def long_polling_spawn(self):
 
95
        nargs = stripped_sys_argv('--pidfile')
 
96
        cmd = nargs[0]
 
97
        cmd = os.path.join(os.path.dirname(cmd), "openerp-long-polling")
 
98
        nargs[0] = cmd
 
99
        popen = subprocess.Popen(nargs)
 
100
        self.long_polling_pid = popen.pid
 
101
 
92
102
    def worker_pop(self, pid):
93
103
        if pid in self.workers:
94
104
            _logger.debug("Worker (%s) unregistered",pid)
143
153
            self.worker_spawn(WorkerHTTP, self.workers_http)
144
154
        while len(self.workers_cron) < config['max_cron_threads']:
145
155
            self.worker_spawn(WorkerCron, self.workers_cron)
146
 
        while len(self.workers_longpolling) < 1:
147
 
            self.worker_spawn(WorkerLongPolling, self.workers_longpolling)
 
156
        if not self.long_polling_pid:
 
157
            self.long_polling_spawn()
148
158
 
149
159
    def sleep(self):
150
160
        try:
183
193
        self.socket.setblocking(0)
184
194
        self.socket.bind(self.address)
185
195
        self.socket.listen(8*self.population)
186
 
        # long polling socket
187
 
        self.long_polling_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
188
 
        self.long_polling_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
189
 
        self.long_polling_socket.setblocking(0)
190
 
        self.long_polling_socket.bind(self.long_polling_address)
191
 
        self.long_polling_socket.listen(8)
192
196
 
193
197
    def stop(self, graceful=True):
 
198
        if self.long_polling_pid is not None:
 
199
            self.worker_kill(self.long_polling_pid, signal.SIGTERM)
 
200
            self.long_polling_pid = None
194
201
        if graceful:
195
202
            _logger.info("Stopping gracefully")
196
203
            limit = time.time() + self.timeout
353
360
 
354
361
    def start(self):
355
362
        Worker.start(self)
356
 
        self.multi.long_polling_socket.close()
357
363
        self.server = WorkerBaseWSGIServer(self.multi.app)
358
364
 
359
 
class WorkerLongPolling(Worker):
360
 
    """ Long polling workers """
361
 
    def __init__(self, multi):
362
 
        super(WorkerLongPolling, self).__init__(multi)
363
 
        # Disable the watchdog feature for this kind of worker.
364
 
        self.watchdog_timeout = None
365
 
 
366
 
    def watch_parent(self):
367
 
        import gevent
368
 
        while True:
369
 
            if self.ppid != os.getppid():
370
 
                _logger.info("WorkerLongPolling (%s) Parent changed", self.pid)
371
 
                os.kill(os.getpid(), signal.SIGTERM)
372
 
                return
373
 
            gevent.sleep(self.multi.beat)
374
 
 
375
 
    def start(self):
376
 
        openerp.evented = True
377
 
        _logger.info('Using gevent mode')
378
 
        import gevent.monkey
379
 
        gevent.monkey.patch_all()
380
 
        import gevent_psycopg2
381
 
        gevent_psycopg2.monkey_patch()
382
 
        from openerp.modules.registry import RegistryManager
383
 
        from gevent.coros import RLock
384
 
        RegistryManager.registries_lock = RLock()
385
 
 
386
 
        Worker.start(self)
387
 
        self.multi.socket.close()
388
 
 
389
 
        import gevent
390
 
        watcher = gevent.spawn(self.watch_parent)
391
 
 
392
 
        log = _logger.getChild(self.__class__.__name__)
393
 
        log.write = lambda msg: log.info(msg.strip())
394
 
 
395
 
        from gevent.wsgi import WSGIServer
396
 
        self.server = WSGIServer(self.multi.long_polling_socket, self.multi.app, log=log)
397
 
        self.server.serve_forever()
398
 
 
399
365
class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
400
366
    """ werkzeug WSGI Server patched to allow using an external listen socket
401
367
    """
469
435
    def start(self):
470
436
        Worker.start(self)
471
437
        self.multi.socket.close()
472
 
        self.multi.long_polling_socket.close()
473
438
        openerp.service.start_internal()
474
439
 
475
440
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: