~david-goetz/swift/wal_again

« back to all changes in this revision

Viewing changes to bin/st

  • Committer: Tarmac
  • Author(s): gholt
  • Date: 2011-06-16 21:12:04 UTC
  • mfrom: (291.19.6 postcopy)
  • mto: This revision was merged to the branch mainline in revision 294.
  • Revision ID: tarmac-20110616211204-s5slh4h8nt9mrd2v
You can specify X-Newest: true on GETs and HEADs to indicate you want Swift to query all backend copies and return the newest version retrieved.
Object COPY requests now always copy the newest object they can find.
Object POSTs are implemented as COPYs now by default (you can revert to previous implementation with conf object_post_as_copy = false)
Account and container GETs and HEADs now shuffle the nodes they use to balance load.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python -u
2
 
# Copyright (c) 2010-2011 OpenStack, LLC.
3
 
#
4
 
# Licensed under the Apache License, Version 2.0 (the "License");
5
 
# you may not use this file except in compliance with the License.
6
 
# You may obtain a copy of the License at
7
 
#
8
 
#    http://www.apache.org/licenses/LICENSE-2.0
9
 
#
10
 
# Unless required by applicable law or agreed to in writing, software
11
 
# distributed under the License is distributed on an "AS IS" BASIS,
12
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
 
# implied.
14
 
# See the License for the specific language governing permissions and
15
 
# limitations under the License.
16
 
 
17
 
from errno import EEXIST, ENOENT
18
 
from hashlib import md5
19
 
from optparse import OptionParser
20
 
from os import environ, listdir, makedirs, utime
21
 
from os.path import basename, dirname, getmtime, getsize, isdir, join
22
 
from Queue import Empty, Queue
23
 
from sys import argv, exit, stderr, stdout
24
 
from threading import enumerate as threading_enumerate, Thread
25
 
from time import sleep
26
 
 
27
 
 
28
 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
29
 
# Inclusion of swift.common.client for convenience of single file distribution
30
 
 
31
 
import socket
32
 
from cStringIO import StringIO
33
 
from httplib import HTTPException, HTTPSConnection
34
 
from re import compile, DOTALL
35
 
from tokenize import generate_tokens, STRING, NAME, OP
36
 
from urllib import quote as _quote, unquote
37
 
from urlparse import urlparse, urlunparse
38
 
 
39
 
try:
40
 
    from eventlet import sleep
41
 
except Exception:
42
 
    from time import sleep
43
 
 
44
 
try:
45
 
    from swift.common.bufferedhttp \
46
 
        import BufferedHTTPConnection as HTTPConnection
47
 
except Exception:
48
 
    from httplib import HTTPConnection
49
 
 
50
 
 
51
 
def quote(value, safe='/'):
52
 
    """
53
 
    Patched version of urllib.quote that encodes utf8 strings before quoting
54
 
    """
55
 
    if isinstance(value, unicode):
56
 
        value = value.encode('utf8')
57
 
    return _quote(value, safe)
58
 
 
59
 
 
60
 
# look for a real json parser first
61
 
try:
62
 
    # simplejson is popular and pretty good
63
 
    from simplejson import loads as json_loads
64
 
except ImportError:
65
 
    try:
66
 
        # 2.6 will have a json module in the stdlib
67
 
        from json import loads as json_loads
68
 
    except ImportError:
69
 
        # fall back on local parser otherwise
70
 
        comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
71
 
 
72
 
        def json_loads(string):
73
 
            '''
74
 
            Fairly competent json parser exploiting the python tokenizer and
75
 
            eval(). -- From python-cloudfiles
76
 
 
77
 
            _loads(serialized_json) -> object
78
 
            '''
79
 
            try:
80
 
                res = []
81
 
                consts = {'true': True, 'false': False, 'null': None}
82
 
                string = '(' + comments.sub('', string) + ')'
83
 
                for type, val, _junk, _junk, _junk in \
84
 
                        generate_tokens(StringIO(string).readline):
85
 
                    if (type == OP and val not in '[]{}:,()-') or \
86
 
                            (type == NAME and val not in consts):
87
 
                        raise AttributeError()
88
 
                    elif type == STRING:
89
 
                        res.append('u')
90
 
                        res.append(val.replace('\\/', '/'))
91
 
                    else:
92
 
                        res.append(val)
93
 
                return eval(''.join(res), {}, consts)
94
 
            except Exception:
95
 
                raise AttributeError()
96
 
 
97
 
 
98
 
class ClientException(Exception):
99
 
 
100
 
    def __init__(self, msg, http_scheme='', http_host='', http_port='',
101
 
                 http_path='', http_query='', http_status=0, http_reason='',
102
 
                 http_device=''):
103
 
        Exception.__init__(self, msg)
104
 
        self.msg = msg
105
 
        self.http_scheme = http_scheme
106
 
        self.http_host = http_host
107
 
        self.http_port = http_port
108
 
        self.http_path = http_path
109
 
        self.http_query = http_query
110
 
        self.http_status = http_status
111
 
        self.http_reason = http_reason
112
 
        self.http_device = http_device
113
 
 
114
 
    def __str__(self):
115
 
        a = self.msg
116
 
        b = ''
117
 
        if self.http_scheme:
118
 
            b += '%s://' % self.http_scheme
119
 
        if self.http_host:
120
 
            b += self.http_host
121
 
        if self.http_port:
122
 
            b += ':%s' % self.http_port
123
 
        if self.http_path:
124
 
            b += self.http_path
125
 
        if self.http_query:
126
 
            b += '?%s' % self.http_query
127
 
        if self.http_status:
128
 
            if b:
129
 
                b = '%s %s' % (b, self.http_status)
130
 
            else:
131
 
                b = str(self.http_status)
132
 
        if self.http_reason:
133
 
            if b:
134
 
                b = '%s %s' % (b, self.http_reason)
135
 
            else:
136
 
                b = '- %s' % self.http_reason
137
 
        if self.http_device:
138
 
            if b:
139
 
                b = '%s: device %s' % (b, self.http_device)
140
 
            else:
141
 
                b = 'device %s' % self.http_device
142
 
        return b and '%s: %s' % (a, b) or a
143
 
 
144
 
 
145
 
def http_connection(url):
146
 
    """
147
 
    Make an HTTPConnection or HTTPSConnection
148
 
 
149
 
    :param url: url to connect to
150
 
    :returns: tuple of (parsed url, connection object)
151
 
    :raises ClientException: Unable to handle protocol scheme
152
 
    """
153
 
    parsed = urlparse(url)
154
 
    if parsed.scheme == 'http':
155
 
        conn = HTTPConnection(parsed.netloc)
156
 
    elif parsed.scheme == 'https':
157
 
        conn = HTTPSConnection(parsed.netloc)
158
 
    else:
159
 
        raise ClientException('Cannot handle protocol scheme %s for url %s' %
160
 
                              (parsed.scheme, repr(url)))
161
 
    return parsed, conn
162
 
 
163
 
 
164
 
def get_auth(url, user, key, snet=False):
165
 
    """
166
 
    Get authentication/authorization credentials.
167
 
 
168
 
    The snet parameter is used for Rackspace's ServiceNet internal network
169
 
    implementation. In this function, it simply adds *snet-* to the beginning
170
 
    of the host name for the returned storage URL. With Rackspace Cloud Files,
171
 
    use of this network path causes no bandwidth charges but requires the
172
 
    client to be running on Rackspace's ServiceNet network.
173
 
 
174
 
    :param url: authentication/authorization URL
175
 
    :param user: user to authenticate as
176
 
    :param key: key or password for authorization
177
 
    :param snet: use SERVICENET internal network (see above), default is False
178
 
    :returns: tuple of (storage URL, auth token)
179
 
    :raises ClientException: HTTP GET request to auth URL failed
180
 
    """
181
 
    parsed, conn = http_connection(url)
182
 
    conn.request('GET', parsed.path, '',
183
 
                 {'X-Auth-User': user, 'X-Auth-Key': key})
184
 
    resp = conn.getresponse()
185
 
    resp.read()
186
 
    if resp.status < 200 or resp.status >= 300:
187
 
        raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
188
 
                http_host=conn.host, http_port=conn.port,
189
 
                http_path=parsed.path, http_status=resp.status,
190
 
                http_reason=resp.reason)
191
 
    url = resp.getheader('x-storage-url')
192
 
    if snet:
193
 
        parsed = list(urlparse(url))
194
 
        # Second item in the list is the netloc
195
 
        parsed[1] = 'snet-' + parsed[1]
196
 
        url = urlunparse(parsed)
197
 
    return url, resp.getheader('x-storage-token',
198
 
                                                resp.getheader('x-auth-token'))
199
 
 
200
 
 
201
 
def get_account(url, token, marker=None, limit=None, prefix=None,
202
 
                http_conn=None, full_listing=False):
203
 
    """
204
 
    Get a listing of containers for the account.
205
 
 
206
 
    :param url: storage URL
207
 
    :param token: auth token
208
 
    :param marker: marker query
209
 
    :param limit: limit query
210
 
    :param prefix: prefix query
211
 
    :param http_conn: HTTP connection object (If None, it will create the
212
 
                      conn object)
213
 
    :param full_listing: if True, return a full listing, else returns a max
214
 
                         of 10000 listings
215
 
    :returns: a tuple of (response headers, a list of containers) The response
216
 
              headers will be a dict and all header names will be lowercase.
217
 
    :raises ClientException: HTTP GET request failed
218
 
    """
