~ubuntu-branches/ubuntu/trusty/swift/trusty-updates

« back to all changes in this revision

Viewing changes to swift/proxy/controllers/base.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Soren Hansen, Chuck Short
  • Date: 2012-09-07 19:02:36 UTC
  • mfrom: (1.2.12)
  • Revision ID: package-import@ubuntu.com-20120907190236-fqrmbzm7v6zivs8d
Tags: 1.7.0-0ubuntu1
[ Soren Hansen ]
* Update debian/watch to account for symbolically named tarballs and
  use newer URL.
* Run unit tests at build time.
* Fix Launchpad URLs in debian/watch.

[ Chuck Short ]
* New upstream release
* debian/control: Add pubthon-moc as a build dep
* debian/rules: Dont fail if testsuite fails.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2010-2012 OpenStack, LLC.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#    http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.
 
13
# See the License for the specific language governing permissions and
 
14
# limitations under the License.
 
15
 
 
16
# NOTE: swift_conn
 
17
# You'll see swift_conn passed around a few places in this file. This is the
 
18
# source httplib connection of whatever it is attached to.
 
19
#   It is used when early termination of reading from the connection should
 
20
# happen, such as when a range request is satisfied but there's still more the
 
21
# source connection would like to send. To prevent having to read all the data
 
22
# that could be left, the source connection can be .close() and then reads
 
23
# commence to empty out any buffers.
 
24
#   These shenanigans are to ensure all related objects can be garbage
 
25
# collected. We've seen objects hang around forever otherwise.
 
26
 
 
27
import time
 
28
import functools
 
29
 
 
30
from eventlet import spawn_n, GreenPile, Timeout
 
31
from eventlet.queue import Queue, Empty, Full
 
32
from eventlet.timeout import Timeout
 
33
from webob.exc import status_map
 
34
from webob import Request, Response
 
35
 
 
36
from swift.common.utils import normalize_timestamp, TRUE_VALUES, public
 
37
from swift.common.bufferedhttp import http_connect
 
38
from swift.common.constraints import MAX_ACCOUNT_NAME_LENGTH
 
39
from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout
 
40
from swift.common.http import is_informational, is_success, is_redirection, \
 
41
    is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
 
42
    HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
 
43
    HTTP_INSUFFICIENT_STORAGE
 
44
 
 
45
 
 
46
def update_headers(response, headers):
 
47
    """
 
48
    Helper function to update headers in the response.
 
49
 
 
50
    :param response: webob.Response object
 
51
    :param headers: dictionary headers
 
52
    """
 
53
    if hasattr(headers, 'items'):
 
54
        headers = headers.items()
 
55
    for name, value in headers:
 
56
        if name == 'etag':
 
57
            response.headers[name] = value.replace('"', '')
 
58
        elif name not in ('date', 'content-length', 'content-type',
 
59
                          'connection', 'x-put-timestamp', 'x-delete-after'):
 
60
            response.headers[name] = value
 
61
 
 
62
 
 
63
def delay_denial(func):
 
64
    """
 
65
    Decorator to declare which methods should have any swift.authorize call
 
66
    delayed. This is so the method can load the Request object up with
 
67
    additional information that may be needed by the authorization system.
 
68
 
 
69
    :param func: function for which authorization will be delayed
 
70
    """
 
71
    func.delay_denial = True
 
72
 
 
73
    @functools.wraps(func)
 
74
    def wrapped(*a, **kw):
 
75
        return func(*a, **kw)
 
76
    return wrapped
 
77
 
 
78
 
 
79
def get_account_memcache_key(account):
 
80
    return 'account/%s' % account
 
81
 
 
82
 
 
83
def get_container_memcache_key(account, container):
 
84
    return 'container/%s/%s' % (account, container)
 
85
 
 
86
 
 
87
class Controller(object):
 
88
    """Base WSGI controller class for the proxy"""
 
89
    server_type = 'Base'
 
90
 
 
91
    # Ensure these are all lowercase
 
92
    pass_through_headers = []
 
93
 
 
94
    def __init__(self, app):
 
95
        self.account_name = None
 
96
        self.app = app
 
97
        self.trans_id = '-'
 
98
 
 
99
    def transfer_headers(self, src_headers, dst_headers):
 
