101
97
report['tb_text'] = tb
104
def configure_oops():
105
"""Configure the oopses."""
106
oops_config = oops.Config()
107
oops_config.on_create.append(_prettify_traceback)
108
vers_info = dict(branch_nick=version_info['branch_nick'],
109
revno=version_info['revno'])
110
oops_config.template.update(vers_info)
111
datedir_repo = oops_datedir_repo.DateDirRepo(settings.OOPS_PATH,
114
oops_config.publisher = oops.publishers.publish_to_many(
115
datedir_repo.publish)
119
100
class StorageLogger(object):
120
101
"""Create logs for the server.
349
330
self.waiting_on_poison.append(d)
352
def _log_error_and_oops(self, failure, where, tl=None, exc_info=None):
353
"""Auxiliar method to log error and build oops."""
333
def _log_error(self, failure, where, exc_info=None):
334
"""Auxiliar method to log error."""
354
335
self.log.error('Unhandled %s when calling %r', failure,
355
336
where.__name__, exc_info=exc_info)
358
if not isinstance(failure, Failure):
359
failure = Failure(failure)
360
oops = self.build_oops(failure, tl)
363
338
def schedule_request(self, request, callback, head=False):
364
339
"""Schedule this request to run."""
365
tl = getattr(request, 'timeline', None)
367
341
self.pending_requests.appendleft((request, callback))
369
343
self.pending_requests.append((request, callback))
370
344
# we do care if it fails, means no one handled the failure before
371
345
if not isinstance(request, Action):
372
request.deferred.addErrback(self._log_error_and_oops,
373
request.__class__, tl=tl)
346
request.deferred.addErrback(self._log_error, request.__class__)
374
347
if not self.request_locked:
375
348
self.execute_next_request()
387
360
def execute_next_request(self):
388
361
"""Read the queue and execute a request."""
389
362
request, callback = self.pending_requests.popleft()
390
tl = getattr(request, 'timeline', None)
393
class_name = request.__class__.__name__
394
tl.start('REQ-' + class_name,
395
'EXECUTE %s[%s]' % (class_name, request.id)).finish()
398
364
self.request_locked = request
400
366
except Exception as e:
401
self._log_error_and_oops(e, request.__class__, tl=tl,
402
exc_info=sys.exc_info())
367
self._log_error(e, request.__class__, exc_info=sys.exc_info())
403
368
self.release(request)
405
370
def connectionLost(self, reason=None):
470
435
self.log.warning('---- garbage in, garbage out')
473
self._log_error_and_oops(e, self.dataReceived,
474
exc_info=sys.exc_info())
476
def build_oops(self, failure, tl=None):
477
"""Create an oops entry to log the failure."""
478
context = {'exc_info': (failure.type, failure.value, failure.tb)}
482
context['timeline'] = tl
484
report = self.factory.oops_config.create(context)
487
if self.user is not None:
488
# The 'username' key is parsed as (login, user_id, display_name).
489
report['username'] = '%s,%s,%s' % (self.user.id, self.user.id,
494
def save_oops(self, report):
495
"""save the oops entry."""
496
self.factory.oops_config.publish(report)
438
self._log_error(e, self.dataReceived, exc_info=sys.exc_info())
498
440
def processMessage(self, message):
499
441
"""Log errors from requests created by incoming messages."""
505
447
result = request.RequestHandler.processMessage(self, message)
506
448
except Exception as e:
507
self._log_error_and_oops(e, self.processMessage,
508
exc_info=sys.exc_info())
449
self._log_error(e, self.processMessage, exc_info=sys.exc_info())
511
452
if isinstance(result, defer.Deferred):
512
result.addErrback(self._log_error_and_oops, result.__class__)
453
result.addErrback(self._log_error, result.__class__)
513
454
elif isinstance(result, request.Request):
514
tl = getattr(result, 'timeline', None)
515
result.deferred.addErrback(self._log_error_and_oops,
516
result.__class__, tl=tl)
455
result.deferred.addErrback(self._log_error, result.__class__)
518
457
def handle_PROTOCOL_VERSION(self, message):
519
458
"""Handle PROTOCOL_VERSION message.
671
610
It keeps a weak reference of the protocol instead of the real ref.
674
__slots__ = ('use_protocol_weakref', '_protocol_ref', 'timeline')
613
__slots__ = ('use_protocol_weakref', '_protocol_ref')
676
615
def __init__(self, protocol, message):
677
616
"""Create the request response."""
678
617
self.use_protocol_weakref = settings.api_server.PROTOCOL_WEAKREF
679
618
self._protocol_ref = None
680
self.timeline = timeline.Timeline(format_stack=None)
681
619
super(BaseRequestResponse, self).__init__(protocol, message)
684
"""Get the context of this request, to be passed down the RPC layer."""
685
return {'timeline': self.timeline}
687
621
def _get_protocol(self):
688
622
"""Return the protocol instance."""
689
623
if self.use_protocol_weakref:
767
701
self.id = self.source_message.id
768
702
self.protocol.requests[self.source_message.id] = self
770
class_name = self.__class__.__name__
773
'SCHEDULE %s[%s:%s] CAPS %r WITH %r' % (
774
class_name, self.protocol.session_id,
775
self.id, self.protocol.working_caps,
776
str(self.source_message)[:MAX_OOPS_LINE])).finish()
778
704
self.log.info('Request being scheduled')
779
705
self.log.trace_message('IN: ', self.source_message)
780
self.protocol.factory.metrics.increment('request_instances.%s' %
781
(self.__class__.__name__,))
706
self.protocol.factory.metrics.increment(
707
'request_instances.%s' % self.__class__.__name__)
782
708
self.protocol.schedule_request(self, _scheduled_start)
783
709
self.protocol.check_poison('request_schedule')
798
724
self.protocol.factory.metrics.decrement('request_instances.%s' %
799
725
(self.__class__.__name__,))
800
726
self.protocol.release(self)
803
728
def error(self, failure):
804
729
"""Overrided error to add logging. Never fail, always log."""
808
733
failure = Failure(failure)
809
734
self.log.error('Request error',
810
735
exc_info=(failure.type, failure.value, None))
811
o = self.protocol.build_oops(failure, self.timeline)
812
self.protocol.save_oops(o)
813
736
request.RequestResponse.error(self, failure)
814
737
except Exception as e:
815
738
msg = 'error() crashed when processing %s: %s'
816
# we shouldn't create an oops here since we just failed
817
739
self.log.error(msg, failure, e, exc_info=sys.exc_info())
819
741
self.protocol.factory.sli_metrics.report('sli_error', class_name)
2338
2260
def __init__(self, auth_provider_class=auth.DummyAuthProvider,
2339
2261
content_class=content.ContentManager,
2340
reactor=reactor, oops_config=None,
2341
servername=None, reactor_inspector=None):
2262
reactor=reactor, servername=None, reactor_inspector=None):
2342
2263
"""Create a StorageServerFactory."""
2343
2264
self.auth_provider = auth_provider_class(self)
2344
2265
self.content = content_class(self)
2380
2301
self.reactor = reactor
2381
2302
self.trace_users = set(settings.api_server.TRACE_USERS)
2383
# oops and log observer
2384
self.oops_config = oops_config
2385
2304
twisted.python.log.addObserver(self._deferror_handler)
2387
2306
def _deferror_handler(self, data):
2403
2322
logger.error('Unhandled error in deferred! %s', msg)
2405
def build_notification_oops(self, failure, notif, tl=None):
2406
"""Create an oops entry to log the notification failure."""
2407
context = {'exc_info': (failure.type, failure.value, failure.tb)}
2411
context['timeline'] = tl
2413
report = self.oops_config.create(context)
2418
2324
def event_callback_handler(self, func):
2419
2325
"""Wrap the event callback in an error handler."""
2421
2327
def wrapper(notif, **kwargs):
2422
2328
"""The wrapper."""
2423
tl = timeline.Timeline(format_stack=None)
2424
action = tl.start('EVENT-%s' % notif.event_type, '')
2426
2330
def notification_error_handler(failure):
2427
2331
"""Handle error while processing a Notification."""
2428
action.detail = 'NOTIFY WITH %r (%s)' % (notif, kwargs)
2431
oops = self.build_notification_oops(failure, notif, tl)
2432
self.oops_config.publish(oops)
2434
2333
'%s in notification %r while calling %s(**%r)',
2435
2334
failure.value, notif, func.__name__, kwargs)
2645
2544
"""Wrap the whole TCP StorageServer mess as a single twisted serv."""
2647
2546
def __init__(self, port, auth_provider_class=None,
2648
oops_config=None, status_port=0, heartbeat_interval=None):
2547
status_port=0, heartbeat_interval=None):
2649
2548
"""Create a StorageServerService.
2651
2550
@param port: the port to listen on without ssl.
2674
2573
self.factory = StorageServerFactory(
2675
2574
auth_provider_class=auth_provider_class,
2676
oops_config=oops_config, servername=self.servername,
2575
servername=self.servername,
2677
2576
reactor_inspector=self._reactor_inspector)
2679
2578
self.tcp_service = TCPServer(port, self.factory)
2736
2635
def create_service(status_port=None,
2737
auth_provider_class=auth.DummyAuthProvider,
2636
auth_provider_class=auth.DummyAuthProvider):
2739
2637
"""Start the StorageServer service."""
2740
2638
# turn on heapy if must to
2741
2639
if os.getenv('USE_HEAPY'):
2761
2659
# create the service
2762
2660
service = StorageServerService(
2763
settings.api_server.TCP_PORT, auth_provider_class,
2764
oops_config, status_port)
2661
settings.api_server.TCP_PORT, auth_provider_class, status_port)