~ubuntu-branches/ubuntu/trusty/swift/trusty

« back to all changes in this revision

Viewing changes to swift/container/server.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Chuck Short
  • Date: 2013-08-13 10:37:13 UTC
  • mfrom: (1.2.21)
  • Revision ID: package-import@ubuntu.com-20130813103713-1ctbx4zifyljs2aq
Tags: 1.9.1-0ubuntu1
[ James Page ]
* d/control: Update VCS fields for new branch locations.

[ Chuck Short ]
* New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import os
19
19
import time
20
20
import traceback
21
 
from xml.sax import saxutils
22
21
from datetime import datetime
 
22
from gettext import gettext as _
 
23
from xml.etree.cElementTree import Element, SubElement, tostring
23
24
 
24
25
from eventlet import Timeout
25
26
 
26
27
import swift.common.db
27
28
from swift.common.db import ContainerBroker
28
 
from swift.common.utils import get_logger, get_param, hash_path, public, \
 
29
from swift.common.request_helpers import get_param
 
30
from swift.common.utils import get_logger, hash_path, public, \
29
31
    normalize_timestamp, storage_directory, validate_sync_to, \
30
 
    config_true_value, validate_device_partition, json, timing_stats
 
32
    config_true_value, validate_device_partition, json, timing_stats, \
 
33
    replication, parse_content_type
31
34
from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
32
35
    check_mount, check_float, check_utf8, FORMAT2CONTENT_TYPE
33
36
from swift.common.bufferedhttp import http_connect
37
40
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
38
41
    HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
39
42
    HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \
40
 
    HTTPInsufficientStorage, HTTPNotAcceptable, HeaderKeyDict
 
43
    HTTPInsufficientStorage, HTTPNotAcceptable, HTTPException, HeaderKeyDict
41
44
 
42
45
DATADIR = 'containers'
43
46
 
56
59
        self.node_timeout = int(conf.get('node_timeout', 3))
57
60
        self.conn_timeout = float(conf.get('conn_timeout', 0.5))
58
61
        replication_server = conf.get('replication_server', None)
59
 
        if replication_server is None:
60
 
            allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
61
 
                               'POST']
62
 
        else:
 
62
        if replication_server is not None:
63
63
            replication_server = config_true_value(replication_server)
64
 
            if replication_server:
65
 
                allowed_methods = ['REPLICATE']
66
 
            else:
67
 
                allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
68
64
        self.replication_server = replication_server
69
 
        self.allowed_methods = allowed_methods
