1
# Copyright 2008-2015 Canonical
3
# This program is free software: you can redistribute it and/or modify
4
# it under the terms of the GNU Affero General Public License as
5
# published by the Free Software Foundation, either version 3 of the
6
# License, or (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU Affero General Public License for more details.
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
# For further info, check http://launchpad.net/filesync-server
18
"""The Storage network server.
20
Handles all the networking part of the server.
21
All code here must be non blocking and the messages should never leave
36
from functools import wraps
40
import twisted.web.error
42
import oops_datedir_repo
44
import metrics.services
48
from twisted.application.service import MultiService, Service
49
from twisted.application.internet import TCPServer
50
from twisted.internet.defer import maybeDeferred, inlineCallbacks
51
from twisted.internet.protocol import Factory
52
from twisted.internet import defer, reactor, error, task, stdio
53
from twisted.python.failure import Failure
55
from s3lib.s3lib import S3, ProducerStopped
59
from metrics import get_meter
60
from metrics.metricsconnector import MetricsConnector
61
from backends.filesync.data import errors as dataerror
62
from backends.filesync.notifier import notifier
63
from ubuntuone.storage.server.logger import configure_logger, TRACE
64
from config import config
65
from ubuntuone.monitoring.reactor import ReactorInspector
66
from ubuntuone.storage.rpcdb import inthread
67
from ubuntuone.storage.server import auth, content, errors, stats
68
from ubuntuone.storageprotocol import protocol_pb2, request, sharersp
69
from ubuntuone.supervisor import utils as supervisor_utils
71
# this is the minimal cap we support (to avoid hardcoding it in the code)
72
MIN_CAP = frozenset(["no-content", "account-info", "resumable-uploads",
73
"fix462230", "volumes", "generations"])
75
# these are the capabilities combinations that we support
76
SUPPORTED_CAPS = set([MIN_CAP])
78
# this is the capabilities combination that we prefer, the latest one
79
PREFERRED_CAP = MIN_CAP
81
# these is where we suggest to reconnect in case we deny the capabilities
83
# frozenset(["example1"]): dict(hostname="fs-3.server.com", port=443)
84
# frozenset(["example2"]): dict(srv_record="_https._tcp.fs.server.com")
90
def install_signal_handlers():
91
"""Install custom SIGUSR2 handler."""
92
reactor.callWhenRunning(signal.signal, signal.SIGUSR2, sigusr2_handler)
95
def sigusr2_handler(signum, frame):
96
"""Handle SIGUSR2 to reload the config."""
98
logger = logging.getLogger("storage.server")
99
logger.info('Reloading config file')
100
config.config = config._Config()
104
"""Make a function that logs at lvl log level."""
105
def level_log(self, message, *args, **kwargs):
107
self.log(lvl, message, *args, **kwargs)
111
def trace_message(function):
112
"""A decorator to trace incoming messages."""
113
def decorator(self, message):
115
self.log.trace_message("IN: ", message)
116
function(self, message)
120
def _prettify_traceback(report, context):
121
"""Make the traceback nicer."""
122
if 'tb_text' in report:
123
tb = "Traceback (most recent call last):\n"
124
tb += report['tb_text']
125
tb += "%s: '%s'" % (report['type'], report['value'])
126
report['tb_text'] = tb
129
def configure_oops():
130
"""Configure the oopses."""
131
oops_config = oops.Config()
132
oops_config.on_create.append(_prettify_traceback)
133
vers_info = dict(branch_nick=versioninfo.version_info['branch_nick'],
134
revno=versioninfo.version_info['revno'])
135
oops_config.template.update(vers_info)
136
datedir_repo = oops_datedir_repo.DateDirRepo(config.oops.path,
139
oops_config.publisher = oops.publishers.publish_to_many(
140
datedir_repo.publish)
144
class StorageLogger(object):
145
"""Create logs for the server.
148
session_id remote_ip:remote_port username request_type request_id \
151
Unknown fields are replaced with a '-'.
153
Plus whatever the log handler prepends to the line, normally a timestamp
157
def __init__(self, protocol):
158
"""Create the logger."""
159
self.protocol = protocol
161
def log(self, lvl, message, *args, **kwargs):
163
if self.protocol.logger.isEnabledFor(lvl):
164
self._log(lvl, message, *args, **kwargs)
166
def _log(self, lvl, message, *args, **kwargs):
167
"""Actually do the real log"""
168
msg_format = "%(uuid)s %(remote)s %(userid)s %(message)s"
169
extra = {"message": message}
170
if self.protocol.user is not None:
171
extra["userid"] = urllib.quote(self.protocol.user.username,
172
":~/").replace('%', '%%')
174
extra["userid"] = "-"
176
# be robust in case we have no transport
177
if self.protocol.transport is not None:
178
peer = self.protocol.transport.getPeer()
179
extra["remote"] = peer.host + ":" + str(peer.port)
181
extra["remote"] = '%s.transport is None' % self.__class__.__name__
183
extra["uuid"] = self.protocol.session_id
185
message = msg_format % extra
186
self.protocol.logger.log(lvl, message, *args, **kwargs)
188
def trace_message(self, text, message):
189
"""Log a message with some pre processing."""
190
if self.protocol.logger.isEnabledFor(TRACE):
191
if message.type != protocol_pb2.Message.BYTES:
192
self.trace(text + (str(message).replace("\n", " ")))
194
critical = loglevel(logging.CRITICAL)
195
error = loglevel(logging.ERROR)
196
warning = loglevel(logging.WARNING)
197
info = loglevel(logging.INFO)
198
debug = loglevel(logging.DEBUG)
199
trace = loglevel(TRACE)
202
class StorageServerLogger(StorageLogger):
203
"""Logger for the server."""
205
def log(self, lvl, message, *args, **kwargs):
207
message = "- - " + message
208
super(StorageServerLogger, self).log(lvl, message, *args, **kwargs)
211
class StorageRequestLogger(StorageLogger):
212
"""Logger for requests."""
214
def __init__(self, protocol, request):
215
"""Create the logger."""
216
super(StorageRequestLogger, self).__init__(protocol)
217
self.request_class_name = request.__class__.__name__
218
self.request_id = request.id
220
def log(self, lvl, message, *args, **kwargs):
222
message = "%s %s - %s" % (self.request_class_name,
223
self.request_id, message)
224
super(StorageRequestLogger, self).log(lvl, message, *args, **kwargs)
227
class NotificationRPCTimeoutError(Exception):
228
"""RPCTimeoutError but during a notification."""
231
class PoisonException(Exception):
232
"""An exception class for poison errors."""
235
class LoopingPing(object):
236
"""Execute Ping requests in a given interval and expect a response.
238
Shutdown the request.RequestHandler if the request takes longer than
239
the specified timeout.
241
@param interval: the seconds between each Ping
242
@param timeout: the seconds to wait for a response before shutting down
244
@param request_handler: a request.RequestHandler instance
247
def __init__(self, interval, timeout, idle_timeout, request_handler):
248
self.interval = interval
249
self.timeout = timeout
250
self.idle_timeout = idle_timeout
251
self.request_handler = request_handler
253
self.next_loop = None
258
"""Create the DelayedCall instances."""
261
self.shutdown = reactor.callLater(
262
self.timeout, self._shutdown, reason='No Pong response.')
264
self.next_loop = reactor.callLater(self.interval, self.schedule)
267
"""Reset pong count and reschedule."""
271
def reschedule(self):
272
"""Reset all delayed calls."""
273
if self.shutdown is not None and self.shutdown.active():
274
self.shutdown.reset(self.timeout)
276
self.shutdown = reactor.callLater(
277
self.timeout, self._shutdown, reason='No Pong response.')
278
if self.next_loop is not None and self.next_loop.active():
279
self.next_loop.reset(self.interval)
281
self.next_loop = reactor.callLater(self.interval, self.schedule)
283
@defer.inlineCallbacks
285
"""Request a Ping and reset the shutdown timeout."""
286
yield self.request_handler.ping()
288
# check if the client is idling for too long
289
# if self.idle_timeout is 0, do nothing.
290
idle_time = self.pong_count * self.interval
291
if self.idle_timeout != 0 and idle_time >= self.idle_timeout:
292
self._shutdown(reason="idle timeout %s" % (idle_time,))
296
def _shutdown(self, reason):
297
"""Shutdown the request_handler."""
298
self.request_handler.log.info("Disconnecting - %s", reason)
299
if not self.request_handler.shutting_down:
300
self.request_handler.shutdown()
303
"""Stop all the delayed calls."""
305
if self.shutdown is not None and self.shutdown.active():
306
self.shutdown.cancel()
307
if self.next_loop is not None and self.next_loop.active():
308
self.next_loop.cancel()
309
for req in self.request_handler.requests.values():
310
# cleanup stalled Ping requests
311
if req.started and isinstance(req, request.Ping):
315
class StorageServer(request.RequestHandler):
316
"""The Storage network server."""
318
# the version the client must have (minimum)
324
"""Create a network server. The factory does this."""
325
request.RequestHandler.__init__(self)
328
self.session_id = uuid.uuid4()
329
self.logger = logging.getLogger(config.api_server.logger_name)
330
self.log = StorageServerLogger(self)
331
self.shutting_down = False
332
self.request_locked = False
333
self.pending_requests = collections.deque()
334
self.ping_loop = LoopingPing(StorageServer.PING_INTERVAL,
335
StorageServer.PING_TIMEOUT,
336
config.api_server.idle_timeout,
339
# capabilities that the server is working with
340
self.working_caps = MIN_CAP
342
self.waiting_on_poison = []
343
self.connection_time = None
344
self._metadata_count = set()
346
def set_user(self, user):
347
"""Set user and adjust values that depend on which user it is."""
349
if user.username in self.factory.trace_users:
350
# set up the logger to use the hackers' one
351
hackers_logger = config.api_server.logger_name + '.hackers'
352
self.logger = logging.getLogger(hackers_logger)
354
def poison(self, tag):
355
"""Inject a failure in the server. Works with check_poison."""
356
self.poisoned.append(tag)
358
def check_poison(self, tag):
359
"""Fail if poisoned with tag.
361
Will raise a PoisonException when called and tag matches the poison
364
if not self.poisoned:
367
poison = self.poisoned[-1]
368
if poison == "*" or poison == tag:
370
reactor.callLater(0, self.callback_on_poison, poison)
371
raise PoisonException("Service was poisoned with: %s" % poison)
373
def callback_on_poison(self, poison):
374
"""Call all the deferreds waiting on poison."""
375
for d in self.waiting_on_poison:
377
self.waiting_on_poison = []
379
def wait_for_poison(self):
380
"""Return a deferred that will be called when we are poisoned."""
382
self.waiting_on_poison.append(d)
385
def _log_error_and_oops(self, failure, where, tl=None, exc_info=None):
386
"""Auxiliar method to log error and build oops."""
387
self.log.error("Unhandled %s when calling '%s'", failure,
388
where.__name__, exc_info=exc_info)
391
if not isinstance(failure, Failure):
392
failure = Failure(failure)
393
oops = self.build_oops(failure, tl)
396
def schedule_request(self, request, callback, head=False):
397
"""Schedule this request to run."""
398
tl = getattr(request, "timeline", None)
400
self.pending_requests.appendleft((request, callback))
402
self.pending_requests.append((request, callback))
403
# we do care if it fails, means no one handled the failure before
404
if not isinstance(request, Action):
405
request.deferred.addErrback(self._log_error_and_oops,
406
request.__class__, tl=tl)
407
if not self.request_locked:
408
self.execute_next_request()
410
def release(self, request):
411
"""Request 'request' finished, others may start."""
412
if self.request_locked is not request:
413
# a release from someone not holding the lock does nothing
416
self.request_locked = False
417
if self.pending_requests and not self.shutting_down:
418
self.execute_next_request()
420
def execute_next_request(self):
421
"""Read the queue and execute a request."""
422
request, callback = self.pending_requests.popleft()
423
tl = getattr(request, "timeline", None)
426
class_name = request.__class__.__name__
427
tl.start("REQ-" + class_name,
428
"EXECUTE %s[%s]" % (class_name, request.id)).finish()
431
self.request_locked = request
433
except Exception as e:
434
self._log_error_and_oops(e, request.__class__, tl=tl,
435
exc_info=sys.exc_info())
436
self.release(request)
438
def connectionLost(self, reason=None):
439
"""Unregister this connection and potentially remove user binding."""
440
if not self.shutting_down:
442
if self.user is not None:
443
self.user.unregister_protocol(self)
445
self.factory.protocols.remove(self)
446
self.log.info("Connection Lost: %s", reason.value)
448
def connectionMade(self):
449
"""Called when a connection is made."""
450
request.RequestHandler.connectionMade(self)
451
self.factory.protocols.append(self)
452
self.log.info("Connection Made")
453
self.transport.write("%d filesync server revision %s.\r\n" %
454
(self.PROTOCOL_VERSION,
455
versioninfo.version_info['revno']))
456
self.ping_loop.start()
457
self.factory.metrics.meter("connection_made", 1)
458
self.factory.metrics.increment("connections_active")
459
self.connection_time = time.time()
462
"""Lose connection and abort requests."""
463
self.log.debug("Shutdown Request")
464
self.shutting_down = True
465
self.ping_loop.stop()
466
self.transport.loseConnection()
467
self.pending_requests.clear()
469
# stop all the pending requests
470
for req in self.requests.values():
473
if self.connection_time is not None:
474
delta = time.time() - self.connection_time
475
self.factory.metrics.timing("connection_closed", delta)
476
self.factory.metrics.decrement("connections_active")
478
def wait_for_shutdown(self):
479
"""Wait until the server has stopped serving this client."""
480
if not self.factory.graceful_shutdown:
481
return defer.succeed(True)
484
def wait_for_shutdown_worker():
485
"""Check, wait and recurse."""
487
reactor.callLater(0.1, wait_for_shutdown_worker)
491
wait_for_shutdown_worker()
494
def dataReceived(self, data):
495
"""Handle new data."""
497
self.buildMessage(data)
498
except Exception as e:
499
# here we handle and should log all errors
502
if isinstance(e, request.StorageProtocolErrorSizeTooBig):
503
self.log.warning("---- garbage in, garbage out")
506
self._log_error_and_oops(e, self.dataReceived,
507
exc_info=sys.exc_info())
509
def build_oops(self, failure, tl=None):
510
"""Create an oops entry to log the failure."""
511
context = {"exc_info": (failure.type, failure.value, failure.tb)}
515
context["timeline"] = tl
517
report = self.factory.oops_config.create(context)
520
if self.user is not None:
521
# The 'username' key is parsed as (login, user_id, display_name).
522
report["username"] = "%s,%s,%s" % (self.user.id, self.user.id,
527
def save_oops(self, report):
528
"""save the oops entry."""
529
self.factory.oops_config.publish(report)
531
def processMessage(self, message):
532
"""Log errors from requests created by incoming messages."""
533
# reset the ping loop if the message isn't a PING or a PONG
534
if message.type not in (protocol_pb2.Message.PING,
535
protocol_pb2.Message.PONG):
536
self.ping_loop.reset()
538
result = request.RequestHandler.processMessage(self, message)
539
except Exception as e:
540
self._log_error_and_oops(e, self.processMessage,
541
exc_info=sys.exc_info())
544
if isinstance(result, defer.Deferred):
545
result.addErrback(self._log_error_and_oops, result.__class__)
546
elif isinstance(result, request.Request):
547
tl = getattr(result, "timeline", None)
548
result.deferred.addErrback(self._log_error_and_oops,
549
result.__class__, tl=tl)
551
def handle_PROTOCOL_VERSION(self, message):
552
"""Handle PROTOCOL_VERSION message.
554
If the version is less or more than whats allowed, we send an error
555
and drop the connection.
557
version = message.protocol.version
558
response = protocol_pb2.Message()
559
response.id = message.id
560
if self.VERSION_REQUIRED <= version <= self.PROTOCOL_VERSION:
561
response.type = protocol_pb2.Message.PROTOCOL_VERSION
562
response.protocol.version = self.PROTOCOL_VERSION
563
self.sendMessage(response)
564
self.log.debug("client requested protocol version %d", version)
566
msg = "client requested invalid version %s" % (version,)
567
response.type = protocol_pb2.Message.ERROR
568
response.error.type = protocol_pb2.Error.UNSUPPORTED_VERSION
569
response.error.comment = msg
570
self.sendMessage(response)
572
# wrong protocol version is no longer an error.
576
def handle_PING(self, message):
577
"""handle an incoming ping message."""
578
self.check_poison("ping")
579
response = protocol_pb2.Message()
580
response.id = message.id
581
response.type = protocol_pb2.Message.PONG
582
self.log.trace("ping pong")
583
self.sendMessage(response)
585
def handle_AUTH_REQUEST(self, message):
586
"""Handle AUTH_REQUEST message.
588
If already logged in, send an error.
591
request = AuthenticateResponse(self, message)
594
def handle_MAKE_DIR(self, message):
595
"""Handle MAKE_DIR message."""
596
request = MakeResponse(self, message)
599
def handle_MAKE_FILE(self, message):
600
"""Handle MAKE_FILE message."""
601
request = MakeResponse(self, message)
604
def handle_GET_DELTA(self, message):
605
"""Handle GET_DELTA message."""
606
if message.get_delta.from_scratch:
607
request = RescanFromScratchResponse(self, message)
609
request = GetDeltaResponse(self, message)
612
def handle_GET_CONTENT(self, message):
613
"""Handle GET_CONTENT message."""
614
request = GetContentResponse(self, message)
617
def handle_MOVE(self, message):
618
"""Handle MOVE message."""
619
request = MoveResponse(self, message)
622
def handle_PUT_CONTENT(self, message):
623
"""Handle PUT_CONTENT message."""
624
request = PutContentResponse(self, message)
627
def handle_UNLINK(self, message):
628
"""Handle UNLINK message."""
629
request = Unlink(self, message)
632
def handle_CREATE_UDF(self, message):
633
"""Handle CREATE_UDF message."""
634
request = CreateUDF(self, message)
637
def handle_DELETE_VOLUME(self, message):
638
"""Handle DELETE_VOLUME message."""
639
request = DeleteVolume(self, message)
642
def handle_LIST_VOLUMES(self, message):
643
"""Handle LIST_VOLUMES message."""
644
request = ListVolumes(self, message)
647
def handle_CREATE_SHARE(self, message):
648
"""Handle CREATE_SHARE message."""
649
request = CreateShare(self, message)
652
def handle_SHARE_ACCEPTED(self, message):
653
"""Handle SHARE_ACCEPTED message."""
654
request = ShareAccepted(self, message)
657
def handle_LIST_SHARES(self, message):
658
"""Handle LIST_SHARES message."""
659
request = ListShares(self, message)
662
def handle_DELETE_SHARE(self, message):
663
"""Handle DELETE_SHARE message."""
664
request = DeleteShare(self, message)
667
def handle_CANCEL_REQUEST(self, message):
668
"""Ignores this misreceived cancel."""
669
# if this arrives to this Request Handler, it means that what
670
# we're trying to cancel already finished (and the client will
671
# receive or already received the ok for that), so we ignore it
673
def handle_BYTES(self, message):
674
"""Ignores this misreceived message."""
675
# if this arrives to this Request Handler, it means that what
676
# we're receiving messages that hit the network before
677
# everything is properly cancelled, so we ignore it
678
handle_EOF = handle_BYTES
680
def handle_QUERY_CAPS(self, message):
681
"""Handle QUERY_CAPS message."""
682
request = QuerySetCapsResponse(self, message)
685
def handle_SET_CAPS(self, message):
686
"""Handle SET_CAPS message."""
687
request = QuerySetCapsResponse(self, message, set_mode=True)
690
def handle_FREE_SPACE_INQUIRY(self, message):
691
"""Handle FREE_SPACE_INQUIRY message."""
692
request = FreeSpaceResponse(self, message)
695
def handle_ACCOUNT_INQUIRY(self, message):
696
"""Handle account inquiry message."""
697
request = AccountResponse(self, message)
701
class BaseRequestResponse(request.RequestResponse):
702
"""Base RequestResponse class.
704
It keeps a weak reference of the protocol instead of the real ref.
707
__slots__ = ('use_protocol_weakref', '_protocol_ref', 'timeline')
709
def __init__(self, protocol, message):
710
"""Create the request response."""
711
self.use_protocol_weakref = config.api_server.protocol_weakref
712
self._protocol_ref = None
713
self.timeline = timeline.Timeline(format_stack=None)
714
super(BaseRequestResponse, self).__init__(protocol, message)
717
"""Get the context of this request, to be passed down the RPC layer."""
718
return {"timeline": self.timeline}
720
def _get_protocol(self):
721
"""Return the protocol instance."""
722
if self.use_protocol_weakref:
723
protocol = self._protocol_ref()
725
# protocol reference gone,
726
raise errors.ProtocolReferenceError(str(self._protocol_ref))
729
return self._protocol_ref
731
def _set_protocol(self, protocol):
732
"""Set the weak ref. to the protocol instance."""
733
if self.use_protocol_weakref:
734
self._protocol_ref = weakref.ref(protocol)
736
self._protocol_ref = protocol
738
protocol = property(fget=_get_protocol, fset=_set_protocol)
741
"""Override this method to start the request."""
742
raise NotImplementedError("request needs to do something")
745
class StorageServerRequestResponse(BaseRequestResponse):
746
"""Base class for all server request responses."""
748
__slots__ = ('_id', 'log', 'start_time', 'last_good_state_ts',
749
'length', 'operation_data')
751
def __init__(self, protocol, message):
752
"""Create the request response. Setup logger."""
754
super(StorageServerRequestResponse, self).__init__(protocol, message)
755
self.log = StorageRequestLogger(self.protocol, self)
756
self.start_time = None
757
self.last_good_state_ts = None
759
self.operation_data = None
762
"""Return this request id."""
765
def _set_id(self, id):
766
"""Set this request id."""
767
# check if self.log is defined and isn't None
768
if getattr(self, 'log', None) is not None:
769
# update the request_id in self.log
770
self.log.request_id = id
773
# override self.id with a property
774
id = property(fget=_get_id, fset=_set_id)
776
def _get_node_info(self):
777
"""Return node info from the message.
779
This should be overwritten by our children, but it's not mandatory.
783
"""Schedule the request start for later.
785
It will be executed when the server has no further pending requests.
788
def _scheduled_start():
789
"""The scheduled call."""
790
self.start_time = time.time()
791
self.last_good_state_ts = time.time()
792
super(StorageServerRequestResponse, self).start()
793
self.protocol.check_poison("request_start")
795
self.protocol.addProducer(self)
797
# we add the id to the request here, even if it happens again on the
798
# super() call in _scheduled_start(), but it's good to have the id
799
# while scheduled, before really started
800
self.id = self.source_message.id
801
self.protocol.requests[self.source_message.id] = self
803
class_name = self.__class__.__name__
806
"SCHEDULE %s[%s:%s] CAPS %r WITH %r" % (
807
class_name, self.protocol.session_id,
808
self.id, self.protocol.working_caps,
809
str(self.source_message)[:MAX_OOPS_LINE])).finish()
811
self.log.info("Request being scheduled")
812
self.log.trace_message("IN: ", self.source_message)
813
self.protocol.factory.metrics.increment("request_instances.%s" %
814
(self.__class__.__name__,))
815
self.protocol.schedule_request(self, _scheduled_start)
816
self.protocol.check_poison("request_schedule")
819
"""Stop the request."""
824
# not even started! just log and cleanup
825
self.log.debug("Request being released before start")
829
"""remove the reference to self from the request handler"""
830
super(StorageServerRequestResponse, self).cleanup()
831
self.protocol.factory.metrics.decrement("request_instances.%s" %
832
(self.__class__.__name__,))
833
self.protocol.release(self)
836
def error(self, failure):
837
"""Overrided error to add logging. Never fail, always log."""
838
class_name = self.__class__.__name__
840
if isinstance(failure, Exception):
841
failure = Failure(failure)
842
self.log.error("Request error",
843
exc_info=(failure.type, failure.value, None))
844
o = self.protocol.build_oops(failure, self.timeline)
845
self.protocol.save_oops(o)
846
request.RequestResponse.error(self, failure)
847
except Exception as e:
848
msg = 'error() crashed when processing %s: %s'
849
# we shouldn't create an oops here since we just failed
850
self.log.error(msg, failure, e, exc_info=sys.exc_info())
852
self.protocol.factory.sli_metrics.sli_error(class_name)
853
if self.start_time is not None:
854
delta = time.time() - self.start_time
855
self.protocol.factory.metrics.timing(
856
"%s.request_error" % (class_name,), delta)
857
self.protocol.factory.metrics.timing("request_error", delta)
859
transferred = getattr(self, 'transferred', None)
860
if transferred is not None:
861
msg = "%s.transferred" % (class_name,)
862
self.protocol.factory.metrics.gauge(msg, transferred)
864
def done(self, _=None):
865
"""Overrided done to add logging.
867
Added an ignored parameter so we can hook this to the defered chain.
870
if self.operation_data is None:
871
self.log.info("Request done")
873
self.log.info("Request done: %s", self.operation_data)
874
request.RequestResponse.done(self)
875
except Exception as e:
876
self.error(Failure(e))
878
class_name = self.__class__.__name__
879
factory = self.protocol.factory
880
transferred = getattr(self, 'transferred', None)
882
if self.start_time is not None:
883
delta = time.time() - self.start_time
884
factory.metrics.timing("%s.request_finished" % (class_name,),
886
factory.metrics.timing("request_finished", delta)
888
# inform SLI metric here if operation has a valid length (some
889
# operations change it, default is 1 for other operations, and
890
# Put/GetContentResponse set it to None to not inform *here*)
891
if self.length is not None:
892
self.protocol.factory.sli_metrics.sli(class_name, delta,
895
if transferred is not None:
896
msg = "%s.transferred" % (class_name,)
897
factory.metrics.gauge(msg, transferred)
899
# measure specific "user's client" activity
900
user_activity = getattr(self, 'user_activity', None)
901
if user_activity is not None:
902
user_id = getattr(self.protocol.user, 'id', '')
903
factory.user_metrics.report(user_activity, str(user_id), 'd')
905
def sendMessage(self, message):
906
"""Send a message and trace it."""
907
self.log.trace_message("OUT: ", message)
908
request.RequestResponse.sendMessage(self, message)
910
def _processMessage(self, message):
911
"""Locally overridable part of processMessage."""
913
def processMessage(self, message):
914
"""Process the received messages.
916
If the request already started, really process the message rightaway;
917
if not, just schedule the processing for later.
921
result = self._processMessage(message)
923
def _process_later():
924
"""Deferred call to process the message."""
925
self._processMessage(message)
926
self.protocol.schedule_request(self, _process_later)
929
def convert_share_id(self, share_id):
930
"""Convert a share_id from a message to a UUID or None.
932
@param share_id: a str representation of a share id
933
@return: uuid.UUID or None
935
if share_id != request.ROOT:
936
return uuid.UUID(share_id)
940
def queue_action(self, callable_action, head=True):
941
"""Queue a callable action.
943
Return a deferred that will be fired when the action finish.
945
action = Action(self, callable_action)
946
action.start(head=head)
947
return action.deferred
949
def _get_extension(self, path):
950
"""Return an extension from the name/path."""
951
parts = path.rsplit(".", 1)
954
if len(endpart) <= 4:
958
class SimpleRequestResponse(StorageServerRequestResponse):
959
"""Common request/response bits."""
963
authentication_required = True
965
# a list of foreign (non protocol) errors to be filled by heirs
966
expected_foreign_errors = []
968
# a list of foreing errors that will always be handled by this super class
969
# because all operations could generate them (RetryLimitReached because of
970
# access to the DB, and DoesNotExist because user account becoming
971
# disabled while connected)
972
generic_foreign_errors = [
973
dataerror.RetryLimitReached,
974
dataerror.DoesNotExist,
975
dataerror.LockedUserError,
979
# convert "foreign" errors directly to protocol error codes;
980
# StorageServerError and TryAgain (and heirs) will be handled automatically
981
# because Failure.check compares exceptions using isinstance().
983
dataerror.NotEmpty: protocol_pb2.Error.NOT_EMPTY,
984
dataerror.NotADirectory: protocol_pb2.Error.NOT_A_DIRECTORY,
985
dataerror.NoPermission: protocol_pb2.Error.NO_PERMISSION,
986
dataerror.DoesNotExist: protocol_pb2.Error.DOES_NOT_EXIST,
987
dataerror.AlreadyExists: protocol_pb2.Error.ALREADY_EXISTS,
988
dataerror.QuotaExceeded: protocol_pb2.Error.QUOTA_EXCEEDED,
989
dataerror.InvalidFilename: protocol_pb2.Error.INVALID_FILENAME,
990
psycopg2.DataError: protocol_pb2.Error.INVALID_FILENAME,
992
# violation of primary key, foreign key, or unique constraints
993
dataerror.IntegrityError: protocol_pb2.Error.ALREADY_EXISTS,
995
# These get converted to TRY_AGAIN
997
dataerror.RetryLimitReached,
998
error.TCPTimedOutError,
1001
auth_required_error = 'Authentication required and the user is None.'
1002
upload_not_accepted_error = (
1003
'This upload has not been accepted yet content is being sent.')
1006
"""Process the request."""
1008
def _meter_s3_timeout(self, failure):
1009
"""Send metrics on S3 timeouts."""
1010
if failure.check(errors.S3Error):
1011
factory = self.protocol.factory
1012
last_responsive = factory.reactor_inspector.last_responsive_ts
1013
last_good = self.last_good_state_ts
1014
# Unresponsive server or client not reading/writing fast enough?
1015
reason = "server" if last_responsive <= last_good else "client"
1017
class_name = self.__class__.__name__
1018
metric_name = "%s.request_error.s3_timeout.%s"
1019
factory.metrics.meter(metric_name % (class_name, reason), 1)
1021
def _send_protocol_error(self, failure):
1022
"""Convert the failure to a protocol error.
1024
Send it if possible; otherwise, continue propagating it.
1027
# Convert try_again_errors that weren't converted at lower levels
1028
if failure.check(*self.try_again_errors):
1029
failure = Failure(errors.TryAgain(failure.value))
1031
# Send metrics if we have an S3 error
1032
self._meter_s3_timeout(failure)
1034
foreign_errors = (self.generic_foreign_errors +
1035
self.expected_foreign_errors)
1036
error = failure.check(errors.StorageServerError, *foreign_errors)
1037
comment = failure.getErrorMessage().decode('utf8')
1038
if failure.check(errors.TryAgain):
1039
orig_error_name = failure.value.orig_error.__class__.__name__
1040
comment = u"%s: %s" % (orig_error_name, comment)
1041
msg = 'Sending TRY_AGAIN to client because we got: %s', comment
1042
self.log.warning(msg, Failure(failure.value.orig_error))
1043
comment = u"TryAgain (%s)" % comment
1045
metric_name = "TRY_AGAIN.%s" % orig_error_name
1046
self.protocol.factory.metrics.meter(metric_name, 1)
1048
if error == dataerror.QuotaExceeded:
1049
# handle the case of QuotaExceeded
1050
# XXX: check Bug #605847
1051
protocol_error = self.protocol_errors[error]
1052
free_bytes = failure.value.free_bytes
1053
volume_id = str(failure.value.volume_id or '')
1054
free_space_info = {'share_id': volume_id, 'free_bytes': free_bytes}
1055
self.sendError(protocol_error, comment=comment,
1056
free_space_info=free_space_info)
1057
return # the error has been consumed
1058
elif error in self.protocol_errors:
1059
# an error we expect and can convert to a protocol error
1060
protocol_error = self.protocol_errors[error]
1061
self.sendError(protocol_error, comment=comment)
1062
return # the error has been consumed
1063
elif (error == errors.StorageServerError and
1064
not failure.check(errors.InternalError)):
1065
# an error which corresponds directly to a protocol error
1066
self.sendError(failure.value.errno, comment=comment)
1067
return # the error has been consumed
1068
elif error == dataerror.LockedUserError:
1069
self.log.warning("Shutting down protocol: user locked")
1070
if not self.protocol.shutting_down:
1071
self.protocol.shutdown()
1072
return # the error has been consumed
1074
# an unexpected or unconvertable error
1075
self.sendError(protocol_pb2.Error.INTERNAL_ERROR,
1077
return failure # propagate the error
1079
def internal_error(self, failure):
1080
"""Handle a failure that caused an INTERNAL_ERROR."""
1081
# only call self.error, if I'm not finished
1082
if not self.finished:
1084
# only shutdown the protocol if isn't already doing the shutdown
1085
if not self.protocol.shutting_down:
1086
self.protocol.shutdown()
1088
def _log_start(self):
1089
"""Log that the request started."""
1090
txt = "Request being started"
1091
on_what = self._get_node_info()
1093
txt += ", working on: " + str(on_what)
1097
"""Kick off request processing."""
1100
if self.authentication_required and self.protocol.user is None:
1101
self.sendError(protocol_pb2.Error.AUTHENTICATION_REQUIRED,
1102
comment=self.auth_required_error)
1105
d = maybeDeferred(self._process)
1106
d.addErrback(self._send_protocol_error)
1107
d.addCallbacks(self.done, self.internal_error)
1110
class ListShares(SimpleRequestResponse):
1111
"""LIST_SHARES Request Response."""
1117
"""List shares for the client."""
1118
user = self.protocol.user
1119
shared_by, shared_to = yield user.list_shares()
1120
self.length = len(shared_by) + len(shared_to)
1122
for share in shared_by:
1123
# get the volume_id of the shared node.
1124
volume_id = yield user.get_volume_id(share['root_id'])
1125
volume_id = str(volume_id) if volume_id else ''
1126
if volume_id == user.root_volume_id:
1127
# return the protocol root instead of users real root
1128
volume_id = request.ROOT
1129
share_resp = sharersp.ShareResponse.from_params(
1130
share['id'], "from_me", share['root_id'], share['name'],
1131
share['shared_to_username'] or u"",
1132
share['shared_to_visible_name'] or u"",
1133
share['accepted'], share['access'],
1134
subtree_volume_id=volume_id)
1135
response = protocol_pb2.Message()
1136
response.type = protocol_pb2.Message.SHARES_INFO
1137
share_resp.dump_to_msg(response.shares)
1138
self.sendMessage(response)
1140
for share in shared_to:
1141
share_resp = sharersp.ShareResponse.from_params(
1142
share['id'], "to_me", share['root_id'], share['name'],
1143
share['shared_by_username'] or u"",
1144
share['shared_by_visible_name'] or u"",
1145
share['accepted'], share['access'])
1146
response = protocol_pb2.Message()
1147
response.type = protocol_pb2.Message.SHARES_INFO
1148
share_resp.dump_to_msg(response.shares)
1149
self.sendMessage(response)
1152
response = protocol_pb2.Message()
1153
response.type = protocol_pb2.Message.SHARES_END
1154
self.sendMessage(response)
1156
# save data to be logged on operation end
1157
self.operation_data = "shared_by=%d shared_to=%d" % (
1158
len(shared_by), len(shared_to))
1161
class ShareAccepted(SimpleRequestResponse):
1162
"""SHARE_ACCEPTED Request Response."""
1166
# these are the valid access levels and their translation from the
1168
_answer_prot2nice = {
1169
protocol_pb2.ShareAccepted.YES: "Yes",
1170
protocol_pb2.ShareAccepted.NO: "No",
1173
def _get_node_info(self):
1174
"""Return node info from the message."""
1175
share_id = self.source_message.share_accepted.share_id
1176
return "share: %r" % (share_id,)
1180
"""Mark the share as accepted."""
1181
mes = self.source_message.share_accepted
1182
answer = self._answer_prot2nice[mes.answer]
1184
accept_share = self.protocol.user.share_accepted
1185
share_id = self.convert_share_id(mes.share_id)
1186
yield accept_share(share_id, answer)
1188
# send the ok to the requesting client
1189
response = protocol_pb2.Message()
1190
response.type = protocol_pb2.Message.OK
1191
self.sendMessage(response)
1193
# save data to be logged on operation end
1194
self.operation_data = "vol_id=%s answer=%s" % (share_id, answer)
1197
class CreateShare(SimpleRequestResponse):
1198
"""CREATE_SHARE Request Response."""
1202
expected_foreign_errors = [
1203
dataerror.IntegrityError,
1204
dataerror.NotADirectory,
1205
dataerror.NoPermission,
1208
# these are the valid access levels and their translation from the
1210
_valid_access_levels = {
1211
protocol_pb2.CreateShare.VIEW: "View",
1212
protocol_pb2.CreateShare.MODIFY: "Modify",
1215
user_activity = 'create_share'
1219
"""Create the share."""
1220
mes = self.source_message.create_share
1221
access_level = self._valid_access_levels[mes.access_level]
1223
create_share = self.protocol.user.create_share
1224
share_id = yield create_share(mes.node, mes.share_to,
1225
mes.name, access_level)
1227
# send the ok to the requesting client
1228
response = protocol_pb2.Message()
1229
response.type = protocol_pb2.Message.SHARE_CREATED
1230
response.share_created.share_id = str(share_id)
1231
self.sendMessage(response)
1233
# save data to be logged on operation end
1234
self.operation_data = "vol_id=%s access_level=%s" % (
1235
share_id, access_level)
1238
class DeleteShare(SimpleRequestResponse):
1239
"""Deletes a share we had offered."""
1243
expected_foreign_errors = [
1244
dataerror.IntegrityError,
1245
dataerror.NoPermission,
1248
def _get_node_info(self):
1249
"""Return node info from the message."""
1250
share_id = self.source_message.delete_share.share_id
1251
return "share: %r" % (share_id,)
1255
"""Delete the share."""
1256
share_id = self.convert_share_id(
1257
self.source_message.delete_share.share_id)
1258
yield self.protocol.user.delete_share(share_id)
1260
# send the ok to the requesting client
1261
response = protocol_pb2.Message()
1262
response.type = protocol_pb2.Message.OK
1263
self.sendMessage(response)
1265
# save data to be logged on operation end
1266
self.operation_data = "vol_id=%s" % (share_id,)
1269
class CreateUDF(SimpleRequestResponse):
1270
"""CREATE_UDF Request Response."""
1274
expected_foreign_errors = [dataerror.NoPermission]
1276
user_activity = 'sync_activity'
1280
"""Create the share."""
1281
mes = self.source_message.create_udf
1282
data = yield self.protocol.user.create_udf(
1283
mes.path, mes.name, self.protocol.session_id)
1284
udf_id, udf_rootid, udf_path = data
1286
# send the ok to the requesting client
1287
response = protocol_pb2.Message()
1288
response.type = protocol_pb2.Message.VOLUME_CREATED
1289
response.volume_created.type = protocol_pb2.Volumes.UDF
1290
response.volume_created.udf.volume = str(udf_id)
1291
response.volume_created.udf.node = str(udf_rootid)
1292
response.volume_created.udf.suggested_path = udf_path
1293
self.sendMessage(response)
1295
# save data to be logged on operation end
1296
self.operation_data = "vol_id=%s" % (udf_id,)
1299
class DeleteVolume(SimpleRequestResponse):
1300
"""DELETE_VOLUME Request Response."""
1304
expected_foreign_errors = [dataerror.NoPermission]
1306
user_activity = 'sync_activity'
1308
def _get_node_info(self):
1309
"""Return node info from the message."""
1310
volume_id = self.source_message.delete_volume.volume
1311
return "volume: %r" % (volume_id,)
1315
"""Delete the volume."""
1316
volume_id = self.source_message.delete_volume.volume
1317
if volume_id == request.ROOT:
1318
raise dataerror.NoPermission("Root volume can't be deleted.")
1320
vol_id = uuid.UUID(volume_id)
1322
raise dataerror.DoesNotExist("No such volume %r" % volume_id)
1323
yield self.protocol.user.delete_volume(
1324
vol_id, self.protocol.session_id)
1326
# send the ok to the requesting client
1327
response = protocol_pb2.Message()
1328
response.type = protocol_pb2.Message.OK
1329
self.sendMessage(response)
1331
# save data to be logged on operation end
1332
self.operation_data = "vol_id=%s" % (vol_id,)
1335
class ListVolumes(SimpleRequestResponse):
1336
"""LIST_VOLUMES Request Response."""
1342
"""List volumes for the client."""
1343
user = self.protocol.user
1344
result = yield user.list_volumes()
1345
root_info, shares, udfs, free_bytes = result
1346
self.length = 1 + len(shares) + len(udfs)
1349
response = protocol_pb2.Message()
1350
response.type = protocol_pb2.Message.VOLUMES_INFO
1351
response.list_volumes.type = protocol_pb2.Volumes.ROOT
1352
response.list_volumes.root.node = str(root_info['root_id'])
1353
response.list_volumes.root.generation = root_info['generation']
1354
response.list_volumes.root.free_bytes = free_bytes
1355
self.sendMessage(response)
1358
for share in shares:
1360
share_resp = sharersp.ShareResponse.from_params(
1361
share['id'], direction, share['root_id'], share['name'],
1362
share['shared_by_username'] or u"",
1363
share['shared_by_visible_name'] or u"",
1364
share['accepted'], share['access'])
1365
response = protocol_pb2.Message()
1366
response.type = protocol_pb2.Message.VOLUMES_INFO
1367
response.list_volumes.type = protocol_pb2.Volumes.SHARE
1368
share_resp.dump_to_msg(response.list_volumes.share)
1369
response.list_volumes.share.generation = share['generation']
1370
response.list_volumes.share.free_bytes = share['free_bytes']
1371
self.sendMessage(response)
1375
response = protocol_pb2.Message()
1376
response.type = protocol_pb2.Message.VOLUMES_INFO
1377
response.list_volumes.type = protocol_pb2.Volumes.UDF
1378
response.list_volumes.udf.volume = str(udf['id'])
1379
response.list_volumes.udf.node = str(udf['root_id'])
1380
response.list_volumes.udf.suggested_path = udf['path']
1381
response.list_volumes.udf.generation = udf['generation']
1382
response.list_volumes.udf.free_bytes = free_bytes
1383
self.sendMessage(response)
1386
response = protocol_pb2.Message()
1387
response.type = protocol_pb2.Message.VOLUMES_END
1388
self.sendMessage(response)
1390
# save data to be logged on operation end
1391
self.operation_data = "root=%s shares=%d udfs=%d" % (
1392
root_info['root_id'], len(shares), len(udfs))
1395
class Unlink(SimpleRequestResponse):
1396
"""UNLINK Request Response."""
1400
expected_foreign_errors = [dataerror.NotEmpty, dataerror.NoPermission]
1402
user_activity = 'sync_activity'
1404
def _get_node_info(self):
1405
"""Return node info from the message."""
1406
node_id = self.source_message.unlink.node
1407
return "node: %r" % (node_id,)
1411
"""Unlink a node."""
1412
share_id = self.convert_share_id(self.source_message.unlink.share)
1413
node_id = self.source_message.unlink.node
1414
generation, kind, name, mime = yield self.protocol.user.unlink_node(
1415
share_id, node_id, session_id=self.protocol.session_id)
1417
# answer the ok to original user
1418
response = protocol_pb2.Message()
1419
response.new_generation = generation
1420
response.type = protocol_pb2.Message.OK
1421
self.sendMessage(response)
1423
# save data to be logged on operation end
1424
extension = self._get_extension(name)
1425
self.operation_data = "vol_id=%s node_id=%s type=%s mime=%r ext=%r" % (
1426
share_id, node_id, kind, mime, extension)
1429
class BytesMessageProducer(object):
1430
"""Adapt a bytes producer to produce BYTES messages."""
1432
payload_size = request.MAX_PAYLOAD_SIZE
1434
def __init__(self, bytes_producer, request):
1435
"""Create a BytesMessageProducer."""
1436
self.producer = bytes_producer
1437
bytes_producer.consumer = self
1438
self.request = request
1439
self.logger = request.log
1441
def resumeProducing(self):
1442
"""IPushProducer interface."""
1443
self.logger.trace("BytesMessageProducer resumed, http producer: %s",
1446
self.producer.resumeProducing()
1448
def stopProducing(self):
1449
"""IPushProducer interface."""
1450
self.logger.trace("BytesMessageProducer stopped, http producer: %s",
1453
self.producer.stopProducing()
1455
def pauseProducing(self):
1456
"""IPushProducer interface."""
1457
self.logger.trace("BytesMessageProducer paused, http producer: %s",
1460
self.producer.pauseProducing()
1462
def write(self, content):
1463
"""Part of IConsumer."""
1465
part = content[p:p + self.payload_size]
1467
if self.request.cancelled:
1468
# stop generating messages
1470
response = protocol_pb2.Message()
1471
response.type = protocol_pb2.Message.BYTES
1472
response.bytes.bytes = part
1473
self.request.transferred += len(part)
1474
self.request.sendMessage(response)
1475
p += self.payload_size
1476
part = content[p:p + self.payload_size]
1477
self.request.last_good_state_ts = time.time()
1480
def cancel_filter(function):
1481
"""Raises RequestCancelledError if the request is cancelled.
1483
This methods exists to be used in a addCallback sequence to assure
1484
that it does not continue if the request is cancelled, like:
1486
>>> d.addCallback(cancel_filter(foo))
1487
>>> d.addCallbacks(done_callback, error_errback)
1489
Note that you may receive RequestCancelledError in your
1490
'error_errback' func.
1493
def f(self, *args, **kwargs):
1494
'''Function to be called from twisted when its time arrives.'''
1496
raise request.RequestCancelledError(
1497
"The request id=%d "
1498
"is cancelled! (before calling %r)" % (self.id, function))
1499
return function(self, *args, **kwargs)
1503
class GetContentResponse(SimpleRequestResponse):
1504
"""GET_CONTENT Request Response."""
1506
__slots__ = ('cancel_message', 'message_producer',
1507
'transferred', 'init_time')
1509
def __init__(self, protocol, message):
1510
super(GetContentResponse, self).__init__(protocol, message)
1511
self.cancel_message = None
1512
self.message_producer = None
1513
self.transferred = 0
1515
self.length = None # to not inform automatically on done()
1517
def _get_node_info(self):
1518
"""Return node info from the message."""
1519
node_id = self.source_message.get_content.node
1520
return "node: %r" % (node_id,)
1522
def _send_protocol_error(self, failure):
1523
"""Convert the failure to a protocol error.
1525
Send it if possible; otherwise, continue propagating it.
1528
is_cancelled = (failure.check(ProducerStopped) and self.cancelled)
1530
if failure.check(request.RequestCancelledError) or is_cancelled:
1531
# the normal sequence was interrupted by a cancel
1532
if self.cancel_message is not None:
1533
response = protocol_pb2.Message()
1534
response.id = self.cancel_message.id
1535
response.type = protocol_pb2.Message.CANCELLED
1536
self.sendMessage(response)
1538
msg = 'Got cancelling failure %s but cancel_message is None.'
1539
self.log.warning(msg, failure)
1540
return # the error has been consumed
1543
# handle all the TRY_AGAIN cases
1544
if failure.check(ProducerStopped, error.ConnectionLost):
1545
failure = Failure(errors.TryAgain(failure.value))
1546
return SimpleRequestResponse._send_protocol_error(self, failure)
1549
"""Get node content and send it."""
1550
self.init_time = time.time()
1552
share_id = self.convert_share_id(self.source_message.get_content.share)
1555
"""Send EOF message."""
1556
response = protocol_pb2.Message()
1557
response.type = protocol_pb2.Message.EOF
1558
self.sendMessage(response)
1560
def get_content(node):
1561
"""Get the content for node."""
1562
d = node.get_content(
1563
user=self.protocol.user,
1564
previous_hash=self.source_message.get_content.hash,
1565
start=self.source_message.get_content.offset)
1568
def stop_if_cancelled(producer):
1569
"""Stop the producer if cancelled."""
1570
# let's forget ProducerStopped if we're cancelled
1571
def ignore_producer_stopped(failure):
1572
"""Like failure trap with extras."""
1573
if failure.check(ProducerStopped) and self.cancelled:
1574
# I know, I know, I cancelled you!
1578
producer.deferred.addErrback(ignore_producer_stopped)
1581
producer.stopProducing()
1584
def get_node_attrs(node):
1585
"""Get attributes for 'node' and send them."""
1586
r = protocol_pb2.Message()
1587
r.type = protocol_pb2.Message.NODE_ATTR
1588
r.node_attr.deflated_size = node.deflated_size
1589
r.node_attr.size = node.size
1590
r.node_attr.hash = node.content_hash
1591
r.node_attr.crc32 = node.crc32
1594
# save data to be logged on operation end
1595
self.operation_data = "vol_id=%s node_id=%s hash=%s size=%s" % (
1596
share_id, self.source_message.get_content.node,
1597
node.content_hash, node.size)
1600
if self.protocol.user is None:
1601
self.sendError(protocol_pb2.Error.AUTHENTICATION_REQUIRED,
1602
comment=self.auth_required_error)
1606
d = self.protocol.user.get_node(
1608
self.source_message.get_content.node,
1609
self.source_message.get_content.hash)
1610
d.addCallback(self.cancel_filter(get_node_attrs))
1611
# get content and validate hash
1612
d.addCallback(self.cancel_filter(get_content))
1613
# stop producer if cancelled
1614
d.addCallback(stop_if_cancelled)
1616
d.addCallback(self.cancel_filter(self.send))
1618
d.addCallback(self.cancel_filter(done))
1619
d.addErrback(self._send_protocol_error)
1620
d.addCallbacks(self.done, self.internal_error)
1622
def send(self, producer):
1623
"""Send node to the client."""
1624
delta = time.time() - self.init_time
1625
self.protocol.factory.sli_metrics.sli('GetContentResponseInit', delta)
1627
message_producer = BytesMessageProducer(producer, self)
1628
self.message_producer = message_producer
1629
self.registerProducer(message_producer, streaming=True)
1631
# release this request early
1632
self.protocol.release(self)
1634
return producer.deferred
1637
"""Cancel the request.
1639
This method is called if the request was not cancelled before
1640
(check self.cancel).
1642
producer = self.message_producer
1643
if producer is not None:
1644
self.unregisterProducer()
1645
producer.stopProducing()
1647
def _processMessage(self, message):
1648
"""Process new messages from the client inside this request."""
1649
if message.type == protocol_pb2.Message.CANCEL_REQUEST:
1650
self.cancel_message = message
1652
self.protocol.release(self)
1654
self.error(request.StorageProtocolProtocolError(message))
1657
class PutContentResponse(SimpleRequestResponse):
1658
"""PUT_CONTENT Request Response."""
1660
__slots__ = ('cancel_message', 'upload_job', 'transferred',
1661
'state', 'init_time')
1663
expected_foreign_errors = [dataerror.NoPermission, dataerror.QuotaExceeded]
1665
user_activity = 'sync_activity'
1667
# indicators for internal fsm
1668
states = "INIT UPLOADING COMMITING CANCELING ERROR DONE"
1669
states = collections.namedtuple("States", states.lower())(*states.split())
1671
# will send TRY_AGAIN on all these errors
1672
_try_again_errors = (
1674
dataerror.RetryLimitReached,
1678
def __init__(self, protocol, message):
1679
super(PutContentResponse, self).__init__(protocol, message)
1680
self.cancel_message = None
1681
self.upload_job = None
1682
self.transferred = 0
1683
self.state = self.states.init
1685
self.length = None # to not inform automatically on done()
1688
"""Override done to skip and log 'double calls'."""
1689
# Just a hack to avoid the KeyError flood until we find the
1690
# reason of the double done calls.
1692
# build a call chain of 3 levels
1693
curframe = inspect.currentframe()
1694
calframe1 = inspect.getouterframes(curframe, 2)
1695
callers = [calframe1[i][3] for i in range(1, 4)]
1696
call_chain = ' -> '.join(reversed(callers))
1697
self.log.warning("%s: called done() finished=%s",
1698
call_chain, self.finished)
1700
SimpleRequestResponse.done(self)
1702
def _get_node_info(self):
1703
"""Return node info from the message."""
1704
node_id = self.source_message.put_content.node
1705
return "node: %r" % (node_id,)
1708
"""Create upload reservation and start receiving."""
1709
self.init_time = time.time()
1712
if self.protocol.user is None:
1713
self.sendError(protocol_pb2.Error.AUTHENTICATION_REQUIRED,
1714
comment=self.auth_required_error)
1715
self.state = self.states.done
1718
d = self._start_upload()
1719
d.addErrback(self._generic_error)
1721
def _log_exception(self, exc):
1722
"""Log an exception before it is handled by the parent request."""
1723
if self.upload_job is not None:
1724
size_hint = self.upload_job.inflated_size_hint
1727
if isinstance(exc, errors.TryAgain):
1728
m = "Upload failed with TryAgain error: %s, size_hint: %s"
1729
self.log.debug(m, exc.orig_error.__class__.__name__, size_hint)
1731
m = "Upload failed with error: %s, size_hint: %s"
1732
self.log.debug(m, exc.__class__.__name__, size_hint)
1734
@defer.inlineCallbacks
1735
def _generic_error(self, failure):
1736
"""Process all possible errors."""
1737
# it can be an exception, let's work with a failure from now on
1738
if not isinstance(failure, Failure):
1739
failure = Failure(failure)
1742
if failure.check(request.RequestCancelledError):
1743
if self.state == self.states.canceling:
1744
# special error while canceling
1745
self.log.debug("Request cancelled: %s", exc)
1748
self.log.warning("Error while in %s: %s (%s)", self.state,
1749
exc.__class__.__name__, exc)
1750
if self.state in (self.states.error, self.states.done):
1751
# if already in error/done state, just ignore it (after logging)
1753
self.state = self.states.error
1755
# on any error, we just stop the upload job
1756
if self.upload_job is not None:
1757
self.log.debug("Stoping the upload job after an error")
1758
yield self.upload_job.stop()
1759
# and unregister the transport from the upload_job
1760
self.upload_job.unregisterProducer()
1762
# handle all the TRY_AGAIN cases
1763
if failure.check(*self._try_again_errors):
1764
exc = errors.TryAgain(exc)
1765
failure = Failure(exc)
1767
# generic error handler, and done (and really fail if we get
1768
# a problem while doing that)
1770
self._log_exception(exc)
1771
yield self._send_protocol_error(failure)
1774
yield self.internal_error(Failure())
1777
def _processMessage(self, message):
1778
"""Receive the content for the upload."""
1779
if self.state == self.states.uploading:
1781
self._process_while_uploading(message)
1782
except Exception as e:
1783
self._generic_error(e)
1786
if self.state == self.states.init:
1787
if message.type == protocol_pb2.Message.CANCEL_REQUEST:
1789
self.state = self.states.canceling
1790
self.cancel_message = message
1792
except Exception as e:
1793
self._generic_error(e)
1796
if self.state == self.states.error:
1797
# just ignore the message
1800
self.log.warning("Received out-of-order message: state=%s message=%s",
1801
self.state, message.type)
1803
def _process_while_uploading(self, message):
1804
"""Receive any messages while in UPLOADING."""
1805
if message.type == protocol_pb2.Message.CANCEL_REQUEST:
1806
self.state = self.states.canceling
1807
self.cancel_message = message
1810
elif message.type == protocol_pb2.Message.EOF:
1811
self.state = self.states.commiting
1812
self.upload_job.deferred.addCallback(self._commit_uploadjob)
1813
self.upload_job.deferred.addErrback(self._generic_error)
1815
elif message.type == protocol_pb2.Message.BYTES:
1816
# process BYTES, stay here in UPLOADING
1817
received_bytes = message.bytes.bytes
1818
self.transferred += len(received_bytes)
1819
self.upload_job.add_data(received_bytes)
1820
self.last_good_state_ts = time.time()
1823
self.log.error("Received unknown message: %s", message.type)
1825
@defer.inlineCallbacks
1827
"""Cancel put content.
1829
This is called from request.Request.cancel(), after a
1830
client's CANCEL_REQUEST or a request.stop().
1832
cancelled_by_user = self.state == self.states.canceling
1833
self.log.debug("Request canceled (in %s)", self.state)
1834
self.state = self.states.canceling
1836
if cancelled_by_user:
1837
# cancel the upload job and answer back
1838
if self.upload_job is not None:
1839
self.log.debug("Canceling the upload job")
1840
yield self.upload_job.cancel()
1841
response = protocol_pb2.Message()
1842
response.id = self.cancel_message.id
1843
response.type = protocol_pb2.Message.CANCELLED
1844
self.sendMessage(response)
1846
# just stop the upload job
1847
if self.upload_job is not None:
1848
self.log.debug("Stoping the upload job after a cancel")
1849
yield self.upload_job.stop()
1852
self.state = self.states.done
1855
@defer.inlineCallbacks
1856
def _commit_uploadjob(self, result):
1857
"""Callback for uploadjob when it's complete."""
1858
commit_time = time.time()
1859
if self.state != self.states.commiting:
1860
# if we are not in committing state, bail out
1864
"""The callable to queue."""
1865
if self.state != self.states.commiting:
1866
# Request has been canceled since we last checked
1867
return defer.succeed(None)
1868
return self.upload_job.commit(result)
1870
# queue the commit work
1871
new_generation = yield self.queue_action(commit)
1873
if self.state != self.states.commiting:
1874
# if we are not in committing state, bail out
1877
# when commit's done, send the ok
1878
self.protocol.factory.sli_metrics.sli('PutContentResponseCommit',
1879
time.time() - commit_time)
1880
response = protocol_pb2.Message()
1881
response.type = protocol_pb2.Message.OK
1882
response.new_generation = new_generation
1883
self.sendMessage(response)
1886
self.state = self.states.done
1889
@defer.inlineCallbacks
1890
def _start_upload(self):
1891
"""Setup the upload and tell the client to start uploading."""
1892
self.upload_job = yield self._get_upload_job()
1893
self.upload_job.deferred.addErrback(self._generic_error)
1894
if self.state in (self.states.done, self.states.canceling):
1895
# Manually canceling the upload because we were canceled
1896
# while getting the upload
1897
self.log.debug("Manually canceling the upload job (in %s)",
1899
yield self.upload_job.cancel()
1901
yield self.upload_job.connect()
1902
# register the client transport as the producer
1903
self.upload_job.registerProducer(self.protocol.transport)
1904
self.protocol.release(self)
1905
yield self._send_begin()
1906
self.state = self.states.uploading
1908
@defer.inlineCallbacks
1910
def _get_upload_job(self):
1911
"""Get the uploadjob."""
1912
share_id = self.convert_share_id(self.source_message.put_content.share)
1913
if config.api_server.magic_upload_active:
1914
magic_hash = self.source_message.put_content.magic_hash or None
1918
# save data to be logged on operation end
1919
self.operation_data = "vol_id=%s node_id=%s hash=%s size=%s" % (
1920
share_id, self.source_message.put_content.node,
1921
self.source_message.put_content.hash,
1922
self.source_message.put_content.size)
1924
# create upload reservation
1925
uploadjob = yield self.protocol.user.get_upload_job(
1927
self.source_message.put_content.node,
1928
self.source_message.put_content.previous_hash,
1929
self.source_message.put_content.hash,
1930
self.source_message.put_content.crc32,
1931
self.source_message.put_content.size,
1932
self.source_message.put_content.deflated_size,
1933
session_id=self.protocol.session_id,
1934
magic_hash=magic_hash,
1935
upload_id=self.source_message.put_content.upload_id or None)
1936
defer.returnValue(uploadjob)
1939
def _send_begin(self):
1940
"""Notify the client that he can start sending data."""
1941
delta = time.time() - self.init_time
1942
self.protocol.factory.sli_metrics.sli('PutContentResponseInit', delta)
1943
upload_type = self.upload_job.__class__.__name__
1944
offset = self.upload_job.offset
1945
response = protocol_pb2.Message()
1946
response.type = protocol_pb2.Message.BEGIN_CONTENT
1947
response.begin_content.offset = offset
1948
# only send the upload id for a new put content
1949
upload_id = self.upload_job.upload_id
1950
upload_id = '' if upload_id is None else str(upload_id)
1951
if self.source_message.put_content.upload_id != upload_id:
1952
response.begin_content.upload_id = upload_id
1953
self.sendMessage(response)
1955
factory = self.protocol.factory
1956
factory.metrics.meter("%s.upload.begin" % (upload_type,), 1)
1957
factory.metrics.gauge("%s.upload" % (upload_type,), offset)
1958
self.log.debug("%s begin content from offset %d", upload_type, offset)
1961
class GetDeltaResponse(SimpleRequestResponse):
1962
"""GetDelta Request Response."""
1966
def _get_node_info(self):
1967
"""Return node info from the message."""
1968
volume_id = self.source_message.get_delta.share
1969
return "volume: %r" % (volume_id,)
1973
"""Get the deltas and send messages."""
1974
msg = self.source_message
1975
share_id = self.convert_share_id(msg.get_delta.share)
1976
from_generation = msg.get_delta.from_generation
1977
delta_max_size = config.api_server.delta_max_size
1978
delta_info = yield self.protocol.user.get_delta(
1979
share_id, from_generation, limit=delta_max_size)
1980
nodes, vol_generation, free_bytes = delta_info
1981
yield self.send_delta_info(nodes, msg.get_delta.share)
1982
self.length = len(nodes)
1984
# now send the DELTA_END
1985
response = protocol_pb2.Message()
1986
response.type = protocol_pb2.Message.DELTA_END
1989
full = vol_generation == nodes[-1].generation
1990
response.delta_end.full = full
1992
response.delta_end.generation = vol_generation
1994
response.delta_end.generation = nodes[-1].generation
1995
response.delta_end.free_bytes = free_bytes
1996
self.sendMessage(response)
1998
# save data to be logged on operation end
1999
t = "vol_id=%s from_gen=%s current_gen=%s nodes=%d free_bytes=%s" % (
2000
share_id, from_generation, vol_generation, self.length, free_bytes)
2001
self.operation_data = t
2003
def send_delta_info(self, nodes, share_id):
2004
"""Build and send the DELTA_INFO for each node."""
2005
return task.cooperate(
2006
self._send_delta_info(nodes, share_id)).whenDone()
2008
def _send_delta_info(self, nodes, share_id):
2009
"""Build and send the DELTA_INFO for each node."""
2012
if count == config.api_server.max_delta_info:
2015
message = protocol_pb2.Message()
2016
message.type = protocol_pb2.Message.DELTA_INFO
2017
message.delta_info.type = protocol_pb2.DeltaInfo.FILE_INFO
2018
delta_info = message.delta_info
2019
delta_info.generation = node.generation
2020
delta_info.is_live = node.is_live
2022
delta_info.file_info.type = protocol_pb2.FileInfo.FILE
2024
delta_info.file_info.type = protocol_pb2.FileInfo.DIRECTORY
2025
delta_info.file_info.name = node.name
2026
delta_info.file_info.share = share_id
2027
delta_info.file_info.node = str(node.id)
2028
if node.parent_id is None:
2029
delta_info.file_info.parent = ''
2031
delta_info.file_info.parent = str(node.parent_id)
2032
delta_info.file_info.is_public = node.is_public
2033
if node.content_hash is None:
2034
delta_info.file_info.content_hash = ''
2036
delta_info.file_info.content_hash = str(node.content_hash)
2037
delta_info.file_info.crc32 = node.crc32
2038
delta_info.file_info.size = node.size
2039
delta_info.file_info.last_modified = node.last_modified
2040
self.sendMessage(message)
2044
class RescanFromScratchResponse(GetDeltaResponse):
2045
"""RescanFromScratch Request Response."""
2051
"""Get all the live nodes and send DeltaInfos."""
2052
msg = self.source_message
2053
limit = config.api_server.get_from_scratch_limit
2054
share_id = self.convert_share_id(msg.get_delta.share)
2055
# get the first chunk
2056
delta_info = yield self.protocol.user.get_from_scratch(
2057
share_id, limit=limit)
2058
# keep vol_generation and free_bytes for later
2059
nodes, vol_generation, free_bytes = delta_info
2060
self.length = len(nodes)
2062
# while it keep returning nodes, send the current nodes
2063
yield self.send_delta_info(nodes, msg.get_delta.share)
2065
last_path = (nodes[-1].path, nodes[-1].name)
2066
delta_info = yield self.protocol.user.get_from_scratch(
2067
share_id, start_from_path=last_path, limit=limit,
2068
max_generation=vol_generation)
2069
# but don't overwrite vol_generation or free_bytes in case
2070
# something changes while fetching nodes
2071
nodes, _, _ = delta_info
2072
self.length += len(nodes)
2074
# now send the DELTA_END
2075
response = protocol_pb2.Message()
2076
response.type = protocol_pb2.Message.DELTA_END
2077
response.delta_end.full = True
2078
response.delta_end.generation = vol_generation
2079
response.delta_end.free_bytes = free_bytes
2080
self.sendMessage(response)
2082
# save data to be logged on operation end
2083
t = "vol_id=%s current_gen=%s nodes=%d free_bytes=%s" % (
2084
share_id, vol_generation, self.length, free_bytes)
2085
self.operation_data = t
2088
class QuerySetCapsResponse(SimpleRequestResponse):
2089
"""QUERY_CAPS and SET_CAPS Request Response."""
2091
__slots__ = ('set_mode',)
2093
authentication_required = False
2095
def __init__(self, protocol, message, set_mode=False):
2096
"""Stores the mode."""
2097
self.set_mode = set_mode
2098
super(QuerySetCapsResponse, self).__init__(protocol, message)
2101
"""Validate the list of capabilities passed to us."""
2103
msg_caps = self.source_message.set_caps
2105
msg_caps = self.source_message.query_caps
2106
caps = frozenset(q.capability for q in msg_caps)
2108
# after one succesful, no more allowed
2109
if self.protocol.working_caps == MIN_CAP:
2110
accepted = caps in SUPPORTED_CAPS
2113
self.protocol.working_caps = caps
2115
# get the redirecting info
2116
redirect = SUGGESTED_REDIRS.get(caps, {})
2117
red_host, red_port, red_srvr = [redirect.get(x, "")
2118
for x in "hostname", "port",
2122
red_host = red_port = red_srvr = ""
2124
# log what we just decided
2125
action = "Set" if self.set_mode else "Query"
2126
result = "Accepted" if accepted else "Rejected"
2127
self.log.info("Capabilities %s %s: %s", action, result, caps)
2129
response = protocol_pb2.Message()
2130
response.type = protocol_pb2.Message.ACCEPT_CAPS
2131
response.accept_caps.accepted = accepted
2132
response.accept_caps.redirect_hostname = red_host
2133
response.accept_caps.redirect_port = red_port
2134
response.accept_caps.redirect_srvrecord = red_srvr
2135
self.sendMessage(response)
2138
class MoveResponse(SimpleRequestResponse):
2139
"""Move Request Response."""
2143
expected_foreign_errors = [
2144
dataerror.InvalidFilename,
2145
dataerror.NoPermission,
2146
dataerror.AlreadyExists,
2147
dataerror.NotADirectory,
2150
user_activity = 'sync_activity'
2152
def _get_node_info(self):
2153
"""Return node info from the message."""
2154
node_id = self.source_message.move.node
2155
return "node: %r" % (node_id,)
2159
"""Move the node."""
2160
share_id = self.convert_share_id(self.source_message.move.share)
2161
node_id = self.source_message.move.node
2162
new_parent_id = self.source_message.move.new_parent_node
2163
new_name = self.source_message.move.new_name
2165
generation, mimetype = yield self.protocol.user.move(
2166
share_id, node_id, new_parent_id, new_name,
2167
session_id=self.protocol.session_id)
2169
response = protocol_pb2.Message()
2170
response.new_generation = generation
2171
response.type = protocol_pb2.Message.OK
2172
self.sendMessage(response)
2174
# save data to be logged on operation end
2175
extension = self._get_extension(new_name)
2176
self.operation_data = "vol_id=%s node_id=%s mime=%r ext=%r" % (
2177
share_id, node_id, mimetype, extension)
2180
class MakeResponse(SimpleRequestResponse):
2181
"""MAKE_[DIR|FILE] Request Response."""
2185
expected_foreign_errors = [
2186
dataerror.InvalidFilename,
2187
dataerror.NoPermission,
2188
dataerror.AlreadyExists,
2189
dataerror.NotADirectory,
2192
user_activity = 'sync_activity'
2194
def _get_node_info(self):
2195
"""Return node info from the message."""
2196
parent_id = self.source_message.make.parent_node
2197
return "parent: %r" % (parent_id,)
2201
"""Create the file/directory."""
2202
if self.source_message.type == protocol_pb2.Message.MAKE_DIR:
2203
response_type = protocol_pb2.Message.NEW_DIR
2204
create_method_name = "make_dir"
2205
node_type = "Directory"
2206
elif self.source_message.type == protocol_pb2.Message.MAKE_FILE:
2207
response_type = protocol_pb2.Message.NEW_FILE
2208
create_method_name = "make_file"
2211
raise request.StorageProtocolError(
2212
"Can not create from message "
2213
"(type %s)." % self.source_message.type)
2215
share_id = self.convert_share_id(self.source_message.make.share)
2216
parent_id = self.source_message.make.parent_node
2217
name = self.source_message.make.name
2218
create_method = getattr(self.protocol.user, create_method_name)
2219
d = create_method(share_id, parent_id, name,
2220
session_id=self.protocol.session_id)
2221
node_id, generation, mimetype = yield d
2222
response = protocol_pb2.Message()
2223
response.type = response_type
2224
response.new_generation = generation
2225
response.new.node = str(node_id)
2226
response.new.parent_node = parent_id
2227
response.new.name = name
2228
self.sendMessage(response)
2230
# save data to be logged on operation end
2231
extension = self._get_extension(name)
2232
self.operation_data = "type=%s vol_id=%s node_id=%s mime=%r ext=%r" % (
2233
node_type, share_id, node_id, mimetype, extension)
2236
class FreeSpaceResponse(SimpleRequestResponse):
2237
"""Implements FREE_SPACE_INQUIRY."""
2241
expected_foreign_errors = [dataerror.NoPermission]
2245
"""Reports available space for the given share (or the user's own
2249
share_id = self.convert_share_id(
2250
self.source_message.free_space_inquiry.share_id)
2251
free_bytes = yield self.protocol.user.get_free_bytes(share_id)
2252
response = protocol_pb2.Message()
2253
response.type = protocol_pb2.Message.FREE_SPACE_INFO
2254
response.free_space_info.free_bytes = free_bytes
2255
self.sendMessage(response)
2257
# save data to be logged on operation end
2258
self.operation_data = "vol_id=%s free_bytes=%s" % (
2259
share_id, free_bytes)
2262
class AccountResponse(SimpleRequestResponse):
2263
"""Implements ACCOUNT_INQUIRY."""
2269
"""Reports user account information."""
2270
(purchased, used) = yield self.protocol.user.get_storage_byte_quota()
2271
response = protocol_pb2.Message()
2272
response.type = protocol_pb2.Message.ACCOUNT_INFO
2273
response.account_info.purchased_bytes = purchased
2274
self.sendMessage(response)
2277
class AuthenticateResponse(SimpleRequestResponse):
2278
"""AUTH_REQUEST Request Response."""
2282
authentication_required = False
2283
not_allowed_re = re.compile("[^\w_]")
2285
user_activity = 'connected'
2289
"""Authenticate the user."""
2290
# check that its not already logged in
2291
if self.protocol.user is not None:
2292
raise errors.AuthenticationError("User already logged in.")
2294
self.protocol.check_poison("authenticate_start")
2295
# get metadata and send stats
2297
(m.key, m.value) for m in self.source_message.metadata)
2299
self.log.info("Client metadata: %s", metadata)
2300
for key in ("platform", "version"):
2302
value = self.not_allowed_re.sub("_", metadata[key])
2303
self.protocol.factory.metrics.meter("client.%s.%s" %
2306
auth_parameters = dict(
2307
(param.name, param.value)
2308
for param in self.source_message.auth_parameters)
2309
authenticate = self.protocol.factory.auth_provider.authenticate
2310
user = yield authenticate(auth_parameters, self.protocol)
2311
self.protocol.check_poison("authenticate_continue")
2313
raise errors.AuthenticationError("Authentication failed.")
2315
# confirm authentication
2316
response = protocol_pb2.Message()
2317
response.type = protocol_pb2.Message.AUTH_AUTHENTICATED
2318
# include the session_id
2319
response.session_id = str(self.protocol.session_id)
2320
self.sendMessage(response)
2322
# set up user for this session
2323
self.protocol.set_user(user)
2325
# let the client know the root
2326
root_id, _ = yield user.get_root()
2327
response = protocol_pb2.Message()
2328
response.type = protocol_pb2.Message.ROOT
2329
response.root.node = str(root_id)
2330
self.sendMessage(response)
2333
class Action(BaseRequestResponse):
2334
"""This is an internal action, that behaves like a request.
2336
We can use this Action class to queue work in the pending_requests without
2337
need to worry of handling errors in the request itself.
2340
__slots__ = ('_callable', 'log')
2342
def __init__(self, request, action):
2343
"""Create the request response. Setup logger."""
2344
self._callable = action
2345
# keep the request logger around
2346
self.log = request.log
2347
super(Action, self).__init__(request.protocol, request.source_message)
2349
def start(self, head=True):
2350
"""Schedule the action start for later.
2352
It will be executed when the server has no further pending requests.
2355
def _scheduled_start():
2356
"""The scheduled call."""
2357
super(Action, self).start()
2359
self.log.debug("Action being scheduled (%s)", self._callable)
2360
self.protocol.factory.metrics.increment("action_instances.%s" %
2361
(self.__class__.__name__,))
2362
self.protocol.schedule_request(self, _scheduled_start, head=head)
2365
"""Kick off action processing."""
2366
self.log.debug("Action being started, working on: %s", self._callable)
2367
d = maybeDeferred(self._callable)
2368
d.addCallbacks(self.done, self.error)
2371
"""Remove the reference to self from the request handler"""
2372
self.finished = True
2373
self.started = False
2374
self.protocol.factory.metrics.decrement("action_instances.%s" %
2375
(self.__class__.__name__,))
2376
self.protocol.release(self)
2378
def done(self, result):
2379
"""Overrided done to add logging and to use the callable result.
2381
Added an ignored parameter so we can hook this to the defered chain.
2384
self.log.debug("Action done (%s)", self._callable)
2386
self.deferred.callback(result)
2387
except Exception as e:
2388
self.error(Failure(e))
2390
def error(self, failure):
2391
"""Rall this to signal that the action finished with failure
2393
@param failure: the failure instance
2396
self.deferred.errback(failure)
2399
class StorageServerFactory(Factory):
2400
"""The Storage Server Factory.
2402
@cvar protocol: The class of the server.
2404
protocol = StorageServer
2405
graceful_shutdown = config.api_server.graceful_shutdown
2407
def __init__(self, s3_host, s3_port, s3_ssl, s3_key, s3_secret,
2408
s3_proxy_host=None, s3_proxy_port=None,
2409
auth_provider_class=auth.DummyAuthProvider,
2410
s3_class=S3, content_class=content.ContentManager,
2411
reactor=reactor, oops_config=None,
2412
servername=None, reactor_inspector=None):
2413
"""Create a StorageServerFactory."""
2414
self.auth_provider = auth_provider_class(self)
2415
self.content = content_class(self)
2416
self.s3_class = s3_class
2417
self.s3_host = s3_host
2418
self.s3_port = s3_port
2419
self.s3_ssl = s3_ssl
2420
self.s3_key = s3_key
2421
self.s3_secret = s3_secret
2422
self.s3_proxy_host = s3_proxy_host
2423
self.s3_proxy_port = s3_proxy_port
2424
self.logger = logging.getLogger("storage.server")
2426
self.metrics = MetricsConnector.get_metrics("root")
2427
self.user_metrics = MetricsConnector.get_metrics("user")
2428
self.sli_metrics = MetricsConnector.get_metrics("sli")
2430
self.servername = servername
2431
self.reactor_inspector = reactor_inspector
2433
# note that this relies on the notifier calling the
2434
# callback from the reactor thread
2435
notif = notifier.get_notifier()
2436
notif.set_event_callback(
2438
self.event_callback_handler(self.deliver_udf_create))
2439
notif.set_event_callback(
2441
self.event_callback_handler(self.deliver_udf_delete))
2442
notif.set_event_callback(
2443
notifier.ShareCreated,
2444
self.event_callback_handler(self.deliver_share_created))
2445
notif.set_event_callback(
2446
notifier.ShareAccepted,
2447
self.event_callback_handler(self.deliver_share_accepted))
2448
notif.set_event_callback(
2449
notifier.ShareDeclined,
2450
self.event_callback_handler(self.deliver_share_declined))
2451
notif.set_event_callback(
2452
notifier.ShareDeleted,
2453
self.event_callback_handler(self.deliver_share_deleted))
2454
notif.set_event_callback(
2455
notifier.VolumeNewGeneration,
2456
self.event_callback_handler(self.deliver_volume_new_generation))
2459
self.reactor = reactor
2460
self.trace_users = set(config.api_server.trace_users)
2462
# oops and log observer
2463
self.oops_config = oops_config
2464
twisted.python.log.addObserver(self._deferror_handler)
2466
def _deferror_handler(self, data):
2467
"""Deferred error handler.
2469
We receive all stuff here, filter the errors and use correct info.
2471
if not data.get('isError', None):
2474
failure = data['failure']
2476
msg = data['message']
2479
msg = failure.getTraceback()
2482
self.logger.error("Unhandled error in deferred! %s", msg)
2484
def build_notification_oops(self, failure, notif, tl=None):
2485
"""Create an oops entry to log the notification failure."""
2486
context = {"exc_info": (failure.type, failure.value, failure.tb)}
2490
context["timeline"] = tl
2492
report = self.oops_config.create(context)
2497
def event_callback_handler(self, func):
2498
"""Wrap the event callback in an error handler."""
2500
def wrapper(notif, **kwargs):
2502
tl = timeline.Timeline(format_stack=None)
2503
action = tl.start("EVENT-%s" % notif.event_type, "")
2505
def notification_error_handler(failure):
2506
"""Handle error while processing a Notification."""
2507
action.detail = "NOTIFY WITH %r (%s)" % (notif, kwargs)
2510
oops = self.build_notification_oops(failure, notif, tl)
2511
oops_id = self.oops_config.publish(oops)
2512
self.logger.error(" %s in notification %r logged in OOPS: %s",
2513
failure.value, notif, oops_id)
2515
d = defer.maybeDeferred(func, notif, **kwargs)
2516
d.addErrback(notification_error_handler)
2521
"""Get an s3lib instance to do s3 operations."""
2522
return self.s3_class(self.s3_host, self.s3_port, self.s3_key,
2523
self.s3_secret, use_ssl=self.s3_ssl,
2524
proxy_host=self.s3_proxy_host,
2525
proxy_port=self.s3_proxy_port)
2528
def deliver_udf_create(self, udf_create):
2529
"""Handle UDF creation notification."""
2531
def notif_filter(protocol):
2532
"""Return True if the client should receive the notification."""
2533
return protocol.session_id != udf_create.source_session
2535
user = yield self.content.get_user_by_id(udf_create.owner_id)
2536
if user: # connected instances for this user?
2537
resp = protocol_pb2.Message()
2538
resp.type = protocol_pb2.Message.VOLUME_CREATED
2539
resp.volume_created.type = protocol_pb2.Volumes.UDF
2540
resp.volume_created.udf.volume = str(udf_create.udf_id)
2541
resp.volume_created.udf.node = str(udf_create.root_id)
2542
resp.volume_created.udf.suggested_path = udf_create.suggested_path
2543
user.broadcast(resp, filter=notif_filter)
2546
def deliver_share_created(self, share_notif):
2547
"""Handle Share creation notification."""
2549
def notif_filter(protocol):
2550
"""Return True if the client should receive the notification."""
2551
return protocol.session_id != share_notif.source_session
2553
to_user = yield self.content.get_user_by_id(share_notif.shared_to_id)
2556
share_resp = sharersp.NotifyShareHolder.from_params(
2557
share_notif.share_id, share_notif.root_id, share_notif.name,
2558
to_user.username, to_user.visible_name, share_notif.access)
2559
proto_msg = protocol_pb2.Message()
2560
proto_msg.type = protocol_pb2.Message.NOTIFY_SHARE
2561
share_resp.dump_to_msg(proto_msg.notify_share)
2562
to_user.broadcast(proto_msg, filter=notif_filter)
2565
def deliver_share_accepted(self, share_notif, recipient_id):
2566
"""Handle Share accepted notification."""
2568
def notif_filter(protocol):
2569
"""Return True if the client should receive the notification."""
2570
return protocol.session_id != share_notif.source_session
2572
def to_notif_filter(protocol):
2573
"""Return True if the client should recieve the notification."""
2574
return protocol.session_id != share_notif.source_session
2576
by_user = yield self.content.get_user_by_id(share_notif.shared_by_id)
2577
to_user = yield self.content.get_user_by_id(share_notif.shared_to_id)
2579
if by_user and recipient_id == by_user.id:
2580
proto_msg = protocol_pb2.Message()
2581
proto_msg.type = protocol_pb2.Message.SHARE_ACCEPTED
2582
proto_msg.share_accepted.share_id = str(share_notif.share_id)
2583
proto_msg.share_accepted.answer = protocol_pb2.ShareAccepted.YES
2584
by_user.broadcast(proto_msg, filter=notif_filter)
2585
if to_user and recipient_id == to_user.id:
2587
by_user = yield self.content.get_user_by_id(
2588
share_notif.shared_by_id, required=True)
2589
share_resp = sharersp.ShareResponse.from_params(
2590
str(share_notif.share_id), "to_me", share_notif.root_id,
2591
share_notif.name, by_user.username or u"",
2592
by_user.visible_name or u"", protocol_pb2.ShareAccepted.YES,
2594
resp = protocol_pb2.Message()
2595
resp.type = protocol_pb2.Message.VOLUME_CREATED
2596
resp.volume_created.type = protocol_pb2.Volumes.SHARE
2597
share_resp.dump_to_msg(resp.volume_created.share)
2598
to_user.broadcast(resp, filter=to_notif_filter)
2601
def deliver_share_declined(self, share_notif):
2602
"""Handle Share declined notification."""
2604
def notif_filter(protocol):
2605
"""Return True if the client should receive the notification."""
2606
return protocol.session_id != share_notif.source_session
2608
by_user = yield self.content.get_user_by_id(share_notif.shared_by_id)
2610
proto_msg = protocol_pb2.Message()
2611
proto_msg.type = protocol_pb2.Message.SHARE_ACCEPTED
2612
proto_msg.share_accepted.share_id = str(share_notif.share_id)
2613
proto_msg.share_accepted.answer = protocol_pb2.ShareAccepted.NO
2614
by_user.broadcast(proto_msg, filter=notif_filter)
2617
def deliver_share_deleted(self, share_notif):
2618
"""Handle Share deletion notification."""
2619
def notif_filter(protocol):
2620
"""Return True if the client should receive the notification."""
2621
return protocol.session_id != share_notif.source_session
2623
to_user = yield self.content.get_user_by_id(share_notif.shared_to_id)
2624
#by_user = yield self.content.get_user_by_id(share_notif.shared_by_id)
2627
proto_msg = protocol_pb2.Message()
2628
proto_msg.type = protocol_pb2.Message.SHARE_DELETED
2629
proto_msg.share_deleted.share_id = str(share_notif.share_id)
2631
to_user.broadcast(proto_msg, filter=notif_filter)
2633
# by_user.broadcast(proto_msg, filter=notif_filter)
2636
def deliver_udf_delete(self, udf_delete):
2637
"""Handle UDF deletion notification."""
2639
def notif_filter(protocol):
2640
"""Return True if the client should receive the notification."""
2641
return protocol.session_id != udf_delete.source_session
2643
user = yield self.content.get_user_by_id(udf_delete.owner_id)
2644
if user: # connected instances for this user?
2645
resp = protocol_pb2.Message()
2646
resp.type = protocol_pb2.Message.VOLUME_DELETED
2647
resp.volume_deleted.volume = str(udf_delete.udf_id)
2648
user.broadcast(resp, filter=notif_filter)
2651
def deliver_volume_new_generation(self, notif):
2652
"""Handle the notification of a new generation for a volume."""
2654
def notif_filter(protocol):
2655
"""Return True if the client should receive the notification."""
2656
return protocol.session_id != notif.source_session
2658
user = yield self.content.get_user_by_id(notif.user_id)
2659
if user: # connected instances for this user?
2660
resp = protocol_pb2.Message()
2661
resp.type = protocol_pb2.Message.VOLUME_NEW_GENERATION
2662
if notif.client_volume_id is None:
2663
resp.volume_new_generation.volume = ''
2665
resp.volume_new_generation.volume = str(notif.client_volume_id)
2666
resp.volume_new_generation.generation = notif.new_generation
2667
user.broadcast(resp, filter=notif_filter)
2669
def wait_for_shutdown(self):
2670
"""Wait until all the current servers have finished serving."""
2671
if not self.protocols or not self.graceful_shutdown:
2672
return defer.succeed(None)
2674
wait_d = [protocol.wait_for_shutdown() for protocol in self.protocols]
2675
done_with_current = self.wait_for_current_shutdown()
2676
d = defer.DeferredList(wait_d + [done_with_current])
2679
def wait_for_current_shutdown(self):
2680
"""wait for the current protocols to end."""
2681
d = defer.Deferred()
2686
self.reactor.callLater(0.1, _wait)
2693
class OrderedMultiService(MultiService):
2694
"""A container which starts and shuts down the services in strict order."""
2696
def startService(self):
2697
"""Start the services sequentually. Returns a deferred which
2700
# deliberately skip MultiService's implementation
2701
Service.startService(self)
2702
d = defer.succeed(None)
2703
for service in self:
2704
d.addCallback(lambda _, s: s.startService(), service)
2707
def stopService(self):
2708
"""Stop the services sequentially (in the opposite order they
2709
are started). Returns a deferred which signals completion.
2711
# deliberately skip MultiService's implementation
2712
Service.stopService(self)
2713
d = defer.succeed(None)
2714
services = list(self)
2716
for service in services:
2717
d.addBoth(lambda _, s: s.stopService(), service)
2721
def get_service_port(service):
2722
"""Returns the actual port a simple service is bound to."""
2723
# we have to query this rather than simply using the port number the
2724
# service was passed at creation time -- if the given port was 0,
2725
# the real port number will have been dynamically assigned
2726
return service._port.getHost().port
2729
class StorageServerService(OrderedMultiService):
2730
"""Wrap the whole TCP StorageServer mess as a single twisted serv."""
2732
def __init__(self, port, s3_host, s3_port, s3_ssl, s3_key, s3_secret,
2733
s3_proxy_host=None, s3_proxy_port=None,
2734
auth_provider_class=None,
2735
oops_config=None, status_port=0, heartbeat_interval=None):
2736
"""Create a StorageServerService.
2738
@param port: the port to listen on without ssl.
2739
@param s3_host: the S3 server hostname.
2740
@param s3_port: the S3 server port to connect to.
2741
@param s3_key: the S3 key.
2742
@param s3_secret: the S3 secret.
2743
@param auth_provider_class: the authentication provider.
2745
OrderedMultiService.__init__(self)
2746
self.heartbeat_writer = None
2747
if heartbeat_interval is None:
2748
heartbeat_interval = float(config.api_server.heartbeat_interval)
2749
self.heartbeat_interval = heartbeat_interval
2750
self.rpcdal_client = None
2751
self.rpcauth_client = None
2752
self.logger = logging.getLogger("storage.server")
2753
self.servername = config.api_server.servername
2754
self.logger.info("Starting %s", self.servername)
2756
"protocol buffers implementation: %s",
2757
os.environ.get("PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION", "None"))
2759
namespace = config.api_server.metrics_namespace
2760
# Register all server metrics components
2761
MetricsConnector.register_metrics("root", namespace)
2762
# Important: User activity is in a global namespace!
2763
environment = config.general.environment_name
2764
user_namespace = environment + ".storage.user_activity"
2765
MetricsConnector.register_metrics("user", user_namespace)
2766
MetricsConnector.register_metrics("reactor_inspector",
2767
namespace + ".reactor_inspector")
2768
sli_metric_namespace = config.api_server.sli_metric_namespace
2769
MetricsConnector.register_metrics('sli', sli_metric_namespace)
2771
self.metrics = get_meter(scope='service')
2773
listeners = MultiService()
2774
listeners.setName("Listeners")
2775
listeners.setServiceParent(self)
2777
self._reactor_inspector = ReactorInspector(self.logger,
2778
reactor.callFromThread)
2780
self.factory = StorageServerFactory(
2781
s3_host, s3_port, s3_ssl, s3_key, s3_secret,
2782
s3_proxy_host=s3_proxy_host, s3_proxy_port=s3_proxy_port,
2783
auth_provider_class=auth_provider_class,
2784
oops_config=oops_config, servername=self.servername,
2785
reactor_inspector=self._reactor_inspector)
2787
self.tcp_service = TCPServer(port, self.factory)
2788
self.tcp_service.setName("TCP")
2789
self.tcp_service.setServiceParent(listeners)
2791
# setup the status service
2792
self.status_service = stats.create_status_service(
2793
self, listeners, status_port)
2795
self.next_log_loop = None
2796
stats_log_interval = config.api_server.stats_log_interval
2797
self.stats_worker = stats.StatsWorker(self, stats_log_interval,
2802
"""The port without ssl."""
2803
return get_service_port(self.tcp_service)
2806
def status_port(self):
2807
"""The status service port."""
2808
return get_service_port(self.status_service)
2810
def start_rpc_client(self):
2811
"""Setup the rpc client."""
2812
self.logger.info("Starting the RPC clients.")
2813
self.rpcdal_client = inthread.ThreadedNonRPC(inthread.DAL_BACKEND)
2814
self.rpcauth_client = inthread.ThreadedNonRPC(inthread.AUTH_BACKEND)
2817
def startService(self):
2818
"""Start listening on two ports."""
2819
self.logger.info("- - - - - SERVER STARTING")
2820
yield OrderedMultiService.startService(self)
2821
yield defer.maybeDeferred(self.start_rpc_client)
2822
self.factory.content.rpcdal_client = self.rpcdal_client
2823
self.factory.rpcauth_client = self.rpcauth_client
2824
self.stats_worker.start()
2825
self.metrics.meter('server_start')
2826
self.metrics.increment('services_active')
2827
metrics.services.revno()
2829
self._reactor_inspector.start()
2830
# only start the HeartbeatWriter if the interval is > 0
2831
if self.heartbeat_interval > 0:
2832
self.heartbeat_writer = stdio.StandardIO(
2833
supervisor_utils.HeartbeatWriter(self.heartbeat_interval,
2837
def stopService(self):
2838
"""Stop listening on both ports."""
2839
self.logger.info("- - - - - SERVER STOPPING")
2840
yield self.stats_worker.stop()
2841
yield OrderedMultiService.stopService(self)
2842
yield self.factory.wait_for_shutdown()
2843
self.metrics.meter('server_stop')
2844
self.metrics.decrement('services_active')
2845
if self.metrics.connection:
2846
self.metrics.connection.disconnect()
2847
if self.factory.metrics.connection:
2848
self.factory.metrics.connection.disconnect()
2849
if self.factory.user_metrics.connection:
2850
self.factory.user_metrics.connection.disconnect()
2851
self._reactor_inspector.stop()
2852
if self.heartbeat_writer:
2853
self.heartbeat_writer.loseConnection()
2854
self.heartbeat_writer = None
2855
for metrics_key in ["reactor_inspector", "sli"]:
2856
metrics = MetricsConnector.get_metrics(metrics_key)
2857
if metrics.connection:
2858
metrics.connection.disconnect()
2859
self.logger.info("- - - - - SERVER STOPPED")
2862
def create_service(s3_host, s3_port, s3_ssl, s3_key, s3_secret,
2863
s3_proxy_host=None, s3_proxy_port=None,
2865
auth_provider_class=auth.DummyAuthProvider,
2867
"""Start the StorageServer service."""
2870
logger = logging.getLogger(config.api_server.logger_name)
2871
handler = configure_logger(logger=logger, propagate=False,
2872
filename=config.api_server.log_filename,
2873
start_observer=True)
2876
s3_logger = logging.getLogger('s3lib')
2877
s3_logger.setLevel(config.general.log_level)
2878
s3_logger.addHandler(handler)
2880
# set up the hacker's logger always in TRACE
2881
h_logger = logging.getLogger(config.api_server.logger_name + ".hackers")
2882
h_logger.setLevel(TRACE)
2883
h_logger.propagate = False
2884
h_logger.addHandler(handler)
2886
logger.debug('S3 host:%s port:%s', s3_host, s3_port)
2888
# turn on heapy if must to
2889
if os.getenv('USE_HEAPY'):
2890
logger.debug('importing heapy')
2892
import guppy.heapy.RM
2894
logger.warning('guppy-pe/heapy not available, remote monitor '
2895
'thread not started')
2898
logger.debug('activated heapy remote monitor')
2901
if config.api_server.gc_debug:
2903
gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
2904
logger.debug("set gc debug on")
2906
if status_port is None:
2907
status_port = config.api_server.status_port
2909
# create the service
2910
service = StorageServerService(
2911
config.api_server.tcp_port, s3_host, s3_port, s3_ssl, s3_key,
2912
s3_secret, s3_proxy_host, s3_proxy_port, auth_provider_class,
2913
oops_config, status_port)