219
 
    if not http_conn:
220
 
        http_conn = http_connection(url)
221
 
    if full_listing:
222
 
        rv = get_account(url, token, marker, limit, prefix, http_conn)
223
 
        listing = rv[1]
224
 
        while listing:
225
 
            marker = listing[-1]['name']
226
 
            listing = \
227
 
                get_account(url, token, marker, limit, prefix, http_conn)[1]
228
 
            if listing:
229
 
                rv.extend(listing)
230
 
        return rv
231
 
    parsed, conn = http_conn
232
 
    qs = 'format=json'
233
 
    if marker:
234
 
        qs += '&marker=%s' % quote(marker)
235
 
    if limit:
236
 
        qs += '&limit=%d' % limit
237
 
    if prefix:
238
 
        qs += '&prefix=%s' % quote(prefix)
239
 
    conn.request('GET', '%s?%s' % (parsed.path, qs), '',
240
 
                 {'X-Auth-Token': token})
241
 
    resp = conn.getresponse()
242
 
    resp_headers = {}
243
 
    for header, value in resp.getheaders():
244
 
        resp_headers[header.lower()] = value
245
 
    if resp.status < 200 or resp.status >= 300:
246
 
        resp.read()
247
 
        raise ClientException('Account GET failed', http_scheme=parsed.scheme,
248
 
                http_host=conn.host, http_port=conn.port,
249
 
                http_path=parsed.path, http_query=qs, http_status=resp.status,
250
 
                http_reason=resp.reason)
251
 
    if resp.status == 204:
252
 
        resp.read()
253
 
        return resp_headers, []
254
 
    return resp_headers, json_loads(resp.read())
255
 
 
256
 
 
257
 
def head_account(url, token, http_conn=None):
258
 
    """
259
 
    Get account stats.
260
 
 
261
 
    :param url: storage URL
262
 
    :param token: auth token
263
 
    :param http_conn: HTTP connection object (If None, it will create the
264
 
                      conn object)
265
 
    :returns: a dict containing the response's headers (all header names will
266
 
              be lowercase)
267
 
    :raises ClientException: HTTP HEAD request failed
268
 
    """
269
 
    if http_conn:
270
 
        parsed, conn = http_conn
271
 
    else:
272
 
        parsed, conn = http_connection(url)
273
 
    conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
274
 
    resp = conn.getresponse()
275
 
    resp.read()
276
 
    if resp.status < 200 or resp.status >= 300:
277
 
        raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
278
 
                http_host=conn.host, http_port=conn.port,
279
 
                http_path=parsed.path, http_status=resp.status,
280
 
                http_reason=resp.reason)
281
 
    resp_headers = {}
282
 
    for header, value in resp.getheaders():
283
 
        resp_headers[header.lower()] = value
284
 
    return resp_headers
285
 
 
286
 
 
287
 
def post_account(url, token, headers, http_conn=None):
288
 
    """
289
 
    Update an account's metadata.
290
 
 
291
 
    :param url: storage URL
292
 
    :param token: auth token
293
 
    :param headers: additional headers to include in the request
294
 
    :param http_conn: HTTP connection object (If None, it will create the
295
 
                      conn object)
296
 
    :raises ClientException: HTTP POST request failed
297
 
    """
298
 
    if http_conn:
299
 
        parsed, conn = http_conn
300
 
    else:
301
 
        parsed, conn = http_connection(url)
302
 
    headers['X-Auth-Token'] = token
303
 
    conn.request('POST', parsed.path, '', headers)
304
 
    resp = conn.getresponse()
305
 
    resp.read()
306
 
    if resp.status < 200 or resp.status >= 300:
307
 
        raise ClientException('Account POST failed',
308
 
                http_scheme=parsed.scheme, http_host=conn.host,
309
 
                http_port=conn.port, http_path=path, http_status=resp.status,
310
 
                http_reason=resp.reason)
311
 
 
312
 
 
313
 
def get_container(url, token, container, marker=None, limit=None,
314
 
                  prefix=None, delimiter=None, http_conn=None,
315
 
                  full_listing=False):
316
 
    """
317
 
    Get a listing of objects for the container.
318
 
 
319
 
    :param url: storage URL
320
 
    :param token: auth token
321
 
    :param container: container name to get a listing for
322
 
    :param marker: marker query
323
 
    :param limit: limit query
324
 
    :param prefix: prefix query
325
 
    :param delimeter: string to delimit the queries on
326
 
    :param http_conn: HTTP connection object (If None, it will create the
327
 
                      conn object)
328
 
    :param full_listing: if True, return a full listing, else returns a max
329
 
                         of 10000 listings
330
 
    :returns: a tuple of (response headers, a list of objects) The response
331
 
              headers will be a dict and all header names will be lowercase.
332
 
    :raises ClientException: HTTP GET request failed
333
 
    """
334
 
    if not http_conn:
335
 
        http_conn = http_connection(url)
336
 
    if full_listing:
337
 
        rv = get_container(url, token, container, marker, limit, prefix,
338
 
                           delimiter, http_conn)
339
 
        listing = rv[1]
340
 
        while listing:
341
 
            if not delimiter:
342
 
                marker = listing[-1]['name']
343
 
            else:
344
 
                marker = listing[-1].get('name', listing[-1].get('subdir'))
345
 
            listing = get_container(url, token, container, marker, limit,
346
 
                                    prefix, delimiter, http_conn)[1]
347
 
            if listing:
348
 
                rv[1].extend(listing)
349
 
        return rv
350
 
    parsed, conn = http_conn
351
 
    path = '%s/%s' % (parsed.path, quote(container))
352
 
    qs = 'format=json'
353
 
    if marker:
354
 
        qs += '&marker=%s' % quote(marker)
355
 
    if limit:
356
 
        qs += '&limit=%d' % limit
357
 
    if prefix:
358
 
        qs += '&prefix=%s' % quote(prefix)
359
 
    if delimiter:
360
 
        qs += '&delimiter=%s' % quote(delimiter)
361
 
    conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
362
 
    resp = conn.getresponse()
363
 
    if resp.status < 200 or resp.status >= 300:
364
 
        resp.read()
365
 
        raise ClientException('Container GET failed',
366
 
                http_scheme=parsed.scheme, http_host=conn.host,
367
 
                http_port=conn.port, http_path=path, http_query=qs,
368
 
                http_status=resp.status, http_reason=resp.reason)
369
 
    resp_headers = {}
370
 
    for header, value in resp.getheaders():
371
 
        resp_headers[header.lower()] = value
372
 
    if resp.status == 204:
373
 
        resp.read()
374
 
        return resp_headers, []
375
 
    return resp_headers, json_loads(resp.read())
376
 
 
377
 
 
378
 
def head_container(url, token, container, http_conn=None):
379
 
    """
380
 
    Get container stats.
381
 
 
382
 
    :param url: storage URL
383
 
    :param token: auth token
384
 
    :param container: container name to get stats for
385
 
    :param http_conn: HTTP connection object (If None, it will create the
386
 
                      conn object)
387
 
    :returns: a dict containing the response's headers (all header names will
388
 
              be lowercase)
389
 
    :raises ClientException: HTTP HEAD request failed
390
 
    """
391
 
    if http_conn:
392
 
        parsed, conn = http_conn
393
 
    else:
394
 
        parsed, conn = http_connection(url)
395
 
    path = '%s/%s' % (parsed.path, quote(container))
396
 
    conn.request('HEAD', path, '', {'X-Auth-Token': token})
397
 
    resp = conn.getresponse()
398
 
    resp.read()
399
 
    if resp.status < 200 or resp.status >= 300:
400
 
        raise ClientException('Container HEAD failed',
401
 
                http_scheme=parsed.scheme, http_host=conn.host,
402
 
                http_port=conn.port, http_path=path, http_status=resp.status,
403
 
                http_reason=resp.reason)
404
 
    resp_headers = {}
405
 
    for header, value in resp.getheaders():
406
 
        resp_headers[header.lower()] = value
407
 
    return resp_headers
408
 
 
409
 
 
410
 
def put_container(url, token, container, headers=None, http_conn=None):
411
 
    """
412
 
    Create a container
413
 
 
414
 
    :param url: storage URL
415
 
    :param token: auth token
416
 
    :param container: container name to create
417
 
    :param headers: additional headers to include in the request
418
 
    :param http_conn: HTTP connection object (If None, it will create the
419
 
                      conn object)
420
 
    :raises ClientException: HTTP PUT request failed
421
 
    """
422
 
    if http_conn:
423
 
        parsed, conn = http_conn
424
 
    else:
425
 
        parsed, conn = http_connection(url)
426
 
    path = '%s/%s' % (parsed.path, quote(container))
427
 
    if not headers:
428
 
        headers = {}
429
 
    headers['X-Auth-Token'] = token
430
 
    conn.request('PUT', path, '', headers)
431
 
    resp = conn.getresponse()
432
 
    resp.read()
433
 
    if resp.status < 200 or resp.status >= 300:
434
 
        raise ClientException('Container PUT failed',
435
 
                http_scheme=parsed.scheme, http_host=conn.host,
436
 
                http_port=conn.port, http_path=path, http_status=resp.status,
437
 
                http_reason=resp.reason)
