~ubuntuone/filesync-server/trunk

« back to all changes in this revision

Viewing changes to src/server/server.py

  • Committer: Facundo Batista
  • Date: 2015-08-05 13:10:02 UTC
  • Revision ID: facundo@taniquetil.com.ar-20150805131002-he7b7k704d8o7js6
First released version.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2008-2015 Canonical
 
2
#
 
3
# This program is free software: you can redistribute it and/or modify
 
4
# it under the terms of the GNU Affero General Public License as
 
5
# published by the Free Software Foundation, either version 3 of the
 
6
# License, or (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU Affero General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU Affero General Public License
 
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 
15
#
 
16
# For further info, check  http://launchpad.net/filesync-server
 
17
 
 
18
"""The Storage network server.
 
19
 
 
20
Handles all the networking part of the server.
 
21
All code here must be non blocking and the messages should never leave
 
22
the server's scope.
 
23
"""
 
24
 
 
25
import collections
 
26
import inspect
 
27
import logging
 
28
import os
 
29
import re
 
30
import signal
 
31
import sys
 
32
import time
 
33
import urllib
 
34
import weakref
 
35
 
 
36
from functools import wraps
 
37
 
 
38
import psycopg2
 
39
import twisted
 
40
import twisted.web.error
 
41
import oops
 
42
import oops_datedir_repo
 
43
 
 
44
import metrics.services
 
45
import versioninfo
 
46
import timeline
 
47
 
 
48
from twisted.application.service import MultiService, Service
 
49
from twisted.application.internet import TCPServer
 
50
from twisted.internet.defer import maybeDeferred, inlineCallbacks
 
51
from twisted.internet.protocol import Factory
 
52
from twisted.internet import defer, reactor, error, task, stdio
 
53
from twisted.python.failure import Failure
 
54
 
 
55
from s3lib.s3lib import S3, ProducerStopped
 
56
 
 
57
import uuid
 
58
 
 
59
from metrics import get_meter
 
60
from metrics.metricsconnector import MetricsConnector
 
61
from backends.filesync.data import errors as dataerror
 
62
from backends.filesync.notifier import notifier
 
63
from ubuntuone.storage.server.logger import configure_logger, TRACE
 
64
from config import config
 
65
from ubuntuone.monitoring.reactor import ReactorInspector
 
66
from ubuntuone.storage.rpcdb import inthread
 
67
from ubuntuone.storage.server import auth, content, errors, stats
 
68
from ubuntuone.storageprotocol import protocol_pb2, request, sharersp
 
69
from ubuntuone.supervisor import utils as supervisor_utils
 
70
 
 
71
# this is the minimal cap we support (to avoid hardcoding it in the code)
 
72
MIN_CAP = frozenset(["no-content", "account-info", "resumable-uploads",
 
73
                     "fix462230", "volumes", "generations"])
 
74
 
 
75
# these are the capabilities combinations that we support
 
76
SUPPORTED_CAPS = set([MIN_CAP])
 
77
 
 
78
# this is the capabilities combination that we prefer, the latest one
 
79
PREFERRED_CAP = MIN_CAP
 
80
 
 
81
# these is where we suggest to reconnect in case we deny the capabilities
 
82
SUGGESTED_REDIRS = {
 
83
    # frozenset(["example1"]): dict(hostname="fs-3.server.com", port=443)
 
84
    # frozenset(["example2"]): dict(srv_record="_https._tcp.fs.server.com")
 
85
}
 
86
 
 
87
MAX_OOPS_LINE = 300
 
88
 
 
89
 
 
90
def install_signal_handlers():
 
91
    """Install custom SIGUSR2 handler."""
 
92
    reactor.callWhenRunning(signal.signal, signal.SIGUSR2, sigusr2_handler)
 
93
 
 
94
 
 
95
def sigusr2_handler(signum, frame):
 
96
    """Handle SIGUSR2 to reload the config."""
 
97
    import config
 
98
    logger = logging.getLogger("storage.server")
 
99
    logger.info('Reloading config file')
 
100
    config.config = config._Config()
 
101
 
 
102
 
 
103
def loglevel(lvl):
 
104
    """Make a function that logs at lvl log level."""
 
105
    def level_log(self, message, *args, **kwargs):
 
106
        """inner."""
 
107
        self.log(lvl, message, *args, **kwargs)
 
108
    return level_log
 
109
 
 
110
 
 
111
def trace_message(function):
 
112
    """A decorator to trace incoming messages."""
 
113
    def decorator(self, message):
 
114
        """inner."""
 
115
        self.log.trace_message("IN: ", message)
 
116
        function(self, message)
 
117
    return decorator
 
118
 
 
119
 
 
120
def _prettify_traceback(report, context):
 
121
    """Make the traceback nicer."""
 
122
    if 'tb_text' in report:
 
123
        tb = "Traceback (most recent call last):\n"
 
124
        tb += report['tb_text']
 
125
        tb += "%s: '%s'" % (report['type'], report['value'])
 
126
        report['tb_text'] = tb
 
127
 
 
128
 
 
129
def configure_oops():
 
130
    """Configure the oopses."""
 
131
    oops_config = oops.Config()
 
132
    oops_config.on_create.append(_prettify_traceback)
 
133
    vers_info = dict(branch_nick=versioninfo.version_info['branch_nick'],
 
134
                     revno=versioninfo.version_info['revno'])
 
135
    oops_config.template.update(vers_info)
 
136
    datedir_repo = oops_datedir_repo.DateDirRepo(config.oops.path,
 
137
                                                 inherit_id=True)
 
138
 
 
139
    oops_config.publisher = oops.publishers.publish_to_many(
 
140
        datedir_repo.publish)
 
141
    return oops_config
 
142
 
 
143
 
 
144
class StorageLogger(object):
 
145
    """Create logs for the server.
 
146
 
 
147
    The log format is:
 
148
    session_id remote_ip:remote_port username request_type request_id \
 
149
    message
 
150
 
 
151
    Unknown fields are replaced with a '-'.
 
152
 
 
153
    Plus whatever the log handler prepends to the line, normally a timestamp
 
154
    plus level name.
 
155
    """
 
156
 
 
157
    def __init__(self, protocol):
 
158
        """Create the logger."""
 
159
        self.protocol = protocol
 
160
 
 
161
    def log(self, lvl, message, *args, **kwargs):
 
162
        """Log."""
 
163
        if self.protocol.logger.isEnabledFor(lvl):
 
164
            self._log(lvl, message, *args, **kwargs)
 
165
 
 
166
    def _log(self, lvl, message, *args, **kwargs):
 
167
        """Actually do the real log"""
 
168
        msg_format = "%(uuid)s %(remote)s %(userid)s %(message)s"
 
169
        extra = {"message": message}
 
170
        if self.protocol.user is not None:
 
171
            extra["userid"] = urllib.quote(self.protocol.user.username,
 
172
                                           ":~/").replace('%', '%%')
 
173
        else:
 
174
            extra["userid"] = "-"
 
175
 
 
176
        # be robust in case we have no transport
 
177
        if self.protocol.transport is not None:
 
178
            peer = self.protocol.transport.getPeer()
 
179
            extra["remote"] = peer.host + ":" + str(peer.port)
 
180
        else:
 
181
            extra["remote"] = '%s.transport is None' % self.__class__.__name__
 
182
 
 
183
        extra["uuid"] = self.protocol.session_id
 
184
 
 
185
        message = msg_format % extra
 
186
        self.protocol.logger.log(lvl, message, *args, **kwargs)
 
187
 
 
188
    def trace_message(self, text, message):
 
189
        """Log a message with some pre processing."""
 
190
        if self.protocol.logger.isEnabledFor(TRACE):
 
191
            if message.type != protocol_pb2.Message.BYTES:
 
192
                self.trace(text + (str(message).replace("\n", " ")))
 
193
 
 
194
    critical = loglevel(logging.CRITICAL)
 
195
    error = loglevel(logging.ERROR)
 
196
    warning = loglevel(logging.WARNING)
 
197
    info = loglevel(logging.INFO)
 
198
    debug = loglevel(logging.DEBUG)
 
199
    trace = loglevel(TRACE)
 
200
 
 
201
 
 
202
class StorageServerLogger(StorageLogger):
 
203
    """Logger for the server."""
 
204
 
 
205
    def log(self, lvl, message, *args, **kwargs):
 
206
        """Log."""
 
207
        message = "- - " + message
 
208
        super(StorageServerLogger, self).log(lvl, message, *args, **kwargs)
 
209
 
 
210
 
 
211
class StorageRequestLogger(StorageLogger):
 
212
    """Logger for requests."""
 
213
 
 
214
    def __init__(self, protocol, request):
 
215
        """Create the logger."""
 
216
        super(StorageRequestLogger, self).__init__(protocol)
 
217
        self.request_class_name = request.__class__.__name__
 
218
        self.request_id = request.id
 
219
 
 
220
    def log(self, lvl, message, *args, **kwargs):
 
221
        """Log."""
 
222
        message = "%s %s - %s" % (self.request_class_name,
 
223
                                  self.request_id, message)
 
224
        super(StorageRequestLogger, self).log(lvl, message, *args, **kwargs)
 
225
 
 
226
 
 
227
class NotificationRPCTimeoutError(Exception):
 
228
    """RPCTimeoutError but during a notification."""
 
229
 
 
230
 
 
231
class PoisonException(Exception):
 
232
    """An exception class for poison errors."""
 
233
 
 
234
 
 
235
class LoopingPing(object):
 
236
    """Execute Ping requests in a given interval and expect a response.
 
237
 
 
238
    Shutdown the request.RequestHandler if the request takes longer than
 
239
    the specified timeout.
 
240
 
 
241
    @param interval: the seconds between each Ping
 
242
    @param timeout: the seconds to wait for a response before shutting down
 
243
                    the request handler
 
244
    @param request_handler: a request.RequestHandler instance
 
245
    """
 
246
 
 
247
    def __init__(self, interval, timeout, idle_timeout, request_handler):
 
248
        self.interval = interval
 
249
        self.timeout = timeout
 
250
        self.idle_timeout = idle_timeout
 
251
        self.request_handler = request_handler
 
252
        self.shutdown = None
 
253
        self.next_loop = None
 
254
        self.running = False
 
255
        self.pong_count = 0
 
256
 
 
257
    def start(self):
 
258
        """Create the DelayedCall instances."""
 
259
        self.running = True
 
260
 
 
261
        self.shutdown = reactor.callLater(
 
262
            self.timeout, self._shutdown, reason='No Pong response.')
 
263
 
 
264
        self.next_loop = reactor.callLater(self.interval, self.schedule)
 
265
 
 
266
    def reset(self):
 
267
        """Reset pong count and reschedule."""
 
268
        self.pong_count = 0
 
269
        self.reschedule()
 
270
 
 
271
    def reschedule(self):
 
272
        """Reset all delayed calls."""
 
273
        if self.shutdown is not None and self.shutdown.active():
 
274
            self.shutdown.reset(self.timeout)
 
275
        elif self.running:
 
276
            self.shutdown = reactor.callLater(
 
277
                self.timeout, self._shutdown, reason='No Pong response.')
 
278
        if self.next_loop is not None and self.next_loop.active():
 
279
            self.next_loop.reset(self.interval)
 
280
        elif self.running:
 
281
            self.next_loop = reactor.callLater(self.interval, self.schedule)
 
282
 
 
283
    @defer.inlineCallbacks
 
284
    def schedule(self):
 
285
        """Request a Ping and reset the shutdown timeout."""
 
286
        yield self.request_handler.ping()
 
287
        self.pong_count += 1
 
288
        # check if the client is idling for too long
 
289
        # if self.idle_timeout is 0,  do nothing.
 
290
        idle_time = self.pong_count * self.interval
 
291
        if self.idle_timeout != 0 and idle_time >= self.idle_timeout:
 
292
            self._shutdown(reason="idle timeout %s" % (idle_time,))
 
293
        else:
 
294
            self.reschedule()
 
295
 
 
296
    def _shutdown(self, reason):
 
297
        """Shutdown the request_handler."""
 
298
        self.request_handler.log.info("Disconnecting - %s", reason)
 
299
        if not self.request_handler.shutting_down:
 
300
            self.request_handler.shutdown()
 
301
 
 
302
    def stop(self):
 
303
        """Stop all the delayed calls."""
 
304
        self.running = False
 
305
        if self.shutdown is not None and self.shutdown.active():
 
306
            self.shutdown.cancel()
 
307
        if self.next_loop is not None and self.next_loop.active():
 
308
            self.next_loop.cancel()
 
309
        for req in self.request_handler.requests.values():
 
310
            # cleanup stalled Ping requests
 
311
            if req.started and isinstance(req, request.Ping):
 
312
                req.done()
 
313
 
 
314
 
 
315
class StorageServer(request.RequestHandler):
 
316
    """The Storage network server."""
 
317
 
 
318
    # the version the client must have (minimum)
 
319
    VERSION_REQUIRED = 3
 
320
    PING_TIMEOUT = 480
 
321
    PING_INTERVAL = 120
 
322
 
 
323
    def __init__(self):
 
324
        """Create a network server. The factory does this."""
 
325
        request.RequestHandler.__init__(self)
 
326
        self.user = None
 
327
        self.factory = None
 
328
        self.session_id = uuid.uuid4()
 
329
        self.logger = logging.getLogger(config.api_server.logger_name)
 
330
        self.log = StorageServerLogger(self)
 
331
        self.shutting_down = False
 
332
        self.request_locked = False
 
333
        self.pending_requests = collections.deque()
 
334
        self.ping_loop = LoopingPing(StorageServer.PING_INTERVAL,
 
335
                                     StorageServer.PING_TIMEOUT,
 
336
                                     config.api_server.idle_timeout,
 
337
                                     self)
 
338
 
 
339
        # capabilities that the server is working with
 
340
        self.working_caps = MIN_CAP
 
341
        self.poisoned = []
 
342
        self.waiting_on_poison = []
 
343
        self.connection_time = None
 
344
        self._metadata_count = set()
 
345
 
 
346
    def set_user(self, user):
 
347
        """Set user and adjust values that depend on which user it is."""
 
348
        self.user = user
 
349
        if user.username in self.factory.trace_users:
 
350
            # set up the logger to use the hackers' one
 
351
            hackers_logger = config.api_server.logger_name + '.hackers'
 
352
            self.logger = logging.getLogger(hackers_logger)
 
353
 
 
354
    def poison(self, tag):
 
355
        """Inject a failure in the server. Works with check_poison."""
 
356
        self.poisoned.append(tag)
 
357
 
 
358
    def check_poison(self, tag):
 
359
        """Fail if poisoned with tag.
 
360
 
 
361
        Will raise a PoisonException when called and tag matches the poison
 
362
        condition.
 
363
        """
 
364
        if not self.poisoned:
 
365
            return
 
366
 
 
367
        poison = self.poisoned[-1]
 
368
        if poison == "*" or poison == tag:
 
369
            self.poisoned.pop()
 
370
            reactor.callLater(0, self.callback_on_poison, poison)
 
371
            raise PoisonException("Service was poisoned with: %s" % poison)
 
372
 
 
373
    def callback_on_poison(self, poison):
 
374
        """Call all the deferreds waiting on poison."""
 
375
        for d in self.waiting_on_poison:
 
376
            d.callback(poison)
 
377
        self.waiting_on_poison = []
 
378
 
 
379
    def wait_for_poison(self):
 
380
        """Return a deferred that will be called when we are poisoned."""
 
381
        d = defer.Deferred()
 
382
        self.waiting_on_poison.append(d)
 
383
        return d
 
384
 
 
385
    def _log_error_and_oops(self, failure, where, tl=None, exc_info=None):
 
386
        """Auxiliar method to log error and build oops."""
 
387
        self.log.error("Unhandled %s when calling '%s'", failure,
 
388
                       where.__name__, exc_info=exc_info)
 
389
        del exc_info
 
390
 
 
391
        if not isinstance(failure, Failure):
 
392
            failure = Failure(failure)
 
393
        oops = self.build_oops(failure, tl)
 
394
        self.save_oops(oops)
 
395
 
 
396
    def schedule_request(self, request, callback, head=False):
 
397
        """Schedule this request to run."""
 
