~ubuntu-branches/ubuntu/wily/swift/wily

« back to all changes in this revision

Viewing changes to swift/container/sync.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-09-07 13:00:03 UTC
  • mfrom: (1.2.36)
  • Revision ID: package-import@ubuntu.com-20150907130003-57kpoea6im6wm1fi
Tags: 2.4.0-0ubuntu1
* New upstream release:
  - d/p/CVE-2015-1856.patch,
    d/p/fix-tempauth-acl-checks.patch,
    d/p/increase-httplib-maxheaders.patch,
    d/p/x-auth-token-should-be-bytestring.patch:
    Dropped, included upstream.
* Align (build-)depends with upstream, wrap-and-sort.
* d/p/pyeclib-version.patch: Specify minimum bound for pyeclib only.
* Install new swift-object-reconstructor and swift-ring-builder-analyzer
  binaries.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.
15
15
 
 
16
import errno
16
17
import os
17
18
import uuid
18
19
from swift import gettext_ as _
25
26
import swift.common.db
26
27
from swift.container.backend import ContainerBroker, DATADIR
27
28
from swift.common.container_sync_realms import ContainerSyncRealms
28
 
from swift.common.direct_client import direct_get_object
29
 
from swift.common.internal_client import delete_object, put_object
 
29
from swift.common.internal_client import (
 
30
    delete_object, put_object, InternalClient, UnexpectedResponse)
30
31
from swift.common.exceptions import ClientException
31
32
from swift.common.ring import Ring
 
33
from swift.common.ring.utils import is_local_device
32
34
from swift.common.utils import (
33
35
    audit_location_generator, clean_content_type, config_true_value,
34
36
    FileLikeIter, get_logger, hash_path, quote, urlparse, validate_sync_to,
36
38
from swift.common.daemon import Daemon
37
39
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
38
40
from swift.common.storage_policy import POLICIES
 
41
from swift.common.wsgi import ConfigString
 
42
 
 
43
 
 
44
# The default internal client config body is to support upgrades without
 
45
# requiring deployment of the new /etc/swift/internal-client.conf
 
46
ic_conf_body = """
 
47
[DEFAULT]
 
48
# swift_dir = /etc/swift
 
49
# user = swift
 
50
# You can specify default log routing here if you want:
 
51
# log_name = swift
 
52
# log_facility = LOG_LOCAL0
 
53
# log_level = INFO
 
54
# log_address = /dev/log
 
55
#
 
56
# comma separated list of functions to call to setup custom log handlers.
 
57
# functions get passed: conf, name, log_to_console, log_route, fmt, logger,
 
58
# adapted_logger
 
59
# log_custom_handlers =
 
60
#
 
61
# If set, log_udp_host will override log_address
 
62
# log_udp_host =
 
63
# log_udp_port = 514
 
64
#
 
65
# You can enable StatsD logging here:
 
66
# log_statsd_host = localhost
 
67
# log_statsd_port = 8125
 
68
# log_statsd_default_sample_rate = 1.0
 
69
# log_statsd_sample_rate_factor = 1.0
 
70
# log_statsd_metric_prefix =
 
71
 
 
72
[pipeline:main]
 
73
pipeline = catch_errors proxy-logging cache proxy-server
 
74
 
 
75
[app:proxy-server]
 
76
use = egg:swift#proxy
 
77
# See proxy-server.conf-sample for options
 
78
 
 
79
[filter:cache]
 
80
use = egg:swift#memcache
 
81
# See proxy-server.conf-sample for options
 
82
 
 
83
[filter:proxy-logging]
 
84
use = egg:swift#proxy_logging
 
85
 
 
86
[filter:catch_errors]
 
87
use = egg:swift#catch_errors
 
88
# See proxy-server.conf-sample for options
 
89
""".lstrip()
39
90
 
40
91
 
41
92
class ContainerSync(Daemon):
102
153
                           loaded. This is overridden by unit tests.
103
154
    """
104
155
 
105
 
    def __init__(self, conf, container_ring=None):
 
156
    def __init__(self, conf, container_ring=None, logger=None):
106
157
        #: The dict of configuration values from the [container-sync] section
107
158
        #: of the container-server.conf.
108
159
        self.conf = conf
109
160
        #: Logger to use for container-sync log lines.
110
 
        self.logger = get_logger(conf, log_route='container-sync')
 
161
        self.logger = logger or get_logger(conf, log_route='container-sync')
111
162
        #: Path to the local device mount points.
112
163
        self.devices = conf.get('devices', '/srv/node')
113
164
        #: Indicates whether mount points should be verified as actual mount
153
204
        #: swift.common.ring.Ring for locating containers.
154
205
        self.container_ring = container_ring or Ring(self.swift_dir,
155
206
                                                     ring_name='container')
156
 
        self._myips = whataremyips()
 
207
        bind_ip = conf.get('bind_ip', '0.0.0.0')
 
208
        self._myips = whataremyips(bind_ip)
157
209
        self._myport = int(conf.get('bind_port', 6001))
158
210
        swift.common.db.DB_PREALLOCATION = \
159
211
            config_true_value(conf.get('db_preallocation', 'f'))
 
212
        self.conn_timeout = float(conf.get('conn_timeout', 5))
 
213
        request_tries = int(conf.get('request_tries') or 3)
 
214
 
 
215
        internal_client_conf_path = conf.get('internal_client_conf_path')
 
216
        if not internal_client_conf_path:
 
217
            self.logger.warning(
 
218
                _('Configuration option internal_client_conf_path not '
 
219
                  'defined. Using default configuration, See '
 
220
                  'internal-client.conf-sample for options'))
 
221
            internal_client_conf = ConfigString(ic_conf_body)
 
222
        else:
 
223
            internal_client_conf = internal_client_conf_path
 
224
        try:
 
225
            self.swift = InternalClient(
 
226
                internal_client_conf, 'Swift Container Sync', request_tries)
 
227
        except IOError as err:
 
228
            if err.errno != errno.ENOENT:
 
229
                raise
 
230
            raise SystemExit(
 
231
                _('Unable to load internal client from config: %r (%s)') %
 
232
                (internal_client_conf_path, err))
160
233
 
161
234
    def get_object_ring(self, policy_idx):
162
235
        """
239
312
            x, nodes = self.container_ring.get_nodes(info['account'],
240
313
                                                     info['container'])
241
314
            for ordinal, node in enumerate(nodes):
242
 
                if node['ip'] in self._myips and node['port'] == self._myport:
 
315
                if is_local_device(self._myips, self._myport,
 
316
                                   node['ip'], node['port']):
243
317
                    break
244
318
            else:
245
319
                return
248
322
                user_key = None
249
323
                sync_point1 = info['x_container_sync_point1']
250
324
                sync_point2 = info['x_container_sync_point2']
251
 
                for key, (value, timestamp) in broker.metadata.iteritems():
 
325
                for key, (value, timestamp) in broker.metadata.items():
252
326
                    if key.lower() == 'x-container-sync-to':
253
327
                        sync_to = value
254
328
                    elif key.lower() == 'x-container-sync-key':
359
433
                        headers['x-container-sync-key'] = user_key
360
434
                    delete_object(sync_to, name=row['name'], headers=headers,
361
435
                                  proxy=self.select_http_proxy(),
362
 
                                  logger=self.logger)
 
436
                                  logger=self.logger,
 
437
                                  timeout=self.conn_timeout)