100
        x_remove = 'x-remove-%s-meta-' % self.server_type.lower()
 
101
        x_meta = 'x-%s-meta-' % self.server_type.lower()
 
102
        dst_headers.update((k.lower().replace('-remove', '', 1), '')
 
103
                           for k in src_headers
 
104
                           if k.lower().startswith(x_remove))
 
105
        dst_headers.update((k.lower(), v)
 
106
                           for k, v in src_headers.iteritems()
 
107
                           if k.lower() in self.pass_through_headers or
 
108
                              k.lower().startswith(x_meta))
 
109
 
 
110
    def error_increment(self, node):
 
111
        """
 
112
        Handles incrementing error counts when talking to nodes.
 
113
 
 
114
        :param node: dictionary of node to increment the error count for
 
115
        """
 
116
        node['errors'] = node.get('errors', 0) + 1
 
117
        node['last_error'] = time.time()
 
118
 
 
119
    def error_occurred(self, node, msg):
 
120
        """
 
121
        Handle logging, and handling of errors.
 
122
 
 
123
        :param node: dictionary of node to handle errors for
 
124
        :param msg: error message
 
125
        """
 
126
        self.error_increment(node)
 
127
        self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'),
 
128
            {'msg': msg, 'ip': node['ip'], 'port': node['port']})
 
129
 
 
130
    def exception_occurred(self, node, typ, additional_info):
 
131
        """
 
132
        Handle logging of generic exceptions.
 
133
 
 
134
        :param node: dictionary of node to log the error for
 
135
        :param typ: server type
 
136
        :param additional_info: additional information to log
 
137
        """
 
138
        self.app.logger.exception(
 
139
            _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
 
140
              '%(info)s'),
 
141
            {'type': typ, 'ip': node['ip'], 'port': node['port'],
 
142
             'device': node['device'], 'info': additional_info})
 
143
 
 
144
    def error_limited(self, node):
 
145
        """
 
146
        Check if the node is currently error limited.
 
147
 
 
148
        :param node: dictionary of node to check
 
149
        :returns: True if error limited, False otherwise
 
150
        """
 
151
        now = time.time()
 
152
        if not 'errors' in node:
 
153
            return False
 
154
        if 'last_error' in node and node['last_error'] < \
 
155
                now - self.app.error_suppression_interval:
 
156
            del node['last_error']
 
157
            if 'errors' in node:
 
158
                del node['errors']
 
159
            return False
 
160
        limited = node['errors'] > self.app.error_suppression_limit
 
161
        if limited:
 