398
        tl = getattr(request, "timeline", None)
 
399
        if head:
 
400
            self.pending_requests.appendleft((request, callback))
 
401
        else:
 
402
            self.pending_requests.append((request, callback))
 
403
        # we do care if it fails, means no one handled the failure before
 
404
        if not isinstance(request, Action):
 
405
            request.deferred.addErrback(self._log_error_and_oops,
 
406
                                        request.__class__, tl=tl)
 
407
        if not self.request_locked:
 
408
            self.execute_next_request()
 
409
 
 
410
    def release(self, request):
 
411
        """Request 'request' finished, others may start."""
 
412
        if self.request_locked is not request:
 
413
            # a release from someone not holding the lock does nothing
 
414
            return
 
415
 
 
416
        self.request_locked = False
 
417
        if self.pending_requests and not self.shutting_down:
 
418
            self.execute_next_request()
 
419
 
 
420
    def execute_next_request(self):
 
421
        """Read the queue and execute a request."""
 
422
        request, callback = self.pending_requests.popleft()
 
423
        tl = getattr(request, "timeline", None)
 
424
 
 
425
        if tl is not None:
 
426
            class_name = request.__class__.__name__
 
427
            tl.start("REQ-" + class_name,
 
428
                     "EXECUTE %s[%s]" % (class_name, request.id)).finish()
 
429
 
 
430
        try:
 
431
            self.request_locked = request
 
432
            callback()
 
433
        except Exception as e:
 
434
            self._log_error_and_oops(e, request.__class__, tl=tl,
 
435
                                     exc_info=sys.exc_info())
 
436
            self.release(request)
 
437
 
 
438
    def connectionLost(self, reason=None):
 
439
        """Unregister this connection and potentially remove user binding."""
 
440
        if not self.shutting_down:
 
441
            self.shutdown()
 
442
        if self.user is not None:
 
443
            self.user.unregister_protocol(self)
 
444
 
 
445
        self.factory.protocols.remove(self)
 
446
        self.log.info("Connection Lost: %s", reason.value)
 
447
 
 
448
    def connectionMade(self):
 
449
        """Called when a connection is made."""
 
450
        request.RequestHandler.connectionMade(self)
 
451
        self.factory.protocols.append(self)
 
452
        self.log.info("Connection Made")
 
453
        self.transport.write("%d filesync server revision %s.\r\n" %
 
454
                             (self.PROTOCOL_VERSION,
 
455
                              versioninfo.version_info['revno']))
 
456
        self.ping_loop.start()
 
457
        self.factory.metrics.meter("connection_made", 1)
 
458
        self.factory.metrics.increment("connections_active")
 
459
        self.connection_time = time.time()
 
460
 
 
461
    def shutdown(self):
 
462
        """Lose connection and abort requests."""
 
463
        self.log.debug("Shutdown Request")
 
464
        self.shutting_down = True
 
465
        self.ping_loop.stop()
 
466
        self.transport.loseConnection()
 
467
        self.pending_requests.clear()
 
468
 
 
469
        # stop all the pending requests
 
470
        for req in self.requests.values():
 
471
            req.stop()
 
472
 
 
473
        if self.connection_time is not None:
 
474
            delta = time.time() - self.connection_time
 
475
            self.factory.metrics.timing("connection_closed", delta)
 
476
            self.factory.metrics.decrement("connections_active")
 
477
 
 
478
    def wait_for_shutdown(self):
 
479
        """Wait until the server has stopped serving this client."""
 
480
        if not self.factory.graceful_shutdown:
 
481
            return defer.succeed(True)
 
482
        d = defer.Deferred()
 
483
 
 
484
        def wait_for_shutdown_worker():
 
485
            """Check, wait and recurse."""
 
486
            if self.requests:
 
487
                reactor.callLater(0.1, wait_for_shutdown_worker)
 
488
            else:
 
489
                d.callback(True)
 
490
 
 
491
        wait_for_shutdown_worker()
 
492
        return d
 
493
 
 
494
    def dataReceived(self, data):
 
495
        """Handle new data."""
 
496
        try:
 
497
            self.buildMessage(data)
 
498
        except Exception as e:
 
499
            # here we handle and should log all errors
 
500
            self.shutdown()
 
501
 
 
502
            if isinstance(e, request.StorageProtocolErrorSizeTooBig):
 
503
                self.log.warning("---- garbage in, garbage out")
 
504
                return
 
505
 
 
506
            self._log_error_and_oops(e, self.dataReceived,
 
507
                                     exc_info=sys.exc_info())
 
508
 
 
509
    def build_oops(self, failure, tl=None):
 
510
        """Create an oops entry to log the failure."""
 
511
        context = {"exc_info": (failure.type, failure.value, failure.tb)}
 
512
        del failure
 
513
 
 
514
        if tl is not None:
 
515
            context["timeline"] = tl
 
516
 
 
517
        report = self.factory.oops_config.create(context)
 
518
        del context
 
519
 
 
520
        if self.user is not None:
 
521
            # The 'username' key is parsed as (login, user_id, display_name).
 
522
            report["username"] = "%s,%s,%s" % (self.user.id, self.user.id,
 
523
                                               self.user.username)
 
524
 
 
525
        return report
 
526
 
 
527
    def save_oops(self, report):
 
528
        """save the oops entry."""
 
529
        self.factory.oops_config.publish(report)
 
530
 
 
531
    def processMessage(self, message):
 
532
        """Log errors from requests created by incoming messages."""
 
533
        # reset the ping loop if the message isn't a PING or a PONG
 
534
        if message.type not in (protocol_pb2.Message.PING,
 
535
                                protocol_pb2.Message.PONG):
 
536
            self.ping_loop.reset()
 
537
        try:
 
538
            result = request.RequestHandler.processMessage(self, message)
 
539
        except Exception as e:
 
540
            self._log_error_and_oops(e, self.processMessage,
 
541
                                     exc_info=sys.exc_info())
 
542
            return
 
543
 
 
544
        if isinstance(result, defer.Deferred):
 
545
            result.addErrback(self._log_error_and_oops, result.__class__)
 
546
        elif isinstance(result, request.Request):
 
547
            tl = getattr(result, "timeline", None)
 
548
            result.deferred.addErrback(self._log_error_and_oops,
 
549
                                       result.__class__, tl=tl)
 
550
 
 
551
    def handle_PROTOCOL_VERSION(self, message):
 
552
        """Handle PROTOCOL_VERSION message.
 
553
 
 
554
        If the version is less or more than whats allowed, we send an error
 
555
        and drop the connection.
 
556
        """
 
557
        version = message.protocol.version
 
558
        response = protocol_pb2.Message()
 
559
        response.id = message.id
 
560
        if self.VERSION_REQUIRED <= version <= self.PROTOCOL_VERSION:
 
561
            response.type = protocol_pb2.Message.PROTOCOL_VERSION
 
562
            response.protocol.version = self.PROTOCOL_VERSION
 
563
            self.sendMessage(response)
 
564
            self.log.debug("client requested protocol version %d", version)
 
565
        else:
 
566
            msg = "client requested invalid version %s" % (version,)
 
567
            response.type = protocol_pb2.Message.ERROR
 
568
            response.error.type = protocol_pb2.Error.UNSUPPORTED_VERSION
 
569
            response.error.comment = msg
 
570
            self.sendMessage(response)
 
571
 
 
572
            # wrong protocol version is no longer an error.
 
573
            self.log.info(msg)
 
574
            self.shutdown()
 
575
 
 
576
    def handle_PING(self, message):
 
577
        """handle an incoming ping message."""
 
578
        self.check_poison("ping")
 
579
        response = protocol_pb2.Message()
 
580
        response.id = message.id
 
581
        response.type = protocol_pb2.Message.PONG
 
582
        self.log.trace("ping pong")
 
583
        self.sendMessage(response)
 
584
 
 
585
    def handle_AUTH_REQUEST(self, message):
 
586
        """Handle AUTH_REQUEST message.
 
587
 
 
588
        If already logged in, send an error.
 
589
 
 
590
        """
 
591
        request = AuthenticateResponse(self, message)
 
592
        request.start()
 
593
 
 
594
    def handle_MAKE_DIR(self, message):
 
595
        """Handle MAKE_DIR message."""
 
596
        request = MakeResponse(self, message)
 
597
        request.start()
 
598
 
 
599
    def handle_MAKE_FILE(self, message):
 
600
        """Handle MAKE_FILE message."""
 
601
        request = MakeResponse(self, message)
 
602
        request.start()
 
603
 
 
604
    def handle_GET_DELTA(self, message):
 
605
        """Handle GET_DELTA message."""
 
606
        if message.get_delta.from_scratch:
 
607
            request = RescanFromScratchResponse(self, message)
 
608
        else:
 
609
            request = GetDeltaResponse(self, message)
 
610
        request.start()
 
611
 
 
612
    def handle_GET_CONTENT(self, message):
 
613
        """Handle GET_CONTENT message."""
 
614
        request = GetContentResponse(self, message)
 
615
        request.start()
 
616
 
 
617
    def handle_MOVE(self, message):
 
618
        """Handle MOVE message."""
 
619
        request = MoveResponse(self, message)
 
620
        request.start()
 
621
 
 
622
    def handle_PUT_CONTENT(self, message):
 
623
        """Handle PUT_CONTENT message."""
 
624
        request = PutContentResponse(self, message)
 
625
        request.start()
 
626
 
 
627
    def handle_UNLINK(self, message):
 
628
        """Handle UNLINK message."""
 
629
        request = Unlink(self, message)
 
630
        request.start()
 
631
 
 
632
    def handle_CREATE_UDF(self, message):
 
633
        """Handle CREATE_UDF message."""
 
634
        request = CreateUDF(self, message)
 
635
        request.start()
 
636
 
 
637
    def handle_DELETE_VOLUME(self, message):
 
638
        """Handle DELETE_VOLUME message."""
 
639
        request = DeleteVolume(self, message)
 
640
        request.start()
 
641
 
 
642
    def handle_LIST_VOLUMES(self, message):
 
643
        """Handle LIST_VOLUMES message."""
 
644
        request = ListVolumes(self, message)
 
645
        request.start()
 
646
 
 
647
    def handle_CREATE_SHARE(self, message):
 
648
        """Handle CREATE_SHARE message."""
 
649
        request = CreateShare(self, message)
 
650
        request.start()
 
651
 
 
652
    def handle_SHARE_ACCEPTED(self, message):
 
653
        """Handle SHARE_ACCEPTED message."""
 
654
        request = ShareAccepted(self, message)
 
655
        request.start()
 
656
 
 
657
    def handle_LIST_SHARES(self, message):
 
658
        """Handle LIST_SHARES message."""
 
659
        request = ListShares(self, message)
 
660
        request.start()
 
661
 
 
662
    def handle_DELETE_SHARE(self, message):
 
663
        """Handle DELETE_SHARE message."""
 
664
        request = DeleteShare(self, message)
 
665
        request.start()
 
666
 
 
667
    def handle_CANCEL_REQUEST(self, message):
 
668
        """Ignores this misreceived cancel."""
 
669
        # if this arrives to this Request Handler, it means that what
 
670
        # we're trying to cancel already finished (and the client will
 
671
        # receive or already received the ok for that), so we ignore it
 
672
 
 
673
    def handle_BYTES(self, message):
 
674
        """Ignores this misreceived message."""
 
675
        # if this arrives to this Request Handler, it means that what
 
676
        # we're receiving messages that hit the network before
 
677
        # everything is properly cancelled, so we ignore it
 
678
    handle_EOF = handle_BYTES
 
679
 
 
680
    def handle_QUERY_CAPS(self, message):
 
681
        """Handle QUERY_CAPS message."""
 
682
        request = QuerySetCapsResponse(self, message)
 
683
        request.start()
 
684
 
 
685
    def handle_SET_CAPS(self, message):
 
686
        """Handle SET_CAPS message."""
 
687
        request = QuerySetCapsResponse(self, message, set_mode=True)
 
688
        request.start()
 
689
 
 
690
    def handle_FREE_SPACE_INQUIRY(self, message):
 
691
        """Handle FREE_SPACE_INQUIRY message."""
 
692
        request = FreeSpaceResponse(self, message)
 
693
        request.start()
 
694
 
 
695
    def handle_ACCOUNT_INQUIRY(self, message):
 
696
        """Handle account inquiry message."""
 
697
        request = AccountResponse(self, message)
 
698
        request.start()
 
699
 
 
700
 
 
701
class BaseRequestResponse(request.RequestResponse):
 
702
    """Base RequestResponse class.
 
703
 
 
704
    It keeps a weak reference of the protocol instead of the real ref.
 
705
    """
 
706
 
 
707
    __slots__ = ('use_protocol_weakref', '_protocol_ref', 'timeline')
 
708
 
 
709
    def __init__(self, protocol, message):
 
710
        """Create the request response."""
 
711
        self.use_protocol_weakref = config.api_server.protocol_weakref
 
712
        self._protocol_ref = None
 
713
        self.timeline = timeline.Timeline(format_stack=None)
 
714
        super(BaseRequestResponse, self).__init__(protocol, message)
 
715
 
 
716
    def context(self):
 
717
        """Get the context of this request, to be passed down the RPC layer."""
 
718
        return {"timeline": self.timeline}
 
719
 
 
720
    def _get_protocol(self):
 
721
        """Return the protocol instance."""
 
722
        if self.use_protocol_weakref:
 
723
            protocol = self._protocol_ref()
 
724
            if protocol is None:
 
725
                # protocol reference gone,
 
726
                raise errors.ProtocolReferenceError(str(self._protocol_ref))
 
727
            return protocol
 
728
        else:
 
729
            return self._protocol_ref
 
730
 
 
731
    def _set_protocol(self, protocol):
 
732
        """Set the weak ref. to the protocol instance."""
 
733
        if self.use_protocol_weakref:
 
734
            self._protocol_ref = weakref.ref(protocol)
 
735
        else:
 
736
            self._protocol_ref = protocol
 
737
 
 
738
    protocol = property(fget=_get_protocol, fset=_set_protocol)
 
739
 
 
740
    def _start(self):
 
741
        """Override this method to start the request."""
 
742
        raise NotImplementedError("request needs to do something")
 
743
 
 
744
 
 
745
class StorageServerRequestResponse(BaseRequestResponse):
 
746
    """Base class for all server request responses."""
 
747
 
 
748
    __slots__ = ('_id', 'log', 'start_time', 'last_good_state_ts',
 
749
                 'length', 'operation_data')
 
750
 
 
751
    def __init__(self, protocol, message):
 
752
        """Create the request response. Setup logger."""
 
753
        self._id = None
 
754
        super(StorageServerRequestResponse, self).__init__(protocol, message)
 
755
        self.log = StorageRequestLogger(self.protocol, self)
 
756
        self.start_time = None
 
757
        self.last_good_state_ts = None
 
758
        self.length = 1
 
759
        self.operation_data = None
 
760
 
 
761
    def _get_id(self):
 
762
        """Return this request id."""
 
763
        return self._id
 
764
 
 
765
    def _set_id(self, id):
 
766
        """Set this request id."""
 
767
        # check if self.log is defined and isn't None
 
768
        if getattr(self, 'log', None) is not None:
 
769
            # update the request_id in self.log
 
770
            self.log.request_id = id
 
771
        self._id = id
 
772
 
 
773
    # override self.id with a property
 
774
    id = property(fget=_get_id, fset=_set_id)
 
775
 
 
776
    def _get_node_info(self):
 
777
        """Return node info from the message.
 
778
 
 
779
        This should be overwritten by our children, but it's not mandatory.
 
780
        """
 
781
 
 
782
    def start(self):
 
783
        """Schedule the request start for later.
 
784
 
 
785
        It will be executed when the server has no further pending requests.
 
786
 
 
787
        """
 
788
        def _scheduled_start():
 
789
            """The scheduled call."""
 
790
            self.start_time = time.time()
 
791
            self.last_good_state_ts = time.time()
 
792
            super(StorageServerRequestResponse, self).start()
 
793
            self.protocol.check_poison("request_start")
 
794
 
 
795
        self.protocol.addProducer(self)
 
796
 
 
797
        # we add the id to the request here, even if it happens again on the
 