70
65
        self.allowed_sync_hosts = [
71
66
            h.strip()
72
67
            for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
81
76
        swift.common.db.DB_PREALLOCATION = \
82
77
            config_true_value(conf.get('db_preallocation', 'f'))
83
78
 
84
 
    def _get_container_broker(self, drive, part, account, container):
 
79
    def _get_container_broker(self, drive, part, account, container, **kwargs):
85
80
        """
86
81
        Get a DB broker for the container.
87
82
 
94
89
        hsh = hash_path(account, container)
95
90
        db_dir = storage_directory(DATADIR, part, hsh)
96
91
        db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
97
 
        return ContainerBroker(db_path, account=account, container=container,
98
 
                               logger=self.logger)
 
92
        kwargs.setdefault('account', account)
 
93
        kwargs.setdefault('container', container)
 
94
        kwargs.setdefault('logger', self.logger)
 
95
        return ContainerBroker(db_path, **kwargs)
99
96
 
100
97
    def account_update(self, req, account, container, broker):
101
98
        """
303
300
        except ValueError, err:
304
301
            return HTTPBadRequest(body=str(err), content_type='text/plain',
305
302
                                  request=req)
306
 
        try:
307
 
            query_format = get_param(req, 'format')
308
 
        except UnicodeDecodeError:
309
 
            return HTTPBadRequest(body='parameters not utf8',
310
 
                                  content_type='text/plain', request=req)
 
303
        query_format = get_param(req, 'format')
311
304
        if query_format:
312
305
            req.accept = FORMAT2CONTENT_TYPE.get(
313
306
                query_format.lower(), FORMAT2CONTENT_TYPE['plain'])
317
310
            return HTTPNotAcceptable(request=req)
318
311
        if self.mount_check and not check_mount(self.root, drive):
319
312
            return HTTPInsufficientStorage(drive=drive, request=req)
320
 
        broker = self._get_container_broker(drive, part, account, container)
321
 
        broker.pending_timeout = 0.1
322
 
        broker.stale_reads_ok = True
 
313
        broker = self._get_container_broker(drive, part, account, container,
 
314
                                            pending_timeout=0.1,
 
315
                                            stale_reads_ok=True)
323
316
        if broker.is_deleted():
324
317
            return HTTPNotFound(request=req)
325
318
        info = broker.get_info()
337
330
        headers['Content-Type'] = out_content_type
338
331
        return HTTPNoContent(request=req, headers=headers, charset='utf-8')
339
332
 
340
 
    def derive_content_type_metadata(self, content_type, size):
341
 
        """
342
 
        Will check the last parameter and if it starts with 'swift_bytes=' will
343
 
        strip it off. Returns either the passed in content_type and size
344
 
        or the content_type without the swift_bytes param and its value as
345
 
        the new size.
346
 
        :params content_type: Content Type from db
347
 
        :params size: # bytes from db, an int
348
 
        :returns: tuple: content_type, size
349
 
        """
350
 
        if ';' in content_type:
351
 
            new_content_type, param = content_type.rsplit(';', 1)
352
 
            if param.lstrip().startswith('swift_bytes='):
353
 
                key, value = param.split('=')
 
333
    def update_data_record(self, record):
 
334
        """
 
335
        Perform any mutations to container listing records that are common to
 
336
        all serialization formats, and returns it as a dict.
 
337
 
 
338
        Converts created time to iso timestamp.
 
339
        Replaces size with 'swift_bytes' content type parameter.
 
340
 
 
341
        :params record: object entry record
 
342
        :returns: modified record
 
343
        """
 
344
        (name, created, size, content_type, etag) = record
 
345
        if content_type is None:
 
346
            return {'subdir': name}
 
347
        response = {'bytes': size, 'hash': etag, 'name': name}
 
348
        last_modified = datetime.utcfromtimestamp(float(created)).isoformat()
 
349
        # python isoformat() doesn't include msecs when zero
 
350
        if len(last_modified) < len("1970-01-01T00:00:00.000000"):
 
351
            last_modified += ".000000"
 
352
        response['last_modified'] = last_modified + 'Z'
 
353
        content_type, params = parse_content_type(content_type)
 
354
        for key, value in params:
 
355
            if key == 'swift_bytes':
354
356
                try:
355
 
                    return new_content_type, int(value)
 
357
                    response['bytes'] = int(value)
356
358
                except ValueError:
357
359
                    self.logger.exception("Invalid swift_bytes")
358
 
        return content_type, size
 
360
            else:
 
361
                content_type += ';%s=%s' % (key, value)
 
362
        response['content_type'] = content_type
 
363
        return response
359
364
 
360
365
    @public
361
366
    @timing_stats()
367
372
        except ValueError, err:
368
373
            return HTTPBadRequest(body=str(err), content_type='text/plain',
369
374
                                  request=req)
370
 
        try:
371
 
            path = get_param(req, 'path')
372
 
            prefix = get_param(req, 'prefix')
373
 
            delimiter = get_param(req, 'delimiter')
374
 
            if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
375
 
                # delimiters can be made more flexible later
376
 
                return HTTPPreconditionFailed(body='Bad delimiter')
377
 
            marker = get_param(req, 'marker', '')
378
 
            end_marker = get_param(req, 'end_marker')
379
 
            limit = CONTAINER_LISTING_LIMIT
380
 
            given_limit = get_param(req, 'limit')
381
 
            if given_limit and given_limit.isdigit():
382
 
                limit = int(given_limit)
383
 
                if limit > CONTAINER_LISTING_LIMIT:
384
 
                    return HTTPPreconditionFailed(
385
 
                        request=req,
386
 
                        body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT)
387
 
            query_format = get_param(req, 'format')
388
 
        except UnicodeDecodeError, err:
389
 
            return HTTPBadRequest(body='parameters not utf8',
390
 
                                  content_type='text/plain', request=req)
 
375
        path = get_param(req, 'path')
 
376
        prefix = get_param(req, 'prefix')
 
377
        delimiter = get_param(req, 'delimiter')
 
378
        if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
 
379
            # delimiters can be made more flexible later
 
380
            return HTTPPreconditionFailed(body='Bad delimiter')
 
381
        marker = get_param(req, 'marker', '')
 
382
        end_marker = get_param(req, 'end_marker')
 
383
        limit = CONTAINER_LISTING_LIMIT
 
384
        given_limit = get_param(req, 'limit')
 
385
        if given_limit and given_limit.isdigit():
 
386
            limit = int(given_limit)
 
387
            if limit > CONTAINER_LISTING_LIMIT:
 
388
                return HTTPPreconditionFailed(
 
389
                    request=req,
 
390
                    body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT)
 
391
        query_format = get_param(req, 'format')
391
392
        if query_format:
392
393
            req.accept = FORMAT2CONTENT_TYPE.get(query_format.lower(),
393
394
                                                 FORMAT2CONTENT_TYPE['plain'])
397
398
            return HTTPNotAcceptable(request=req)
398
399
        if self.mount_check and not check_mount(self.root, drive):
399
400
            return HTTPInsufficientStorage(drive=drive, request=req)
400
 
        broker = self._get_container_broker(drive, part, account, container)
401
 
        broker.pending_timeout = 0.1
402
 
        broker.stale_reads_ok = True
 
401
        broker = self._get_container_broker(drive, part, account, container,
 
402
                                            pending_timeout=0.1,
 
403
                                            stale_reads_ok=True)
403
404
        if broker.is_deleted():
404
405
            return HTTPNotFound(request=req)
405
406
        info = broker.get_info()
409
410
            'X-Timestamp': info['created_at'],
410
411
            'X-PUT-Timestamp': info['put_timestamp'],
411
412
        }
