1
# Copyright (c) 2010-2012 OpenStack, LLC.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
17
# You'll see swift_conn passed around a few places in this file. This is the
18
# source httplib connection of whatever it is attached to.
19
# It is used when early termination of reading from the connection should
20
# happen, such as when a range request is satisfied but there's still more the
21
# source connection would like to send. To prevent having to read all the data
22
# that could be left, the source connection can be .close() and then reads
23
# commence to empty out any buffers.
24
# These shenanigans are to ensure all related objects can be garbage
25
# collected. We've seen objects hang around forever otherwise.
30
from eventlet import spawn_n, GreenPile, Timeout
31
from eventlet.queue import Queue, Empty, Full
32
from eventlet.timeout import Timeout
33
from webob.exc import status_map
34
from webob import Request, Response
36
from swift.common.utils import normalize_timestamp, TRUE_VALUES, public
37
from swift.common.bufferedhttp import http_connect
38
from swift.common.constraints import MAX_ACCOUNT_NAME_LENGTH
39
from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout
40
from swift.common.http import is_informational, is_success, is_redirection, \
41
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
42
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
43
HTTP_INSUFFICIENT_STORAGE
46
def update_headers(response, headers):
48
Helper function to update headers in the response.
50
:param response: webob.Response object
51
:param headers: dictionary headers
53
if hasattr(headers, 'items'):
54
headers = headers.items()
55
for name, value in headers:
57
response.headers[name] = value.replace('"', '')
58
elif name not in ('date', 'content-length', 'content-type',
59
'connection', 'x-put-timestamp', 'x-delete-after'):
60
response.headers[name] = value
63
def delay_denial(func):
65
Decorator to declare which methods should have any swift.authorize call
66
delayed. This is so the method can load the Request object up with
67
additional information that may be needed by the authorization system.
69
:param func: function for which authorization will be delayed
71
func.delay_denial = True
73
@functools.wraps(func)
74
def wrapped(*a, **kw):
79
def get_account_memcache_key(account):
80
return 'account/%s' % account
83
def get_container_memcache_key(account, container):
84
return 'container/%s/%s' % (account, container)
87
class Controller(object):
88
"""Base WSGI controller class for the proxy"""
91
# Ensure these are all lowercase
92
pass_through_headers = []
94
def __init__(self, app):
95
self.account_name = None
99
def transfer_headers(self, src_headers, dst_headers):
100
x_remove = 'x-remove-%s-meta-' % self.server_type.lower()
101
x_meta = 'x-%s-meta-' % self.server_type.lower()
102
dst_headers.update((k.lower().replace('-remove', '', 1), '')
104
if k.lower().startswith(x_remove))
105
dst_headers.update((k.lower(), v)
106
for k, v in src_headers.iteritems()
107
if k.lower() in self.pass_through_headers or
108
k.lower().startswith(x_meta))
110
def error_increment(self, node):
112
Handles incrementing error counts when talking to nodes.
114
:param node: dictionary of node to increment the error count for
116
node['errors'] = node.get('errors', 0) + 1
117
node['last_error'] = time.time()
119
def error_occurred(self, node, msg):
121
Handle logging, and handling of errors.
123
:param node: dictionary of node to handle errors for
124
:param msg: error message
126
self.error_increment(node)
127
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'),
128
{'msg': msg, 'ip': node['ip'], 'port': node['port']})
130
def exception_occurred(self, node, typ, additional_info):
132
Handle logging of generic exceptions.
134
:param node: dictionary of node to log the error for
135
:param typ: server type
136
:param additional_info: additional information to log
138
self.app.logger.exception(
139
_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
141
{'type': typ, 'ip': node['ip'], 'port': node['port'],
142
'device': node['device'], 'info': additional_info})
144
def error_limited(self, node):
146
Check if the node is currently error limited.
148
:param node: dictionary of node to check
149
:returns: True if error limited, False otherwise
152
if not 'errors' in node:
154
if 'last_error' in node and node['last_error'] < \
155
now - self.app.error_suppression_interval:
156
del node['last_error']
160
limited = node['errors'] > self.app.error_suppression_limit
162
self.app.logger.debug(
163
_('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
166
def error_limit(self, node):
168
Mark a node as error limited.
170
:param node: dictionary of node to error limit
172
node['errors'] = self.app.error_suppression_limit + 1
173
node['last_error'] = time.time()
175
def account_info(self, account, autocreate=False):
177
Get account information, and also verify that the account exists.
179
:param account: name of the account to get the info for
180
:returns: tuple of (account partition, account nodes, container_count)
181
or (None, None, None) if it does not exist
183
partition, nodes = self.app.account_ring.get_nodes(account)
184
# 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses
185
if self.app.memcache:
186
cache_key = get_account_memcache_key(account)
187
cache_value = self.app.memcache.get(cache_key)
188
if not isinstance(cache_value, dict):
189
result_code = cache_value
192
result_code = cache_value['status']
193
container_count = cache_value['container_count']
194
if result_code == HTTP_OK:
195
return partition, nodes, container_count
196
elif result_code == HTTP_NOT_FOUND and not autocreate:
197
return None, None, None
200
attempts_left = len(nodes)
201
path = '/%s' % account
202
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
203
iternodes = self.iter_nodes(partition, nodes, self.app.account_ring)
204
while attempts_left > 0:
206
node = iternodes.next()
207
except StopIteration:
211
with ConnectionTimeout(self.app.conn_timeout):
212
conn = http_connect(node['ip'], node['port'],
213
node['device'], partition, 'HEAD', path, headers)
214
with Timeout(self.app.node_timeout):
215
resp = conn.getresponse()
217
if is_success(resp.status):
218
result_code = HTTP_OK
219
container_count = int(
220
resp.getheader('x-account-container-count') or 0)
222
elif resp.status == HTTP_NOT_FOUND:
224
result_code = HTTP_NOT_FOUND
225
elif result_code != HTTP_NOT_FOUND:
227
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
228
self.error_limit(node)
232
except (Exception, Timeout):
233
self.exception_occurred(node, _('Account'),
234
_('Trying to get account info for %s') % path)
235
if result_code == HTTP_NOT_FOUND and autocreate:
236
if len(account) > MAX_ACCOUNT_NAME_LENGTH:
237
return None, None, None
238
headers = {'X-Timestamp': normalize_timestamp(time.time()),
239
'X-Trans-Id': self.trans_id,
240
'Connection': 'close'}
241
resp = self.make_requests(Request.blank('/v1' + path),
242
self.app.account_ring, partition, 'PUT',
243
path, [headers] * len(nodes))
244
if not is_success(resp.status_int):
245
self.app.logger.warning('Could not autocreate account %r' % \
247
return None, None, None
248
result_code = HTTP_OK
249
if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
250
if result_code == HTTP_OK:
251
cache_timeout = self.app.recheck_account_existence
253
cache_timeout = self.app.recheck_account_existence * 0.1
254
self.app.memcache.set(cache_key,
255
{'status': result_code, 'container_count': container_count},
256
timeout=cache_timeout)
257
if result_code == HTTP_OK:
258
return partition, nodes, container_count
259
return None, None, None
261
def container_info(self, account, container, account_autocreate=False):
263
Get container information and thusly verify container existance.
264
This will also make a call to account_info to verify that the
267
:param account: account name for the container
268
:param container: container name to look up
269
:returns: tuple of (container partition, container nodes, container
270
read acl, container write acl, container sync key) or (None,
271
None, None, None, None) if the container does not exist
273
partition, nodes = self.app.container_ring.get_nodes(
275
path = '/%s/%s' % (account, container)
276
if self.app.memcache:
277
cache_key = get_container_memcache_key(account, container)
278
cache_value = self.app.memcache.get(cache_key)
279
if isinstance(cache_value, dict):
280
status = cache_value['status']
281
read_acl = cache_value['read_acl']
282
write_acl = cache_value['write_acl']
283
sync_key = cache_value.get('sync_key')
284
versions = cache_value.get('versions')
285
if status == HTTP_OK:
286
return partition, nodes, read_acl, write_acl, sync_key, \
288
elif status == HTTP_NOT_FOUND:
289
return None, None, None, None, None, None
290
if not self.account_info(account, autocreate=account_autocreate)[1]:
291
return None, None, None, None, None, None
296
container_size = None
298
attempts_left = len(nodes)
299
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
300
iternodes = self.iter_nodes(partition, nodes, self.app.container_ring)
301
while attempts_left > 0:
303
node = iternodes.next()
304
except StopIteration:
308
with ConnectionTimeout(self.app.conn_timeout):
309
conn = http_connect(node['ip'], node['port'],
310
node['device'], partition, 'HEAD', path, headers)
311
with Timeout(self.app.node_timeout):
312
resp = conn.getresponse()
314
if is_success(resp.status):
315
result_code = HTTP_OK
316
read_acl = resp.getheader('x-container-read')
317
write_acl = resp.getheader('x-container-write')
318
sync_key = resp.getheader('x-container-sync-key')
320
resp.getheader('X-Container-Object-Count')
321
versions = resp.getheader('x-versions-location')
323
elif resp.status == HTTP_NOT_FOUND:
325
result_code = HTTP_NOT_FOUND
326
elif result_code != HTTP_NOT_FOUND:
328
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
329
self.error_limit(node)
333
except (Exception, Timeout):
334
self.exception_occurred(node, _('Container'),
335
_('Trying to get container info for %s') % path)
336
if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
337
if result_code == HTTP_OK:
338
cache_timeout = self.app.recheck_container_existence
340
cache_timeout = self.app.recheck_container_existence * 0.1
341
self.app.memcache.set(cache_key,
342
{'status': result_code,
343
'read_acl': read_acl,
344
'write_acl': write_acl,
345
'sync_key': sync_key,
346
'container_size': container_size,
347
'versions': versions},
348
timeout=cache_timeout)
349
if result_code == HTTP_OK:
350
return partition, nodes, read_acl, write_acl, sync_key, versions
351
return None, None, None, None, None, None
353
def iter_nodes(self, partition, nodes, ring):
355
Node iterator that will first iterate over the normal nodes for a
356
partition and then the handoff partitions for the node.
358
:param partition: partition to iterate nodes for
359
:param nodes: list of node dicts from the ring
360
:param ring: ring to get handoff nodes from
363
if not self.error_limited(node):
366
for node in ring.get_more_nodes(partition):
367
if not self.error_limited(node):
369
if self.app.log_handoffs:
370
self.app.logger.increment('handoff_count')
371
self.app.logger.warning(
372
'Handoff requested (%d)' % handoffs)
373
if handoffs == len(nodes):
374
self.app.logger.increment('handoff_all_count')
377
def _make_request(self, nodes, part, method, path, headers, query,
378
logger_thread_locals):
379
self.app.logger.thread_locals = logger_thread_locals
382
with ConnectionTimeout(self.app.conn_timeout):
383
conn = http_connect(node['ip'], node['port'],
384
node['device'], part, method, path,
385
headers=headers, query_string=query)
387
with Timeout(self.app.node_timeout):
388
resp = conn.getresponse()
389
if not is_informational(resp.status) and \
390
not is_server_error(resp.status):
391
return resp.status, resp.reason, resp.read()
392
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
393
self.error_limit(node)
394
except (Exception, Timeout):
395
self.exception_occurred(node, self.server_type,
396
_('Trying to %(method)s %(path)s') %
397
{'method': method, 'path': path})
399
def make_requests(self, req, ring, part, method, path, headers,
402
Sends an HTTP request to multiple nodes and aggregates the results.
403
It attempts the primary nodes concurrently, then iterates over the
404
handoff nodes as needed.
406
:param headers: a list of dicts, where each dict represents one
407
backend request that should be made.
408
:returns: a webob Response object
410
start_nodes = ring.get_part_nodes(part)
411
nodes = self.iter_nodes(part, start_nodes, ring)
412
pile = GreenPile(len(start_nodes))
414
pile.spawn(self._make_request, nodes, part, method, path,
415
head, query_string, self.app.logger.thread_locals)
416
response = [resp for resp in pile if resp]
417
while len(response) < len(start_nodes):
418
response.append((HTTP_SERVICE_UNAVAILABLE, '', ''))
419
statuses, reasons, bodies = zip(*response)
420
return self.best_response(req, statuses, reasons, bodies,
421
'%s %s' % (self.server_type, req.method))
423
def best_response(self, req, statuses, reasons, bodies, server_type,
426
Given a list of responses from several servers, choose the best to
429
:param req: webob.Request object
430
:param statuses: list of statuses returned
431
:param reasons: list of reasons for each status
432
:param bodies: bodies of each response
433
:param server_type: type of server the responses came from
435
:returns: webob.Response object with the correct status, body, etc. set
437
resp = Response(request=req)
439
for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
441
[s for s in statuses if hundred <= s < hundred + 100]
442
if len(hstatuses) > len(statuses) / 2:
443
status = max(hstatuses)
444
status_index = statuses.index(status)
445
resp.status = '%s %s' % (status, reasons[status_index])
446
resp.body = bodies[status_index]
447
resp.content_type = 'text/html'
449
resp.headers['etag'] = etag.strip('"')
451
self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
452
{'type': server_type, 'statuses': statuses})
453
resp.status = '503 Internal Server Error'
458
"""Handler for HTTP GET requests."""
459
return self.GETorHEAD(req)
463
"""Handler for HTTP HEAD requests."""
464
return self.GETorHEAD(req)
466
def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
468
Reads from the source and places data in the queue. It expects
469
something else be reading from the queue and, if nothing does within
470
self.app.client_timeout seconds, the process will be aborted.
472
:param node: The node dict that the source is connected to, for
473
logging/error-limiting purposes.
474
:param source: The httplib.Response object to read from.
475
:param queue: The eventlet.queue.Queue to place read source data into.
476
:param logger_thread_locals: The thread local values to be set on the
477
self.app.logger to retain transaction
480
self.app.logger.thread_locals = logger_thread_locals
485
with ChunkReadTimeout(self.app.node_timeout):
486
chunk = source.read(self.app.object_chunk_size)
489
queue.put(chunk, timeout=self.app.client_timeout)
491
self.app.logger.warn(
492
_('Client did not read from queue within %ss') %
493
self.app.client_timeout)
494
self.app.logger.increment('client_timeouts')
496
except (Exception, Timeout):
497
self.exception_occurred(node, _('Object'),
498
_('Trying to read during GET'))
501
# Ensure the queue getter gets a terminator.
504
# Close-out the connection as best as possible.
505
if getattr(source, 'swift_conn', None):
506
self.close_swift_conn(source)
508
def _make_app_iter(self, node, source, response):
510
Returns an iterator over the contents of the source (via its read
511
func). There is also quite a bit of cleanup to ensure garbage
512
collection works and the underlying socket of the source is closed.
514
:param response: The webob.Response object this iterator should be
515
assigned to via response.app_iter.
516
:param source: The httplib.Response object this iterator should read
518
:param node: The node the source is reading from, for logging purposes.
522
# Spawn reader to read from the source and place in the queue.
523
# We then drop any reference to the source or node, for garbage
524
# collection purposes.
526
spawn_n(self._make_app_iter_reader, node, source, queue,
527
self.app.logger.thread_locals)
530
chunk = queue.get(timeout=self.app.node_timeout)
531
if isinstance(chunk, bool): # terminator
534
raise Exception(_('Failed to read all data'
539
raise ChunkReadTimeout()
540
except (GeneratorExit, Timeout):
541
self.app.logger.warn(_('Client disconnected on read'))
543
self.app.logger.exception(_('Trying to send to client'))
546
response.app_iter = None
548
def close_swift_conn(self, src):
550
src.swift_conn.close()
553
src.swift_conn = None
555
while src.read(self.app.object_chunk_size):
564
def GETorHEAD_base(self, req, server_type, partition, nodes, path,
567
Base handler for HTTP GET or HEAD requests.
569
:param req: webob.Request object
570
:param server_type: server type
571
:param partition: partition
573
:param path: path for the request
574
:param attempts: number of attempts to try
575
:returns: webob.Response object
582
newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES
584
while len(statuses) < attempts:
587
except StopIteration:
589
if self.error_limited(node):
592
with ConnectionTimeout(self.app.conn_timeout):
593
headers = dict(req.headers)
594
headers['Connection'] = 'close'
595
conn = http_connect(node['ip'], node['port'],
596
node['device'], partition, req.method, path,
598
query_string=req.query_string)
599
with Timeout(self.app.node_timeout):
600
possible_source = conn.getresponse()
601
# See NOTE: swift_conn at top of file about this.
602
possible_source.swift_conn = conn
603
except (Exception, Timeout):
604
self.exception_occurred(node, server_type,
605
_('Trying to %(method)s %(path)s') %
606
{'method': req.method, 'path': req.path})
608
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
609
self.error_limit(node)
611
if is_success(possible_source.status) or \
612
is_redirection(possible_source.status):
613
# 404 if we know we don't have a synced copy
614
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
615
statuses.append(HTTP_NOT_FOUND)
618
possible_source.read()
622
ts = float(source.getheader('x-put-timestamp') or
623
source.getheader('x-timestamp') or 0)
625
possible_source.getheader('x-put-timestamp') or
626
possible_source.getheader('x-timestamp') or 0)
628
sources.insert(0, possible_source)
630
sources.append(possible_source)
632
sources.insert(0, possible_source)
634
statuses.append(source.status)
635
reasons.append(source.reason)
639
source = possible_source
641
statuses.append(possible_source.status)
642
reasons.append(possible_source.reason)
643
bodies.append(possible_source.read())
644
if is_server_error(possible_source.status):
645
self.error_occurred(node, _('ERROR %(status)d %(body)s ' \
646
'From %(type)s Server') %
647
{'status': possible_source.status,
648
'body': bodies[-1][:1024], 'type': server_type})
650
if req.method == 'GET' and \
651
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
653
# we need to close all hanging swift_conns
656
self.close_swift_conn(src)
658
res = Response(request=req, conditional_response=True)
659
res.app_iter = self._make_app_iter(node, source, res)
660
# See NOTE: swift_conn at top of file about this.
661
res.swift_conn = source.swift_conn
662
update_headers(res, source.getheaders())
663
# Used by container sync feature
664
if res.environ is None:
666
res.environ['swift_x_timestamp'] = \
667
source.getheader('x-timestamp')
668
update_headers(res, {'accept-ranges': 'bytes'})
669
res.status = source.status
670
res.content_length = source.getheader('Content-Length')
671
if source.getheader('Content-Type'):
673
res.content_type = source.getheader('Content-Type')
675
elif is_success(source.status) or is_redirection(source.status):
676
res = status_map[source.status](request=req)
677
update_headers(res, source.getheaders())
678
# Used by container sync feature
679
if res.environ is None:
681
res.environ['swift_x_timestamp'] = \
682
source.getheader('x-timestamp')
683
update_headers(res, {'accept-ranges': 'bytes'})
684
res.content_length = source.getheader('Content-Length')
685
if source.getheader('Content-Type'):
687
res.content_type = source.getheader('Content-Type')
689
return self.best_response(req, statuses, reasons, bodies,
690
'%s %s' % (server_type, req.method))