798
        # super() call in _scheduled_start(), but it's good to have the id
 
799
        # while scheduled, before really started
 
800
        self.id = self.source_message.id
 
801
        self.protocol.requests[self.source_message.id] = self
 
802
 
 
803
        class_name = self.__class__.__name__
 
804
        self.timeline.start(
 
805
            "REQ-" + class_name,
 
806
            "SCHEDULE %s[%s:%s] CAPS %r WITH %r" % (
 
807
                class_name, self.protocol.session_id,
 
808
                self.id, self.protocol.working_caps,
 
809
                str(self.source_message)[:MAX_OOPS_LINE])).finish()
 
810
 
 
811
        self.log.info("Request being scheduled")
 
812
        self.log.trace_message("IN: ", self.source_message)
 
813
        self.protocol.factory.metrics.increment("request_instances.%s" %
 
814
                                                (self.__class__.__name__,))
 
815
        self.protocol.schedule_request(self, _scheduled_start)
 
816
        self.protocol.check_poison("request_schedule")
 
817
 
 
818
    def stop(self):
 
819
        """Stop the request."""
 
820
        if self.started:
 
821
            # cancel the request
 
822
            self.cancel()
 
823
        else:
 
824
            # not even started! just log and cleanup
 
825
            self.log.debug("Request being released before start")
 
826
            self.cleanup()
 
827
 
 
828
    def cleanup(self):
 
829
        """remove the reference to self from the request handler"""
 
830
        super(StorageServerRequestResponse, self).cleanup()
 
831
        self.protocol.factory.metrics.decrement("request_instances.%s" %
 
832
                                                (self.__class__.__name__,))
 
833
        self.protocol.release(self)
 
834
        self.timeline = None
 
835
 
 
836
    def error(self, failure):
 
837
        """Overrided error to add logging. Never fail, always log."""
 
838
        class_name = self.__class__.__name__
 
839
        try:
 
840
            if isinstance(failure, Exception):
 
841
                failure = Failure(failure)
 
842
            self.log.error("Request error",
 
843
                           exc_info=(failure.type, failure.value, None))
 
844
            o = self.protocol.build_oops(failure, self.timeline)
 
845
            self.protocol.save_oops(o)
 
846
            request.RequestResponse.error(self, failure)
 
847
        except Exception as e:
 
848
            msg = 'error() crashed when processing %s: %s'
 
849
            # we shouldn't create an oops here since we just failed
 
850
            self.log.error(msg, failure, e, exc_info=sys.exc_info())
 
851
        else:
 
852
            self.protocol.factory.sli_metrics.sli_error(class_name)
 
853
            if self.start_time is not None:
 
854
                delta = time.time() - self.start_time
 
855
                self.protocol.factory.metrics.timing(
 
856
                    "%s.request_error" % (class_name,), delta)
 
857
                self.protocol.factory.metrics.timing("request_error", delta)
 
858
 
 
859
            transferred = getattr(self, 'transferred', None)
 
860
            if transferred is not None:
 
861
                msg = "%s.transferred" % (class_name,)
 
862
                self.protocol.factory.metrics.gauge(msg, transferred)
 
863
 
 
864
    def done(self, _=None):
 
865
        """Overrided done to add logging.
 
866
 
 
867
        Added an ignored parameter so we can hook this to the defered chain.
 
868
        """
 
869
        try:
 
870
            if self.operation_data is None:
 
871
                self.log.info("Request done")
 
872
            else:
 
873
                self.log.info("Request done: %s", self.operation_data)
 
874
            request.RequestResponse.done(self)
 
875
        except Exception as e:
 
876
            self.error(Failure(e))
 
877
        else:
 
878
            class_name = self.__class__.__name__
 
879
            factory = self.protocol.factory
 
880
            transferred = getattr(self, 'transferred', None)
 
881
 
 
882
            if self.start_time is not None:
 
883
                delta = time.time() - self.start_time
 
884
                factory.metrics.timing("%s.request_finished" % (class_name,),
 
885
                                       delta)
 
886
                factory.metrics.timing("request_finished", delta)
 
887
 
 
888
                # inform SLI metric here if operation has a valid length (some
 
889
                # operations change it, default is 1 for other operations, and
 
890
                # Put/GetContentResponse set it to None to not inform *here*)
 
891
                if self.length is not None:
 
892
                    self.protocol.factory.sli_metrics.sli(class_name, delta,
 
893
                                                          self.length)
 
894
 
 
895
            if transferred is not None:
 
896
                msg = "%s.transferred" % (class_name,)
 
897
                factory.metrics.gauge(msg, transferred)
 
898
 
 
899
            # measure specific "user's client" activity
 
900
            user_activity = getattr(self, 'user_activity', None)
 
901
            if user_activity is not None:
 
902
                user_id = getattr(self.protocol.user, 'id', '')
 
903
                factory.user_metrics.report(user_activity, str(user_id), 'd')
 
904
 
 
905
    def sendMessage(self, message):
 
906
        """Send a message and trace it."""
 
907
        self.log.trace_message("OUT: ", message)
 
908
        request.RequestResponse.sendMessage(self, message)
 
909
 
 
910
    def _processMessage(self, message):
 
911
        """Locally overridable part of processMessage."""
 
912
 
 
913
    def processMessage(self, message):
 
914
        """Process the received messages.
 
915
 
 
916
        If the request already started, really process the message rightaway;
 
917
        if not, just schedule the processing for later.
 
918
        """
 
919
        result = None
 
920
        if self.started:
 
921
            result = self._processMessage(message)
 
922
        else:
 
923
            def _process_later():
 
924
                """Deferred call to process the message."""
 
925
                self._processMessage(message)
 
926
            self.protocol.schedule_request(self, _process_later)
 
927
        return result
 
928
 
 
929
    def convert_share_id(self, share_id):
 
930
        """Convert a share_id from a message to a UUID or None.
 
931
 
 
932
        @param share_id: a str representation of a share id
 
933
        @return: uuid.UUID or None
 
934
        """
 
935
        if share_id != request.ROOT:
 
936
            return uuid.UUID(share_id)
 
937
        else:
 
938
            return None
 
939
 
 
940
    def queue_action(self, callable_action, head=True):
 
941
        """Queue a callable action.
 
942
 
 
943
        Return a deferred that will be fired when the action finish.
 
944
        """
 
945
        action = Action(self, callable_action)
 
946
        action.start(head=head)
 
947
        return action.deferred
 
948
 
 
949
    def _get_extension(self, path):
 
950
        """Return an extension from the name/path."""
 
951
        parts = path.rsplit(".", 1)
 
952
        if len(parts) == 2:
 
953
            endpart = parts[1]
 
954
            if len(endpart) <= 4:
 
955
                return endpart
 
956
 
 
957
 
 
958
class SimpleRequestResponse(StorageServerRequestResponse):
 
959
    """Common request/response bits."""
 
960
 
 
961
    __slots__ = ()
 
962
 
 
963
    authentication_required = True
 
964
 
 
965
    # a list of foreign (non protocol) errors to be filled by heirs
 
966
    expected_foreign_errors = []
 
967
 
 
968
    # a list of foreing errors that will always be handled by this super class
 
969
    # because all operations could generate them (RetryLimitReached because of
 
970
    # access to the DB, and DoesNotExist because user account becoming
 
971
    # disabled while connected)
 
972
    generic_foreign_errors = [
 
973
        dataerror.RetryLimitReached,
 
974
        dataerror.DoesNotExist,
 
975
        dataerror.LockedUserError,
 
976
        psycopg2.DataError,
 
977
    ]
 
978
 
 
979
    # convert "foreign" errors directly to protocol error codes;
 
980
    # StorageServerError and TryAgain (and heirs) will be handled automatically
 
981
    # because Failure.check compares exceptions using isinstance().
 
982
    protocol_errors = {
 
983
        dataerror.NotEmpty: protocol_pb2.Error.NOT_EMPTY,
 
984
        dataerror.NotADirectory: protocol_pb2.Error.NOT_A_DIRECTORY,
 
985
        dataerror.NoPermission: protocol_pb2.Error.NO_PERMISSION,
 
986
        dataerror.DoesNotExist: protocol_pb2.Error.DOES_NOT_EXIST,
 
987
        dataerror.AlreadyExists: protocol_pb2.Error.ALREADY_EXISTS,
 
988
        dataerror.QuotaExceeded: protocol_pb2.Error.QUOTA_EXCEEDED,
 
989
        dataerror.InvalidFilename: protocol_pb2.Error.INVALID_FILENAME,
 
990
        psycopg2.DataError: protocol_pb2.Error.INVALID_FILENAME,
 
991
 
 
992
        # violation of primary key, foreign key, or unique constraints
 
993
        dataerror.IntegrityError: protocol_pb2.Error.ALREADY_EXISTS,
 
994
    }
 
995
    # These get converted to TRY_AGAIN
 
996
    try_again_errors = [
 
997
        dataerror.RetryLimitReached,
 
998
        error.TCPTimedOutError,
 
999
    ]
 
1000
 
 
1001
    auth_required_error = 'Authentication required and the user is None.'
 
1002
    upload_not_accepted_error = (
 
1003
        'This upload has not been accepted yet content is being sent.')
 
1004
 
 
1005
    def _process(self):
 
1006
        """Process the request."""
 
1007
 
 
1008
    def _meter_s3_timeout(self, failure):
 
1009
        """Send metrics on S3 timeouts."""
 
1010
        if failure.check(errors.S3Error):
 
1011
            factory = self.protocol.factory
 
1012
            last_responsive = factory.reactor_inspector.last_responsive_ts
 
1013
            last_good = self.last_good_state_ts
 
1014
            # Unresponsive server or client not reading/writing fast enough?
 
1015
            reason = "server" if last_responsive <= last_good else "client"
 
1016
 
 
1017
            class_name = self.__class__.__name__
 
1018
            metric_name = "%s.request_error.s3_timeout.%s"
 
1019
            factory.metrics.meter(metric_name % (class_name, reason), 1)
 
1020
 
 
1021
    def _send_protocol_error(self, failure):
 
1022
        """Convert the failure to a protocol error.
 
1023
 
 
1024
        Send it if possible; otherwise, continue propagating it.
 
1025
 
 
1026
        """
 
1027
        # Convert try_again_errors that weren't converted at lower levels
 
1028
        if failure.check(*self.try_again_errors):
 
1029
            failure = Failure(errors.TryAgain(failure.value))
 
1030
 
 
1031
        # Send metrics if we have an S3 error
 
1032
        self._meter_s3_timeout(failure)
 
1033
 
 
1034
        foreign_errors = (self.generic_foreign_errors +
 
1035
                          self.expected_foreign_errors)
 
1036
        error = failure.check(errors.StorageServerError, *foreign_errors)
 
1037
        comment = failure.getErrorMessage().decode('utf8')
 
1038
        if failure.check(errors.TryAgain):
 
1039
            orig_error_name = failure.value.orig_error.__class__.__name__
 
1040
            comment = u"%s: %s" % (orig_error_name, comment)
 
1041
            msg = 'Sending TRY_AGAIN to client because we got: %s', comment
 
1042
            self.log.warning(msg, Failure(failure.value.orig_error))
 
1043
            comment = u"TryAgain (%s)" % comment
 
1044
 
 
1045
            metric_name = "TRY_AGAIN.%s" % orig_error_name
 
1046
            self.protocol.factory.metrics.meter(metric_name, 1)
 
1047
 
 
1048
        if error == dataerror.QuotaExceeded:
 
1049
            # handle the case of QuotaExceeded
 
1050
            # XXX: check Bug #605847
 
1051
            protocol_error = self.protocol_errors[error]
 
1052
            free_bytes = failure.value.free_bytes
 
1053
            volume_id = str(failure.value.volume_id or '')
 
1054
            free_space_info = {'share_id': volume_id, 'free_bytes': free_bytes}
 
1055
            self.sendError(protocol_error, comment=comment,
 
1056
                           free_space_info=free_space_info)
 
1057
            return  # the error has been consumed
 
1058
        elif error in self.protocol_errors:
 
1059
            # an error we expect and can convert to a protocol error
 
1060
            protocol_error = self.protocol_errors[error]
 
1061
            self.sendError(protocol_error, comment=comment)
 
1062
            return  # the error has been consumed
 
1063
        elif (error == errors.StorageServerError and
 
1064
                not failure.check(errors.InternalError)):
 
1065
            # an error which corresponds directly to a protocol error
 
1066
            self.sendError(failure.value.errno, comment=comment)
 
1067
            return  # the error has been consumed
 
1068
        elif error == dataerror.LockedUserError:
 
1069
            self.log.warning("Shutting down protocol: user locked")
 
1070
            if not self.protocol.shutting_down:
 
1071
                self.protocol.shutdown()
 
1072
            return  # the error has been consumed
 
1073
        else:
 
1074
            # an unexpected or unconvertable error
 
1075
            self.sendError(protocol_pb2.Error.INTERNAL_ERROR,
 
1076
                           comment=comment)
 
1077
            return failure  # propagate the error
 
1078
 
 
1079
    def internal_error(self, failure):
 
1080
        """Handle a failure that caused an INTERNAL_ERROR."""
 
1081
        # only call self.error, if I'm not finished
 
1082
        if not self.finished:
 
1083
            self.error(failure)
 
1084
        # only shutdown the protocol if isn't already doing the shutdown
 
1085
        if not self.protocol.shutting_down:
 
1086
            self.protocol.shutdown()
 
1087
 
 
1088
    def _log_start(self):
 
1089
        """Log that the request started."""
 
1090
        txt = "Request being started"
 
1091
        on_what = self._get_node_info()
 
1092
        if on_what:
 
1093
            txt += ", working on: " + str(on_what)
 
1094
        self.log.debug(txt)
 
1095
 
 
1096
    def _start(self):
 
1097
        """Kick off request processing."""
 
1098
        self._log_start()
 
1099
 
 
1100
        if self.authentication_required and self.protocol.user is None:
 
1101
            self.sendError(protocol_pb2.Error.AUTHENTICATION_REQUIRED,
 
1102
                           comment=self.auth_required_error)
 
1103
            return self.done()
 
1104
        else:
 
1105
            d = maybeDeferred(self._process)
 
1106
            d.addErrback(self._send_protocol_error)
 
1107
            d.addCallbacks(self.done, self.internal_error)
 
1108
 
 
1109
 
 
1110
class ListShares(SimpleRequestResponse):
 
1111
    """LIST_SHARES Request Response."""
 
1112
 
 
1113
    __slots__ = ()
 
1114
 
 
1115
    @inlineCallbacks
 
1116
    def _process(self):
 
1117
        """List shares for the client."""
 
1118
        user = self.protocol.user
 
1119
        shared_by, shared_to = yield user.list_shares()
 
1120
        self.length = len(shared_by) + len(shared_to)
 
1121
 
 
1122
        for share in shared_by:
 
1123
            # get the volume_id of the shared node.
 
1124
            volume_id = yield user.get_volume_id(share['root_id'])
 
1125
            volume_id = str(volume_id) if volume_id else ''
 
1126
            if volume_id == user.root_volume_id:
 
1127
                # return the protocol root instead of users real root
 
1128
                volume_id = request.ROOT
 
1129
            share_resp = sharersp.ShareResponse.from_params(
 
1130
                share['id'], "from_me", share['root_id'], share['name'],
 
1131
                share['shared_to_username'] or u"",
 
1132
                share['shared_to_visible_name'] or u"",
 
1133
                share['accepted'], share['access'],
 
1134
                subtree_volume_id=volume_id)
 
1135
            response = protocol_pb2.Message()
 
1136
            response.type = protocol_pb2.Message.SHARES_INFO
 
1137
            share_resp.dump_to_msg(response.shares)
 
1138
            self.sendMessage(response)
 
1139
 
 
1140
        for share in shared_to:
 
1141
            share_resp = sharersp.ShareResponse.from_params(
 
1142
                share['id'], "to_me", share['root_id'], share['name'],
 
1143
                share['shared_by_username'] or u"",
 
1144
                share['shared_by_visible_name'] or u"",
 
1145
                share['accepted'], share['access'])
 
1146
            response = protocol_pb2.Message()
 
1147
            response.type = protocol_pb2.Message.SHARES_INFO
 
