~david-goetz/swift/wal_again

« back to all changes in this revision

Viewing changes to bin/swift-dispersion-report

  • Committer: Chuck Thier
  • Date: 2011-05-05 21:02:08 UTC
  • mto: This revision was merged to the branch mainline in revision 287.
  • Revision ID: cthier@gmail.com-20110505210208-b4dbw0w62qo0e1rz
removed test that was incorrectly testing for a bad path

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
 
# Copyright (c) 2010-2011 OpenStack, LLC.
3
 
#
4
 
# Licensed under the Apache License, Version 2.0 (the "License");
5
 
# you may not use this file except in compliance with the License.
6
 
# You may obtain a copy of the License at
7
 
#
8
 
#    http://www.apache.org/licenses/LICENSE-2.0
9
 
#
10
 
# Unless required by applicable law or agreed to in writing, software
11
 
# distributed under the License is distributed on an "AS IS" BASIS,
12
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
 
# implied.
14
 
# See the License for the specific language governing permissions and
15
 
# limitations under the License.
16
 
 
17
 
import csv
18
 
import os
19
 
import socket
20
 
from ConfigParser import ConfigParser
21
 
from httplib import HTTPException
22
 
from optparse import OptionParser
23
 
from sys import argv, exit, stdout, stderr
24
 
from time import time
25
 
from uuid import uuid4
26
 
 
27
 
from eventlet import GreenPool, hubs, patcher, sleep, Timeout
28
 
from eventlet.pools import Pool
29
 
 
30
 
from swift.common import direct_client
31
 
from swift.common.client import ClientException, Connection, get_auth
32
 
from swift.common.ring import Ring
33
 
from swift.common.utils import compute_eta, get_time_units
34
 
 
35
 
 
36
 
unmounted = []
37
 
 
38
 
 
39
 
def get_error_log(prefix):
40
 
    def error_log(msg_or_exc):
41
 
        global unmounted
42
 
        if hasattr(msg_or_exc, 'http_status') and \
43
 
                msg_or_exc.http_status == 507:
44
 
            identifier = '%s:%s/%s'
45
 
            if identifier not in unmounted:
46
 
                unmounted.append(identifier)
47
 
                print >>stderr, 'ERROR: %s:%s/%s is unmounted -- This will ' \
48
 
                    'cause replicas designated for that device to be ' \
49
 
                    'considered missing until resolved or the ring is ' \
50
 
                    'updated.' % (msg_or_exc.http_host, msg_or_exc.http_port,
51
 
                    msg_or_exc.http_device)
52
 
                stderr.flush()
53
 
        if not hasattr(msg_or_exc, 'http_status') or \
54
 
                msg_or_exc.http_status not in (404, 507):
55
 
            print >>stderr, 'ERROR: %s: %s' % (prefix, msg_or_exc)
56
 
            stderr.flush()
57
 
    return error_log
58
 
 
59
 
 
60
 
def container_dispersion_report(coropool, connpool, account, container_ring,
61
 
                                retries):
62
 
    with connpool.item() as conn:
63
 
        containers = [c['name'] for c in conn.get_account(prefix='dispersion_',
64
 
                                                         full_listing=True)[1]]
65
 
    containers_listed = len(containers)
66
 
    if not containers_listed:
67
 
        print >>stderr, 'No containers to query. Has ' \
68
 
                        'swift-dispersion-populate been run?'
69
 
        stderr.flush()
70
 
        return
71
 
    retries_done = [0]
72
 
    containers_queried = [0]
73
 
    container_copies_found = [0, 0, 0, 0]
74
 
    begun = time()
75
 
    next_report = [time() + 2]
76
 
 
77
 
    def direct(container, part, nodes):
78
 
        found_count = 0
79
 
        for node in nodes:
80
 
            error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
81
 
            try:
82
 
                attempts, _junk = direct_client.retry(
83
 
                                direct_client.direct_head_container, node,
84
 
                                part, account, container, error_log=error_log,
85
 
                                retries=retries)
86
 
                retries_done[0] += attempts - 1
87
 
                found_count += 1
88
 
            except ClientException, err:
89
 
                if err.http_status not in (404, 507):
90
 
                    error_log('Giving up on /%s/%s/%s: %s' % (part, account,
91
 
                              container, err))