438
 
 
439
 
 
440
 
def post_container(url, token, container, headers, http_conn=None):
441
 
    """
442
 
    Update a container's metadata.
443
 
 
444
 
    :param url: storage URL
445
 
    :param token: auth token
446
 
    :param container: container name to update
447
 
    :param headers: additional headers to include in the request
448
 
    :param http_conn: HTTP connection object (If None, it will create the
449
 
                      conn object)
450
 
    :raises ClientException: HTTP POST request failed
451
 
    """
452
 
    if http_conn:
453
 
        parsed, conn = http_conn
454
 
    else:
455
 
        parsed, conn = http_connection(url)
456
 
    path = '%s/%s' % (parsed.path, quote(container))
457
 
    headers['X-Auth-Token'] = token
458
 
    conn.request('POST', path, '', headers)
459
 
    resp = conn.getresponse()
460
 
    resp.read()
461
 
    if resp.status < 200 or resp.status >= 300:
462
 
        raise ClientException('Container POST failed',
463
 
                http_scheme=parsed.scheme, http_host=conn.host,
464
 
                http_port=conn.port, http_path=path, http_status=resp.status,
465
 
                http_reason=resp.reason)
466
 
 
467
 
 
468
 
def delete_container(url, token, container, http_conn=None):
469
 
    """
470
 
    Delete a container
471
 
 
472
 
    :param url: storage URL
473
 
    :param token: auth token
474
 
    :param container: container name to delete
475
 
    :param http_conn: HTTP connection object (If None, it will create the
476
 
                      conn object)
477
 
    :raises ClientException: HTTP DELETE request failed
478
 
    """
479
 
    if http_conn:
480
 
        parsed, conn = http_conn
481
 
    else:
482
 
        parsed, conn = http_connection(url)
483
 
    path = '%s/%s' % (parsed.path, quote(container))
484
 
    conn.request('DELETE', path, '', {'X-Auth-Token': token})
485
 
    resp = conn.getresponse()
486
 
    resp.read()
487
 
    if resp.status < 200 or resp.status >= 300:
488
 
        raise ClientException('Container DELETE failed',
489
 
                http_scheme=parsed.scheme, http_host=conn.host,
490
 
                http_port=conn.port, http_path=path, http_status=resp.status,
491
 
                http_reason=resp.reason)
492
 
 
493
 
 
494
 
def get_object(url, token, container, name, http_conn=None,
495
 
               resp_chunk_size=None):
496
 
    """
497
 
    Get an object
498
 
 
499
 
    :param url: storage URL
500
 
    :param token: auth token
501
 
    :param container: container name that the object is in
502
 
    :param name: object name to get
503
 
    :param http_conn: HTTP connection object (If None, it will create the
504
 
                      conn object)
505
 
    :param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
506
 
                            you specify a resp_chunk_size you must fully read
507
 
                            the object's contents before making another
508
 
                            request.
509
 
    :returns: a tuple of (response headers, the object's contents) The response
510
 
              headers will be a dict and all header names will be lowercase.
511
 
    :raises ClientException: HTTP GET request failed
512
 
    """
513
 
    if http_conn:
514
 
        parsed, conn = http_conn
515
 
    else:
516
 
        parsed, conn = http_connection(url)
517
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
518
 
    conn.request('GET', path, '', {'X-Auth-Token': token})
519
 
    resp = conn.getresponse()
520
 
    if resp.status < 200 or resp.status >= 300:
521
 
        resp.read()
522
 
        raise ClientException('Object GET failed', http_scheme=parsed.scheme,
523
 
                http_host=conn.host, http_port=conn.port, http_path=path,
524
 
                http_status=resp.status, http_reason=resp.reason)
525
 
    if resp_chunk_size:
526
 
 
527
 
        def _object_body():
528
 
            buf = resp.read(resp_chunk_size)
529
 
            while buf:
530
 
                yield buf
531
 
                buf = resp.read(resp_chunk_size)
532
 
        object_body = _object_body()
533
 
    else:
534
 
        object_body = resp.read()
535
 
    resp_headers = {}
536
 
    for header, value in resp.getheaders():
537
 
        resp_headers[header.lower()] = value
538
 
    return resp_headers, object_body
539
 
 
540
 
 
541
 
def head_object(url, token, container, name, http_conn=None):
542
 
    """
543
 
    Get object info
544
 
 
545
 
    :param url: storage URL
546
 
    :param token: auth token
547
 
    :param container: container name that the object is in
548
 
    :param name: object name to get info for
549
 
    :param http_conn: HTTP connection object (If None, it will create the
550
 
                      conn object)
551
 
    :returns: a dict containing the response's headers (all header names will
552
 
              be lowercase)
553
 
    :raises ClientException: HTTP HEAD request failed
554
 
    """
555
 
    if http_conn:
556
 
        parsed, conn = http_conn
557
 
    else:
558
 
        parsed, conn = http_connection(url)
559
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
560
 
    conn.request('HEAD', path, '', {'X-Auth-Token': token})
561
 
    resp = conn.getresponse()
562
 
    resp.read()
563
 
    if resp.status < 200 or resp.status >= 300:
564
 
        raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
565
 
                http_host=conn.host, http_port=conn.port, http_path=path,
566
 
                http_status=resp.status, http_reason=resp.reason)
567
 
    resp_headers = {}
568
 
    for header, value in resp.getheaders():
569
 
        resp_headers[header.lower()] = value
570
 
    return resp_headers
571
 
 
572
 
 
573
 
def put_object(url, token, container, name, contents, content_length=None,
574
 
               etag=None, chunk_size=65536, content_type=None, headers=None,
575
 
               http_conn=None):
576
 
    """
577
 
    Put an object
578
 
 
579
 
    :param url: storage URL
580
 
    :param token: auth token
581
 
    :param container: container name that the object is in
582
 
    :param name: object name to put
583
 
    :param contents: a string or a file like object to read object data from
584
 
    :param content_length: value to send as content-length header; also limits
585
 
                           the amount read from contents
586
 
    :param etag: etag of contents
587
 
    :param chunk_size: chunk size of data to write
588
 
    :param content_type: value to send as content-type header
589
 
    :param headers: additional headers to include in the request
590
 
    :param http_conn: HTTP connection object (If None, it will create the
591
 
                      conn object)
592
 
    :returns: etag from server response
593
 
    :raises ClientException: HTTP PUT request failed
594
 
    """
595
 
    if http_conn:
596
 
        parsed, conn = http_conn
597
 
    else:
598
 
        parsed, conn = http_connection(url)
599
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
600
 
    if not headers:
601
 
        headers = {}
602
 
    headers['X-Auth-Token'] = token
603
 
    if etag:
604
 
        headers['ETag'] = etag.strip('"')
605
 
    if content_length is not None:
606
 
        headers['Content-Length'] = str(content_length)
607
 
    if content_type is not None:
608
 
        headers['Content-Type'] = content_type
609
 
    if not contents:
610
 
        headers['Content-Length'] = '0'
611
 
    if hasattr(contents, 'read'):
612
 
        conn.putrequest('PUT', path)
613
 
        for header, value in headers.iteritems():
614
 
            conn.putheader(header, value)
615
 
        if content_length is None:
616
 
            conn.putheader('Transfer-Encoding', 'chunked')
617
 
            conn.endheaders()
618
 
            chunk = contents.read(chunk_size)
619
 
            while chunk:
620
 
                conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
621
 
                chunk = contents.read(chunk_size)
622
 
            conn.send('0\r\n\r\n')
623
 
        else:
624
 
            conn.endheaders()
625
 
            left = content_length
626
 
            while left > 0:
627
 
                size = chunk_size
628
 
                if size > left:
629
 
                    size = left
630
 
                chunk = contents.read(size)
631
 
                conn.send(chunk)
632
 
                left -= len(chunk)
633
 
    else:
634
 
        conn.request('PUT', path, contents, headers)
635
 
    resp = conn.getresponse()
636
 
    resp.read()
637
 
    if resp.status < 200 or resp.status >= 300:
638
 
        raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
639
 
                http_host=conn.host, http_port=conn.port, http_path=path,
640
 
                http_status=resp.status, http_reason=resp.reason)
641
 
    return resp.getheader('etag').strip('"')
642
 
 
643
 
 
644
 
def post_object(url, token, container, name, headers, http_conn=None):
645
 
    """
646
 
    Update object metadata
647
 
 
648
 
    :param url: storage URL
649
 
    :param token: auth token
650
 
    :param container: container name that the object is in
651
 
    :param name: name of the object to update
652
 
    :param headers: additional headers to include in the request
653
 
    :param http_conn: HTTP connection object (If None, it will create the
654
 
                      conn object)
655
 
    :raises ClientException: HTTP POST request failed
656
 
    """
657
 
    if http_conn:
658
 
        parsed, conn = http_conn
659
 
    else:
660
 
        parsed, conn = http_connection(url)
661
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
662
 
    headers['X-Auth-Token'] = token
663
 
    conn.request('POST', path, '', headers)
664
 
    resp = conn.getresponse()
665
 
    resp.read()
666
 
    if resp.status < 200 or resp.status >= 300:
667
 
        raise ClientException('Object POST failed', http_scheme=parsed.scheme,
668
 
                http_host=conn.host, http_port=conn.port, http_path=path,
669
 
                http_status=resp.status, http_reason=resp.reason)