1148
            share_resp.dump_to_msg(response.shares)
 
1149
            self.sendMessage(response)
 
1150
 
 
1151
        # we're done!
 
1152
        response = protocol_pb2.Message()
 
1153
        response.type = protocol_pb2.Message.SHARES_END
 
1154
        self.sendMessage(response)
 
1155
 
 
1156
        # save data to be logged on operation end
 
1157
        self.operation_data = "shared_by=%d shared_to=%d" % (
 
1158
            len(shared_by), len(shared_to))
 
1159
 
 
1160
 
 
1161
class ShareAccepted(SimpleRequestResponse):
 
1162
    """SHARE_ACCEPTED Request Response."""
 
1163
 
 
1164
    __slots__ = ()
 
1165
 
 
1166
    # these are the valid access levels and their translation from the
 
1167
    # protocol message
 
1168
    _answer_prot2nice = {
 
1169
        protocol_pb2.ShareAccepted.YES: "Yes",
 
1170
        protocol_pb2.ShareAccepted.NO: "No",
 
1171
    }
 
1172
 
 
1173
    def _get_node_info(self):
 
1174
        """Return node info from the message."""
 
1175
        share_id = self.source_message.share_accepted.share_id
 
1176
        return "share: %r" % (share_id,)
 
1177
 
 
1178
    @inlineCallbacks
 
1179
    def _process(self):
 
1180
        """Mark the share as accepted."""
 
1181
        mes = self.source_message.share_accepted
 
1182
        answer = self._answer_prot2nice[mes.answer]
 
1183
 
 
1184
        accept_share = self.protocol.user.share_accepted
 
1185
        share_id = self.convert_share_id(mes.share_id)
 
1186
        yield accept_share(share_id, answer)
 
1187
 
 
1188
        # send the ok to the requesting client
 
1189
        response = protocol_pb2.Message()
 
1190
        response.type = protocol_pb2.Message.OK
 
1191
        self.sendMessage(response)
 
1192
 
 
1193
        # save data to be logged on operation end
 
1194
        self.operation_data = "vol_id=%s answer=%s" % (share_id, answer)
 
1195
 
 
1196
 
 
1197
class CreateShare(SimpleRequestResponse):
 
1198
    """CREATE_SHARE Request Response."""
 
1199
 
 
1200
    __slots__ = ()
 
1201
 
 
1202
    expected_foreign_errors = [
 
1203
        dataerror.IntegrityError,
 
1204
        dataerror.NotADirectory,
 
1205
        dataerror.NoPermission,
 
1206
    ]
 
1207
 
 
1208
    # these are the valid access levels and their translation from the
 
1209
    # protocol message
 
1210
    _valid_access_levels = {
 
1211
        protocol_pb2.CreateShare.VIEW: "View",
 
1212
        protocol_pb2.CreateShare.MODIFY: "Modify",
 
1213
    }
 
1214
 
 
1215
    user_activity = 'create_share'
 
1216
 
 
1217
    @inlineCallbacks
 
1218
    def _process(self):
 
1219
        """Create the share."""
 
1220
        mes = self.source_message.create_share
 
1221
        access_level = self._valid_access_levels[mes.access_level]
 
1222
 
 
1223
        create_share = self.protocol.user.create_share
 
1224
        share_id = yield create_share(mes.node, mes.share_to,
 
1225
                                      mes.name, access_level)
 
1226
 
 
1227
        # send the ok to the requesting client
 
1228
        response = protocol_pb2.Message()
 
1229
        response.type = protocol_pb2.Message.SHARE_CREATED
 
1230
        response.share_created.share_id = str(share_id)
 
1231
        self.sendMessage(response)
 
1232
 
 
1233
        # save data to be logged on operation end
 
1234
        self.operation_data = "vol_id=%s access_level=%s" % (
 
1235
            share_id, access_level)
 
1236
 
 
1237
 
 
1238
class DeleteShare(SimpleRequestResponse):
 
1239
    """Deletes a share we had offered."""
 
1240
 
 
1241
    __slots__ = ()
 
1242
 
 
1243
    expected_foreign_errors = [
 
1244
        dataerror.IntegrityError,
 
1245
        dataerror.NoPermission,
 
1246
    ]
 
1247
 
 
1248
    def _get_node_info(self):
 
1249
        """Return node info from the message."""
 
1250
        share_id = self.source_message.delete_share.share_id
 
1251
        return "share: %r" % (share_id,)
 
1252
 
 
1253
    @inlineCallbacks
 
1254
    def _process(self):
 
1255
        """Delete the share."""
 
1256
        share_id = self.convert_share_id(
 
1257
            self.source_message.delete_share.share_id)
 
1258
        yield self.protocol.user.delete_share(share_id)
 
1259
 
 
1260
        # send the ok to the requesting client
 
1261
        response = protocol_pb2.Message()
 
1262
        response.type = protocol_pb2.Message.OK
 
1263
        self.sendMessage(response)
 
1264
 
 
1265
        # save data to be logged on operation end
 
1266
        self.operation_data = "vol_id=%s" % (share_id,)
 
1267
 
 
1268
 
 
1269
class CreateUDF(SimpleRequestResponse):
 
1270
    """CREATE_UDF Request Response."""
 
1271
 
 
1272
    __slots__ = ()
 
1273
 
 
1274
    expected_foreign_errors = [dataerror.NoPermission]
 
1275
 
 
1276
    user_activity = 'sync_activity'
 
1277
 
 
1278
    @inlineCallbacks
 
1279
    def _process(self):
 
1280
        """Create the share."""
 
1281
        mes = self.source_message.create_udf
 
1282
        data = yield self.protocol.user.create_udf(
 
1283
            mes.path, mes.name, self.protocol.session_id)
 
1284
        udf_id, udf_rootid, udf_path = data
 
1285
 
 
1286
        # send the ok to the requesting client
 
1287
        response = protocol_pb2.Message()
 
1288
        response.type = protocol_pb2.Message.VOLUME_CREATED
 
1289
        response.volume_created.type = protocol_pb2.Volumes.UDF
 
1290
        response.volume_created.udf.volume = str(udf_id)
 
1291
        response.volume_created.udf.node = str(udf_rootid)
 
1292
        response.volume_created.udf.suggested_path = udf_path
 
1293
        self.sendMessage(response)
 
1294
 
 
1295
        # save data to be logged on operation end
 
1296
        self.operation_data = "vol_id=%s" % (udf_id,)
 
1297
 
 
1298
 
 
1299
class DeleteVolume(SimpleRequestResponse):
 
1300
    """DELETE_VOLUME Request Response."""
 
1301
 
 
1302
    __slots__ = ()
 
1303
 
 
1304
    expected_foreign_errors = [dataerror.NoPermission]
 
1305
 
 
1306
    user_activity = 'sync_activity'
 
1307
 
 
1308
    def _get_node_info(self):
 
1309
        """Return node info from the message."""
 
1310
        volume_id = self.source_message.delete_volume.volume
 
1311
        return "volume: %r" % (volume_id,)
 
1312
 
 
1313
    @inlineCallbacks
 
1314
    def _process(self):
 
1315
        """Delete the volume."""
 
1316
        volume_id = self.source_message.delete_volume.volume
 
1317
        if volume_id == request.ROOT:
 
1318
            raise dataerror.NoPermission("Root volume can't be deleted.")
 
1319
        try:
 
1320
            vol_id = uuid.UUID(volume_id)
 
1321
        except ValueError:
 
1322
            raise dataerror.DoesNotExist("No such volume %r" % volume_id)
 
1323
        yield self.protocol.user.delete_volume(
 
1324
            vol_id, self.protocol.session_id)
 
1325
 
 
1326
        # send the ok to the requesting client
 
1327
        response = protocol_pb2.Message()
 
1328
        response.type = protocol_pb2.Message.OK
 
1329
        self.sendMessage(response)
 
1330
 
 
1331
        # save data to be logged on operation end
 
1332
        self.operation_data = "vol_id=%s" % (vol_id,)
 
1333
 
 
1334
 
 
1335
class ListVolumes(SimpleRequestResponse):
 
1336
    """LIST_VOLUMES Request Response."""
 
1337
 
 
1338
    __slots__ = ()
 
1339
 
 
1340
    @inlineCallbacks
 
1341
    def _process(self):
 
1342
        """List volumes for the client."""
 
1343
        user = self.protocol.user
 
1344
        result = yield user.list_volumes()
 
1345
        root_info, shares, udfs, free_bytes = result
 
1346
        self.length = 1 + len(shares) + len(udfs)
 
1347
 
 
1348
        # send root
 
1349
        response = protocol_pb2.Message()
 
1350
        response.type = protocol_pb2.Message.VOLUMES_INFO
 
1351
        response.list_volumes.type = protocol_pb2.Volumes.ROOT
 
1352
        response.list_volumes.root.node = str(root_info['root_id'])
 
1353
        response.list_volumes.root.generation = root_info['generation']
 
1354
        response.list_volumes.root.free_bytes = free_bytes
 
1355
        self.sendMessage(response)
 
1356
 
 
1357
        # send shares
 
1358
        for share in shares:
 
1359
            direction = "to_me"
 
1360
            share_resp = sharersp.ShareResponse.from_params(
 
1361
                share['id'], direction, share['root_id'], share['name'],
 
1362
                share['shared_by_username'] or u"",
 
1363
                share['shared_by_visible_name'] or u"",
 
1364
                share['accepted'], share['access'])
 
1365
            response = protocol_pb2.Message()
 
1366
            response.type = protocol_pb2.Message.VOLUMES_INFO
 
1367
            response.list_volumes.type = protocol_pb2.Volumes.SHARE
 
1368
            share_resp.dump_to_msg(response.list_volumes.share)
 
1369
            response.list_volumes.share.generation = share['generation']
 
1370
            response.list_volumes.share.free_bytes = share['free_bytes']
 
1371
            self.sendMessage(response)
 
1372
 
 
1373
        # send udfs
 
1374
        for udf in udfs:
 
1375
            response = protocol_pb2.Message()
 
1376
            response.type = protocol_pb2.Message.VOLUMES_INFO
 
1377
            response.list_volumes.type = protocol_pb2.Volumes.UDF
 
1378
            response.list_volumes.udf.volume = str(udf['id'])
 
1379
            response.list_volumes.udf.node = str(udf['root_id'])
 
1380
            response.list_volumes.udf.suggested_path = udf['path']
 
1381
            response.list_volumes.udf.generation = udf['generation']
 
1382
            response.list_volumes.udf.free_bytes = free_bytes
 
1383
            self.sendMessage(response)
 
1384
 
 
1385
        # we're done!
 
1386
        response = protocol_pb2.Message()
 
1387
        response.type = protocol_pb2.Message.VOLUMES_END
 
1388
        self.sendMessage(response)
 
1389
 
 
1390
        # save data to be logged on operation end
 
1391
        self.operation_data = "root=%s shares=%d udfs=%d" % (
 
1392
            root_info['root_id'], len(shares), len(udfs))
 
1393
 
 
1394
 
 
1395
class Unlink(SimpleRequestResponse):
 
1396
    """UNLINK Request Response."""
 
1397
 
 
1398
    __slots__ = ()
 
1399
 
 
1400
    expected_foreign_errors = [dataerror.NotEmpty, dataerror.NoPermission]
 
1401
 
 
1402
    user_activity = 'sync_activity'
 
1403
 
 
1404
    def _get_node_info(self):
 
1405
        """Return node info from the message."""
 
1406
        node_id = self.source_message.unlink.node
 
1407
        return "node: %r" % (node_id,)
 
1408
 
 
1409
    @inlineCallbacks
 
1410
    def _process(self):
 
1411
        """Unlink a node."""
 
1412
        share_id = self.convert_share_id(self.source_message.unlink.share)
 
1413
        node_id = self.source_message.unlink.node
 
1414
        generation, kind, name, mime = yield self.protocol.user.unlink_node(
 
1415
            share_id, node_id, session_id=self.protocol.session_id)
 
1416
 
 
1417
        # answer the ok to original user
 
1418
        response = protocol_pb2.Message()
 
1419
        response.new_generation = generation
 
1420
        response.type = protocol_pb2.Message.OK
 
1421
        self.sendMessage(response)
 
1422
 
 
1423
        # save data to be logged on operation end
 
1424
        extension = self._get_extension(name)
 
1425
        self.operation_data = "vol_id=%s node_id=%s type=%s mime=%r ext=%r" % (
 
1426
            share_id, node_id, kind, mime, extension)
 
1427
 
 
1428
 
 
1429
class BytesMessageProducer(object):
 
1430
    """Adapt a bytes producer to produce BYTES messages."""
 
1431
 
 
1432
    payload_size = request.MAX_PAYLOAD_SIZE
 
1433
 
 
1434
    def __init__(self, bytes_producer, request):
 
1435
        """Create a BytesMessageProducer."""
 
1436
        self.producer = bytes_producer
 
1437
        bytes_producer.consumer = self
 
1438
        self.request = request
 
1439
        self.logger = request.log
 
1440
 
 
1441
    def resumeProducing(self):
 
1442
        """IPushProducer interface."""
 
1443
        self.logger.trace("BytesMessageProducer resumed, http producer: %s",
 
1444
                          self.producer)
 
1445
        if self.producer:
 
1446
            self.producer.resumeProducing()
 
1447
 
 
1448
    def stopProducing(self):
 
1449
        """IPushProducer interface."""
 
1450
        self.logger.trace("BytesMessageProducer stopped, http producer: %s",
 
1451
                          self.producer)
 
1452
        if self.producer:
 
1453
            self.producer.stopProducing()
 
1454
 
 
1455
    def pauseProducing(self):
 
1456
        """IPushProducer interface."""
 
1457
        self.logger.trace("BytesMessageProducer paused, http producer: %s",
 
1458
                          self.producer)
 
1459
        if self.producer:
 
1460
            self.producer.pauseProducing()
 
1461
 
 
1462
    def write(self, content):
 
1463
        """Part of IConsumer."""
 
1464
        p = 0
 
1465
        part = content[p:p + self.payload_size]
 
1466
        while part:
 
1467
            if self.request.cancelled:
 
1468
                # stop generating messages
 
1469
                return
 
1470
            response = protocol_pb2.Message()
 
1471
            response.type = protocol_pb2.Message.BYTES
 
1472
            response.bytes.bytes = part
 
1473
            self.request.transferred += len(part)
 
1474
            self.request.sendMessage(response)
 
1475
            p += self.payload_size
 
1476
            part = content[p:p + self.payload_size]
 
1477
        self.request.last_good_state_ts = time.time()
 
1478
 
 
1479
 
 
1480
def cancel_filter(function):
 
1481
    """Raises RequestCancelledError if the request is cancelled.
 
1482
 
 
1483
    This methods exists to be used in a addCallback sequence to assure
 
1484
    that it does not continue if the request is cancelled, like:
 
1485
 
 
1486
    >>> d.addCallback(cancel_filter(foo))
 
1487
    >>> d.addCallbacks(done_callback, error_errback)
 
1488
 
 
1489
    Note that you may receive RequestCancelledError in your
 
1490
    'error_errback' func.
 
1491
    """
 
1492
    @wraps(function)
 
1493
    def f(self, *args, **kwargs):
 
1494
        '''Function to be called from twisted when its time arrives.'''
 
1495
        if self.cancelled:
 
1496
            raise request.RequestCancelledError(
 
1497
                "The request id=%d "
 
1498
                "is cancelled! (before calling %r)" % (self.id, function))
 
1499
        return function(self, *args, **kwargs)
 
1500
    return f
 
1501
 
 
1502
 
 
1503
class GetContentResponse(SimpleRequestResponse):
 
1504
    """GET_CONTENT Request Response."""
 
1505
 
 
1506
    __slots__ = ('cancel_message', 'message_producer',
 
1507
                 'transferred', 'init_time')
 
1508
 
 
1509
    def __init__(self, protocol, message):
 
1510
        super(GetContentResponse, self).__init__(protocol, message)
 
1511
        self.cancel_message = None
 
1512
        self.message_producer = None
 
1513
        self.transferred = 0
 
1514
        self.init_time = 0
 
1515
        self.length = None  # to not inform automatically on done()
 