363
438
                except ClientException as err:
364
439
                    if err.http_status != HTTP_NOT_FOUND:
365
440
                        raise
376
451
                looking_for_timestamp = Timestamp(row['created_at'])
377
452
                timestamp = -1
378
453
                headers = body = None
379
 
                headers_out = {'X-Backend-Storage-Policy-Index':
 
454
                # look up for the newest one
 
455
                headers_out = {'X-Newest': True,
 
456
                               'X-Backend-Storage-Policy-Index':
380
457
                               str(info['storage_policy_index'])}
381
 
                for node in nodes:
382
 
                    try:
383
 
                        these_headers, this_body = direct_get_object(
384
 
                            node, part, info['account'], info['container'],
385
 
                            row['name'], headers=headers_out,
386
 
                            resp_chunk_size=65536)
387
 
                        this_timestamp = Timestamp(
388
 
                            these_headers['x-timestamp'])
389
 
                        if this_timestamp > timestamp:
390
 
                            timestamp = this_timestamp
391
 
                            headers = these_headers
392
 
                            body = this_body
393
 
                    except ClientException as err:
394
 
                        # If any errors are not 404, make sure we report the
395
 
                        # non-404 one. We don't want to mistakenly assume the
396
 
                        # object no longer exists just because one says so and
397
 
                        # the others errored for some other reason.
398
 
                        if not exc or getattr(
399
 
                                exc, 'http_status', HTTP_NOT_FOUND) == \
400
 
                                HTTP_NOT_FOUND:
401
 
                            exc = err
402
 
                    except (Exception, Timeout) as err:
403
 
                        exc = err
 
458
                try:
 
459
                    source_obj_status, source_obj_info, source_obj_iter = \
 
460
                        self.swift.get_object(info['account'],
 
461
                                              info['container'], row['name'],
 
462
                                              headers=headers_out,
 
463
                                              acceptable_statuses=(2, 4))
 
464
 
 
465
                except (Exception, UnexpectedResponse, Timeout) as err:
 
466
                    source_obj_info = {}
 
467
                    source_obj_iter = None
 
468
                    exc = err
 
469
                timestamp = Timestamp(source_obj_info.get(
 
470
                                      'x-timestamp', 0))
 
471
                headers = source_obj_info
 
472
                body = source_obj_iter
404
473
                if timestamp < looking_for_timestamp:
405
474
                    if exc:
406
475
                        raise exc
407
476
                    raise Exception(
408
 
                        _('Unknown exception trying to GET: %(node)r '
 
477
                        _('Unknown exception trying to GET: '
409
478
                          '%(account)r %(container)r %(object)r'),
410
 
                        {'node': node, 'part': part,
411
 
                         'account': info['account'],
 
479
                        {'account': info['account'],
412
480
                         'container': info['container'],
413
481
                         'object': row['name']})
414
482
                for key in ('date', 'last-modified'):
432
500
                    headers['x-container-sync-key'] = user_key
433
501
                put_object(sync_to, name=row['name'], headers=headers,
434
502
                           contents=FileLikeIter(body),
435
 
                           proxy=self.select_http_proxy(), logger=self.logger)
 
503
                           proxy=self.select_http_proxy(), logger=self.logger,
 
504
                           timeout=self.conn_timeout)
436
505
                self.container_puts += 1
437
506
                self.logger.increment('puts')
438
507
                self.logger.timing_since('puts.timing', start_time)