162
            self.app.logger.debug(
 
163
                _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
 
164
        return limited
 
165
 
 
166
    def error_limit(self, node):
 
167
        """
 
168
        Mark a node as error limited.
 
169
 
 
170
        :param node: dictionary of node to error limit
 
171
        """
 
172
        node['errors'] = self.app.error_suppression_limit + 1
 
173
        node['last_error'] = time.time()
 
174
 
 
175
    def account_info(self, account, autocreate=False):
 
176
        """
 
177
        Get account information, and also verify that the account exists.
 
178
 
 
179
        :param account: name of the account to get the info for
 
180
        :returns: tuple of (account partition, account nodes, container_count)
 
181
                  or (None, None, None) if it does not exist
 
182
        """
 
183
        partition, nodes = self.app.account_ring.get_nodes(account)
 
184
        # 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses
 
185
        if self.app.memcache:
 
186
            cache_key = get_account_memcache_key(account)
 
187
            cache_value = self.app.memcache.get(cache_key)
 
188
            if not isinstance(cache_value, dict):
 
189
                result_code = cache_value
 
190
                container_count = 0
 
191
            else:
 
192
                result_code = cache_value['status']
 
193
                container_count = cache_value['container_count']
 
194
            if result_code == HTTP_OK:
 
195
                return partition, nodes, container_count
 
196
            elif result_code == HTTP_NOT_FOUND and not autocreate:
 
197
                return None, None, None
 
198
        result_code = 0
 
199
        container_count = 0
 
200
        attempts_left = len(nodes)
 
201
        path = '/%s' % account
 
202
        headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
 
203
        iternodes = self.iter_nodes(partition, nodes, self.app.account_ring)
 
204
        while attempts_left > 0:
 
205
            try:
 
206
                node = iternodes.next()
 
207
            except StopIteration:
 
208
                break
 
209
            attempts_left -= 1
 
210
            try:
 
211
                with ConnectionTimeout(self.app.conn_timeout):
 
212
                    conn = http_connect(node['ip'], node['port'],
 
213
                            node['device'], partition, 'HEAD', path, headers)
 
214
                with Timeout(self.app.node_timeout):
 
215
                    resp = conn.getresponse()
 
216
                    body = resp.read()
 
217
                    if is_success(resp.status):
 
218
                        result_code = HTTP_OK
 
219
                        container_count = int(
 
220
                            resp.getheader('x-account-container-count') or 0)
 
221
                        break
 
222
                    elif resp.status == HTTP_NOT_FOUND:
 
223
                        if result_code == 0:
 
224
                            result_code = HTTP_NOT_FOUND
 
225
                        elif result_code != HTTP_NOT_FOUND:
 
226
                            result_code = -1
 
227
                    elif resp.status == HTTP_INSUFFICIENT_STORAGE:
 
228
                        self.error_limit(node)
 
229
                        continue
 
230
                    else:
 
231
                        result_code = -1
 
232
            except (Exception, Timeout):
 
233
                self.exception_occurred(node, _('Account'),
 
234
                    _('Trying to get account info for %s') % path)
 
235
        if result_code == HTTP_NOT_FOUND and autocreate:
 
236
            if len(account) > MAX_ACCOUNT_NAME_LENGTH:
 
237
                return None, None, None
 
238
            headers = {'X-Timestamp': normalize_timestamp(time.time()),
 
239
                       'X-Trans-Id': self.trans_id,
 
240
                       'Connection': 'close'}
 
241
            resp = self.make_requests(Request.blank('/v1' + path),
 
242
                self.app.account_ring, partition, 'PUT',
 
243
                path, [headers] * len(nodes))
 
244
            if not is_success(resp.status_int):
 
245
                self.app.logger.warning('Could not autocreate account %r' % \
 
246
                                        path)
 
247
                return None, None, None
 
248
            result_code = HTTP_OK
 
249
        if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
 
250
            if result_code == HTTP_OK:
 
251
                cache_timeout = self.app.recheck_account_existence
 
252
            else:
 
253
                cache_timeout = self.app.recheck_account_existence * 0.1
 
254
            self.app.memcache.set(cache_key,
 
255
                {'status': result_code, 'container_count': container_count},
 
256
                timeout=cache_timeout)
 
257
        if result_code == HTTP_OK:
 
258
            return partition, nodes, container_count
 
259
        return None, None, None
 
260
 
 
261
    def container_info(self, account, container, account_autocreate=False):
 
262
        """
 
263
        Get container information and thusly verify container existance.
 
264
        This will also make a call to account_info to verify that the
 
265
        account exists.
 
266
 
 
267
        :param account: account name for the container
 
268
        :param container: container name to look up
 
269
        :returns: tuple of (container partition, container nodes, container
 
270
                  read acl, container write acl, container sync key) or (None,
 
271
                  None, None, None, None) if the container does not exist
 
272
        """
 
273
        partition, nodes = self.app.container_ring.get_nodes(
 
274
                account, container)
 
275
        path = '/%s/%s' % (account, container)
 
276
        if self.app.memcache:
 
277
            cache_key = get_container_memcache_key(account, container)
 
278
            cache_value = self.app.memcache.get(cache_key)
 
279
            if isinstance(cache_value, dict):
 
280
                status = cache_value['status']
 
281
                read_acl = cache_value['read_acl']
 
282
                write_acl = cache_value['write_acl']
 
283
                sync_key = cache_value.get('sync_key')
 
284
                versions = cache_value.get('versions')
 
285
                if status == HTTP_OK:
 
286
                    return partition, nodes, read_acl, write_acl, sync_key, \
 
287
                            versions
 
288
                elif status == HTTP_NOT_FOUND:
 
289
                    return None, None, None, None, None, None
 
290
        if not self.account_info(account, autocreate=account_autocreate)[1]:
 
291
            return None, None, None, None, None, None
 
292
        result_code = 0
 
293
        read_acl = None
 
294
        write_acl = None
 
295
        sync_key = None
 
296
        container_size = None
 
297
        versions = None
 
298
        attempts_left = len(nodes)
 
299
        headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
 
300
        iternodes = self.iter_nodes(partition, nodes, self.app.container_ring)
 
301
        while attempts_left > 0:
 
302
            try:
 
303
                node = iternodes.next()
 
304
            except StopIteration:
 
305
                break
 
306
            attempts_left -= 1
 
307
            try:
 
308
                with ConnectionTimeout(self.app.conn_timeout):
 
309
                    conn = http_connect(node['ip'], node['port'],
 
310
                            node['device'], partition, 'HEAD', path, headers)
 
311
                with Timeout(self.app.node_timeout):
 
312
                    resp = conn.getresponse()
 
313
                    body = resp.read()
 
314
                    if is_success(resp.status):
 
315
                        result_code = HTTP_OK
 
316
                        read_acl = resp.getheader('x-container-read')
 
317
                        write_acl = resp.getheader('x-container-write')
 
318
                        sync_key = resp.getheader('x-container-sync-key')
 
319
                        container_size = \
 
320
                            resp.getheader('X-Container-Object-Count')
 
321
                        versions = resp.getheader('x-versions-location')
 
322
                        break
 
323
                    elif resp.status == HTTP_NOT_FOUND:
 
324
                        if result_code == 0:
 
325
                            result_code = HTTP_NOT_FOUND
 
326
                        elif result_code != HTTP_NOT_FOUND:
 
327
                            result_code = -1
 
328
                    elif resp.status == HTTP_INSUFFICIENT_STORAGE:
 
329
                        self.error_limit(node)
 
330
                        continue
 
331
                    else:
 
332
                        result_code = -1
 
333
            except (Exception, Timeout):
 
334
                self.exception_occurred(node, _('Container'),
 
335
                    _('Trying to get container info for %s') % path)
 
336
        if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
 
337
            if result_code == HTTP_OK:
 
338
                cache_timeout = self.app.recheck_container_existence
 
339
            else:
 
340
                cache_timeout = self.app.recheck_container_existence * 0.1
 
341
            self.app.memcache.set(cache_key,
 
342
                                  {'status': result_code,
 
343
                                   'read_acl': read_acl,
 
344
                                   'write_acl': write_acl,
 
345
                                   'sync_key': sync_key,
 
346
                                   'container_size': container_size,
 
347
                                   'versions': versions},
 
348
                                  timeout=cache_timeout)
 
349
        if result_code == HTTP_OK:
 
350
            return partition, nodes, read_acl, write_acl, sync_key, versions
 
351
        return None, None, None, None, None, None
 
352
 
 
353
    def iter_nodes(self, partition, nodes, ring):
 
354
        """
 
355
        Node iterator that will first iterate over the normal nodes for a
 
356
        partition and then the handoff partitions for the node.
 
357
 
 
358
        :param partition: partition to iterate nodes for
 
359
        :param nodes: list of node dicts from the ring
 
360
        :param ring: ring to get handoff nodes from
 
361
        """
 
362
        for node in nodes:
 
363
            if not self.error_limited(node):
 
364
                yield node
 
365
        handoffs = 0
 
366
        for node in ring.get_more_nodes(partition):
 
367
            if not self.error_limited(node):
 
368
                handoffs += 1
 
369
                if self.app.log_handoffs:
 
370
                    self.app.logger.increment('handoff_count')
 
371
                    self.app.logger.warning(
 
372
                        'Handoff requested (%d)' % handoffs)
 
373
                    if handoffs == len(nodes):
 
374
                        self.app.logger.increment('handoff_all_count')
 
375
                yield node
 
376
 
 
377
    def _make_request(self, nodes, part, method, path, headers, query,
 
378
                      logger_thread_locals):
 
379
        self.app.logger.thread_locals = logger_thread_locals
 
380
        for node in nodes:
 
381
            try:
 
382
                with ConnectionTimeout(self.app.conn_timeout):
 
383
                    conn = http_connect(node['ip'], node['port'],
 
384
                            node['device'], part, method, path,
 
385
                            headers=headers, query_string=query)
 
386
                    conn.node = node
 
387
                with Timeout(self.app.node_timeout):
 
388
                    resp = conn.getresponse()
 
389
                    if not is_informational(resp.status) and \
 
390
                       not is_server_error(resp.status):
 
391
                        return resp.status, resp.reason, resp.read()
 
392
                    elif resp.status == HTTP_INSUFFICIENT_STORAGE:
 
393
                        self.error_limit(node)
 
394
            except (Exception, Timeout):
 
395
                self.exception_occurred(node, self.server_type,
 
396
                    _('Trying to %(method)s %(path)s') %
 
397
                    {'method': method, 'path': path})
 
398
 
 
399
    def make_requests(self, req, ring, part, method, path, headers,
 
400
                    query_string=''):
 
401
        """
 
402
        Sends an HTTP request to multiple nodes and aggregates the results.
 
403
        It attempts the primary nodes concurrently, then iterates over the
 
404
        handoff nodes as needed.
 
405
 
 
406
        :param headers: a list of dicts, where each dict represents one
 
407
                        backend request that should be made.
 
408
        :returns: a webob Response object
 
409
        """
 
410
        start_nodes = ring.get_part_nodes(part)
 
411
        nodes = self.iter_nodes(part, start_nodes, ring)
 
412
        pile = GreenPile(len(start_nodes))
 
413
        for head in headers:
 
414
            pile.spawn(self._make_request, nodes, part, method, path,
 
415
                       head, query_string, self.app.logger.thread_locals)
 
416
        response = [resp for resp in pile if resp]
 
417
        while len(response) < len(start_nodes):
 
418
            response.append((HTTP_SERVICE_UNAVAILABLE, '', ''))
 
419
        statuses, reasons, bodies = zip(*response)
 
420
        return self.best_response(req, statuses, reasons, bodies,
 
421
                  '%s %s' % (self.server_type, req.method))
 
422
 
 
423
    def best_response(self, req, statuses, reasons, bodies, server_type,
 
424
                      etag=None):
 
425
        """
 
426
        Given a list of responses from several servers, choose the best to
 
427
        return to the API.
 
428
 
 
429
        :param req: webob.Request object
 
430
        :param statuses: list of statuses returned
 
431
        :param reasons: list of reasons for each status
 
432
        :param bodies: bodies of each response
 
433
        :param server_type: type of server the responses came from
 
434
        :param etag: etag
 
435
        :returns: webob.Response object with the correct status, body, etc. set
 
436
        """
 
437
        resp = Response(request=req)
 
438
        if len(statuses):
 
439
            for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
 
440
                hstatuses = \
 
441
                    [s for s in statuses if hundred <= s < hundred + 100]
 
442
                if len(hstatuses) > len(statuses) / 2:
 
443
                    status = max(hstatuses)
 
444
                    status_index = statuses.index(status)
 
445
                    resp.status = '%s %s' % (status, reasons[status_index])
 
446
                    resp.body = bodies[status_index]
 
447
                    resp.content_type = 'text/html'
 
448
                    if etag:
 
449
                        resp.headers['etag'] = etag.strip('"')
 
450
                    return resp
 
451
        self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
 
452
                              {'type': server_type, 'statuses': statuses})
 