1516
 
 
1517
    def _get_node_info(self):
 
1518
        """Return node info from the message."""
 
1519
        node_id = self.source_message.get_content.node
 
1520
        return "node: %r" % (node_id,)
 
1521
 
 
1522
    def _send_protocol_error(self, failure):
 
1523
        """Convert the failure to a protocol error.
 
1524
 
 
1525
        Send it if possible; otherwise, continue propagating it.
 
1526
 
 
1527
        """
 
1528
        is_cancelled = (failure.check(ProducerStopped) and self.cancelled)
 
1529
 
 
1530
        if failure.check(request.RequestCancelledError) or is_cancelled:
 
1531
            # the normal sequence was interrupted by a cancel
 
1532
            if self.cancel_message is not None:
 
1533
                response = protocol_pb2.Message()
 
1534
                response.id = self.cancel_message.id
 
1535
                response.type = protocol_pb2.Message.CANCELLED
 
1536
                self.sendMessage(response)
 
1537
            else:
 
1538
                msg = 'Got cancelling failure %s but cancel_message is None.'
 
1539
                self.log.warning(msg, failure)
 
1540
            return  # the error has been consumed
 
1541
 
 
1542
        else:
 
1543
            # handle all the TRY_AGAIN cases
 
1544
            if failure.check(ProducerStopped, error.ConnectionLost):
 
1545
                failure = Failure(errors.TryAgain(failure.value))
 
1546
            return SimpleRequestResponse._send_protocol_error(self, failure)
 
1547
 
 
1548
    def _start(self):
 
1549
        """Get node content and send it."""
 
1550
        self.init_time = time.time()
 
1551
        self._log_start()
 
1552
        share_id = self.convert_share_id(self.source_message.get_content.share)
 
1553
 
 
1554
        def done(result):
 
1555
            """Send EOF message."""
 
1556
            response = protocol_pb2.Message()
 
1557
            response.type = protocol_pb2.Message.EOF
 
1558
            self.sendMessage(response)
 
1559
 
 
1560
        def get_content(node):
 
1561
            """Get the content for node."""
 
1562
            d = node.get_content(
 
1563
                user=self.protocol.user,
 
1564
                previous_hash=self.source_message.get_content.hash,
 
1565
                start=self.source_message.get_content.offset)
 
1566
            return d
 
1567
 
 
1568
        def stop_if_cancelled(producer):
 
1569
            """Stop the producer if cancelled."""
 
1570
            # let's forget ProducerStopped if we're cancelled
 
1571
            def ignore_producer_stopped(failure):
 
1572
                """Like failure trap with extras."""
 
1573
                if failure.check(ProducerStopped) and self.cancelled:
 
1574
                    # I know, I know, I cancelled you!
 
1575
                    return
 
1576
                return failure
 
1577
 
 
1578
            producer.deferred.addErrback(ignore_producer_stopped)
 
1579
 
 
1580
            if self.cancelled:
 
1581
                producer.stopProducing()
 
1582
            return producer
 
1583
 
 
1584
        def get_node_attrs(node):
 
1585
            """Get attributes for 'node' and send them."""
 
1586
            r = protocol_pb2.Message()
 
1587
            r.type = protocol_pb2.Message.NODE_ATTR
 
1588
            r.node_attr.deflated_size = node.deflated_size
 
1589
            r.node_attr.size = node.size
 
1590
            r.node_attr.hash = node.content_hash
 
1591
            r.node_attr.crc32 = node.crc32
 
1592
            self.sendMessage(r)
 
1593
 
 
1594
            # save data to be logged on operation end
 
1595
            self.operation_data = "vol_id=%s node_id=%s hash=%s size=%s" % (
 
1596
                share_id, self.source_message.get_content.node,
 
1597
                node.content_hash, node.size)
 
1598
            return node
 
1599
 
 
1600
        if self.protocol.user is None:
 
1601
            self.sendError(protocol_pb2.Error.AUTHENTICATION_REQUIRED,
 
1602
                           comment=self.auth_required_error)
 
1603
            self.done()
 
1604
        else:
 
1605
            # get node
 
1606
            d = self.protocol.user.get_node(
 
1607
                share_id,
 
1608
                self.source_message.get_content.node,
 
1609
                self.source_message.get_content.hash)
 
1610
            d.addCallback(self.cancel_filter(get_node_attrs))
 
1611
            # get content and validate hash
 
1612
            d.addCallback(self.cancel_filter(get_content))
 
1613
            # stop producer if cancelled
 
1614
            d.addCallback(stop_if_cancelled)
 
1615
            # send content
 
1616
            d.addCallback(self.cancel_filter(self.send))
 
1617
            # send eof
 
1618
            d.addCallback(self.cancel_filter(done))
 
1619
            d.addErrback(self._send_protocol_error)
 
1620
            d.addCallbacks(self.done, self.internal_error)
 
1621
 
 
1622
    def send(self, producer):
 
1623
        """Send node to the client."""
 
1624
        delta = time.time() - self.init_time
 
1625
        self.protocol.factory.sli_metrics.sli('GetContentResponseInit', delta)
 
1626
 
 
1627
        message_producer = BytesMessageProducer(producer, self)
 
1628
        self.message_producer = message_producer
 
1629
        self.registerProducer(message_producer, streaming=True)
 
1630
 
 
1631
        # release this request early
 
1632
        self.protocol.release(self)
 
1633
 
 
1634
        return producer.deferred
 
1635
 
 
1636
    def _cancel(self):
 
1637
        """Cancel the request.
 
1638
 
 
1639
        This method is called if the request was not cancelled before
 
1640
        (check self.cancel).
 
1641
        """
 
1642
        producer = self.message_producer
 
1643
        if producer is not None:
 
1644
            self.unregisterProducer()
 
1645
            producer.stopProducing()
 
1646
 
 
1647
    def _processMessage(self, message):
 
1648
        """Process new messages from the client inside this request."""
 
1649
        if message.type == protocol_pb2.Message.CANCEL_REQUEST:
 
1650
            self.cancel_message = message
 
1651
            self.cancel()
 
1652
            self.protocol.release(self)
 
1653
        else:
 
1654
            self.error(request.StorageProtocolProtocolError(message))
 
1655
 
 
1656
 
 
1657
class PutContentResponse(SimpleRequestResponse):
 
1658
    """PUT_CONTENT Request Response."""
 
1659
 
 
1660
    __slots__ = ('cancel_message', 'upload_job', 'transferred',
 
1661
                 'state', 'init_time')
 
1662
 
 
1663
    expected_foreign_errors = [dataerror.NoPermission, dataerror.QuotaExceeded]
 
1664
 
 
1665
    user_activity = 'sync_activity'
 
1666
 
 
1667
    # indicators for internal fsm
 
1668
    states = "INIT UPLOADING COMMITING CANCELING ERROR DONE"
 
1669
    states = collections.namedtuple("States", states.lower())(*states.split())
 
1670
 
 
1671
    # will send TRY_AGAIN on all these errors
 
1672
    _try_again_errors = (
 
1673
        ProducerStopped,
 
1674
        dataerror.RetryLimitReached,
 
1675
        errors.BufferLimit,
 
1676
    )
 
1677
 
 
1678
    def __init__(self, protocol, message):
 
1679
        super(PutContentResponse, self).__init__(protocol, message)
 
1680
        self.cancel_message = None
 
1681
        self.upload_job = None
 
1682
        self.transferred = 0
 
1683
        self.state = self.states.init
 
1684
        self.init_time = 0
 
1685
        self.length = None  # to not inform automatically on done()
 
1686
 
 
1687
    def done(self):
 
1688
        """Override done to skip and log 'double calls'."""
 
1689
        # Just a hack to avoid the KeyError flood until we find the
 
1690
        # reason of the double done calls.
 
1691
        if self.finished:
 
1692
            # build a call chain of 3 levels
 
1693
            curframe = inspect.currentframe()
 
1694
            calframe1 = inspect.getouterframes(curframe, 2)
 
1695
            callers = [calframe1[i][3] for i in range(1, 4)]
 
1696
            call_chain = ' -> '.join(reversed(callers))
 
1697
            self.log.warning("%s: called done() finished=%s",
 
1698
                             call_chain, self.finished)
 
1699
        else:
 
1700
            SimpleRequestResponse.done(self)
 
1701
 
 
1702
    def _get_node_info(self):
 
1703
        """Return node info from the message."""
 
1704
        node_id = self.source_message.put_content.node
 
1705
        return "node: %r" % (node_id,)
 
1706
 
 
1707
    def _start(self):
 
1708
        """Create upload reservation and start receiving."""
 
1709
        self.init_time = time.time()
 
1710
        self._log_start()
 
1711
 
 
1712
        if self.protocol.user is None:
 
1713
            self.sendError(protocol_pb2.Error.AUTHENTICATION_REQUIRED,
 
1714
                           comment=self.auth_required_error)
 
1715
            self.state = self.states.done
 
1716
            self.done()
 
1717
        else:
 
1718
            d = self._start_upload()
 
1719
            d.addErrback(self._generic_error)
 
1720
 
 
1721
    def _log_exception(self, exc):
 
1722
        """Log an exception before it is handled by the parent request."""
 
1723
        if self.upload_job is not None:
 
1724
            size_hint = self.upload_job.inflated_size_hint
 
1725
        else:
 
1726
            size_hint = 0
 
1727
        if isinstance(exc, errors.TryAgain):
 
1728
            m = "Upload failed with TryAgain error: %s, size_hint: %s"
 
1729
            self.log.debug(m, exc.orig_error.__class__.__name__, size_hint)
 
1730
        else:
 
1731
            m = "Upload failed with error: %s, size_hint: %s"
 
1732
            self.log.debug(m, exc.__class__.__name__, size_hint)
 
1733
 
 
1734
    @defer.inlineCallbacks
 
1735
    def _generic_error(self, failure):
 
1736
        """Process all possible errors."""
 
1737
        # it can be an exception, let's work with a failure from now on
 
1738
        if not isinstance(failure, Failure):
 
1739
            failure = Failure(failure)
 
1740
        exc = failure.value
 
1741
 
 
1742
        if failure.check(request.RequestCancelledError):
 
1743
            if self.state == self.states.canceling:
 
1744
                # special error while canceling
 
1745
                self.log.debug("Request cancelled: %s", exc)
 
1746
                return
 
1747
 
 
1748
        self.log.warning("Error while in %s: %s (%s)", self.state,
 
1749
                         exc.__class__.__name__, exc)
 
1750
        if self.state in (self.states.error, self.states.done):
 
1751
            # if already in error/done state, just ignore it (after logging)
 
1752
            return
 
1753
        self.state = self.states.error
 
1754
 
 
1755
        # on any error, we just stop the upload job
 
1756
        if self.upload_job is not None:
 
1757
            self.log.debug("Stoping the upload job after an error")
 
1758
            yield self.upload_job.stop()
 
1759
            # and unregister the transport from the upload_job
 
1760
            self.upload_job.unregisterProducer()
 
1761
 
 
1762
        # handle all the TRY_AGAIN cases
 
1763
        if failure.check(*self._try_again_errors):
 
1764
            exc = errors.TryAgain(exc)
 
1765
            failure = Failure(exc)
 
1766
 
 
1767
        # generic error handler, and done (and really fail if we get
 
1768
        # a problem while doing that)
 
1769
        try:
 
1770
            self._log_exception(exc)
 
1771
            yield self._send_protocol_error(failure)
 
1772
            yield self.done()
 
1773
        except:
 
1774
            yield self.internal_error(Failure())
 
1775
 
 
1776
    @trace_message
 
1777
    def _processMessage(self, message):
 
1778
        """Receive the content for the upload."""
 
1779
        if self.state == self.states.uploading:
 
1780
            try:
 
1781
                self._process_while_uploading(message)
 
1782
            except Exception as e:
 
1783
                self._generic_error(e)
 
1784
            return
 
1785
 
 
1786
        if self.state == self.states.init:
 
1787
            if message.type == protocol_pb2.Message.CANCEL_REQUEST:
 
1788
                try:
 
1789
                    self.state = self.states.canceling
 
1790
                    self.cancel_message = message
 
1791
                    self.cancel()
 
1792
                except Exception as e:
 
1793
                    self._generic_error(e)
 
1794
                return
 
1795
 
 
1796
        if self.state == self.states.error:
 
1797
            # just ignore the message
 
1798
            return
 
1799
 
 
1800
        self.log.warning("Received out-of-order message: state=%s  message=%s",
 
1801
                         self.state, message.type)
 
1802
 
 
1803
    def _process_while_uploading(self, message):
 
1804
        """Receive any messages while in UPLOADING."""
 
1805
        if message.type == protocol_pb2.Message.CANCEL_REQUEST:
 
1806
            self.state = self.states.canceling
 
1807
            self.cancel_message = message
 
1808
            self.cancel()
 
1809
 
 
1810
        elif message.type == protocol_pb2.Message.EOF:
 
1811
            self.state = self.states.commiting
 
1812
            self.upload_job.deferred.addCallback(self._commit_uploadjob)
 
1813
            self.upload_job.deferred.addErrback(self._generic_error)
 
1814
 
 
1815
        elif message.type == protocol_pb2.Message.BYTES:
 
1816
            # process BYTES, stay here in UPLOADING
 
1817
            received_bytes = message.bytes.bytes
 
1818
            self.transferred += len(received_bytes)
 
1819
            self.upload_job.add_data(received_bytes)
 
1820
            self.last_good_state_ts = time.time()
 
1821
 
 
1822
        else:
 
1823
            self.log.error("Received unknown message: %s", message.type)
 
1824
 
 
1825
    @defer.inlineCallbacks
 
1826
    def _cancel(self):
 
1827
        """Cancel put content.
 
1828
 
 
1829
        This is called from request.Request.cancel(), after a
 
1830
        client's CANCEL_REQUEST or a request.stop().
 
1831
        """
 
1832
        cancelled_by_user = self.state == self.states.canceling
 
1833
        self.log.debug("Request canceled (in %s)", self.state)
 
1834
        self.state = self.states.canceling
 
1835
 
 
1836
        if cancelled_by_user:
 
1837
            # cancel the upload job and answer back
 
1838
            if self.upload_job is not None:
 
1839
                self.log.debug("Canceling the upload job")
 
1840
                yield self.upload_job.cancel()
 
1841
            response = protocol_pb2.Message()
 
1842
            response.id = self.cancel_message.id
 
1843
            response.type = protocol_pb2.Message.CANCELLED
 
1844
            self.sendMessage(response)
 
1845
        else:
 
1846
            # just stop the upload job
 
1847
            if self.upload_job is not None:
 
1848
                self.log.debug("Stoping the upload job after a cancel")
 
1849
                yield self.upload_job.stop()
 
1850
 
 
1851
        # done
 
1852
        self.state = self.states.done
 
1853
        self.done()
 
1854
 
 
1855
    @defer.inlineCallbacks
 
1856
    def _commit_uploadjob(self, result):
 
1857
        """Callback for uploadjob when it's complete."""
 
1858
        commit_time = time.time()
 
1859
        if self.state != self.states.commiting:
 
1860
            # if we are not in committing state, bail out
 
1861
            return
 
1862
 
 
1863
        def commit():
 
1864
            """The callable to queue."""
 
1865
            if self.state != self.states.commiting:
 
1866
                # Request has been canceled since we last checked
 
1867
                return defer.succeed(None)
 
1868
            return self.upload_job.commit(result)
 
1869
 
 
1870
        # queue the commit work
 
1871
        new_generation = yield self.queue_action(commit)
 
1872
 
 
1873
        if self.state != self.states.commiting:
 
1874
            # if we are not in committing state, bail out
 
1875
            return
 
1876
 
 
1877
        # when commit's done, send the ok
 
1878
        self.protocol.factory.sli_metrics.sli('PutContentResponseCommit',
 
1879
                                              time.time() - commit_time)
 
1880
        response = protocol_pb2.Message()
 
1881
        response.type = protocol_pb2.Message.OK
 
1882
        response.new_generation = new_generation
 
1883
        self.sendMessage(response)
 
1884
 
 
1885
        # done
 
1886
        self.state = self.states.done
 
1887
        self.done()
 
1888
 
 
1889
    @defer.inlineCallbacks
 