412
 
        resp_headers.update(
413
 
            (key, value)
414
 
            for key, (value, timestamp) in broker.metadata.iteritems()
415
 
            if value != '' and (key.lower() in self.save_headers or
416
 
                                key.lower().startswith('x-container-meta-')))
 
413
        for key, (value, timestamp) in broker.metadata.iteritems():
 
414
            if value and (key.lower() in self.save_headers or
 
415
                          key.lower().startswith('x-container-meta-')):
 
416
                resp_headers[key] = value
 
417
        ret = Response(request=req, headers=resp_headers,
 
418
                       content_type=out_content_type, charset='utf-8')
417
419
        container_list = broker.list_objects_iter(limit, marker, end_marker,
418
420
                                                  prefix, delimiter, path)
419
421
        if out_content_type == 'application/json':
420
 
            data = []
421
 
            for (name, created_at, size, content_type, etag) in container_list:
422
 
                if content_type is None:
423
 
                    data.append({"subdir": name})
424
 
                else:
425
 
                    created_at = datetime.utcfromtimestamp(
426
 
                        float(created_at)).isoformat()
427
 
                    # python isoformat() doesn't include msecs when zero
428
 
                    if len(created_at) < len("1970-01-01T00:00:00.000000"):
429
 
                        created_at += ".000000"
430
 
                    content_type, size = self.derive_content_type_metadata(
431
 
                        content_type, size)
432
 
                    data.append({'last_modified': created_at, 'bytes': size,
433
 
                                'content_type': content_type, 'hash': etag,
434
 
                                'name': name})