670
 
 
671
 
 
672
 
def delete_object(url, token, container, name, http_conn=None):
673
 
    """
674
 
    Delete object
675
 
 
676
 
    :param url: storage URL
677
 
    :param token: auth token
678
 
    :param container: container name that the object is in
679
 
    :param name: object name to delete
680
 
    :param http_conn: HTTP connection object (If None, it will create the
681
 
                      conn object)
682
 
    :raises ClientException: HTTP DELETE request failed
683
 
    """
684
 
    if http_conn:
685
 
        parsed, conn = http_conn
686
 
    else:
687
 
        parsed, conn = http_connection(url)
688
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
689
 
    conn.request('DELETE', path, '', {'X-Auth-Token': token})
690
 
    resp = conn.getresponse()
691
 
    resp.read()
692
 
    if resp.status < 200 or resp.status >= 300:
693
 
        raise ClientException('Object DELETE failed',
694
 
                http_scheme=parsed.scheme, http_host=conn.host,
695
 
                http_port=conn.port, http_path=path, http_status=resp.status,
696
 
                http_reason=resp.reason)
697
 
 
698
 
 
699
 
class Connection(object):
700
 
    """Convenience class to make requests that will also retry the request"""
701
 
 
702
 
    def __init__(self, authurl, user, key, retries=5, preauthurl=None,
703
 
                 preauthtoken=None, snet=False):
704
 
        """
705
 
        :param authurl: authenitcation URL
706
 
        :param user: user name to authenticate as
707
 
        :param key: key/password to authenticate with
708
 
        :param retries: Number of times to retry the request before failing
709
 
        :param preauthurl: storage URL (if you have already authenticated)
710
 
        :param preauthtoken: authentication token (if you have already
711
 
                             authenticated)
712
 
        :param snet: use SERVICENET internal network default is False
713
 
        """
714
 
        self.authurl = authurl
715
 
        self.user = user
716
 
        self.key = key
717
 
        self.retries = retries
718
 
        self.http_conn = None
719
 
        self.url = preauthurl
720
 
        self.token = preauthtoken
721
 
        self.attempts = 0
722
 
        self.snet = snet
723
 
 
724
 
    def get_auth(self):
725
 
        return get_auth(self.authurl, self.user, self.key, snet=self.snet)
726
 
 
727
 
    def http_connection(self):
728
 
        return http_connection(self.url)
729
 
 
730
 
    def _retry(self, func, *args, **kwargs):
731
 
        self.attempts = 0
732
 
        backoff = 1
733
 
        while self.attempts <= self.retries:
734
 
            self.attempts += 1
735
 
            try:
736
 
                if not self.url or not self.token:
737
 
                    self.url, self.token = self.get_auth()
738
 
                    self.http_conn = None
739
 
                if not self.http_conn:
740
 
                    self.http_conn = self.http_connection()
741
 
                kwargs['http_conn'] = self.http_conn
742
 
                rv = func(self.url, self.token, *args, **kwargs)
743
 
                return rv
744
 
            except (socket.error, HTTPException):
745
 
                if self.attempts > self.retries:
746
 
                    raise
747
 
                self.http_conn = None
748
 
            except ClientException, err:
749
 
                if self.attempts > self.retries:
750
 
                    raise
751
 
                if err.http_status == 401:
752
 
                    self.url = self.token = None
753
 
                    if self.attempts > 1:
754
 
                        raise
755
 
                elif 500 <= err.http_status <= 599:
756
 
                    pass
757
 
                else:
758
 
                    raise
759
 
            sleep(backoff)
760
 
            backoff *= 2
761
 
 
762
 
    def head_account(self):
763
 
        """Wrapper for :func:`head_account`"""
764
 
        return self._retry(head_account)
765
 
 
766
 
    def get_account(self, marker=None, limit=None, prefix=None,
767
 
                    full_listing=False):
768
 
        """Wrapper for :func:`get_account`"""
769
 
        # TODO(unknown): With full_listing=True this will restart the entire
770
 
        # listing with each retry. Need to make a better version that just
771
 
        # retries where it left off.
772
 
        return self._retry(get_account, marker=marker, limit=limit,
773
 
                           prefix=prefix, full_listing=full_listing)
774
 
 
775
 
    def post_account(self, headers):
776
 
        """Wrapper for :func:`post_account`"""
777
 
        return self._retry(post_account, headers)
778
 
 
779
 
    def head_container(self, container):
780
 
        """Wrapper for :func:`head_container`"""
781
 
        return self._retry(head_container, container)
782
 
 
783
 
    def get_container(self, container, marker=None, limit=None, prefix=None,
784
 
                      delimiter=None, full_listing=False):
785
 
        """Wrapper for :func:`get_container`"""
786
 
        # TODO(unknown): With full_listing=True this will restart the entire
787
 
        # listing with each retry. Need to make a better version that just
788
 
        # retries where it left off.
789
 
        return self._retry(get_container, container, marker=marker,
790
 
                           limit=limit, prefix=prefix, delimiter=delimiter,
791
 
                           full_listing=full_listing)
792
 
 
793
 
    def put_container(self, container, headers=None):
794
 
        """Wrapper for :func:`put_container`"""
795
 
        return self._retry(put_container, container, headers=headers)
796
 
 
797
 
    def post_container(self, container, headers):
798
 
        """Wrapper for :func:`post_container`"""
799
 
        return self._retry(post_container, container, headers)
800
 
 
801
 
    def delete_container(self, container):
802
 
        """Wrapper for :func:`delete_container`"""
803
 
        return self._retry(delete_container, container)
804
 
 
805
 
    def head_object(self, container, obj):
806
 
        """Wrapper for :func:`head_object`"""
807
 
        return self._retry(head_object, container, obj)
808
 
 
809
 
    def get_object(self, container, obj, resp_chunk_size=None):
810
 
        """Wrapper for :func:`get_object`"""
811
 
        return self._retry(get_object, container, obj,
812
 
                           resp_chunk_size=resp_chunk_size)
813
 
 
814
 
    def put_object(self, container, obj, contents, content_length=None,
815
 
                   etag=None, chunk_size=65536, content_type=None,
816
 
                   headers=None):
817
 
        """Wrapper for :func:`put_object`"""
818
 
        return self._retry(put_object, container, obj, contents,
819
 
            content_length=content_length, etag=etag, chunk_size=chunk_size,
820
 
            content_type=content_type, headers=headers)
821
 
 
822
 
    def post_object(self, container, obj, headers):
823
 
        """Wrapper for :func:`post_object`"""
824
 
        return self._retry(post_object, container, obj, headers)
825
 
 
826
 
    def delete_object(self, container, obj):
827
 
        """Wrapper for :func:`delete_object`"""
828
 
        return self._retry(delete_object, container, obj)
829
 
 
830
 
# End inclusion of swift.common.client
831
 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
832
 
 
833
 
 
834
 
def mkdirs(path):
835
 
    try:
836
 
        makedirs(path)
837
 
    except OSError, err:
838
 
        if err.errno != EEXIST:
839
 
            raise
840
 
 
841
 
 
842
 
class QueueFunctionThread(Thread):
843
 
 
844
 
    def __init__(self, queue, func, *args, **kwargs):
845
 
        """ Calls func for each item in queue; func is called with a queued
846
 
            item as the first arg followed by *args and **kwargs. Use the abort
847
 
            attribute to have the thread empty the queue (without processing)
848
 
            and exit. """
849
 
        Thread.__init__(self)
850
 
        self.abort = False
851
 
        self.queue = queue
852
 
        self.func = func
853
 
        self.args = args
854
 
        self.kwargs = kwargs
855
 
 
856
 
    def run(self):
857
 
        while True:
858
 
            try:
859
 
                item = self.queue.get_nowait()
860
 
                if not self.abort:
861
 
                    self.func(item, *self.args, **self.kwargs)
862
 
                self.queue.task_done()
863
 
            except Empty:
864
 
                if self.abort:
865
 
                    break
866
 
                sleep(0.01)
867
 
 
868
 
 
869
 
st_delete_help = '''
870
 
delete --all OR delete container [--leave-segments] [object] [object] ...
871
 
    Deletes everything in the account (with --all), or everything in a
872
 
    container, or a list of objects depending on the args given. Segments of
873
 
    manifest objects will be deleted as well, unless you specify the
874
 
    --leave-segments option.'''.strip('\n')
875
 
 
876
 
 
877
 
def st_delete(parser, args, print_queue, error_queue):
878
 
    parser.add_option('-a', '--all', action='store_true', dest='yes_all',
879
 
        default=False, help='Indicates that you really want to delete '
880
 
        'everything in the account')
881
 
    parser.add_option('', '--leave-segments', action='store_true',
882
 
        dest='leave_segments', default=False, help='Indicates that you want '
883
 
        'the segments of manifest objects left alone')
884
 
    (options, args) = parse_args(parser, args)
885
 
    args = args[1:]
886
 
    if (not args and not options.yes_all) or (args and options.yes_all):
887
 
        error_queue.put('Usage: %s [options] %s' %
888
 
                        (basename(argv[0]), st_delete_help))
889
 
        return
890
 
 
891
 
    def _delete_segment((container, obj), conn):
892
 
        conn.delete_object(container, obj)
893
 
        if options.verbose:
894
 
            print_queue.put('%s/%s' % (container, obj))