1890
    def _start_upload(self):
 
1891
        """Setup the upload and tell the client to start uploading."""
 
1892
        self.upload_job = yield self._get_upload_job()
 
1893
        self.upload_job.deferred.addErrback(self._generic_error)
 
1894
        if self.state in (self.states.done, self.states.canceling):
 
1895
            # Manually canceling the upload because we were canceled
 
1896
            # while getting the upload
 
1897
            self.log.debug("Manually canceling the upload job (in %s)",
 
1898
                           self.state)
 
1899
            yield self.upload_job.cancel()
 
1900
            return
 
1901
        yield self.upload_job.connect()
 
1902
        # register the client transport as the producer
 
1903
        self.upload_job.registerProducer(self.protocol.transport)
 
1904
        self.protocol.release(self)
 
1905
        yield self._send_begin()
 
1906
        self.state = self.states.uploading
 
1907
 
 
1908
    @defer.inlineCallbacks
 
1909
    @cancel_filter
 
1910
    def _get_upload_job(self):
 
1911
        """Get the uploadjob."""
 
1912
        share_id = self.convert_share_id(self.source_message.put_content.share)
 
1913
        if config.api_server.magic_upload_active:
 
1914
            magic_hash = self.source_message.put_content.magic_hash or None
 
1915
        else:
 
1916
            magic_hash = None
 
1917
 
 
1918
        # save data to be logged on operation end
 
1919
        self.operation_data = "vol_id=%s node_id=%s hash=%s size=%s" % (
 
1920
            share_id, self.source_message.put_content.node,
 
1921
            self.source_message.put_content.hash,
 
1922
            self.source_message.put_content.size)
 
1923
 
 
1924
        # create upload reservation
 
1925
        uploadjob = yield self.protocol.user.get_upload_job(
 
1926
            share_id,
 
1927
            self.source_message.put_content.node,
 
1928
            self.source_message.put_content.previous_hash,
 
1929
            self.source_message.put_content.hash,
 
1930
            self.source_message.put_content.crc32,
 
1931
            self.source_message.put_content.size,
 
1932
            self.source_message.put_content.deflated_size,
 
1933
            session_id=self.protocol.session_id,
 
1934
            magic_hash=magic_hash,
 
1935
            upload_id=self.source_message.put_content.upload_id or None)
 
1936
        defer.returnValue(uploadjob)
 
1937
 
 
1938
    @cancel_filter
 
1939
    def _send_begin(self):
 
1940
        """Notify the client that he can start sending data."""
 
1941
        delta = time.time() - self.init_time
 
1942
        self.protocol.factory.sli_metrics.sli('PutContentResponseInit', delta)
 
1943
        upload_type = self.upload_job.__class__.__name__
 
1944
        offset = self.upload_job.offset
 
1945
        response = protocol_pb2.Message()
 
1946
        response.type = protocol_pb2.Message.BEGIN_CONTENT
 
1947
        response.begin_content.offset = offset
 
1948
        # only send the upload id for a new put content
 
1949
        upload_id = self.upload_job.upload_id
 
1950
        upload_id = '' if upload_id is None else str(upload_id)
 
1951
        if self.source_message.put_content.upload_id != upload_id:
 
1952
            response.begin_content.upload_id = upload_id
 
1953
        self.sendMessage(response)
 
1954
 
 
1955
        factory = self.protocol.factory
 
1956
        factory.metrics.meter("%s.upload.begin" % (upload_type,), 1)
 
1957
        factory.metrics.gauge("%s.upload" % (upload_type,), offset)
 
1958
        self.log.debug("%s begin content from offset %d", upload_type, offset)
 
1959
 
 
1960
 
 
1961
class GetDeltaResponse(SimpleRequestResponse):
 
1962
    """GetDelta Request Response."""
 
1963
 
 
1964
    __slots__ = ()
 
1965
 
 
1966
    def _get_node_info(self):
 
1967
        """Return node info from the message."""
 
1968
        volume_id = self.source_message.get_delta.share
 
1969
        return "volume: %r" % (volume_id,)
 
1970
 
 
1971
    @inlineCallbacks
 
1972
    def _process(self):
 
1973
        """Get the deltas and send messages."""
 
1974
        msg = self.source_message
 
1975
        share_id = self.convert_share_id(msg.get_delta.share)
 
1976
        from_generation = msg.get_delta.from_generation
 
1977
        delta_max_size = config.api_server.delta_max_size
 
1978
        delta_info = yield self.protocol.user.get_delta(
 
1979
            share_id, from_generation, limit=delta_max_size)
 
1980
        nodes, vol_generation, free_bytes = delta_info
 
1981
        yield self.send_delta_info(nodes, msg.get_delta.share)
 
1982
        self.length = len(nodes)
 
1983
 
 
1984
        # now send the DELTA_END
 
1985
        response = protocol_pb2.Message()
 
1986
        response.type = protocol_pb2.Message.DELTA_END
 
1987
        full = True
 
1988
        if nodes:
 
1989
            full = vol_generation == nodes[-1].generation
 
1990
        response.delta_end.full = full
 
1991
        if full:
 
1992
            response.delta_end.generation = vol_generation
 
1993
        else:
 
1994
            response.delta_end.generation = nodes[-1].generation
 
1995
        response.delta_end.free_bytes = free_bytes
 
1996
        self.sendMessage(response)
 
1997
 
 
1998
        # save data to be logged on operation end
 
1999
        t = "vol_id=%s from_gen=%s current_gen=%s nodes=%d free_bytes=%s" % (
 
2000
            share_id, from_generation, vol_generation, self.length, free_bytes)
 
2001
        self.operation_data = t
 
2002
 
 
2003
    def send_delta_info(self, nodes, share_id):
 
2004
        """Build and send the DELTA_INFO for each node."""
 
2005
        return task.cooperate(
 
2006
            self._send_delta_info(nodes, share_id)).whenDone()
 
2007
 
 
2008
    def _send_delta_info(self, nodes, share_id):
 
2009
        """Build and send the DELTA_INFO for each node."""
 
2010
        count = 0
 
2011
        for node in nodes:
 
2012
            if count == config.api_server.max_delta_info:
 
2013
                count = 0
 
2014
                yield
 
2015
            message = protocol_pb2.Message()
 
2016
            message.type = protocol_pb2.Message.DELTA_INFO
 
2017
            message.delta_info.type = protocol_pb2.DeltaInfo.FILE_INFO
 
2018
            delta_info = message.delta_info
 
2019
            delta_info.generation = node.generation
 
2020
            delta_info.is_live = node.is_live
 
2021
            if node.is_file:
 
2022
                delta_info.file_info.type = protocol_pb2.FileInfo.FILE
 
2023
            else:
 
2024
                delta_info.file_info.type = protocol_pb2.FileInfo.DIRECTORY
 
2025
            delta_info.file_info.name = node.name
 
2026
            delta_info.file_info.share = share_id
 
2027
            delta_info.file_info.node = str(node.id)
 
2028
            if node.parent_id is None:
 
2029
                delta_info.file_info.parent = ''
 
2030
            else:
 
2031
                delta_info.file_info.parent = str(node.parent_id)
 
2032
            delta_info.file_info.is_public = node.is_public
 
2033
            if node.content_hash is None:
 
2034
                delta_info.file_info.content_hash = ''
 
2035
            else:
 
2036
                delta_info.file_info.content_hash = str(node.content_hash)
 
2037
            delta_info.file_info.crc32 = node.crc32
 
2038
            delta_info.file_info.size = node.size
 
2039
            delta_info.file_info.last_modified = node.last_modified
 
2040
            self.sendMessage(message)
 
2041
            count += 1
 
2042
 
 
2043
 
 
2044
class RescanFromScratchResponse(GetDeltaResponse):
 
2045
    """RescanFromScratch Request Response."""
 
2046
 
 
2047
    __slots__ = ()
 
2048
 
 
2049
    @inlineCallbacks
 
2050
    def _process(self):
 
2051
        """Get all the live nodes and send DeltaInfos."""
 
2052
        msg = self.source_message
 
2053
        limit = config.api_server.get_from_scratch_limit
 
2054
        share_id = self.convert_share_id(msg.get_delta.share)
 
2055
        # get the first chunk
 
2056
        delta_info = yield self.protocol.user.get_from_scratch(
 
2057
            share_id, limit=limit)
 
2058
        # keep vol_generation and free_bytes for later
 
2059
        nodes, vol_generation, free_bytes = delta_info
 
2060
        self.length = len(nodes)
 
2061
        while nodes:
 
2062
            # while it keep returning nodes, send the current nodes
 
2063
            yield self.send_delta_info(nodes, msg.get_delta.share)
 
2064
            # and request more
 
2065
            last_path = (nodes[-1].path, nodes[-1].name)
 
2066
            delta_info = yield self.protocol.user.get_from_scratch(
 
2067
                share_id, start_from_path=last_path, limit=limit,
 
2068
                max_generation=vol_generation)
 
2069
            # but don't overwrite vol_generation or free_bytes in case
 
2070
            # something changes while fetching nodes
 
2071
            nodes, _, _ = delta_info
 
2072
            self.length += len(nodes)
 
2073
 
 
2074
        # now send the DELTA_END
 
2075
        response = protocol_pb2.Message()
 
2076
        response.type = protocol_pb2.Message.DELTA_END
 
2077
        response.delta_end.full = True
 
2078
        response.delta_end.generation = vol_generation
 
2079
        response.delta_end.free_bytes = free_bytes
 
2080
        self.sendMessage(response)
 
2081
 
 
2082
        # save data to be logged on operation end
 
2083
        t = "vol_id=%s current_gen=%s nodes=%d free_bytes=%s" % (
 
2084
            share_id, vol_generation, self.length, free_bytes)
 
2085
        self.operation_data = t
 
2086
 
 
2087
 
 
2088
class QuerySetCapsResponse(SimpleRequestResponse):
 
2089
    """QUERY_CAPS and SET_CAPS Request Response."""
 
2090
 
 
2091
    __slots__ = ('set_mode',)
 
2092
 
 
2093
    authentication_required = False
 
2094
 
 
2095
    def __init__(self, protocol, message, set_mode=False):
 
2096
        """Stores the mode."""
 
2097
        self.set_mode = set_mode
 
2098
        super(QuerySetCapsResponse, self).__init__(protocol, message)
 
2099
 
 
2100
    def _process(self):
 
2101
        """Validate the list of capabilities passed to us."""
 
2102
        if self.set_mode:
 
2103
            msg_caps = self.source_message.set_caps
 
2104
        else:
 
2105
            msg_caps = self.source_message.query_caps
 
2106
        caps = frozenset(q.capability for q in msg_caps)
 
2107
 
 
2108
        # after one succesful, no more allowed
 
2109
        if self.protocol.working_caps == MIN_CAP:
 
2110
            accepted = caps in SUPPORTED_CAPS
 
2111
            if self.set_mode:
 
2112
                if accepted:
 
2113
                    self.protocol.working_caps = caps
 
2114
 
 
2115
            # get the redirecting info
 
2116
            redirect = SUGGESTED_REDIRS.get(caps, {})
 
2117
            red_host, red_port, red_srvr = [redirect.get(x, "")
 
2118
                                            for x in "hostname", "port",
 
2119
                                            "srvrecord"]
 
2120
        else:
 
2121
            accepted = False
 
2122
            red_host = red_port = red_srvr = ""
 
2123
 
 
2124
        # log what we just decided
 
2125
        action = "Set" if self.set_mode else "Query"
 
2126
        result = "Accepted" if accepted else "Rejected"
 
2127
        self.log.info("Capabilities %s %s: %s", action, result, caps)
 
2128
 
 
2129
        response = protocol_pb2.Message()
 
2130
        response.type = protocol_pb2.Message.ACCEPT_CAPS
 
2131
        response.accept_caps.accepted = accepted
 
2132
        response.accept_caps.redirect_hostname = red_host
 
2133
        response.accept_caps.redirect_port = red_port
 
2134
        response.accept_caps.redirect_srvrecord = red_srvr
 
2135
        self.sendMessage(response)
 
2136
 
 
2137
 
 
2138
class MoveResponse(SimpleRequestResponse):
 
2139
    """Move Request Response."""
 
2140
 
 
2141
    __slots__ = ()
 
2142
 
 
2143
    expected_foreign_errors = [
 
2144
        dataerror.InvalidFilename,
 
2145
        dataerror.NoPermission,
 
2146
        dataerror.AlreadyExists,
 
2147
        dataerror.NotADirectory,
 
2148
    ]
 
2149
 
 
2150
    user_activity = 'sync_activity'
 
2151
 
 
2152
    def _get_node_info(self):
 
2153
        """Return node info from the message."""
 
2154
        node_id = self.source_message.move.node
 
2155
        return "node: %r" % (node_id,)
 
2156
 
 
2157
    @inlineCallbacks
 
2158
    def _process(self):
 
2159
        """Move the node."""
 
2160
        share_id = self.convert_share_id(self.source_message.move.share)
 
2161
        node_id = self.source_message.move.node
 
2162
        new_parent_id = self.source_message.move.new_parent_node
 
2163
        new_name = self.source_message.move.new_name
 
2164
 
 
2165
        generation, mimetype = yield self.protocol.user.move(
 
2166
            share_id, node_id, new_parent_id, new_name,
 
2167
            session_id=self.protocol.session_id)
 
2168
 
 
2169
        response = protocol_pb2.Message()
 
2170
        response.new_generation = generation
 
2171
        response.type = protocol_pb2.Message.OK
 
2172
        self.sendMessage(response)
 
2173
 
 
2174
        # save data to be logged on operation end
 
2175
        extension = self._get_extension(new_name)
 
2176
        self.operation_data = "vol_id=%s node_id=%s mime=%r ext=%r" % (
 
2177
            share_id, node_id, mimetype, extension)
 
2178
 
 
2179
 
 
2180
class MakeResponse(SimpleRequestResponse):
 
2181
    """MAKE_[DIR|FILE] Request Response."""
 
2182
 
 
2183
    __slots__ = ()
 
2184
 
 
2185
    expected_foreign_errors = [
 
2186
        dataerror.InvalidFilename,
 
2187
        dataerror.NoPermission,
 
2188
        dataerror.AlreadyExists,
 
2189
        dataerror.NotADirectory,
 
2190
    ]
 
2191
 
 
2192
    user_activity = 'sync_activity'
 
2193
 
 
2194
    def _get_node_info(self):
 
2195
        """Return node info from the message."""
 
2196
        parent_id = self.source_message.make.parent_node
 
2197
        return "parent: %r" % (parent_id,)
 
2198
 
 
2199
    @inlineCallbacks
 
2200
    def _process(self):
 
2201
        """Create the file/directory."""
 
2202
        if self.source_message.type == protocol_pb2.Message.MAKE_DIR:
 
2203
            response_type = protocol_pb2.Message.NEW_DIR
 
2204
            create_method_name = "make_dir"
 
2205
            node_type = "Directory"
 
2206
        elif self.source_message.type == protocol_pb2.Message.MAKE_FILE:
 
2207
            response_type = protocol_pb2.Message.NEW_FILE
 
2208
            create_method_name = "make_file"
 
2209
            node_type = "File"
 
2210
        else:
 
2211
            raise request.StorageProtocolError(
 
2212
                "Can not create from message "
 
2213
                "(type %s)." % self.source_message.type)
 
2214
 
 
2215
        share_id = self.convert_share_id(self.source_message.make.share)
 
2216
        parent_id = self.source_message.make.parent_node
 
2217
        name = self.source_message.make.name
 
2218
        create_method = getattr(self.protocol.user, create_method_name)
 
2219
        d = create_method(share_id, parent_id, name,
 
2220
                          session_id=self.protocol.session_id)
 
2221
        node_id, generation, mimetype = yield d
 
2222
        response = protocol_pb2.Message()
 
2223
        response.type = response_type
 
2224
        response.new_generation = generation
 
2225
        response.new.node = str(node_id)
 
2226
        response.new.parent_node = parent_id
 
2227
        response.new.name = name
 
2228
        self.sendMessage(response)
 
2229
 
 
2230
        # save data to be logged on operation end
 
2231
        extension = self._get_extension(name)
 