453
        resp.status = '503 Internal Server Error'
 
454
        return resp
 
455
 
 
456
    @public
 
457
    def GET(self, req):
 
458
        """Handler for HTTP GET requests."""
 
459
        return self.GETorHEAD(req)
 
460
 
 
461
    @public
 
462
    def HEAD(self, req):
 
463
        """Handler for HTTP HEAD requests."""
 
464
        return self.GETorHEAD(req)
 
465
 
 
466
    def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
 
467
        """
 
468
        Reads from the source and places data in the queue. It expects
 
469
        something else be reading from the queue and, if nothing does within
 
470
        self.app.client_timeout seconds, the process will be aborted.
 
471
 
 
472
        :param node: The node dict that the source is connected to, for
 
473
                     logging/error-limiting purposes.
 
474
        :param source: The httplib.Response object to read from.
 
475
        :param queue: The eventlet.queue.Queue to place read source data into.
 
476
        :param logger_thread_locals: The thread local values to be set on the
 
477
                                     self.app.logger to retain transaction
 
478
                                     logging information.
 
479
        """
 
480
        self.app.logger.thread_locals = logger_thread_locals
 
481
        success = True
 
482
        try:
 
483
            try:
 
484
                while True:
 
485
                    with ChunkReadTimeout(self.app.node_timeout):
 