895
 
 
896
 
    object_queue = Queue(10000)
897
 
 
898
 
    def _delete_object((container, obj), conn):
899
 
        try:
900
 
            old_manifest = None
901
 
            if not options.leave_segments:
902
 
                try:
903
 
                    old_manifest = conn.head_object(container, obj).get(
904
 
                        'x-object-manifest')
905
 
                except ClientException, err:
906
 
                    if err.http_status != 404:
907
 
                        raise
908
 
            conn.delete_object(container, obj)
909
 
            if old_manifest:
910
 
                segment_queue = Queue(10000)
911
 
                scontainer, sprefix = old_manifest.split('/', 1)
912
 
                for delobj in conn.get_container(scontainer,
913
 
                                                 prefix=sprefix)[1]:
914
 
                    segment_queue.put((scontainer, delobj['name']))
915
 
                if not segment_queue.empty():
916
 
                    segment_threads = [QueueFunctionThread(segment_queue,
917
 
                        _delete_segment, create_connection()) for _junk in
918
 
                        xrange(10)]
919
 
                    for thread in segment_threads:
920
 
                        thread.start()
921
 
                    while not segment_queue.empty():
922
 
                        sleep(0.01)
923
 
                    for thread in segment_threads:
924
 
                        thread.abort = True
925
 
                        while thread.isAlive():
926
 
                            thread.join(0.01)
927
 
            if options.verbose:
928
 
                path = options.yes_all and join(container, obj) or obj
929
 
                if path[:1] in ('/', '\\'):
930
 
                    path = path[1:]
931
 
                print_queue.put(path)
932
 
        except ClientException, err:
933
 
            if err.http_status != 404:
934
 
                raise
935
 
            error_queue.put('Object %s not found' %
936
 
                            repr('%s/%s' % (container, obj)))
937
 
 
938
 
    container_queue = Queue(10000)
939
 
 
940
 
    def _delete_container(container, conn):
941
 
        try:
942
 
            marker = ''
943
 
            while True:
944
 
                objects = [o['name'] for o in
945
 
                           conn.get_container(container, marker=marker)[1]]
946
 
                if not objects:
947
 
                    break
948
 
                for obj in objects:
949
 
                    object_queue.put((container, obj))
950
 
                marker = objects[-1]
951
 
            while not object_queue.empty():
952
 
                sleep(0.01)
953
 
            attempts = 1
954
 
            while True:
955
 
                try:
956
 
                    conn.delete_container(container)
957
 
                    break
958
 
                except ClientException, err:
959
 
                    if err.http_status != 409:
960
 
                        raise
961
 
                    if attempts > 10:
962
 
                        raise
963
 
                    attempts += 1
964
 
                    sleep(1)
965
 
        except ClientException, err:
966
 
            if err.http_status != 404:
967
 
                raise
968
 
            error_queue.put('Container %s not found' % repr(container))
969
 
 
970
 
    url, token = get_auth(options.auth, options.user, options.key,
971
 
        snet=options.snet)