2232
        self.operation_data = "type=%s vol_id=%s node_id=%s mime=%r ext=%r" % (
 
2233
            node_type, share_id, node_id, mimetype, extension)
 
2234
 
 
2235
 
 
2236
class FreeSpaceResponse(SimpleRequestResponse):
 
2237
    """Implements FREE_SPACE_INQUIRY."""
 
2238
 
 
2239
    __slots__ = ()
 
2240
 
 
2241
    expected_foreign_errors = [dataerror.NoPermission]
 
2242
 
 
2243
    @inlineCallbacks
 
2244
    def _process(self):
 
2245
        """Reports available space for the given share (or the user's own
 
2246
        storage).
 
2247
 
 
2248
        """
 
2249
        share_id = self.convert_share_id(
 
2250
            self.source_message.free_space_inquiry.share_id)
 
2251
        free_bytes = yield self.protocol.user.get_free_bytes(share_id)
 
2252
        response = protocol_pb2.Message()
 
2253
        response.type = protocol_pb2.Message.FREE_SPACE_INFO
 
2254
        response.free_space_info.free_bytes = free_bytes
 
2255
        self.sendMessage(response)
 
2256
 
 
2257
        # save data to be logged on operation end
 
2258
        self.operation_data = "vol_id=%s free_bytes=%s" % (
 
2259
            share_id, free_bytes)
 
2260
 
 
2261
 
 
2262
class AccountResponse(SimpleRequestResponse):
 
2263
    """Implements ACCOUNT_INQUIRY."""
 
2264
 
 
2265
    __slots__ = ()
 
2266
 
 
2267
    @inlineCallbacks
 
2268
    def _process(self):
 
2269
        """Reports user account information."""
 
2270
        (purchased, used) = yield self.protocol.user.get_storage_byte_quota()
 
2271
        response = protocol_pb2.Message()
 
2272
        response.type = protocol_pb2.Message.ACCOUNT_INFO
 
2273
        response.account_info.purchased_bytes = purchased
 
2274
        self.sendMessage(response)
 
2275
 
 
2276
 
 
2277
class AuthenticateResponse(SimpleRequestResponse):
 
2278
    """AUTH_REQUEST Request Response."""
 
2279
 
 
2280
    __slots__ = ()
 
2281
 
 
2282
    authentication_required = False
 
2283
    not_allowed_re = re.compile("[^\w_]")
 
2284
 
 
2285
    user_activity = 'connected'
 
2286
 
 
2287
    @inlineCallbacks
 
2288
    def _process(self):
 
2289
        """Authenticate the user."""
 
2290
        # check that its not already logged in
 
2291
        if self.protocol.user is not None:
 
2292
            raise errors.AuthenticationError("User already logged in.")
 
2293
 
 
2294
        self.protocol.check_poison("authenticate_start")
 
2295
        # get metadata and send stats
 
2296
        metadata = dict(
 
2297
            (m.key, m.value) for m in self.source_message.metadata)
 
2298
        if metadata:
 
2299
            self.log.info("Client metadata: %s", metadata)
 
2300
            for key in ("platform", "version"):
 
2301
                if key in metadata:
 
2302
                    value = self.not_allowed_re.sub("_", metadata[key])
 
2303
                    self.protocol.factory.metrics.meter("client.%s.%s" %
 
2304
                                                        (key, value), 1)
 
2305
        # do auth
 
2306
        auth_parameters = dict(
 
2307
            (param.name, param.value)
 
2308
            for param in self.source_message.auth_parameters)
 
2309
        authenticate = self.protocol.factory.auth_provider.authenticate
 
2310
        user = yield authenticate(auth_parameters, self.protocol)
 
2311
        self.protocol.check_poison("authenticate_continue")
 
2312
        if user is None:
 
2313
            raise errors.AuthenticationError("Authentication failed.")
 
2314
 
 
2315
        # confirm authentication
 
2316
        response = protocol_pb2.Message()
 
2317
        response.type = protocol_pb2.Message.AUTH_AUTHENTICATED
 
2318
        # include the session_id
 
2319
        response.session_id = str(self.protocol.session_id)
 
2320
        self.sendMessage(response)
 
2321
 
 
2322
        # set up user for this session
 
2323
        self.protocol.set_user(user)
 
2324
 
 
2325
        # let the client know the root
 
2326
        root_id, _ = yield user.get_root()
 
2327
        response = protocol_pb2.Message()
 
2328
        response.type = protocol_pb2.Message.ROOT
 
2329
        response.root.node = str(root_id)
 
2330
        self.sendMessage(response)
 
2331
 
 
2332
 
 
2333
class Action(BaseRequestResponse):
 
2334
    """This is an internal action, that behaves like a request.
 
2335
 
 
2336
    We can use this Action class to queue work in the pending_requests without
 
2337
    need to worry of handling errors in the request itself.
 
2338
    """
 
2339
 
 
2340
    __slots__ = ('_callable', 'log')
 
2341
 
 
2342
    def __init__(self, request, action):
 
2343
        """Create the request response. Setup logger."""
 
2344
        self._callable = action
 
2345
        # keep the request logger around
 
2346
        self.log = request.log
 
2347
        super(Action, self).__init__(request.protocol, request.source_message)
 
2348
 
 
2349
    def start(self, head=True):
 
2350
        """Schedule the action start for later.
 
2351
 
 
2352
        It will be executed when the server has no further pending requests.
 
2353
 
 
2354
        """
 
2355
        def _scheduled_start():
 
2356
            """The scheduled call."""
 
2357
            super(Action, self).start()
 
2358
 
 
2359
        self.log.debug("Action being scheduled (%s)", self._callable)
 
2360
        self.protocol.factory.metrics.increment("action_instances.%s" %
 
2361
                                                (self.__class__.__name__,))
 
2362
        self.protocol.schedule_request(self, _scheduled_start, head=head)
 
2363
 
 
2364
    def _start(self):
 
2365
        """Kick off action processing."""
 
2366
        self.log.debug("Action being started, working on: %s", self._callable)
 
2367
        d = maybeDeferred(self._callable)
 
2368
        d.addCallbacks(self.done, self.error)
 
2369
 
 
2370
    def cleanup(self):
 
2371
        """Remove the reference to self from the request handler"""
 
2372
        self.finished = True
 
2373
        self.started = False
 
2374
        self.protocol.factory.metrics.decrement("action_instances.%s" %
 
2375
                                                (self.__class__.__name__,))
 
2376
        self.protocol.release(self)
 
2377
 
 
2378
    def done(self, result):
 
2379
        """Overrided done to add logging and to use the callable result.
 
2380
 
 
2381
        Added an ignored parameter so we can hook this to the defered chain.
 
2382
        """
 
2383
        try:
 
2384
            self.log.debug("Action done (%s)", self._callable)
 
2385
            self.cleanup()
 
2386
            self.deferred.callback(result)
 
2387
        except Exception as e:
 
2388
            self.error(Failure(e))
 
2389
 
 
2390
    def error(self, failure):
 
2391
        """Rall this to signal that the action finished with failure
 
2392
 
 
2393
        @param failure: the failure instance
 
2394
        """
 
2395
        self.cleanup()
 
2396
        self.deferred.errback(failure)
 
2397
 
 
2398
 
 
2399
class StorageServerFactory(Factory):
 
2400
    """The Storage Server Factory.
 
2401
 
 
2402
    @cvar protocol: The class of the server.
 
2403
    """
 
2404
    protocol = StorageServer
 
2405
    graceful_shutdown = config.api_server.graceful_shutdown
 
2406
 
 
2407
    def __init__(self, s3_host, s3_port, s3_ssl, s3_key, s3_secret,
 
2408
                 s3_proxy_host=None, s3_proxy_port=None,
 
2409
                 auth_provider_class=auth.DummyAuthProvider,
 
2410
                 s3_class=S3, content_class=content.ContentManager,
 
2411
                 reactor=reactor, oops_config=None,
 
2412
                 servername=None, reactor_inspector=None):
 
2413
        """Create a StorageServerFactory."""
 
2414
        self.auth_provider = auth_provider_class(self)
 
2415
        self.content = content_class(self)
 
2416
        self.s3_class = s3_class
 
2417
        self.s3_host = s3_host
 
2418
        self.s3_port = s3_port
 
2419
        self.s3_ssl = s3_ssl
 
2420
        self.s3_key = s3_key
 
2421
        self.s3_secret = s3_secret
 
2422
        self.s3_proxy_host = s3_proxy_host
 
2423
        self.s3_proxy_port = s3_proxy_port
 
2424
        self.logger = logging.getLogger("storage.server")
 
2425
 
 
2426
        self.metrics = MetricsConnector.get_metrics("root")
 
2427
        self.user_metrics = MetricsConnector.get_metrics("user")
 
2428
        self.sli_metrics = MetricsConnector.get_metrics("sli")
 
2429
 
 
2430
        self.servername = servername
 
2431
        self.reactor_inspector = reactor_inspector
 
2432
 
 
2433
        # note that this relies on the notifier calling the
 
2434
        # callback from the reactor thread
 
2435
        notif = notifier.get_notifier()
 
2436
        notif.set_event_callback(
 
2437
            notifier.UDFCreate,
 
2438
            self.event_callback_handler(self.deliver_udf_create))
 
2439
        notif.set_event_callback(
 
2440
            notifier.UDFDelete,
 
2441
            self.event_callback_handler(self.deliver_udf_delete))
 
2442
        notif.set_event_callback(
 
2443
            notifier.ShareCreated,
 
2444
            self.event_callback_handler(self.deliver_share_created))
 
2445
        notif.set_event_callback(
 
2446
            notifier.ShareAccepted,
 
2447
            self.event_callback_handler(self.deliver_share_accepted))
 
2448
        notif.set_event_callback(
 
2449
            notifier.ShareDeclined,
 
2450
            self.event_callback_handler(self.deliver_share_declined))
 
2451
        notif.set_event_callback(
 
2452
            notifier.ShareDeleted,
 
2453
            self.event_callback_handler(self.deliver_share_deleted))
 
2454
        notif.set_event_callback(
 
2455
            notifier.VolumeNewGeneration,
 
2456
            self.event_callback_handler(self.deliver_volume_new_generation))
 
2457
 
 
2458
        self.protocols = []
 
2459
        self.reactor = reactor
 
2460
        self.trace_users = set(config.api_server.trace_users)
 
2461
 
 
2462
        # oops and log observer
 
2463
        self.oops_config = oops_config
 
2464
        twisted.python.log.addObserver(self._deferror_handler)
 
2465
 
 
2466
    def _deferror_handler(self, data):
 
2467
        """Deferred error handler.
 
2468
 
 
2469
        We receive all stuff here, filter the errors and use correct info.
 
2470
        """
 
2471
        if not data.get('isError', None):
 
2472
            return
 
2473
        try:
 
2474
            failure = data['failure']
 
2475
        except KeyError:
 
2476
            msg = data['message']
 
2477
            failure = None
 
2478
        else:
 
2479
            msg = failure.getTraceback()
 
2480
 
 
2481
        # log
 
2482
        self.logger.error("Unhandled error in deferred! %s", msg)
 
2483
 
 
2484
    def build_notification_oops(self, failure, notif, tl=None):
 
2485
        """Create an oops entry to log the notification failure."""
 
2486
        context = {"exc_info": (failure.type, failure.value, failure.tb)}
 
2487
        del failure
 
2488
 
 
2489
        if tl is not None:
 
2490
            context["timeline"] = tl
 
2491
 
 
2492
        report = self.oops_config.create(context)
 
2493
        del context
 
2494
 
 
2495
        return report
 
2496
 
 
2497
    def event_callback_handler(self, func):
 
2498
        """Wrap the event callback in an error handler."""
 
2499
        @wraps(func)
 
2500
        def wrapper(notif, **kwargs):
 
2501
            """The wrapper."""
 
2502
            tl = timeline.Timeline(format_stack=None)
 
2503
            action = tl.start("EVENT-%s" % notif.event_type, "")
 
2504
 
 
2505
            def notification_error_handler(failure):
 
2506
                """Handle error while processing a Notification."""
 
2507
                action.detail = "NOTIFY WITH %r (%s)" % (notif, kwargs)
 
2508
                action.finish()
 
2509
 
 
2510
                oops = self.build_notification_oops(failure, notif, tl)
 
2511
                oops_id = self.oops_config.publish(oops)
 
2512
                self.logger.error(" %s in notification %r logged in OOPS: %s",
 
2513
                                  failure.value, notif, oops_id)
 
2514
 
 
2515
            d = defer.maybeDeferred(func, notif, **kwargs)
 
2516
            d.addErrback(notification_error_handler)
 
2517
            return d
 
2518
        return wrapper
 
2519
 
 
2520
    def s3(self):
 
2521
        """Get an s3lib instance to do s3 operations."""
 
2522
        return self.s3_class(self.s3_host, self.s3_port, self.s3_key,
 
2523
                             self.s3_secret, use_ssl=self.s3_ssl,
 
2524
                             proxy_host=self.s3_proxy_host,
 
2525
                             proxy_port=self.s3_proxy_port)
 
2526
 
 
2527
    @inlineCallbacks
 
2528
    def deliver_udf_create(self, udf_create):
 
2529
        """Handle UDF creation notification."""
 
2530
 
 
2531
        def notif_filter(protocol):
 
2532
            """Return True if the client should receive the notification."""
 
2533
            return protocol.session_id != udf_create.source_session
 
2534
 
 
2535
        user = yield self.content.get_user_by_id(udf_create.owner_id)
 
2536
        if user:  # connected instances for this user?
 
2537
            resp = protocol_pb2.Message()
 
2538
            resp.type = protocol_pb2.Message.VOLUME_CREATED
 
2539
            resp.volume_created.type = protocol_pb2.Volumes.UDF
 
2540
            resp.volume_created.udf.volume = str(udf_create.udf_id)
 
2541
            resp.volume_created.udf.node = str(udf_create.root_id)
 
2542
            resp.volume_created.udf.suggested_path = udf_create.suggested_path
 
2543
            user.broadcast(resp, filter=notif_filter)
 
2544
 
 
2545
    @inlineCallbacks
 
2546
    def deliver_share_created(self, share_notif):
 
2547
        """Handle Share creation notification."""
 
2548
 
 
2549
        def notif_filter(protocol):
 
2550
            """Return True if the client should receive the notification."""
 
2551
            return protocol.session_id != share_notif.source_session
 
2552
 
 
2553
        to_user = yield self.content.get_user_by_id(share_notif.shared_to_id)
 
2554
        if to_user is None:
 
2555
            return
 
2556
        share_resp = sharersp.NotifyShareHolder.from_params(
 
2557
            share_notif.share_id, share_notif.root_id, share_notif.name,
 
2558
            to_user.username, to_user.visible_name, share_notif.access)
 
2559
        proto_msg = protocol_pb2.Message()
 
2560
        proto_msg.type = protocol_pb2.Message.NOTIFY_SHARE
 
2561
        share_resp.dump_to_msg(proto_msg.notify_share)
 
2562
        to_user.broadcast(proto_msg, filter=notif_filter)
 
2563
 
 
2564
    @inlineCallbacks
 
2565
    def deliver_share_accepted(self, share_notif, recipient_id):
 
2566
        """Handle Share accepted notification."""
 
2567
 
 
2568
        def notif_filter(protocol):
 
2569
            """Return True if the client should receive the notification."""
 
2570
            return protocol.session_id != share_notif.source_session
 
2571
 
 
2572
        def to_notif_filter(protocol):
 
2573
            """Return True if the client should recieve the notification."""
 
2574
            return protocol.session_id != share_notif.source_session
 
2575
 
 
2576
        by_user = yield self.content.get_user_by_id(share_notif.shared_by_id)
 
2577
        to_user = yield self.content.get_user_by_id(share_notif.shared_to_id)
 
2578
 
 
2579
        if by_user and recipient_id == by_user.id:
 
2580
            proto_msg = protocol_pb2.Message()
 
2581
            proto_msg.type = protocol_pb2.Message.SHARE_ACCEPTED
 
2582
            proto_msg.share_accepted.share_id = str(share_notif.share_id)
 
2583
            proto_msg.share_accepted.answer = protocol_pb2.ShareAccepted.YES
 