486
                        chunk = source.read(self.app.object_chunk_size)
 
487
                    if not chunk:
 
488
                        break
 
489
                    queue.put(chunk, timeout=self.app.client_timeout)
 
490
            except Full:
 
491
                self.app.logger.warn(
 
492
                    _('Client did not read from queue within %ss') %
 
493
                    self.app.client_timeout)
 
494
                self.app.logger.increment('client_timeouts')
 
495
                success = False
 
496
            except (Exception, Timeout):
 
497
                self.exception_occurred(node, _('Object'),
 
498
                   _('Trying to read during GET'))
 
499
                success = False
 
500
        finally:
 
501
            # Ensure the queue getter gets a terminator.
 
502
            queue.resize(2)
 
503
            queue.put(success)
 
504
            # Close-out the connection as best as possible.
 
505
            if getattr(source, 'swift_conn', None):
 
506
                self.close_swift_conn(source)
 
507
 
 
508
    def _make_app_iter(self, node, source, response):
 
509
        """
 
510
        Returns an iterator over the contents of the source (via its read
 
511
        func).  There is also quite a bit of cleanup to ensure garbage
 
512
        collection works and the underlying socket of the source is closed.
 
513
 
 
514
        :param response: The webob.Response object this iterator should be
 
515
                         assigned to via response.app_iter.
 
516
        :param source: The httplib.Response object this iterator should read
 
517
                       from.
 
518
        :param node: The node the source is reading from, for logging purposes.
 
519
        """
 