972
 
    create_connection = lambda: Connection(options.auth, options.user,
973
 
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
974
 
    object_threads = [QueueFunctionThread(object_queue, _delete_object,
975
 
        create_connection()) for _junk in xrange(10)]
976
 
    for thread in object_threads:
977
 
        thread.start()
978
 
    container_threads = [QueueFunctionThread(container_queue,
979
 
        _delete_container, create_connection()) for _junk in xrange(10)]
980
 
    for thread in container_threads:
981
 
        thread.start()
982
 
    if not args:
983
 
        conn = create_connection()
984
 
        try:
985
 
            marker = ''
986
 
            while True:
987
 
                containers = \
988
 
                    [c['name'] for c in conn.get_account(marker=marker)[1]]
989
 
                if not containers:
990
 
                    break
991
 
                for container in containers:
992
 
                    container_queue.put(container)
993
 
                marker = containers[-1]
994
 
            while not container_queue.empty():
995
 
                sleep(0.01)
996
 
            while not object_queue.empty():
997
 
                sleep(0.01)
998
 
        except ClientException, err:
999
 
            if err.http_status != 404:
1000
 
                raise
1001
 
            error_queue.put('Account not found')
1002
 
    elif len(args) == 1:
1003
 
        if '/' in args[0]:
1004
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1005
 
                             'meant %r instead of %r.' % \
1006
 
                             (args[0].replace('/', ' ', 1), args[0])
1007
 
        conn = create_connection()
1008
 
        _delete_container(args[0], conn)
1009
 
    else:
1010
 
        for obj in args[1:]:
1011
 
            object_queue.put((args[0], obj))
1012
 
    while not container_queue.empty():
1013
 
        sleep(0.01)
1014
 
    for thread in container_threads:
1015
 
        thread.abort = True
1016
 
        while thread.isAlive():
1017
 
            thread.join(0.01)
1018
 
    while not object_queue.empty():
1019
 
        sleep(0.01)
1020
 
    for thread in object_threads:
1021
 
        thread.abort = True
1022
 
        while thread.isAlive():
1023
 
            thread.join(0.01)
1024
 
 
1025
 
 
1026
 
st_download_help = '''
1027
 
download --all OR download container [options] [object] [object] ...
1028
 
    Downloads everything in the account (with --all), or everything in a
1029
 
    container, or a list of objects depending on the args given. For a single
1030
 
    object download, you may use the -o [--output] <filename> option to
1031
 
    redirect the output to a specific file or if "-" then just redirect to
1032
 
    stdout.'''.strip('\n')
1033
 
 
1034
 
 
1035
 
def st_download(options, args, print_queue, error_queue):
1036
 
    parser.add_option('-a', '--all', action='store_true', dest='yes_all',
1037
 
        default=False, help='Indicates that you really want to download '
1038
 
        'everything in the account')
1039
 
    parser.add_option('-o', '--output', dest='out_file', help='For a single '
1040
 
        'file download, stream the output to an alternate location ')
1041
 
    (options, args) = parse_args(parser, args)
1042
 
    args = args[1:]
1043
 
    if options.out_file == '-':
1044
 
        options.verbose = 0
1045
 
    if options.out_file and len(args) != 2:
1046
 
        exit('-o option only allowed for single file downloads')
1047
 
    if (not args and not options.yes_all) or (args and options.yes_all):
1048
 
        error_queue.put('Usage: %s [options] %s' %
1049
 
                        (basename(argv[0]), st_download_help))
1050
 
        return
1051
 
 
1052
 
    object_queue = Queue(10000)
1053
 
 
1054
 
    def _download_object(queue_arg, conn):
1055
 
        if len(queue_arg) == 2:
1056
 
            container, obj = queue_arg
1057
 
            out_file = None
1058
 
        elif len(queue_arg) == 3:
1059
 
            container, obj, out_file = queue_arg
1060
 
        else:
1061
 
            raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
1062
 
        try:
1063
 
            headers, body = \
1064
 
                conn.get_object(container, obj, resp_chunk_size=65536)
1065
 
            content_type = headers.get('content-type')
1066
 
            if 'content-length' in headers:
1067
 
                content_length = int(headers.get('content-length'))
1068
 
            else:
1069
 
                content_length = None
1070
 
            etag = headers.get('etag')
1071
 
            path = options.yes_all and join(container, obj) or obj
1072
 
            if path[:1] in ('/', '\\'):
1073
 
                path = path[1:]
1074
 
            md5sum = None
1075
 
            make_dir = out_file != "-"
1076
 
            if content_type.split(';', 1)[0] == 'text/directory':
1077
 
                if make_dir and not isdir(path):
1078
 
                    mkdirs(path)
1079
 
                read_length = 0
1080
 
                if 'x-object-manifest' not in headers:
1081
 
                    md5sum = md5()
1082
 
                for chunk in body:
1083
 
                    read_length += len(chunk)
1084
 
                    if md5sum:
1085
 
                        md5sum.update(chunk)
1086
 
            else:
1087
 
                dirpath = dirname(path)
1088
 
                if make_dir and dirpath and not isdir(dirpath):
1089
 
                    mkdirs(dirpath)
1090
 
                if out_file == "-":
1091
 
                    fp = stdout
1092
 
                elif out_file:
1093
 
                    fp = open(out_file, 'wb')
1094
 
                else:
1095
 
                    fp = open(path, 'wb')
1096
 
                read_length = 0
1097
 
                if 'x-object-manifest' not in headers:
1098
 
                    md5sum = md5()
1099
 
                for chunk in body:
1100
 
                    fp.write(chunk)
1101
 
                    read_length += len(chunk)
1102
 
                    if md5sum:
1103
 
                        md5sum.update(chunk)
1104
 
                fp.close()
1105
 
            if md5sum and md5sum.hexdigest() != etag:
1106
 
                error_queue.put('%s: md5sum != etag, %s != %s' %
1107
 
                                (path, md5sum.hexdigest(), etag))
1108
 
            if content_length is not None and read_length != content_length:
1109
 
                error_queue.put('%s: read_length != content_length, %d != %d' %
1110
 
                                (path, read_length, content_length))
1111
 
            if 'x-object-meta-mtime' in headers and not options.out_file:
1112
 
                mtime = float(headers['x-object-meta-mtime'])
1113
 
                utime(path, (mtime, mtime))
1114
 
            if options.verbose:
1115
 
                print_queue.put(path)
1116
 
        except ClientException, err:
1117
 
            if err.http_status != 404:
1118
 
                raise
1119
 
            error_queue.put('Object %s not found' %
1120
 
                            repr('%s/%s' % (container, obj)))
1121
 
 
1122
 
    container_queue = Queue(10000)
1123
 
 
1124
 
    def _download_container(container, conn):
1125
 
        try:
1126
 
            marker = ''
1127
 
            while True:
1128
 
                objects = [o['name'] for o in
1129
 
                           conn.get_container(container, marker=marker)[1]]
1130
 
                if not objects:
1131
 
                    break
1132
 
                for obj in objects:
1133
 
                    object_queue.put((container, obj))
1134
 
                marker = objects[-1]
1135
 
        except ClientException, err:
1136
 
            if err.http_status != 404:
1137
 
                raise
1138
 
            error_queue.put('Container %s not found' % repr(container))
1139
 
 
1140
 
    url, token = get_auth(options.auth, options.user, options.key,
1141
 
        snet=options.snet)
1142
 
    create_connection = lambda: Connection(options.auth, options.user,
1143
 
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1144
 
    object_threads = [QueueFunctionThread(object_queue, _download_object,
1145
 
        create_connection()) for _junk in xrange(10)]
1146
 
    for thread in object_threads:
1147
 
        thread.start()
1148
 
    container_threads = [QueueFunctionThread(container_queue,
1149
 
        _download_container, create_connection()) for _junk in xrange(10)]
1150
 
    for thread in container_threads:
1151
 
        thread.start()
1152
 
    if not args:
1153
 
        conn = create_connection()
1154
 
        try:
1155
 
            marker = ''
1156
 
            while True:
1157
 
                containers = [c['name']
1158
 
                              for c in conn.get_account(marker=marker)[1]]
1159
 
                if not containers:
1160
 
                    break
1161
 
                for container in containers:
1162
 
                    container_queue.put(container)
1163
 
                marker = containers[-1]
1164
 
        except ClientException, err:
1165
 
            if err.http_status != 404:
1166
 
                raise
1167
 
            error_queue.put('Account not found')
1168
 
    elif len(args) == 1:
1169
 
        if '/' in args[0]:
1170
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1171
 
                             'meant %r instead of %r.' % \
1172
 
                             (args[0].replace('/', ' ', 1), args[0])
1173
 
        _download_container(args[0], create_connection())
1174
 
    else:
1175
 
        if len(args) == 2:
1176
 
            obj = args[1]
1177
 
            object_queue.put((args[0], obj, options.out_file))
1178
 
        else:
1179
 
            for obj in args[1:]:
1180
 
                object_queue.put((args[0], obj))
1181
 
    while not container_queue.empty():
1182
 
        sleep(0.01)
1183
 
    for thread in container_threads:
1184
 
        thread.abort = True
1185
 
        while thread.isAlive():
1186
 
            thread.join(0.01)
1187
 
    while not object_queue.empty():
1188
 
        sleep(0.01)
1189
 
    for thread in object_threads:
1190
 
        thread.abort = True
1191
 
        while thread.isAlive():
1192
 
            thread.join(0.01)
1193
 
 
1194
 
 
1195
 
st_list_help = '''
1196
 
list [options] [container]
1197
 
    Lists the containers for the account or the objects for a container. -p or
1198
 
    --prefix is an option that will only list items beginning with that prefix.
1199
 
    -d or --delimiter is option (for container listings only) that will roll up
1200
 
    items with the given delimiter (see Cloud Files general documentation for
1201
 
    what this means).
1202
 
'''.strip('\n')
1203
 
 
1204
 
 
1205
 
def st_list(options, args, print_queue, error_queue):
1206
 
    parser.add_option('-p', '--prefix', dest='prefix', help='Will only list '
1207
 
        'items beginning with the prefix')
1208
 
    parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll '
1209
 
        'up items with the given delimiter (see Cloud Files general '
1210
 
        'documentation for what this means)')
1211
 
    (options, args) = parse_args(parser, args)
1212
 
    args = args[1:]
1213
 
    if options.delimiter and not args:
1214
 
        exit('-d option only allowed for container listings')
1215
 
    if len(args) > 1:
1216
 
        error_queue.put('Usage: %s [options] %s' %
1217
 
                        (basename(argv[0]), st_list_help))
1218
 
        return
1219
 
    conn = Connection(options.auth, options.user, options.key,
1220
 
        snet=options.snet)
1221
 
    try:
1222
 
        marker = ''
1223
 
        while True:
1224
 
            if not args:
1225
 
                items = \
1226
 
                    conn.get_account(marker=marker, prefix=options.prefix)[1]
1227
 
            else:
1228
 
                items = conn.get_container(args[0], marker=marker,
1229
 
                    prefix=options.prefix, delimiter=options.delimiter)[1]
1230
 
            if not items:
1231
 
                break
1232
 
            for item in items:
1233
 
                print_queue.put(item.get('name', item.get('subdir')))
1234
 
            marker = items[-1].get('name', items[-1].get('subdir'))
1235
 
    except ClientException, err:
1236
 
        if err.http_status != 404:
1237
 
            raise
1238
 
        if not args:
1239
 
            error_queue.put('Account not found')
1240
 
        else:
1241
 
            error_queue.put('Container %s not found' % repr(args[0]))
1242
 
 
1243
 
 
1244
 
st_stat_help = '''
1245
 
stat [container] [object]
1246
 
    Displays information for the account, container, or object depending on the
1247
 
    args given (if any).'''.strip('\n')
1248
 
 
1249
 
 
1250
 
def st_stat(options, args, print_queue, error_queue):
1251
 
    (options, args) = parse_args(parser, args)
1252
 
    args = args[1:]
1253
 
    conn = Connection(options.auth, options.user, options.key)
1254
 
    if not args:
1255
 
        try:
1256
 
            headers = conn.head_account()
1257
 
            if options.verbose > 1:
1258
 
                print_queue.put('''
1259
 
StorageURL: %s
1260
 
Auth Token: %s
1261
 
'''.strip('\n') % (conn.url, conn.token))
1262
 
            container_count = int(headers.get('x-account-container-count', 0))
1263
 
            object_count = int(headers.get('x-account-object-count', 0))
1264
 
            bytes_used = int(headers.get('x-account-bytes-used', 0))
1265
 
            print_queue.put('''
1266
 
   Account: %s
1267
 
Containers: %d
1268
 
   Objects: %d
1269
 
     Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
1270
 
                                 object_count, bytes_used))
1271
 
            for key, value in headers.items():
1272
 
                if key.startswith('x-account-meta-'):
1273
 
                    print_queue.put('%10s: %s' % ('Meta %s' %
1274
 
                        key[len('x-account-meta-'):].title(), value))
1275
 
            for key, value in headers.items():
1276
 
                if not key.startswith('x-account-meta-') and key not in (
1277
 
                        'content-length', 'date', 'x-account-container-count',
1278
 
                        'x-account-object-count', 'x-account-bytes-used'):
1279
 
                    print_queue.put(
1280
 
                        '%10s: %s' % (key.title(), value))
1281
 
        except ClientException, err:
1282
 
            if err.http_status != 404:
1283
 
                raise
1284
 
            error_queue.put('Account not found')
1285
 
    elif len(args) == 1:
1286
 
        if '/' in args[0]:
1287
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1288
 
                             'meant %r instead of %r.' % \
1289
 
                             (args[0].replace('/', ' ', 1), args[0])
1290
 
        try:
1291
 
            headers = conn.head_container(args[0])
1292
 
            object_count = int(headers.get('x-container-object-count', 0))
1293
 
            bytes_used = int(headers.get('x-container-bytes-used', 0))
1294
 
            print_queue.put('''
1295
 
  Account: %s
1296
 
Container: %s
1297
 
  Objects: %d
1298
 
    Bytes: %d
1299
 
 Read ACL: %s
1300
 
Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1301
 
                                object_count, bytes_used,
1302
 
                                headers.get('x-container-read', ''),
1303
 
                                headers.get('x-container-write', '')))
1304
 
            for key, value in headers.items():
1305
 
                if key.startswith('x-container-meta-'):
1306
 
                    print_queue.put('%9s: %s' % ('Meta %s' %
1307
 
                        key[len('x-container-meta-'):].title(), value))
1308
 
            for key, value in headers.items():
1309
 
                if not key.startswith('x-container-meta-') and key not in (
1310
 
                        'content-length', 'date', 'x-container-object-count',
1311
 
                        'x-container-bytes-used', 'x-container-read',
1312
 
                        'x-container-write'):
1313
 
                    print_queue.put(
1314
 
                        '%9s: %s' % (key.title(), value))
1315
 
        except ClientException, err:
1316
 
            if err.http_status != 404:
1317
 
                raise
1318
 
            error_queue.put('Container %s not found' % repr(args[0]))
1319
 
    elif len(args) == 2:
1320
 
        try:
1321
 
            headers = conn.head_object(args[0], args[1])
