1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
# The refactoring about the OpenSSL support come from Tryton
# Copyright (C) 2007-2009 Cédric Krier.
# Copyright (C) 2007-2009 Bertrand Chenal.
# Copyright (C) 2008 B2CK SPRL.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import errno
import logging
import logging.handlers
import os
import socket
import sys
import threading
import time
import release
from pprint import pformat
import warnings
import heapq
# TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
from loglevels import *
import tools
class Service(object):
""" Base class for *Local* services
Functionality here is trusted, no authentication.
"""
_services = {}
def __init__(self, name, audience=''):
Service._services[name] = self
self.__name = name
self._methods = {}
def joinGroup(self, name):
raise Exception("No group for local services")
#GROUPS.setdefault(name, {})[self.__name] = self
@classmethod
def exists(cls, name):
return name in cls._services
@classmethod
def remove(cls, name):
if cls.exists(name):
cls._services.pop(name)
def exportMethod(self, method):
if callable(method):
self._methods[method.__name__] = method
def abortResponse(self, error, description, origin, details):
if not tools.config['debug_mode']:
raise Exception("%s -- %s\n\n%s"%(origin, description, details))
else:
raise
class LocalService(object):
""" Proxy for local services.
Any instance of this class will behave like the single instance
of Service(name)
"""
__logger = logging.getLogger('service')
def __init__(self, name):
self.__name = name
try:
self._service = Service._services[name]
for method_name, method_definition in self._service._methods.items():
setattr(self, method_name, method_definition)
except KeyError, keyError:
self.__logger.error('This service does not exist: %s' % (str(keyError),) )
raise
def __call__(self, method, *params):
return getattr(self, method)(*params)
class ExportService(object):
""" Proxy for exported services.
All methods here should take an AuthProxy as their first parameter. It
will be appended by the calling framework.
Note that this class has no direct proxy, capable of calling
eservice.method(). Rather, the proxy should call
dispatch(method,auth,params)
"""
_services = {}
_groups = {}
_logger = logging.getLogger('web-services')
def __init__(self, name, audience=''):
ExportService._services[name] = self
self.__name = name
self._logger.debug("Registered an exported service: %s" % name)
def joinGroup(self, name):
ExportService._groups.setdefault(name, {})[self.__name] = self
@classmethod
def getService(cls,name):
return cls._services[name]
def dispatch(self, method, auth, params):
raise Exception("stub dispatch at %s" % self.__name)
def new_dispatch(self,method,auth,params):
raise Exception("stub dispatch at %s" % self.__name)
def abortResponse(self, error, description, origin, details):
if not tools.config['debug_mode']:
raise Exception("%s -- %s\n\n%s"%(origin, description, details))
else:
raise
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
#The background is set with 40 plus the number of the color, and the foreground with 30
#These are the sequences need to get colored ouput
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
BOLD_SEQ = "\033[1m"
COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
LEVEL_COLOR_MAPPING = {
logging.DEBUG_SQL: (WHITE, MAGENTA),
logging.DEBUG_RPC: (BLUE, WHITE),
logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
logging.DEBUG: (BLUE, DEFAULT),
logging.INFO: (GREEN, DEFAULT),
logging.TEST: (WHITE, BLUE),
logging.WARNING: (YELLOW, DEFAULT),
logging.ERROR: (RED, DEFAULT),
logging.CRITICAL: (WHITE, RED),
}
class DBFormatter(logging.Formatter):
def format(self, record):
record.dbname = getattr(threading.currentThread(), 'dbname', '?')
return logging.Formatter.format(self, record)
class ColoredFormatter(DBFormatter):
def format(self, record):
fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
return DBFormatter.format(self, record)
def init_logger():
import os
from tools.translate import resetlocale
resetlocale()
# create a format for log messages and dates
format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
if tools.config['syslog']:
# SysLog Handler
if os.name == 'nt':
handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
else:
handler = logging.handlers.SysLogHandler('/dev/log')
format = '%s %s' % (release.description, release.version) \
+ ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
elif tools.config['logfile']:
# LogFile Handler
logf = tools.config['logfile']
try:
dirname = os.path.dirname(logf)
if dirname and not os.path.isdir(dirname):
os.makedirs(dirname)
if tools.config['logrotate'] is not False:
handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
elif os.name == 'posix':
handler = logging.handlers.WatchedFileHandler(logf)
else:
handler = logging.handlers.FileHandler(logf)
except Exception:
sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
handler = logging.StreamHandler(sys.stdout)
else:
# Normal Handler on standard output
handler = logging.StreamHandler(sys.stdout)
if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
formatter = ColoredFormatter(format)
else:
formatter = DBFormatter(format)
handler.setFormatter(formatter)
# add the handler to the root logger
logger = logging.getLogger()
logger.handlers = []
logger.addHandler(handler)
logger.setLevel(int(tools.config['log_level'] or '0'))
class Agent(object):
"""Singleton that keeps track of cancellable tasks to run at a given
timestamp.
The tasks are caracterised by:
* a timestamp
* the database on which the task run
* the function to call
* the arguments and keyword arguments to pass to the function
Implementation details:
Tasks are stored as list, allowing the cancellation by setting
the timestamp to 0.
A heapq is used to store tasks, so we don't need to sort
tasks ourself.
"""
__tasks = []
__tasks_by_db = {}
_logger = logging.getLogger('netsvc.agent')
@classmethod
def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
task = [timestamp, db_name, function, args, kwargs]
heapq.heappush(cls.__tasks, task)
cls.__tasks_by_db.setdefault(db_name, []).append(task)
@classmethod
def cancel(cls, db_name):
"""Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
cls._logger.debug("Cancel timers for %s db", db_name or 'all')
if db_name is None:
cls.__tasks, cls.__tasks_by_db = [], {}
else:
if db_name in cls.__tasks_by_db:
for task in cls.__tasks_by_db[db_name]:
task[0] = 0
@classmethod
def quit(cls):
cls.cancel(None)
@classmethod
def runner(cls):
"""Neverending function (intended to be ran in a dedicated thread) that
checks every 60 seconds tasks to run. TODO: make configurable
"""
current_thread = threading.currentThread()
while True:
while cls.__tasks and cls.__tasks[0][0] < time.time():
task = heapq.heappop(cls.__tasks)
timestamp, dbname, function, args, kwargs = task
cls.__tasks_by_db[dbname].remove(task)
if not timestamp:
# null timestamp -> cancelled task
continue
current_thread.dbname = dbname # hack hack
cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
delattr(current_thread, 'dbname')
task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
task_thread.start()
time.sleep(1)
time.sleep(60)
agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
# the agent runner is a typical daemon thread, that will never quit and must be
# terminated when the main process exits - with no consequence (the processing
# threads it spawns are not marked daemon)
agent_runner.setDaemon(True)
agent_runner.start()
import traceback
class Server:
""" Generic interface for all servers with an event loop etc.
Override this to impement http, net-rpc etc. servers.
Servers here must have threaded behaviour. start() must not block,
there is no run().
"""
__is_started = False
__servers = []
__starter_threads = []
# we don't want blocking server calls (think select()) to
# wait forever and possibly prevent exiting the process,
# but instead we want a form of polling/busy_wait pattern, where
# _server_timeout should be used as the default timeout for
# all I/O blocking operations
_busywait_timeout = 0.5
__logger = logging.getLogger('server')
def __init__(self):
Server.__servers.append(self)
if Server.__is_started:
# raise Exception('All instances of servers must be inited before the startAll()')
# Since the startAll() won't be called again, allow this server to
# init and then start it after 1sec (hopefully). Register that
# timer thread in a list, so that we can abort the start if quitAll
# is called in the meantime
t = threading.Timer(1.0, self._late_start)
t.name = 'Late start timer for %s' % str(self.__class__)
Server.__starter_threads.append(t)
t.start()
def start(self):
self.__logger.debug("called stub Server.start")
def _late_start(self):
self.start()
for thr in Server.__starter_threads:
if thr.finished.is_set():
Server.__starter_threads.remove(thr)
def stop(self):
self.__logger.debug("called stub Server.stop")
def stats(self):
""" This function should return statistics about the server """
return "%s: No statistics" % str(self.__class__)
@classmethod
def startAll(cls):
if cls.__is_started:
return
cls.__logger.info("Starting %d services" % len(cls.__servers))
for srv in cls.__servers:
srv.start()
cls.__is_started = True
@classmethod
def quitAll(cls):
if not cls.__is_started:
return
cls.__logger.info("Stopping %d services" % len(cls.__servers))
for thr in cls.__starter_threads:
if not thr.finished.is_set():
thr.cancel()
cls.__starter_threads.remove(thr)
for srv in cls.__servers:
srv.stop()
cls.__is_started = False
@classmethod
def allStats(cls):
res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
res.extend(srv.stats() for srv in cls.__servers)
return '\n'.join(res)
def _close_socket(self):
if os.name != 'nt':
try:
self.socket.shutdown(getattr(socket, 'SHUT_RDWR', 2))
except socket.error, e:
if e.errno != errno.ENOTCONN: raise
# OSX, socket shutdowns both sides if any side closes it
# causing an error 57 'Socket is not connected' on shutdown
# of the other side (or something), see
# http://bugs.python.org/issue4397
self.__logger.debug(
'"%s" when shutting down server socket, '
'this is normal under OS X', e)
self.socket.close()
class OpenERPDispatcherException(Exception):
def __init__(self, exception, traceback):
self.exception = exception
self.traceback = traceback
def replace_request_password(args):
# password is always 3rd argument in a request, we replace it in RPC logs
# so it's easier to forward logs for diagnostics/debugging purposes...
args = list(args)
if len(args) > 2:
args[2] = '*'
return args
class OpenERPDispatcher:
def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None):
logger = logging.getLogger(title)
if logger.isEnabledFor(channel):
for line in pformat(msg, depth=depth).split('\n'):
logger.log(channel, line)
def dispatch(self, service_name, method, params):
try:
logger = logging.getLogger('result')
self.log('service', service_name)
self.log('method', method)
self.log('params', replace_request_password(params), depth=(None if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1))
auth = getattr(self, 'auth_provider', None)
result = ExportService.getService(service_name).dispatch(method, auth, params)
self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
return result
except Exception, e:
self.log('exception', tools.exception_to_unicode(e))
tb = getattr(e, 'traceback', sys.exc_info())
tb_s = "".join(traceback.format_exception(*tb))
if tools.config['debug_mode']:
import pdb
pdb.post_mortem(tb[2])
raise OpenERPDispatcherException(e, tb_s)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|