520
        try:
 
521
            try:
 
522
                # Spawn reader to read from the source and place in the queue.
 
523
                # We then drop any reference to the source or node, for garbage
 
524
                # collection purposes.
 
525
                queue = Queue(1)
 
526
                spawn_n(self._make_app_iter_reader, node, source, queue,
 
527
                        self.app.logger.thread_locals)
 
528
                source = node = None
 
529
                while True:
 
530
                    chunk = queue.get(timeout=self.app.node_timeout)
 
531
                    if isinstance(chunk, bool):  # terminator
 
532
                        success = chunk
 
533
                        if not success:
 
534
                            raise Exception(_('Failed to read all data'
 
535
                                              ' from the source'))
 
536
                        break
 
537
                    yield chunk
 
538
            except Empty:
 
539
                raise ChunkReadTimeout()
 
540
            except (GeneratorExit, Timeout):
 
541
                self.app.logger.warn(_('Client disconnected on read'))
 
542
            except Exception:
 
543
                self.app.logger.exception(_('Trying to send to client'))
 
544
                raise
 
545
        finally:
 
546
            response.app_iter = None
 
547
 
 
548
    def close_swift_conn(self, src):
 
549
        try:
 
550
            src.swift_conn.close()
 
551
        except Exception:
 
552
            pass
 
553
        src.swift_conn = None
 
554
        try:
 
555
            while src.read(self.app.object_chunk_size):
 
556
                pass
 
557
        except Exception:
 
558
            pass
 
559
        try:
 
560
            src.close()
 
561
        except Exception:
 
562
            pass
 
563
 
 
564
    def GETorHEAD_base(self, req, server_type, partition, nodes, path,
 
565
                       attempts):
 
566
        """
 
567
        Base handler for HTTP GET or HEAD requests.
 
568
 
 
569
        :param req: webob.Request object
 
570
        :param server_type: server type
 
571
        :param partition: partition
 
572
        :param nodes: nodes
 
573
        :param path: path for the request
 
574
        :param attempts: number of attempts to try
 
575
        :returns: webob.Response object
 
576
        """
 
577
        statuses = []
 
578
        reasons = []
 
579
        bodies = []
 
580
        source = None
 
581
        sources = []
 
582
        newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES
 
583
        nodes = iter(nodes)
 
584
        while len(statuses) < attempts:
 
585
            try:
 
586
                node = nodes.next()
 
587
            except StopIteration:
 
588
                break
 
589
            if self.error_limited(node):
 
590
                continue
 
591
            try:
 
592
                with ConnectionTimeout(self.app.conn_timeout):
 
593
                    headers = dict(req.headers)
 
594
                    headers['Connection'] = 'close'
 