2584
            by_user.broadcast(proto_msg, filter=notif_filter)
 
2585
        if to_user and recipient_id == to_user.id:
 
2586
            if by_user is None:
 
2587
                by_user = yield self.content.get_user_by_id(
 
2588
                    share_notif.shared_by_id, required=True)
 
2589
            share_resp = sharersp.ShareResponse.from_params(
 
2590
                str(share_notif.share_id), "to_me", share_notif.root_id,
 
2591
                share_notif.name, by_user.username or u"",
 
2592
                by_user.visible_name or u"", protocol_pb2.ShareAccepted.YES,
 
2593
                share_notif.access)
 
2594
            resp = protocol_pb2.Message()
 
2595
            resp.type = protocol_pb2.Message.VOLUME_CREATED
 
2596
            resp.volume_created.type = protocol_pb2.Volumes.SHARE
 
2597
            share_resp.dump_to_msg(resp.volume_created.share)
 
2598
            to_user.broadcast(resp, filter=to_notif_filter)
 
2599
 
 
2600
    @inlineCallbacks
 
2601
    def deliver_share_declined(self, share_notif):
 
2602
        """Handle Share declined notification."""
 
2603
 
 
2604
        def notif_filter(protocol):
 
2605
            """Return True if the client should receive the notification."""
 
2606
            return protocol.session_id != share_notif.source_session
 
2607
 
 
2608
        by_user = yield self.content.get_user_by_id(share_notif.shared_by_id)
 
2609
        if by_user:
 
2610
            proto_msg = protocol_pb2.Message()
 
2611
            proto_msg.type = protocol_pb2.Message.SHARE_ACCEPTED
 
2612
            proto_msg.share_accepted.share_id = str(share_notif.share_id)
 
2613
            proto_msg.share_accepted.answer = protocol_pb2.ShareAccepted.NO
 
2614
            by_user.broadcast(proto_msg, filter=notif_filter)
 
2615
 
 
2616
    @inlineCallbacks
 
2617
    def deliver_share_deleted(self, share_notif):
 
2618
        """Handle Share deletion notification."""
 
2619
        def notif_filter(protocol):
 
2620
            """Return True if the client should receive the notification."""
 
2621
            return protocol.session_id != share_notif.source_session
 
2622
 
 
2623
        to_user = yield self.content.get_user_by_id(share_notif.shared_to_id)
 
2624
        #by_user = yield self.content.get_user_by_id(share_notif.shared_by_id)
 
2625
        if to_user is None:
 
2626
            return
 
2627
        proto_msg = protocol_pb2.Message()
 
2628
        proto_msg.type = protocol_pb2.Message.SHARE_DELETED
 
2629
        proto_msg.share_deleted.share_id = str(share_notif.share_id)
 
2630
        if to_user:
 
2631
            to_user.broadcast(proto_msg, filter=notif_filter)
 
2632
        #if by_user:
 
2633
        #    by_user.broadcast(proto_msg, filter=notif_filter)
 
2634
 
 
2635
    @inlineCallbacks
 
2636
    def deliver_udf_delete(self, udf_delete):
 
2637
        """Handle UDF deletion notification."""
 
2638
 
 
2639
        def notif_filter(protocol):
 
2640
            """Return True if the client should receive the notification."""
 
2641
            return protocol.session_id != udf_delete.source_session
 
2642
 
 
2643
        user = yield self.content.get_user_by_id(udf_delete.owner_id)
 
2644
        if user:  # connected instances for this user?
 
2645
            resp = protocol_pb2.Message()
 
2646
            resp.type = protocol_pb2.Message.VOLUME_DELETED
 
2647
            resp.volume_deleted.volume = str(udf_delete.udf_id)
 
2648
            user.broadcast(resp, filter=notif_filter)
 
2649
 
 
2650
    @inlineCallbacks
 
2651
    def deliver_volume_new_generation(self, notif):
 
2652
        """Handle the notification of a new generation for a volume."""
 
2653
 
 
2654
        def notif_filter(protocol):
 
2655
            """Return True if the client should receive the notification."""
 
2656
            return protocol.session_id != notif.source_session
 
2657
 
 
2658
        user = yield self.content.get_user_by_id(notif.user_id)
 
2659
        if user:  # connected instances for this user?
 
2660
            resp = protocol_pb2.Message()
 
2661
            resp.type = protocol_pb2.Message.VOLUME_NEW_GENERATION
 
2662
            if notif.client_volume_id is None:
 
2663
                resp.volume_new_generation.volume = ''
 
2664
            else:
 
2665
                resp.volume_new_generation.volume = str(notif.client_volume_id)
 
2666
            resp.volume_new_generation.generation = notif.new_generation
 
2667
            user.broadcast(resp, filter=notif_filter)
 
2668
 
 
2669
    def wait_for_shutdown(self):
 
2670
        """Wait until all the current servers have finished serving."""
 
2671
        if not self.protocols or not self.graceful_shutdown:
 
2672
            return defer.succeed(None)
 
2673
 
 
2674
        wait_d = [protocol.wait_for_shutdown() for protocol in self.protocols]
 
2675
        done_with_current = self.wait_for_current_shutdown()
 
2676
        d = defer.DeferredList(wait_d + [done_with_current])
 
2677
        return d
 
2678
 
 
2679
    def wait_for_current_shutdown(self):
 
2680
        """wait for the current protocols to end."""
 
2681
        d = defer.Deferred()
 
2682
 
 
2683
        def _wait():
 
2684
            "do the waiting"
 
2685
            if self.protocols:
 
2686
                self.reactor.callLater(0.1, _wait)
 
2687
            else:
 
2688
                d.callback(True)
 
2689
        _wait()
 
2690
        return d
 
2691
 
 
2692
 
 
2693
class OrderedMultiService(MultiService):
 
2694
    """A container which starts and shuts down the services in strict order."""
 
2695
 
 
2696
    def startService(self):
 
2697
        """Start the services sequentually.  Returns a deferred which
 
2698
        signals completion.
 
2699
        """
 
2700
        # deliberately skip MultiService's implementation
 
2701
        Service.startService(self)
 
2702
        d = defer.succeed(None)
 
2703
        for service in self:
 
2704
            d.addCallback(lambda _, s: s.startService(), service)
 
2705
        return d
 
2706
 
 
2707
    def stopService(self):
 
2708
        """Stop the services sequentially (in the opposite order they
 
2709
        are started).  Returns a deferred which signals completion.
 
2710
        """
 
2711
        # deliberately skip MultiService's implementation
 
2712
        Service.stopService(self)
 
2713
        d = defer.succeed(None)
 
2714
        services = list(self)
 
2715
        services.reverse()
 
2716
        for service in services:
 
2717
            d.addBoth(lambda _, s: s.stopService(), service)
 
2718
        return d
 
2719
 
 
2720
 
 
2721
def get_service_port(service):
 
2722
    """Returns the actual port a simple service is bound to."""
 
2723
    # we have to query this rather than simply using the port number the
 
2724
    # service was passed at creation time -- if the given port was 0,
 
2725
    # the real port number will have been dynamically assigned
 
2726
    return service._port.getHost().port
 
2727
 
 
2728
 
 
2729
class StorageServerService(OrderedMultiService):
 
2730
    """Wrap the whole TCP StorageServer mess as a single twisted serv."""
 
2731
 
 
2732
    def __init__(self, port, s3_host, s3_port, s3_ssl, s3_key, s3_secret,
 
2733
                 s3_proxy_host=None, s3_proxy_port=None,
 
2734
                 auth_provider_class=None,
 
2735
                 oops_config=None, status_port=0, heartbeat_interval=None):
 
2736
        """Create a StorageServerService.
 
2737
 
 
2738
        @param port: the port to listen on without ssl.
 
2739
        @param s3_host: the S3 server hostname.
 
2740
        @param s3_port: the S3 server port to connect to.
 
2741
        @param s3_key: the S3 key.
 
2742
        @param s3_secret: the S3 secret.
 
2743
        @param auth_provider_class: the authentication provider.
 
2744
        """
 
2745
        OrderedMultiService.__init__(self)
 
2746
        self.heartbeat_writer = None
 
2747
        if heartbeat_interval is None:
 
2748
            heartbeat_interval = float(config.api_server.heartbeat_interval)
 
2749
        self.heartbeat_interval = heartbeat_interval
 
2750
        self.rpcdal_client = None
 
2751
        self.rpcauth_client = None
 
2752
        self.logger = logging.getLogger("storage.server")
 
2753
        self.servername = config.api_server.servername
 
2754
        self.logger.info("Starting %s", self.servername)
 
2755
        self.logger.info(
 
2756
            "protocol buffers implementation: %s",
 
2757
            os.environ.get("PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION", "None"))
 
2758
 
 
2759
        namespace = config.api_server.metrics_namespace
 
2760
        # Register all server metrics components
 
2761
        MetricsConnector.register_metrics("root", namespace)
 
2762
        # Important:  User activity is in a global namespace!
 
2763
        environment = config.general.environment_name
 
2764
        user_namespace = environment + ".storage.user_activity"
 
2765
        MetricsConnector.register_metrics("user", user_namespace)
 
2766
        MetricsConnector.register_metrics("reactor_inspector",
 
2767
                                          namespace + ".reactor_inspector")
 
2768
        sli_metric_namespace = config.api_server.sli_metric_namespace
 
2769
        MetricsConnector.register_metrics('sli', sli_metric_namespace)
 
2770
 
 
2771
        self.metrics = get_meter(scope='service')
 
2772
 
 
2773
        listeners = MultiService()
 
2774
        listeners.setName("Listeners")
 
2775
        listeners.setServiceParent(self)
 
2776
 
 
2777
        self._reactor_inspector = ReactorInspector(self.logger,
 
2778
                                                   reactor.callFromThread)
 
2779
 
 
2780
        self.factory = StorageServerFactory(
 
2781
            s3_host, s3_port, s3_ssl, s3_key, s3_secret,
 
2782
            s3_proxy_host=s3_proxy_host, s3_proxy_port=s3_proxy_port,
 
2783
            auth_provider_class=auth_provider_class,
 
2784
            oops_config=oops_config, servername=self.servername,
 
2785
            reactor_inspector=self._reactor_inspector)
 
2786
 
 
2787
        self.tcp_service = TCPServer(port, self.factory)
 
2788
        self.tcp_service.setName("TCP")
 
2789
        self.tcp_service.setServiceParent(listeners)
 
2790
 
 
2791
        # setup the status service
 
2792
        self.status_service = stats.create_status_service(
 
2793
            self, listeners, status_port)
 
2794
 
 
2795
        self.next_log_loop = None
 
2796
        stats_log_interval = config.api_server.stats_log_interval
 
2797
        self.stats_worker = stats.StatsWorker(self, stats_log_interval,
 
2798
                                              self.servername)
 
2799
 
 
2800
    @property
 
2801
    def port(self):
 
2802
        """The port without ssl."""
 
2803
        return get_service_port(self.tcp_service)
 
2804
 
 
2805
    @property
 
2806
    def status_port(self):
 
2807
        """The status service port."""
 
2808
        return get_service_port(self.status_service)
 
2809
 
 
2810
    def start_rpc_client(self):
 
2811
        """Setup the rpc client."""
 
2812
        self.logger.info("Starting the RPC clients.")
 
2813
        self.rpcdal_client = inthread.ThreadedNonRPC(inthread.DAL_BACKEND)
 
2814
        self.rpcauth_client = inthread.ThreadedNonRPC(inthread.AUTH_BACKEND)
 
2815
 
 
2816
    @inlineCallbacks
 
2817
    def startService(self):
 
2818
        """Start listening on two ports."""
 
2819
        self.logger.info("- - - - - SERVER STARTING")
 
2820
        yield OrderedMultiService.startService(self)
 
2821
        yield defer.maybeDeferred(self.start_rpc_client)
 
2822
        self.factory.content.rpcdal_client = self.rpcdal_client
 
2823
        self.factory.rpcauth_client = self.rpcauth_client
 
2824
        self.stats_worker.start()
 
2825
        self.metrics.meter('server_start')
 
2826
        self.metrics.increment('services_active')
 
2827
        metrics.services.revno()
 
2828
 
 
2829
        self._reactor_inspector.start()
 
2830
        # only start the HeartbeatWriter if the interval is > 0
 
2831
        if self.heartbeat_interval > 0:
 
2832
            self.heartbeat_writer = stdio.StandardIO(
 
2833
                supervisor_utils.HeartbeatWriter(self.heartbeat_interval,
 
2834
                                                 self.logger))
 
2835
 
 
2836
    @inlineCallbacks
 
2837
    def stopService(self):
 
2838
        """Stop listening on both ports."""
 
2839
        self.logger.info("- - - - - SERVER STOPPING")
 
2840
        yield self.stats_worker.stop()
 
2841
        yield OrderedMultiService.stopService(self)
 
2842
        yield self.factory.wait_for_shutdown()
 
2843
        self.metrics.meter('server_stop')
 
2844
        self.metrics.decrement('services_active')
 
2845
        if self.metrics.connection:
 
2846
            self.metrics.connection.disconnect()
 
2847
        if self.factory.metrics.connection:
 
2848
            self.factory.metrics.connection.disconnect()
 
2849
        if self.factory.user_metrics.connection:
 
2850
            self.factory.user_metrics.connection.disconnect()
 
2851
        self._reactor_inspector.stop()
 
2852
        if self.heartbeat_writer:
 
2853
            self.heartbeat_writer.loseConnection()
 
2854
            self.heartbeat_writer = None
 
2855
        for metrics_key in ["reactor_inspector", "sli"]:
 
2856
            metrics = MetricsConnector.get_metrics(metrics_key)
 
2857
            if metrics.connection:
 
2858
                metrics.connection.disconnect()
 
2859
        self.logger.info("- - - - - SERVER STOPPED")
 
2860
 
 
2861
 
 
2862
def create_service(s3_host, s3_port, s3_ssl, s3_key, s3_secret,
 
2863
                   s3_proxy_host=None, s3_proxy_port=None,
 
2864
                   status_port=None,
 
2865
                   auth_provider_class=auth.DummyAuthProvider,
 
2866
                   oops_config=None):
 
2867
    """Start the StorageServer service."""
 
2868
 
 
2869
    # configure logs
 
2870
    logger = logging.getLogger(config.api_server.logger_name)
 
2871
    handler = configure_logger(logger=logger, propagate=False,
 
2872
                               filename=config.api_server.log_filename,
 
2873
                               start_observer=True)
 
2874
 
 
2875
    # set up s3
 
2876
    s3_logger = logging.getLogger('s3lib')
 
2877
    s3_logger.setLevel(config.general.log_level)
 
2878
    s3_logger.addHandler(handler)
 
2879
 
 
2880
    # set up the hacker's logger always in TRACE
 
2881
    h_logger = logging.getLogger(config.api_server.logger_name + ".hackers")
 
2882
    h_logger.setLevel(TRACE)
 
2883
    h_logger.propagate = False
 
2884
    h_logger.addHandler(handler)
 
2885
 
 
2886
    logger.debug('S3 host:%s port:%s', s3_host, s3_port)
 
2887
 
 
2888
    # turn on heapy if must to
 
2889
    if os.getenv('USE_HEAPY'):
 
2890
        logger.debug('importing heapy')
 
2891
        try:
 
2892
            import guppy.heapy.RM
 
2893
        except ImportError:
 
2894
            logger.warning('guppy-pe/heapy not available, remote monitor '
 
2895
                           'thread not started')
 
2896
        else:
 
2897
            guppy.heapy.RM.on()
 
2898
            logger.debug('activated heapy remote monitor')
 
2899
 
 
2900
    # set GC's debug
 
2901
    if config.api_server.gc_debug:
 
2902
        import gc
 
2903
        gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
 
2904
        logger.debug("set gc debug on")
 
2905
 
 
2906
    if status_port is None:
 
2907
        status_port = config.api_server.status_port
 
2908
 
 
2909
    # create the service
 
2910
    service = StorageServerService(
 
2911
        config.api_server.tcp_port, s3_host, s3_port, s3_ssl, s3_key,
 
2912
        s3_secret, s3_proxy_host, s3_proxy_port, auth_provider_class,
 
2913
        oops_config, status_port)
 
2914
    return service