1322
 
            print_queue.put('''
1323
 
       Account: %s
1324
 
     Container: %s
1325
 
        Object: %s
1326
 
  Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1327
 
                                     args[1], headers.get('content-type')))
1328
 
            if 'content-length' in headers:
1329
 
                print_queue.put('Content Length: %s' %
1330
 
                                headers['content-length'])
1331
 
            if 'last-modified' in headers:
1332
 
                print_queue.put(' Last Modified: %s' %
1333
 
                                headers['last-modified'])
1334
 
            if 'etag' in headers:
1335
 
                print_queue.put('          ETag: %s' % headers['etag'])
1336
 
            if 'x-object-manifest' in headers:
1337
 
                print_queue.put('      Manifest: %s' %
1338
 
                                headers['x-object-manifest'])
1339
 
            for key, value in headers.items():
1340
 
                if key.startswith('x-object-meta-'):
1341
 
                    print_queue.put('%14s: %s' % ('Meta %s' %
1342
 
                        key[len('x-object-meta-'):].title(), value))
1343
 
            for key, value in headers.items():
1344
 
                if not key.startswith('x-object-meta-') and key not in (
1345
 
                        'content-type', 'content-length', 'last-modified',
1346
 
                        'etag', 'date', 'x-object-manifest'):
1347
 
                    print_queue.put(
1348
 
                        '%14s: %s' % (key.title(), value))
1349
 
        except ClientException, err:
1350
 
            if err.http_status != 404:
1351
 
                raise
1352
 
            error_queue.put('Object %s not found' %
1353
 
                            repr('%s/%s' % (args[0], args[1])))
1354
 
    else:
1355
 
        error_queue.put('Usage: %s [options] %s' %
1356
 
                        (basename(argv[0]), st_stat_help))
1357
 
 
1358
 
 
1359
 
st_post_help = '''
1360
 
post [options] [container] [object]
1361
 
    Updates meta information for the account, container, or object depending on
1362
 
    the args given. If the container is not found, it will be created
1363
 
    automatically; but this is not true for accounts and objects. Containers
1364
 
    also allow the -r (or --read-acl) and -w (or --write-acl) options. The -m
1365
 
    or --meta option is allowed on all and used to define the user meta data
1366
 
    items to set in the form Name:Value. This option can be repeated. Example:
1367
 
    post -m Color:Blue -m Size:Large'''.strip('\n')
1368
 
 
1369
 
 
1370
 
def st_post(options, args, print_queue, error_queue):
1371
 
    parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the '
1372
 
        'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
1373
 
        '.r:-.example.com, .r:www.example.com, account1, account2:user2')
1374
 
    parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
1375
 
        'Write ACL for containers. Quick summary of ACL syntax: account1, '
1376
 
        'account2:user2')
1377
 
    parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
1378
 
        help='Sets a meta data item with the syntax name:value. This option '
1379
 
        'may be repeated. Example: -m Color:Blue -m Size:Large')
1380
 
    (options, args) = parse_args(parser, args)
1381
 
    args = args[1:]
1382
 
    if (options.read_acl or options.write_acl) and not args:
1383
 
        exit('-r and -w options only allowed for containers')
1384
 
    conn = Connection(options.auth, options.user, options.key)
1385
 
    if not args:
1386
 
        headers = {}
1387
 
        for item in options.meta:
1388
 
            split_item = item.split(':')
1389
 
            headers['X-Account-Meta-' + split_item[0]] = \
1390
 
                len(split_item) > 1 and split_item[1]
1391
 
        try:
1392
 
            conn.post_account(headers=headers)
1393
 
        except ClientException, err:
1394
 
            if err.http_status != 404:
1395
 
                raise
1396
 
            error_queue.put('Account not found')
1397
 
    elif len(args) == 1:
1398
 
        if '/' in args[0]:
1399
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1400
 
                             'meant %r instead of %r.' % \
1401
 
                             (args[0].replace('/', ' ', 1), args[0])
1402
 
        headers = {}
1403
 
        for item in options.meta:
1404
 
            split_item = item.split(':')
1405
 
            headers['X-Container-Meta-' + split_item[0]] = \
1406
 
                len(split_item) > 1 and split_item[1]
1407
 
        if options.read_acl is not None:
1408
 
            headers['X-Container-Read'] = options.read_acl
1409
 
        if options.write_acl is not None:
1410
 
            headers['X-Container-Write'] = options.write_acl
1411
 
        try:
1412
 
            conn.post_container(args[0], headers=headers)
1413
 
        except ClientException, err:
1414
 
            if err.http_status != 404:
1415
 
                raise
1416
 
            conn.put_container(args[0], headers=headers)
1417
 
    elif len(args) == 2:
1418
 
        headers = {}
1419
 
        for item in options.meta:
1420
 
            split_item = item.split(':')
1421
 
            headers['X-Object-Meta-' + split_item[0]] = \
1422
 
                len(split_item) > 1 and split_item[1]
1423
 
        try:
1424
 
            conn.post_object(args[0], args[1], headers=headers)
1425
 
        except ClientException, err:
1426
 
            if err.http_status != 404:
1427
 
                raise
1428
 
            error_queue.put('Object %s not found' %
1429
 
                            repr('%s/%s' % (args[0], args[1])))
1430
 
    else:
1431
 
        error_queue.put('Usage: %s [options] %s' %
1432
 
                        (basename(argv[0]), st_post_help))
1433
 
 
1434
 
 
1435
 
st_upload_help = '''
1436
 
upload [options] container file_or_directory [file_or_directory] [...]
1437
 
    Uploads to the given container the files and directories specified by the
1438
 
    remaining args. -c or --changed is an option that will only upload files
1439
 
    that have changed since the last upload. -S <size> or --segment-size <size>
1440
 
    and --leave-segments are options as well (see --help for more).
1441
 
'''.strip('\n')
1442
 
 
1443
 
 
1444
 
def st_upload(options, args, print_queue, error_queue):
1445
 
    parser.add_option('-c', '--changed', action='store_true', dest='changed',
1446
 
        default=False, help='Will only upload files that have changed since '
1447
 
        'the last upload')
1448
 
    parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
1449
 
        'upload files in segments no larger than <size> and then create a '
1450
 
        '"manifest" file that will download all the segments as if it were '
1451
 
        'the original file. The segments will be uploaded to a '
1452
 
        '<container>_segments container so as to not pollute the main '
1453
 
        '<container> listings.')
1454
 
    parser.add_option('', '--leave-segments', action='store_true',
1455
 
        dest='leave_segments', default=False, help='Indicates that you want '
1456
 
        'the older segments of manifest objects left alone (in the case of '
1457
 
        'overwrites)')
1458
 
    (options, args) = parse_args(parser, args)
1459
 
    args = args[1:]
1460
 
    if len(args) < 2:
1461
 
        error_queue.put('Usage: %s [options] %s' %
1462
 
                        (basename(argv[0]), st_upload_help))
1463
 
        return
1464
 
    object_queue = Queue(10000)
1465
 
 
1466
 
    def _segment_job(job, conn):
1467
 
        if job.get('delete', False):
1468
 
            conn.delete_object(job['container'], job['obj'])
1469
 
        else:
1470
 
            fp = open(job['path'], 'rb')
1471
 
            fp.seek(job['segment_start'])
1472
 
            conn.put_object(job.get('container', args[0] + '_segments'),
1473
 
                job['obj'], fp, content_length=job['segment_size'])
1474
 
        if options.verbose and 'log_line' in job:
1475
 
            print_queue.put(job['log_line'])
1476
 
 
1477
 
    def _object_job(job, conn):
1478
 
        path = job['path']
1479
 
        container = job.get('container', args[0])
1480
 
        dir_marker = job.get('dir_marker', False)
1481
 
        try:
1482
 
            obj = path
1483
 
            if obj.startswith('./') or obj.startswith('.\\'):
1484
 
                obj = obj[2:]
1485
 
            put_headers = {'x-object-meta-mtime': str(getmtime(path))}
1486
 
            if dir_marker:
1487
 
                if options.changed:
1488
 
                    try:
1489
 
                        headers = conn.head_object(container, obj)
1490
 
                        ct = headers.get('content-type')
1491
 
                        cl = int(headers.get('content-length'))
1492
 
                        et = headers.get('etag')
1493
 
                        mt = headers.get('x-object-meta-mtime')
1494
 
                        if ct.split(';', 1)[0] == 'text/directory' and \
1495
 
                                cl == 0 and \
1496
 
                                et == 'd41d8cd98f00b204e9800998ecf8427e' and \
1497
 
                                mt == put_headers['x-object-meta-mtime']:
1498
 
                            return
1499
 
                    except ClientException, err:
1500
 
                        if err.http_status != 404:
1501
 
                            raise
1502
 
                conn.put_object(container, obj, '', content_length=0,
1503
 
                                content_type='text/directory',
1504
 
                                headers=put_headers)
1505
 
            else:
1506
 
                # We need to HEAD all objects now in case we're overwriting a
1507
 
                # manifest object and need to delete the old segments
1508
 
                # ourselves.
1509
 
                old_manifest = None
1510
 
                if options.changed or not options.leave_segments:
1511
 
                    try:
1512
 
                        headers = conn.head_object(container, obj)
1513
 
                        cl = int(headers.get('content-length'))
1514
 
                        mt = headers.get('x-object-meta-mtime')
1515
 
                        if options.changed and cl == getsize(path) and \
1516
 
                                mt == put_headers['x-object-meta-mtime']:
1517
 
                            return
1518
 
                        if not options.leave_segments:
1519
 
                            old_manifest = headers.get('x-object-manifest')
1520
 
                    except ClientException, err:
1521
 
                        if err.http_status != 404:
1522
 
                            raise
1523
 
                if options.segment_size and \
1524
 
                        getsize(path) < options.segment_size:
1525
 
                    full_size = getsize(path)
1526
 
                    segment_queue = Queue(10000)
1527
 
                    segment_threads = [QueueFunctionThread(segment_queue,
1528
 
                        _segment_job, create_connection()) for _junk in
1529
 
                        xrange(10)]
1530
 
                    for thread in segment_threads:
1531
 
                        thread.start()
1532
 
                    segment = 0
1533
 
                    segment_start = 0
1534
 
                    while segment_start < full_size:
1535
 
                        segment_size = int(options.segment_size)
1536
 
                        if segment_start + segment_size > full_size:
1537
 
                            segment_size = full_size - segment_start
1538
 
                        segment_queue.put({'path': path,
1539
 
                            'obj': '%s/%s/%s/%08d' % (obj,
1540
 
                                put_headers['x-object-meta-mtime'], full_size,
1541
 
                                segment),
1542
 
                            'segment_start': segment_start,
1543
 
                            'segment_size': segment_size,
1544
 
                            'log_line': '%s segment %s' % (obj, segment)})
1545
 
                        segment += 1
1546
 
                        segment_start += segment_size
1547
 
                    while not segment_queue.empty():
1548
 
                        sleep(0.01)
1549
 
                    for thread in segment_threads:
1550
 
                        thread.abort = True
1551
 
                        while thread.isAlive():
1552
 
                            thread.join(0.01)
1553
 
                    new_object_manifest = '%s_segments/%s/%s/%s/' % (
1554
 
                        container, obj, put_headers['x-object-meta-mtime'],
1555
 
                        full_size)
1556
 
                    if old_manifest == new_object_manifest:
1557
 
                        old_manifest = None
1558
 
                    put_headers['x-object-manifest'] = new_object_manifest
1559
 
                    conn.put_object(container, obj, '', content_length=0,
1560
 
                                    headers=put_headers)
1561
 
                else:
1562
 
                    conn.put_object(container, obj, open(path, 'rb'),
1563
 
                        content_length=getsize(path), headers=put_headers)
1564
 
                if old_manifest:
1565
 
                    segment_queue = Queue(10000)
1566
 
                    scontainer, sprefix = old_manifest.split('/', 1)
1567
 
                    for delobj in conn.get_container(scontainer,
1568
 
                                                     prefix=sprefix)[1]:
1569
 
                        segment_queue.put({'delete': True,
1570
 
                            'container': scontainer, 'obj': delobj['name']})
1571
 
                    if not segment_queue.empty():
1572
 
                        segment_threads = [QueueFunctionThread(segment_queue,
1573
 
                            _segment_job, create_connection()) for _junk in
1574
 
                            xrange(10)]
1575
 
                        for thread in segment_threads:
1576
 
                            thread.start()
1577
 
                        while not segment_queue.empty():
1578
 
                            sleep(0.01)
1579
 
                        for thread in segment_threads:
1580
 
                            thread.abort = True
1581
 
                            while thread.isAlive():
1582
 
                                thread.join(0.01)
1583
 
            if options.verbose:
1584
 
                print_queue.put(obj)
1585
 
        except OSError, err:
1586
 
            if err.errno != ENOENT:
1587
 
                raise
1588
 
            error_queue.put('Local file %s not found' % repr(path))
1589
 
 
1590
 
    def _upload_dir(path):
1591
 
        names = listdir(path)
1592
 
        if not names:
1593
 
            object_queue.put({'path': path, 'dir_marker': True})
1594
 
        else:
1595
 
            for name in listdir(path):
1596
 
                subpath = join(path, name)
1597
 
                if isdir(subpath):
1598
 
                    _upload_dir(subpath)
1599
 
                else:
1600
 
                    object_queue.put({'path': subpath})
1601
 
 
1602
 
    url, token = get_auth(options.auth, options.user, options.key,
1603
 
        snet=options.snet)
1604
 
    create_connection = lambda: Connection(options.auth, options.user,
1605
 
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1606
 
    object_threads = [QueueFunctionThread(object_queue, _object_job,
1607
 
        create_connection()) for _junk in xrange(10)]
1608
 
    for thread in object_threads:
1609
 
        thread.start()
1610
 
    conn = create_connection()
1611
 
    # Try to create the container, just in case it doesn't exist. If this
1612
 
    # fails, it might just be because the user doesn't have container PUT
1613
 
    # permissions, so we'll ignore any error. If there's really a problem,
1614
 
    # it'll surface on the first object PUT.
1615
 
    try:
1616
 
        conn.put_container(args[0])
1617
 
        if options.segment_size is not None:
1618
 
            conn.put_container(args[0] + '_segments')
1619
 
    except Exception:
1620
 
        pass
1621
 
    try:
1622
 
        for arg in args[1:]:
1623
 
            if isdir(arg):
1624
 
                _upload_dir(arg)
1625
 
            else:
1626
 
                object_queue.put({'path': arg})
1627
 
        while not object_queue.empty():
1628
 
            sleep(0.01)
1629
 
        for thread in object_threads:
1630
 
            thread.abort = True
1631
 
            while thread.isAlive():
1632
 
                thread.join(0.01)
1633
 
    except ClientException, err:
1634
 
        if err.http_status != 404:
1635
 
            raise
1636
 
        error_queue.put('Account not found')
1637
 
 
1638
 
 
1639
 
def parse_args(parser, args, enforce_requires=True):
1640
 
    if not args:
1641
 
        args = ['-h']
1642
 
    (options, args) = parser.parse_args(args)
1643
 
    if enforce_requires and \
1644
 
            not (options.auth and options.user and options.key):
1645
 
        exit('''
1646
 
Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or
1647
 
overridden with -A, -U, or -K.'''.strip('\n'))
1648
 
    return options, args
1649
 
 
1650
 
 
1651
 
if __name__ == '__main__':
1652
 
    parser = OptionParser(version='%prog 1.0', usage='''
1653
 
Usage: %%prog <command> [options] [args]
1654
 
 
1655
 
Commands:
1656
 
  %(st_stat_help)s
1657
 
  %(st_list_help)s
1658
 
  %(st_upload_help)s
1659
 
  %(st_post_help)s
1660
 
  %(st_download_help)s
1661
 
  %(st_delete_help)s
1662
 
 
1663
 
Example:
1664
 
  %%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K key stat
1665
 
'''.strip('\n') % globals())
1666
 
    parser.add_option('-s', '--snet', action='store_true', dest='snet',
1667
 
                      default=False, help='Use SERVICENET internal network')
1668
 
    parser.add_option('-v', '--verbose', action='count', dest='verbose',
1669
 
                      default=1, help='Print more info')
1670
 
    parser.add_option('-q', '--quiet', action='store_const', dest='verbose',
1671
 
                      const=0, default=1, help='Suppress status output')
1672
 
    parser.add_option('-A', '--auth', dest='auth',
1673
 
                      default=environ.get('ST_AUTH'),
1674
 
                      help='URL for obtaining an auth token')
1675
 
    parser.add_option('-U', '--user', dest='user',
1676
 
                      default=environ.get('ST_USER'),
1677
 
                      help='User name for obtaining an auth token')
1678
 
    parser.add_option('-K', '--key', dest='key',
1679
 
                      default=environ.get('ST_KEY'),
1680
 
                      help='Key for obtaining an auth token')
1681
 
    parser.disable_interspersed_args()
1682
 
    (options, args) = parse_args(parser, argv[1:], enforce_requires=False)
1683
 
    parser.enable_interspersed_args()
1684
 
 
1685
 
    commands = ('delete', 'download', 'list', 'post', 'stat', 'upload')
1686
 
    if not args or args[0] not in commands:
1687
 
        parser.print_usage()
1688
 
        if args:
1689
 
            exit('no such command: %s' % args[0])
1690
 
        exit()
1691
 
 
1692
 
    print_queue = Queue(10000)
1693
 
 
1694
 
    def _print(item):
1695
 
        if isinstance(item, unicode):
1696
 
            item = item.encode('utf8')
1697
 
        print item
1698
 
 
1699
 
    print_thread = QueueFunctionThread(print_queue, _print)
1700
 
    print_thread.start()
1701
 
 
1702
 
    error_queue = Queue(10000)
1703
 
 
1704
 
    def _error(item):
1705
 
        if isinstance(item, unicode):
1706
 
            item = item.encode('utf8')
1707
 
        print >> stderr, item
1708
 
 
1709
 
    error_thread = QueueFunctionThread(error_queue, _error)
1710
 
    error_thread.start()
1711
 
 
1712
 
    try:
1713
 
        parser.usage = globals()['st_%s_help' % args[0]]
1714
 
        globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
1715
 
                                     error_queue)
1716
 
        while not print_queue.empty():
1717
 
            sleep(0.01)
1718
 
        print_thread.abort = True
1719
 
        while print_thread.isAlive():
1720
 
            print_thread.join(0.01)
1721
 
        while not error_queue.empty():
1722
 
            sleep(0.01)
1723
 
        error_thread.abort = True
1724
 
        while error_thread.isAlive():
1725
 
            error_thread.join(0.01)
1726
 
    except (SystemExit, Exception):
1727
 
        for thread in threading_enumerate():
1728
 
            thread.abort = True
1729
 
        raise