595
                    conn = http_connect(node['ip'], node['port'],
 
596
                        node['device'], partition, req.method, path,
 
597
                        headers=headers,
 
598
                        query_string=req.query_string)
 
599
                with Timeout(self.app.node_timeout):
 
600
                    possible_source = conn.getresponse()
 
601
                    # See NOTE: swift_conn at top of file about this.
 
602
                    possible_source.swift_conn = conn
 
603
            except (Exception, Timeout):
 
604
                self.exception_occurred(node, server_type,
 
605
                    _('Trying to %(method)s %(path)s') %
 
606
                    {'method': req.method, 'path': req.path})
 
607
                continue
 
608
            if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
 
609
                self.error_limit(node)
 
610
                continue
 
611
            if is_success(possible_source.status) or \
 
612
               is_redirection(possible_source.status):
 
613
                # 404 if we know we don't have a synced copy
 
614
                if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
 
615
                    statuses.append(HTTP_NOT_FOUND)
 
616
                    reasons.append('')
 
617
                    bodies.append('')
 
618
                    possible_source.read()
 
619
                    continue
 
620
                if newest:
 
621
                    if sources:
 
622
                        ts = float(source.getheader('x-put-timestamp') or
 
623
                                   source.getheader('x-timestamp') or 0)
 
624
                        pts = float(
 
625
                            possible_source.getheader('x-put-timestamp') or
 
626
                            possible_source.getheader('x-timestamp') or 0)
 
627
                        if pts > ts:
 
628
                            sources.insert(0, possible_source)
 
629
                        else:
 
630
                            sources.append(possible_source)
 
631
                    else:
 
632
                        sources.insert(0, possible_source)
 
633
                    source = sources[0]
 
634
                    statuses.append(source.status)
 
635
                    reasons.append(source.reason)
 
636
                    bodies.append('')
 
637
                    continue
 
638
                else:
 
639
                    source = possible_source
 
640
                    break
 
641
            statuses.append(possible_source.status)
 
642
            reasons.append(possible_source.reason)
 
643
            bodies.append(possible_source.read())
 
644
            if is_server_error(possible_source.status):
 
645
                self.error_occurred(node, _('ERROR %(status)d %(body)s ' \
 
646
                    'From %(type)s Server') %
 
647
                    {'status': possible_source.status,
 
648
                    'body': bodies[-1][:1024], 'type': server_type})
 
649
        if source:
 
650
            if req.method == 'GET' and \
 
651
               source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
 
652
                if newest:
 
653
                    # we need to close all hanging swift_conns
 
654
                    sources.pop(0)
 
655
                    for src in sources:
 
656
                        self.close_swift_conn(src)
 
657
 
 
658
                res = Response(request=req, conditional_response=True)
 
659
                res.app_iter = self._make_app_iter(node, source, res)
 
660
                # See NOTE: swift_conn at top of file about this.
 
661
                res.swift_conn = source.swift_conn
 
662
                update_headers(res, source.getheaders())
 
663
                # Used by container sync feature
 
664
                if res.environ is None:
 
665
                    res.environ = dict()
 
666
                res.environ['swift_x_timestamp'] = \
 
667
                    source.getheader('x-timestamp')
 
668
                update_headers(res, {'accept-ranges': 'bytes'})
 
669
                res.status = source.status
 
670
                res.content_length = source.getheader('Content-Length')
 
671
                if source.getheader('Content-Type'):
 
672
                    res.charset = None
 
673
                    res.content_type = source.getheader('Content-Type')
 
674
                return res
 
675
            elif is_success(source.status) or is_redirection(source.status):
 
676
                res = status_map[source.status](request=req)
 
677
                update_headers(res, source.getheaders())
 
678
                # Used by container sync feature
 
679
                if res.environ is None:
 
680
                    res.environ = dict()
 
681
                res.environ['swift_x_timestamp'] = \
 
682
                    source.getheader('x-timestamp')
 
683
                update_headers(res, {'accept-ranges': 'bytes'})
 
684
                res.content_length = source.getheader('Content-Length')
 
685
                if source.getheader('Content-Type'):
 
686
                    res.charset = None
 
687
                    res.content_type = source.getheader('Content-Type')
 
688
                return res
 
689
        return self.best_response(req, statuses, reasons, bodies,
 
690
                                  '%s %s' % (server_type, req.method))