25
26
import swift.common.db
26
27
from swift.container.backend import ContainerBroker, DATADIR
27
28
from swift.common.container_sync_realms import ContainerSyncRealms
28
from swift.common.direct_client import direct_get_object
29
from swift.common.internal_client import delete_object, put_object
29
from swift.common.internal_client import (
30
delete_object, put_object, InternalClient, UnexpectedResponse)
30
31
from swift.common.exceptions import ClientException
31
32
from swift.common.ring import Ring
33
from swift.common.ring.utils import is_local_device
32
34
from swift.common.utils import (
33
35
audit_location_generator, clean_content_type, config_true_value,
34
36
FileLikeIter, get_logger, hash_path, quote, urlparse, validate_sync_to,
36
38
from swift.common.daemon import Daemon
37
39
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
38
40
from swift.common.storage_policy import POLICIES
41
from swift.common.wsgi import ConfigString
44
# The default internal client config body is to support upgrades without
45
# requiring deployment of the new /etc/swift/internal-client.conf
48
# swift_dir = /etc/swift
50
# You can specify default log routing here if you want:
52
# log_facility = LOG_LOCAL0
54
# log_address = /dev/log
56
# comma separated list of functions to call to setup custom log handlers.
57
# functions get passed: conf, name, log_to_console, log_route, fmt, logger,
59
# log_custom_handlers =
61
# If set, log_udp_host will override log_address
65
# You can enable StatsD logging here:
66
# log_statsd_host = localhost
67
# log_statsd_port = 8125
68
# log_statsd_default_sample_rate = 1.0
69
# log_statsd_sample_rate_factor = 1.0
70
# log_statsd_metric_prefix =
73
pipeline = catch_errors proxy-logging cache proxy-server
77
# See proxy-server.conf-sample for options
80
use = egg:swift#memcache
81
# See proxy-server.conf-sample for options
83
[filter:proxy-logging]
84
use = egg:swift#proxy_logging
87
use = egg:swift#catch_errors
88
# See proxy-server.conf-sample for options
41
92
class ContainerSync(Daemon):
102
153
loaded. This is overridden by unit tests.
105
def __init__(self, conf, container_ring=None):
156
def __init__(self, conf, container_ring=None, logger=None):
106
157
#: The dict of configuration values from the [container-sync] section
107
158
#: of the container-server.conf.
109
160
#: Logger to use for container-sync log lines.
110
self.logger = get_logger(conf, log_route='container-sync')
161
self.logger = logger or get_logger(conf, log_route='container-sync')
111
162
#: Path to the local device mount points.
112
163
self.devices = conf.get('devices', '/srv/node')
113
164
#: Indicates whether mount points should be verified as actual mount
153
204
#: swift.common.ring.Ring for locating containers.
154
205
self.container_ring = container_ring or Ring(self.swift_dir,
155
206
ring_name='container')
156
self._myips = whataremyips()
207
bind_ip = conf.get('bind_ip', '0.0.0.0')
208
self._myips = whataremyips(bind_ip)
157
209
self._myport = int(conf.get('bind_port', 6001))
158
210
swift.common.db.DB_PREALLOCATION = \
159
211
config_true_value(conf.get('db_preallocation', 'f'))
212
self.conn_timeout = float(conf.get('conn_timeout', 5))
213
request_tries = int(conf.get('request_tries') or 3)
215
internal_client_conf_path = conf.get('internal_client_conf_path')
216
if not internal_client_conf_path:
218
_('Configuration option internal_client_conf_path not '
219
'defined. Using default configuration, See '
220
'internal-client.conf-sample for options'))
221
internal_client_conf = ConfigString(ic_conf_body)
223
internal_client_conf = internal_client_conf_path
225
self.swift = InternalClient(
226
internal_client_conf, 'Swift Container Sync', request_tries)
227
except IOError as err:
228
if err.errno != errno.ENOENT:
231
_('Unable to load internal client from config: %r (%s)') %
232
(internal_client_conf_path, err))
161
234
def get_object_ring(self, policy_idx):
239
312
x, nodes = self.container_ring.get_nodes(info['account'],
240
313
info['container'])
241
314
for ordinal, node in enumerate(nodes):
242
if node['ip'] in self._myips and node['port'] == self._myport:
315
if is_local_device(self._myips, self._myport,
316
node['ip'], node['port']):
249
323
sync_point1 = info['x_container_sync_point1']
250
324
sync_point2 = info['x_container_sync_point2']
251
for key, (value, timestamp) in broker.metadata.iteritems():
325
for key, (value, timestamp) in broker.metadata.items():
252
326
if key.lower() == 'x-container-sync-to':
254
328
elif key.lower() == 'x-container-sync-key':
359
433
headers['x-container-sync-key'] = user_key
360
434
delete_object(sync_to, name=row['name'], headers=headers,
361
435
proxy=self.select_http_proxy(),
437
timeout=self.conn_timeout)
363
438
except ClientException as err:
364
439
if err.http_status != HTTP_NOT_FOUND:
376
451
looking_for_timestamp = Timestamp(row['created_at'])
378
453
headers = body = None
379
headers_out = {'X-Backend-Storage-Policy-Index':
454
# look up for the newest one
455
headers_out = {'X-Newest': True,
456
'X-Backend-Storage-Policy-Index':
380
457
str(info['storage_policy_index'])}
383
these_headers, this_body = direct_get_object(
384
node, part, info['account'], info['container'],
385
row['name'], headers=headers_out,
386
resp_chunk_size=65536)
387
this_timestamp = Timestamp(
388
these_headers['x-timestamp'])
389
if this_timestamp > timestamp:
390
timestamp = this_timestamp
391
headers = these_headers
393
except ClientException as err:
394
# If any errors are not 404, make sure we report the
395
# non-404 one. We don't want to mistakenly assume the
396
# object no longer exists just because one says so and
397
# the others errored for some other reason.
398
if not exc or getattr(
399
exc, 'http_status', HTTP_NOT_FOUND) == \
402
except (Exception, Timeout) as err:
459
source_obj_status, source_obj_info, source_obj_iter = \
460
self.swift.get_object(info['account'],
461
info['container'], row['name'],
463
acceptable_statuses=(2, 4))
465
except (Exception, UnexpectedResponse, Timeout) as err:
467
source_obj_iter = None
469
timestamp = Timestamp(source_obj_info.get(
471
headers = source_obj_info
472
body = source_obj_iter
404
473
if timestamp < looking_for_timestamp:
408
_('Unknown exception trying to GET: %(node)r '
477
_('Unknown exception trying to GET: '
409
478
'%(account)r %(container)r %(object)r'),
410
{'node': node, 'part': part,
411
'account': info['account'],
479
{'account': info['account'],
412
480
'container': info['container'],
413
481
'object': row['name']})
414
482
for key in ('date', 'last-modified'):
432
500
headers['x-container-sync-key'] = user_key
433
501
put_object(sync_to, name=row['name'], headers=headers,
434
502
contents=FileLikeIter(body),
435
proxy=self.select_http_proxy(), logger=self.logger)
503
proxy=self.select_http_proxy(), logger=self.logger,
504
timeout=self.conn_timeout)
436
505
self.container_puts += 1
437
506
self.logger.increment('puts')
438
507
self.logger.timing_since('puts.timing', start_time)