435
 
            container_list = json.dumps(data)
 
422
            ret.body = json.dumps([self.update_data_record(record)
 
423
                                   for record in container_list])
436
424
        elif out_content_type.endswith('/xml'):
437
 
            xml_output = []
438
 
            for (name, created_at, size, content_type, etag) in container_list:
439
 
                created_at = datetime.utcfromtimestamp(
440
 
                    float(created_at)).isoformat()
441
 
                # python isoformat() doesn't include msecs when zero
442
 
                if len(created_at) < len("1970-01-01T00:00:00.000000"):
443
 
                    created_at += ".000000"
444
 
                if content_type is None:
445
 
                    xml_output.append(
446
 
                        '<subdir name=%s><name>%s</name></subdir>' %
447
 
                        (saxutils.quoteattr(name), saxutils.escape(name)))
 
425
            doc = Element('container', name=container.decode('utf-8'))
 
426
            for obj in container_list:
 
427
                record = self.update_data_record(obj)
 
428
                if 'subdir' in record:
 
429
                    name = record['subdir'].decode('utf-8')
 
430
                    sub = SubElement(doc, 'subdir', name=name)
 
431
                    SubElement(sub, 'name').text = name
448
432
                else:
449
 
                    content_type, size = self.derive_content_type_metadata(
450
 
                        content_type, size)
451
 
                    content_type = saxutils.escape(content_type)
452
 
                    xml_output.append(
453
 
                        '<object><name>%s</name><hash>%s</hash>'
454
 
                        '<bytes>%d</bytes><content_type>%s</content_type>'
455
 
                        '<last_modified>%s</last_modified></object>' %
456
 
                        (saxutils.escape(name), etag, size, content_type,
457
 
                         created_at))
458
 
            container_list = ''.join([
459
 
                '<?xml version="1.0" encoding="UTF-8"?>\n',
460
 
                '<container name=%s>' % saxutils.quoteattr(container),
461
 
                ''.join(xml_output), '</container>'])
 
433
                    obj_element = SubElement(doc, 'object')
 
434
                    for field in ["name", "hash", "bytes", "content_type",
 
435
                                  "last_modified"]:
 
436
                        SubElement(obj_element, field).text = str(
 
437
                            record.pop(field)).decode('utf-8')
 
438
                    for field in sorted(record.keys()):
 
439
                        SubElement(obj_element, field).text = str(
 
440
                            record[field]).decode('utf-8')
 
441
            ret.body = tostring(doc, encoding='UTF-8')
462
442
        else:
463
443
            if not container_list:
464
444
                return HTTPNoContent(request=req, headers=resp_headers)
465
 
            container_list = '\n'.join(r[0] for r in container_list) + '\n'
466
 
        ret = Response(body=container_list, request=req, headers=resp_headers)
467
 
        ret.content_type = out_content_type
468
 
        ret.charset = 'utf-8'
 
445
            ret.body = '\n'.join(rec[0] for rec in container_list) + '\n'
469
446
        return ret
470
447
 
471
448
    @public
 
449
    @replication
472
450
    @timing_stats(sample_rate=0.01)
473
451
    def REPLICATE(self, req):
474
452
        """
542
520
                try:
543
521
                    method = getattr(self, req.method)
544
522
                    getattr(method, 'publicly_accessible')
545
 
                    if req.method not in self.allowed_methods:
 
523
                    replication_method = getattr(method, 'replication', False)
 
524
                    if (self.replication_server is not None and
 
525
                            self.replication_server != replication_method):
546
526
                        raise AttributeError('Not allowed method.')
547
527
                except AttributeError:
548
528
                    res = HTTPMethodNotAllowed()
549
529
                else:
550
530
                    res = method(req)
 
531
            except HTTPException as error_response:
 
532
                res = error_response
551
533
            except (Exception, Timeout):
552
534
                self.logger.exception(_(
553
535
                    'ERROR __call__ error with %(method)s %(path)s '),