92
 
            except (Exception, Timeout), err:
93
 
                error_log('Giving up on /%s/%s/%s: %s' % (part, account,
94
 
                          container, err))
95
 
        container_copies_found[found_count] += 1
96
 
        containers_queried[0] += 1
97
 
        if time() >= next_report[0]:
98
 
            next_report[0] = time() + 5
99
 
            eta, eta_unit = compute_eta(begun, containers_queried[0],
100
 
                                        containers_listed)
101
 
            print '\r\x1B[KQuerying containers: %d of %d, %d%s left, %d ' \
102
 
                  'retries' % (containers_queried[0], containers_listed,
103
 
                  round(eta), eta_unit, retries_done[0]),
104
 
            stdout.flush()
105
 
    container_parts = {}
106
 
    for container in containers:
107
 
        part, nodes = container_ring.get_nodes(account, container)
108
 
        if part not in container_parts:
109
 
            container_parts[part] = part
110
 
            coropool.spawn(direct, container, part, nodes)
111
 
    coropool.waitall()
112
 
    distinct_partitions = len(container_parts)
113
 
    copies_expected = distinct_partitions * container_ring.replica_count
114
 
    copies_found = sum(a * b for a, b in enumerate(container_copies_found))
115
 
    value = 100.0 * copies_found / copies_expected
116
 
    elapsed, elapsed_unit = get_time_units(time() - begun)
117
 
    print '\r\x1B[KQueried %d containers for dispersion reporting, ' \
118
 
          '%d%s, %d retries' % (containers_listed, round(elapsed),
119
 
          elapsed_unit, retries_done[0])
120
 
    if containers_listed - distinct_partitions:
121
 
        print 'There were %d overlapping partitions' % (
122
 
              containers_listed - distinct_partitions)
123
 
    if container_copies_found[2]:
124
 
        print 'There were %d partitions missing one copy.' % \
125
 
              container_copies_found[2]
126
 
    if container_copies_found[1]:
127
 
        print '! There were %d partitions missing two copies.' % \
128
 
              container_copies_found[1]
129
 
    if container_copies_found[0]:
130
 
        print '!!! There were %d partitions missing all copies.' % \
131
 
              container_copies_found[0]
132
 
    print '%.02f%% of container copies found (%d of %d)' % (
133
 
          value, copies_found, copies_expected)
134
 
    print 'Sample represents %.02f%% of the container partition space' % (
135
 
          100.0 * distinct_partitions / container_ring.partition_count)
136
 
    stdout.flush()
137
 
 
138
 
 
139
 
def object_dispersion_report(coropool, connpool, account, object_ring,
140
 
                             retries):
141
 
    container = 'dispersion_objects'
142
 
    with connpool.item() as conn:
143
 
        try:
144
 
            objects = [o['name'] for o in conn.get_container(container,
145
 
                                   prefix='dispersion_', full_listing=True)[1]]
146
 
        except ClientException, err:
147
 
            if err.http_status != 404:
148
 
                raise
149
 
            print >>stderr, 'No objects to query. Has ' \
150
 
                            'swift-dispersion-populate been run?'
151
 
            stderr.flush()
152
 
            return
153
 
    objects_listed = len(objects)
154
 
    if not objects_listed:
155
 
        print >>stderr, 'No objects to query. Has swift-dispersion-populate ' \
156
 
                        'been run?'
157
 
        stderr.flush()
158
 
        return
159
 
    retries_done = [0]
160
 
    objects_queried = [0]
161
 
    object_copies_found = [0, 0, 0, 0]
162
 
    begun = time()
163
 
    next_report = [time() + 2]
164
 
 
165
 
    def direct(obj, part, nodes):
166
 
        found_count = 0
167
 
        for node in nodes:
168
 
            error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
169
 
            try:
170
 
                attempts, _junk = direct_client.retry(
171
 
                                direct_client.direct_head_object, node, part,
172
 
                                account, container, obj, error_log=error_log,
173
 
                                retries=retries)
174
 
                retries_done[0] += attempts - 1
175
 
                found_count += 1
176
 
            except ClientException, err:
177
 
                if err.http_status not in (404, 507):
178
 
                    error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account,
179
 
                              container, obj, err))
180
 
            except (Exception, Timeout), err:
181
 
                error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account,
