25
26
from swift.account.utils import account_listing_response, \
26
27
account_listing_content_type
27
28
from swift.common.db import AccountBroker, DatabaseConnectionError
28
from swift.common.utils import get_logger, get_param, hash_path, public, \
29
from swift.common.request_helpers import get_param
30
from swift.common.utils import get_logger, hash_path, public, \
29
31
normalize_timestamp, storage_directory, config_true_value, \
30
validate_device_partition, json, timing_stats
32
validate_device_partition, json, timing_stats, replication
31
33
from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \
32
34
check_mount, check_float, check_utf8, FORMAT2CONTENT_TYPE
33
35
from swift.common.db_replicator import ReplicatorRpc
35
37
HTTPCreated, HTTPForbidden, HTTPInternalServerError, \
36
38
HTTPMethodNotAllowed, HTTPNoContent, HTTPNotFound, \
37
39
HTTPPreconditionFailed, HTTPConflict, Request, \
38
HTTPInsufficientStorage, HTTPNotAcceptable
40
HTTPInsufficientStorage, HTTPNotAcceptable, HTTPException
41
43
DATADIR = 'accounts'
49
51
self.root = conf.get('devices', '/srv/node')
50
52
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
51
53
replication_server = conf.get('replication_server', None)
52
if replication_server is None:
53
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'REPLICATE',
54
if replication_server is not None:
56
55
replication_server = config_true_value(replication_server)
57
if replication_server:
58
allowed_methods = ['REPLICATE']
60
allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST']
61
56
self.replication_server = replication_server
62
self.allowed_methods = allowed_methods
63
57
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, AccountBroker,
65
59
logger=self.logger)
68
62
swift.common.db.DB_PREALLOCATION = \
69
63
config_true_value(conf.get('db_preallocation', 'f'))
71
def _get_account_broker(self, drive, part, account):
65
def _get_account_broker(self, drive, part, account, **kwargs):
72
66
hsh = hash_path(account)
73
67
db_dir = storage_directory(DATADIR, part, hsh)
74
68
db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
75
return AccountBroker(db_path, account=account, logger=self.logger)
69
kwargs.setdefault('account', account)
70
kwargs.setdefault('logger', self.logger)
71
return AccountBroker(db_path, **kwargs)
77
73
def _deleted_response(self, broker, req, resp, body=''):
78
74
# We are here since either the account does not exist or
123
119
if self.mount_check and not check_mount(self.root, drive):
124
120
return HTTPInsufficientStorage(drive=drive, request=req)
125
broker = self._get_account_broker(drive, part, account)
126
121
if container: # put account container
122
pending_timeout = None
127
123
if 'x-trans-id' in req.headers:
128
broker.pending_timeout = 3
125
broker = self._get_account_broker(drive, part, account,
126
pending_timeout=pending_timeout)
129
127
if account.startswith(self.auto_create_account_prefix) and \
130
128
not os.path.exists(broker.db_file):
182
181
except ValueError, err:
183
182
return HTTPBadRequest(body=str(err), content_type='text/plain',
186
query_format = get_param(req, 'format')
187
except UnicodeDecodeError:
188
return HTTPBadRequest(body='parameters not utf8',
189
content_type='text/plain', request=req)
184
query_format = get_param(req, 'format')
191
186
req.accept = FORMAT2CONTENT_TYPE.get(
192
187
query_format.lower(), FORMAT2CONTENT_TYPE['plain'])
196
191
return HTTPNotAcceptable(request=req)
197
192
if self.mount_check and not check_mount(self.root, drive):
198
193
return HTTPInsufficientStorage(drive=drive, request=req)
199
broker = self._get_account_broker(drive, part, account)
200
broker.pending_timeout = 0.1
201
broker.stale_reads_ok = True
194
broker = self._get_account_broker(drive, part, account,
202
197
if broker.is_deleted():
203
198
return self._deleted_response(broker, req, HTTPNotFound)
204
199
info = broker.get_info()
224
219
except ValueError, err:
225
220
return HTTPBadRequest(body=str(err), content_type='text/plain',
228
prefix = get_param(req, 'prefix')
229
delimiter = get_param(req, 'delimiter')
230
if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
231
# delimiters can be made more flexible later
232
return HTTPPreconditionFailed(body='Bad delimiter')
233
limit = ACCOUNT_LISTING_LIMIT
234
given_limit = get_param(req, 'limit')
235
if given_limit and given_limit.isdigit():
236
limit = int(given_limit)
237
if limit > ACCOUNT_LISTING_LIMIT:
238
return HTTPPreconditionFailed(request=req,
239
body='Maximum limit is %d' %
240
ACCOUNT_LISTING_LIMIT)
241
marker = get_param(req, 'marker', '')
242
end_marker = get_param(req, 'end_marker')
243
except UnicodeDecodeError, err:
244
return HTTPBadRequest(body='parameters not utf8',
245
content_type='text/plain', request=req)
222
prefix = get_param(req, 'prefix')
223
delimiter = get_param(req, 'delimiter')
224
if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
225
# delimiters can be made more flexible later
226
return HTTPPreconditionFailed(body='Bad delimiter')
227
limit = ACCOUNT_LISTING_LIMIT
228
given_limit = get_param(req, 'limit')
229
if given_limit and given_limit.isdigit():
230
limit = int(given_limit)
231
if limit > ACCOUNT_LISTING_LIMIT:
232
return HTTPPreconditionFailed(request=req,
233
body='Maximum limit is %d' %
234
ACCOUNT_LISTING_LIMIT)
235
marker = get_param(req, 'marker', '')
236
end_marker = get_param(req, 'end_marker')
246
237
out_content_type, error = account_listing_content_type(req)
250
241
if self.mount_check and not check_mount(self.root, drive):
251
242
return HTTPInsufficientStorage(drive=drive, request=req)
252
broker = self._get_account_broker(drive, part, account)
253
broker.pending_timeout = 0.1
254
broker.stale_reads_ok = True
243
broker = self._get_account_broker(drive, part, account,
255
246
if broker.is_deleted():
256
247
return self._deleted_response(broker, req, HTTPNotFound)
257
248
return account_listing_response(account, req, out_content_type, broker,
324
316
method = getattr(self, req.method)
325
317
getattr(method, 'publicly_accessible')
326
if req.method not in self.allowed_methods:
318
replication_method = getattr(method, 'replication', False)
319
if (self.replication_server is not None and
320
self.replication_server != replication_method):
327
321
raise AttributeError('Not allowed method.')
328
322
except AttributeError:
329
323
res = HTTPMethodNotAllowed()
331
325
res = method(req)
326
except HTTPException as error_response:
332
328
except (Exception, Timeout):
333
329
self.logger.exception(_('ERROR __call__ error with %(method)s'