182
 
                          container, obj, err))
183
 
        object_copies_found[found_count] += 1
184
 
        objects_queried[0] += 1
185
 
        if time() >= next_report[0]:
186
 
            next_report[0] = time() + 5
187
 
            eta, eta_unit = compute_eta(begun, objects_queried[0],
188
 
                                        objects_listed)
189
 
            print '\r\x1B[KQuerying objects: %d of %d, %d%s left, %d ' \
190
 
                  'retries' % (objects_queried[0], objects_listed, round(eta),
191
 
                  eta_unit, retries_done[0]),
192
 
            stdout.flush()
193
 
    object_parts = {}
194
 
    for obj in objects:
195
 
        part, nodes = object_ring.get_nodes(account, container, obj)
196
 
        if part not in object_parts:
197
 
            object_parts[part] = part
198
 
            coropool.spawn(direct, obj, part, nodes)
199
 
    coropool.waitall()
200
 
    distinct_partitions = len(object_parts)
201
 
    copies_expected = distinct_partitions * object_ring.replica_count
202
 
    copies_found = sum(a * b for a, b in enumerate(object_copies_found))
203
 
    value = 100.0 * copies_found / copies_expected
204
 
    elapsed, elapsed_unit = get_time_units(time() - begun)
205
 
    print '\r\x1B[KQueried %d objects for dispersion reporting, ' \
206
 
          '%d%s, %d retries' % (objects_listed, round(elapsed),
207
 
          elapsed_unit, retries_done[0])
208
 
    if objects_listed - distinct_partitions:
209
 
        print 'There were %d overlapping partitions' % (
210
 
              objects_listed - distinct_partitions)
211
 
    if object_copies_found[2]:
212
 
        print 'There were %d partitions missing one copy.' % \
213
 
              object_copies_found[2]
214
 
    if object_copies_found[1]:
215
 
        print '! There were %d partitions missing two copies.' % \
216
 
              object_copies_found[1]
217
 
    if object_copies_found[0]:
218
 
        print '!!! There were %d partitions missing all copies.' % \
219
 
              object_copies_found[0]
220
 
    print '%.02f%% of object copies found (%d of %d)' % \
221
 
                (value, copies_found, copies_expected)
222
 
    print 'Sample represents %.02f%% of the object partition space' % (
223
 
          100.0 * distinct_partitions / object_ring.partition_count)
224
 
    stdout.flush()
225
 
 
226
 
 
227
 
if __name__ == '__main__':
228
 
    patcher.monkey_patch()
229
 
    hubs.get_hub().debug_exceptions = False
230
 
 
231
 
    conffile = '/etc/swift/dispersion.conf'
232
 
    if len(argv) == 2:
233
 
        conffile = argv[1]
234
 
    elif len(argv) > 2:
235
 
        exit('Syntax: %s [conffile]' % argv[0])
236
 
    c = ConfigParser()
237
 
    if not c.read(conffile):
238
 
        exit('Unable to read config file: %s' % conffile)
239
 
    conf = dict(c.items('dispersion'))
240
 
    swift_dir = conf.get('swift_dir', '/etc/swift')
241
 
    dispersion_coverage = int(conf.get('dispersion_coverage', 1))
242
 
    retries = int(conf.get('retries', 5))
243
 
    concurrency = int(conf.get('concurrency', 25))
244
 
 
245
 
    coropool = GreenPool(size=concurrency)
246
 
 
247
 
    url, token = get_auth(conf['auth_url'], conf['auth_user'],
248
 
                          conf['auth_key'])
249
 
    account = url.rsplit('/', 1)[1]
250
 
    connpool = Pool(max_size=concurrency)
251
 
    connpool.create = lambda: Connection(conf['auth_url'],
252
 
                                conf['auth_user'], conf['auth_key'],
253
 
                                retries=retries,
254
 
                                preauthurl=url, preauthtoken=token)
255
 
 
256
 
    container_ring = Ring(os.path.join(swift_dir, 'container.ring.gz'))
257
 
    object_ring = Ring(os.path.join(swift_dir, 'object.ring.gz'))
258
 
 
259
 
    container_dispersion_report(coropool, connpool, account, container_ring,
260
 
                                retries)
261
 
    object_dispersion_report(coropool, connpool, account, object_ring, retries)