~notmyname/swift/saio_reference

« back to all changes in this revision

Viewing changes to bin/st

  • Committer: John Dickinson
  • Date: 2011-06-14 16:04:06 UTC
  • mto: This revision was merged to the branch mainline in revision 311.
  • Revision ID: john.dickinson@rackspace.com-20110614160406-ce8i4ahvpseccvj4
renamed st to swift

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python -u
2
 
# Copyright (c) 2010-2011 OpenStack, LLC.
3
 
#
4
 
# Licensed under the Apache License, Version 2.0 (the "License");
5
 
# you may not use this file except in compliance with the License.
6
 
# You may obtain a copy of the License at
7
 
#
8
 
#    http://www.apache.org/licenses/LICENSE-2.0
9
 
#
10
 
# Unless required by applicable law or agreed to in writing, software
11
 
# distributed under the License is distributed on an "AS IS" BASIS,
12
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
 
# implied.
14
 
# See the License for the specific language governing permissions and
15
 
# limitations under the License.
16
 
 
17
 
from errno import EEXIST, ENOENT
18
 
from hashlib import md5
19
 
from optparse import OptionParser
20
 
from os import environ, listdir, makedirs, utime
21
 
from os.path import basename, dirname, getmtime, getsize, isdir, join
22
 
from Queue import Empty, Queue
23
 
from sys import argv, exc_info, exit, stderr, stdout
24
 
from threading import enumerate as threading_enumerate, Thread
25
 
from time import sleep
26
 
from traceback import format_exception
27
 
 
28
 
 
29
 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
30
 
# Inclusion of swift.common.client for convenience of single file distribution
31
 
 
32
 
import socket
33
 
from cStringIO import StringIO
34
 
from re import compile, DOTALL
35
 
from tokenize import generate_tokens, STRING, NAME, OP
36
 
from urllib import quote as _quote, unquote
37
 
from urlparse import urlparse, urlunparse
38
 
 
39
 
try:
40
 
    from eventlet.green.httplib import HTTPException, HTTPSConnection
41
 
except ImportError:
42
 
    from httplib import HTTPException, HTTPSConnection
43
 
 
44
 
try:
45
 
    from eventlet import sleep
46
 
except ImportError:
47
 
    from time import sleep
48
 
 
49
 
try:
50
 
    from swift.common.bufferedhttp \
51
 
        import BufferedHTTPConnection as HTTPConnection
52
 
except ImportError:
53
 
    try:
54
 
        from eventlet.green.httplib import HTTPConnection
55
 
    except ImportError:
56
 
        from httplib import HTTPConnection
57
 
 
58
 
 
59
 
def quote(value, safe='/'):
60
 
    """
61
 
    Patched version of urllib.quote that encodes utf8 strings before quoting
62
 
    """
63
 
    if isinstance(value, unicode):
64
 
        value = value.encode('utf8')
65
 
    return _quote(value, safe)
66
 
 
67
 
 
68
 
# look for a real json parser first
69
 
try:
70
 
    # simplejson is popular and pretty good
71
 
    from simplejson import loads as json_loads
72
 
except ImportError:
73
 
    try:
74
 
        # 2.6 will have a json module in the stdlib
75
 
        from json import loads as json_loads
76
 
    except ImportError:
77
 
        # fall back on local parser otherwise
78
 
        comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
79
 
 
80
 
        def json_loads(string):
81
 
            '''
82
 
            Fairly competent json parser exploiting the python tokenizer and
83
 
            eval(). -- From python-cloudfiles
84
 
 
85
 
            _loads(serialized_json) -> object
86
 
            '''
87
 
            try:
88
 
                res = []
89
 
                consts = {'true': True, 'false': False, 'null': None}
90
 
                string = '(' + comments.sub('', string) + ')'
91
 
                for type, val, _junk, _junk, _junk in \
92
 
                        generate_tokens(StringIO(string).readline):
93
 
                    if (type == OP and val not in '[]{}:,()-') or \
94
 
                            (type == NAME and val not in consts):
95
 
                        raise AttributeError()
96
 
                    elif type == STRING:
97
 
                        res.append('u')
98
 
                        res.append(val.replace('\\/', '/'))
99
 
                    else:
100
 
                        res.append(val)
101
 
                return eval(''.join(res), {}, consts)
102
 
            except Exception:
103
 
                raise AttributeError()
104
 
 
105
 
 
106
 
class ClientException(Exception):
107
 
 
108
 
    def __init__(self, msg, http_scheme='', http_host='', http_port='',
109
 
                 http_path='', http_query='', http_status=0, http_reason='',
110
 
                 http_device=''):
111
 
        Exception.__init__(self, msg)
112
 
        self.msg = msg
113
 
        self.http_scheme = http_scheme
114
 
        self.http_host = http_host
115
 
        self.http_port = http_port
116
 
        self.http_path = http_path
117
 
        self.http_query = http_query
118
 
        self.http_status = http_status
119
 
        self.http_reason = http_reason
120
 
        self.http_device = http_device
121
 
 
122
 
    def __str__(self):
123
 
        a = self.msg
124
 
        b = ''
125
 
        if self.http_scheme:
126
 
            b += '%s://' % self.http_scheme
127
 
        if self.http_host:
128
 
            b += self.http_host
129
 
        if self.http_port:
130
 
            b += ':%s' % self.http_port
131
 
        if self.http_path:
132
 
            b += self.http_path
133
 
        if self.http_query:
134
 
            b += '?%s' % self.http_query
135
 
        if self.http_status:
136
 
            if b:
137
 
                b = '%s %s' % (b, self.http_status)
138
 
            else:
139
 
                b = str(self.http_status)
140
 
        if self.http_reason:
141
 
            if b:
142
 
                b = '%s %s' % (b, self.http_reason)
143
 
            else:
144
 
                b = '- %s' % self.http_reason
145
 
        if self.http_device:
146
 
            if b:
147
 
                b = '%s: device %s' % (b, self.http_device)
148
 
            else:
149
 
                b = 'device %s' % self.http_device
150
 
        return b and '%s: %s' % (a, b) or a
151
 
 
152
 
 
153
 
def http_connection(url):
154
 
    """
155
 
    Make an HTTPConnection or HTTPSConnection
156
 
 
157
 
    :param url: url to connect to
158
 
    :returns: tuple of (parsed url, connection object)
159
 
    :raises ClientException: Unable to handle protocol scheme
160
 
    """
161
 
    parsed = urlparse(url)
162
 
    if parsed.scheme == 'http':
163
 
        conn = HTTPConnection(parsed.netloc)
164
 
    elif parsed.scheme == 'https':
165
 
        conn = HTTPSConnection(parsed.netloc)
166
 
    else:
167
 
        raise ClientException('Cannot handle protocol scheme %s for url %s' %
168
 
                              (parsed.scheme, repr(url)))
169
 
    return parsed, conn
170
 
 
171
 
 
172
 
def get_auth(url, user, key, snet=False):
173
 
    """
174
 
    Get authentication/authorization credentials.
175
 
 
176
 
    The snet parameter is used for Rackspace's ServiceNet internal network
177
 
    implementation. In this function, it simply adds *snet-* to the beginning
178
 
    of the host name for the returned storage URL. With Rackspace Cloud Files,
179
 
    use of this network path causes no bandwidth charges but requires the
180
 
    client to be running on Rackspace's ServiceNet network.
181
 
 
182
 
    :param url: authentication/authorization URL
183
 
    :param user: user to authenticate as
184
 
    :param key: key or password for authorization
185
 
    :param snet: use SERVICENET internal network (see above), default is False
186
 
    :returns: tuple of (storage URL, auth token)
187
 
    :raises ClientException: HTTP GET request to auth URL failed
188
 
    """
189
 
    parsed, conn = http_connection(url)
190
 
    conn.request('GET', parsed.path, '',
191
 
                 {'X-Auth-User': user, 'X-Auth-Key': key})
192
 
    resp = conn.getresponse()
193
 
    resp.read()
194
 
    if resp.status < 200 or resp.status >= 300:
195
 
        raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
196
 
                http_host=conn.host, http_port=conn.port,
197
 
                http_path=parsed.path, http_status=resp.status,
198
 
                http_reason=resp.reason)
199
 
    url = resp.getheader('x-storage-url')
200
 
    if snet:
201
 
        parsed = list(urlparse(url))
202
 
        # Second item in the list is the netloc
203
 
        parsed[1] = 'snet-' + parsed[1]
204
 
        url = urlunparse(parsed)
205
 
    return url, resp.getheader('x-storage-token',
206
 
                                                resp.getheader('x-auth-token'))
207
 
 
208
 
 
209
 
def get_account(url, token, marker=None, limit=None, prefix=None,
210
 
                http_conn=None, full_listing=False):
211
 
    """
212
 
    Get a listing of containers for the account.
213
 
 
214
 
    :param url: storage URL
215
 
    :param token: auth token
216
 
    :param marker: marker query
217
 
    :param limit: limit query
218
 
    :param prefix: prefix query
219
 
    :param http_conn: HTTP connection object (If None, it will create the
220
 
                      conn object)
221
 
    :param full_listing: if True, return a full listing, else returns a max
222
 
                         of 10000 listings
223
 
    :returns: a tuple of (response headers, a list of containers) The response
224
 
              headers will be a dict and all header names will be lowercase.
225
 
    :raises ClientException: HTTP GET request failed
226
 
    """
227
 
    if not http_conn:
228
 
        http_conn = http_connection(url)
229
 
    if full_listing:
230
 
        rv = get_account(url, token, marker, limit, prefix, http_conn)
231
 
        listing = rv[1]
232
 
        while listing:
233
 
            marker = listing[-1]['name']
234
 
            listing = \
235
 
                get_account(url, token, marker, limit, prefix, http_conn)[1]
236
 
            if listing:
237
 
                rv[1].extend(listing)
238
 
        return rv
239
 
    parsed, conn = http_conn
240
 
    qs = 'format=json'
241
 
    if marker:
242
 
        qs += '&marker=%s' % quote(marker)
243
 
    if limit:
244
 
        qs += '&limit=%d' % limit
245
 
    if prefix:
246
 
        qs += '&prefix=%s' % quote(prefix)
247
 
    conn.request('GET', '%s?%s' % (parsed.path, qs), '',
248
 
                 {'X-Auth-Token': token})
249
 
    resp = conn.getresponse()
250
 
    resp_headers = {}
251
 
    for header, value in resp.getheaders():
252
 
        resp_headers[header.lower()] = value
253
 
    if resp.status < 200 or resp.status >= 300:
254
 
        resp.read()
255
 
        raise ClientException('Account GET failed', http_scheme=parsed.scheme,
256
 
                http_host=conn.host, http_port=conn.port,
257
 
                http_path=parsed.path, http_query=qs, http_status=resp.status,
258
 
                http_reason=resp.reason)
259
 
    if resp.status == 204:
260
 
        resp.read()
261
 
        return resp_headers, []
262
 
    return resp_headers, json_loads(resp.read())
263
 
 
264
 
 
265
 
def head_account(url, token, http_conn=None):
266
 
    """
267
 
    Get account stats.
268
 
 
269
 
    :param url: storage URL
270
 
    :param token: auth token
271
 
    :param http_conn: HTTP connection object (If None, it will create the
272
 
                      conn object)
273
 
    :returns: a dict containing the response's headers (all header names will
274
 
              be lowercase)
275
 
    :raises ClientException: HTTP HEAD request failed
276
 
    """
277
 
    if http_conn:
278
 
        parsed, conn = http_conn
279
 
    else:
280
 
        parsed, conn = http_connection(url)
281
 
    conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
282
 
    resp = conn.getresponse()
283
 
    resp.read()
284
 
    if resp.status < 200 or resp.status >= 300:
285
 
        raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
286
 
                http_host=conn.host, http_port=conn.port,
287
 
                http_path=parsed.path, http_status=resp.status,
288
 
                http_reason=resp.reason)
289
 
    resp_headers = {}
290
 
    for header, value in resp.getheaders():
291
 
        resp_headers[header.lower()] = value
292
 
    return resp_headers
293
 
 
294
 
 
295
 
def post_account(url, token, headers, http_conn=None):
296
 
    """
297
 
    Update an account's metadata.
298
 
 
299
 
    :param url: storage URL
300
 
    :param token: auth token
301
 
    :param headers: additional headers to include in the request
302
 
    :param http_conn: HTTP connection object (If None, it will create the
303
 
                      conn object)
304
 
    :raises ClientException: HTTP POST request failed
305
 
    """
306
 
    if http_conn:
307
 
        parsed, conn = http_conn
308
 
    else:
309
 
        parsed, conn = http_connection(url)
310
 
    headers['X-Auth-Token'] = token
311
 
    conn.request('POST', parsed.path, '', headers)
312
 
    resp = conn.getresponse()
313
 
    resp.read()
314
 
    if resp.status < 200 or resp.status >= 300:
315
 
        raise ClientException('Account POST failed',
316
 
                http_scheme=parsed.scheme, http_host=conn.host,
317
 
                http_port=conn.port, http_path=path, http_status=resp.status,
318
 
                http_reason=resp.reason)
319
 
 
320
 
 
321
 
def get_container(url, token, container, marker=None, limit=None,
322
 
                  prefix=None, delimiter=None, http_conn=None,
323
 
                  full_listing=False):
324
 
    """
325
 
    Get a listing of objects for the container.
326
 
 
327
 
    :param url: storage URL
328
 
    :param token: auth token
329
 
    :param container: container name to get a listing for
330
 
    :param marker: marker query
331
 
    :param limit: limit query
332
 
    :param prefix: prefix query
333
 
    :param delimeter: string to delimit the queries on
334
 
    :param http_conn: HTTP connection object (If None, it will create the
335
 
                      conn object)
336
 
    :param full_listing: if True, return a full listing, else returns a max
337
 
                         of 10000 listings
338
 
    :returns: a tuple of (response headers, a list of objects) The response
339
 
              headers will be a dict and all header names will be lowercase.
340
 
    :raises ClientException: HTTP GET request failed
341
 
    """
342
 
    if not http_conn:
343
 
        http_conn = http_connection(url)
344
 
    if full_listing:
345
 
        rv = get_container(url, token, container, marker, limit, prefix,
346
 
                           delimiter, http_conn)
347
 
        listing = rv[1]
348
 
        while listing:
349
 
            if not delimiter:
350
 
                marker = listing[-1]['name']
351
 
            else:
352
 
                marker = listing[-1].get('name', listing[-1].get('subdir'))
353
 
            listing = get_container(url, token, container, marker, limit,
354
 
                                    prefix, delimiter, http_conn)[1]
355
 
            if listing:
356
 
                rv[1].extend(listing)
357
 
        return rv
358
 
    parsed, conn = http_conn
359
 
    path = '%s/%s' % (parsed.path, quote(container))
360
 
    qs = 'format=json'
361
 
    if marker:
362
 
        qs += '&marker=%s' % quote(marker)
363
 
    if limit:
364
 
        qs += '&limit=%d' % limit
365
 
    if prefix:
366
 
        qs += '&prefix=%s' % quote(prefix)
367
 
    if delimiter:
368
 
        qs += '&delimiter=%s' % quote(delimiter)
369
 
    conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
370
 
    resp = conn.getresponse()
371
 
    if resp.status < 200 or resp.status >= 300:
372
 
        resp.read()
373
 
        raise ClientException('Container GET failed',
374
 
                http_scheme=parsed.scheme, http_host=conn.host,
375
 
                http_port=conn.port, http_path=path, http_query=qs,
376
 
                http_status=resp.status, http_reason=resp.reason)
377
 
    resp_headers = {}
378
 
    for header, value in resp.getheaders():
379
 
        resp_headers[header.lower()] = value
380
 
    if resp.status == 204:
381
 
        resp.read()
382
 
        return resp_headers, []
383
 
    return resp_headers, json_loads(resp.read())
384
 
 
385
 
 
386
 
def head_container(url, token, container, http_conn=None):
387
 
    """
388
 
    Get container stats.
389
 
 
390
 
    :param url: storage URL
391
 
    :param token: auth token
392
 
    :param container: container name to get stats for
393
 
    :param http_conn: HTTP connection object (If None, it will create the
394
 
                      conn object)
395
 
    :returns: a dict containing the response's headers (all header names will
396
 
              be lowercase)
397
 
    :raises ClientException: HTTP HEAD request failed
398
 
    """
399
 
    if http_conn:
400
 
        parsed, conn = http_conn
401
 
    else:
402
 
        parsed, conn = http_connection(url)
403
 
    path = '%s/%s' % (parsed.path, quote(container))
404
 
    conn.request('HEAD', path, '', {'X-Auth-Token': token})
405
 
    resp = conn.getresponse()
406
 
    resp.read()
407
 
    if resp.status < 200 or resp.status >= 300:
408
 
        raise ClientException('Container HEAD failed',
409
 
                http_scheme=parsed.scheme, http_host=conn.host,
410
 
                http_port=conn.port, http_path=path, http_status=resp.status,
411
 
                http_reason=resp.reason)
412
 
    resp_headers = {}
413
 
    for header, value in resp.getheaders():
414
 
        resp_headers[header.lower()] = value
415
 
    return resp_headers
416
 
 
417
 
 
418
 
def put_container(url, token, container, headers=None, http_conn=None):
419
 
    """
420
 
    Create a container
421
 
 
422
 
    :param url: storage URL
423
 
    :param token: auth token
424
 
    :param container: container name to create
425
 
    :param headers: additional headers to include in the request
426
 
    :param http_conn: HTTP connection object (If None, it will create the
427
 
                      conn object)
428
 
    :raises ClientException: HTTP PUT request failed
429
 
    """
430
 
    if http_conn:
431
 
        parsed, conn = http_conn
432
 
    else:
433
 
        parsed, conn = http_connection(url)
434
 
    path = '%s/%s' % (parsed.path, quote(container))
435
 
    if not headers:
436
 
        headers = {}
437
 
    headers['X-Auth-Token'] = token
438
 
    conn.request('PUT', path, '', headers)
439
 
    resp = conn.getresponse()
440
 
    resp.read()
441
 
    if resp.status < 200 or resp.status >= 300:
442
 
        raise ClientException('Container PUT failed',
443
 
                http_scheme=parsed.scheme, http_host=conn.host,
444
 
                http_port=conn.port, http_path=path, http_status=resp.status,
445
 
                http_reason=resp.reason)
446
 
 
447
 
 
448
 
def post_container(url, token, container, headers, http_conn=None):
449
 
    """
450
 
    Update a container's metadata.
451
 
 
452
 
    :param url: storage URL
453
 
    :param token: auth token
454
 
    :param container: container name to update
455
 
    :param headers: additional headers to include in the request
456
 
    :param http_conn: HTTP connection object (If None, it will create the
457
 
                      conn object)
458
 
    :raises ClientException: HTTP POST request failed
459
 
    """
460
 
    if http_conn:
461
 
        parsed, conn = http_conn
462
 
    else:
463
 
        parsed, conn = http_connection(url)
464
 
    path = '%s/%s' % (parsed.path, quote(container))
465
 
    headers['X-Auth-Token'] = token
466
 
    conn.request('POST', path, '', headers)
467
 
    resp = conn.getresponse()
468
 
    resp.read()
469
 
    if resp.status < 200 or resp.status >= 300:
470
 
        raise ClientException('Container POST failed',
471
 
                http_scheme=parsed.scheme, http_host=conn.host,
472
 
                http_port=conn.port, http_path=path, http_status=resp.status,
473
 
                http_reason=resp.reason)
474
 
 
475
 
 
476
 
def delete_container(url, token, container, http_conn=None):
477
 
    """
478
 
    Delete a container
479
 
 
480
 
    :param url: storage URL
481
 
    :param token: auth token
482
 
    :param container: container name to delete
483
 
    :param http_conn: HTTP connection object (If None, it will create the
484
 
                      conn object)
485
 
    :raises ClientException: HTTP DELETE request failed
486
 
    """
487
 
    if http_conn:
488
 
        parsed, conn = http_conn
489
 
    else:
490
 
        parsed, conn = http_connection(url)
491
 
    path = '%s/%s' % (parsed.path, quote(container))
492
 
    conn.request('DELETE', path, '', {'X-Auth-Token': token})
493
 
    resp = conn.getresponse()
494
 
    resp.read()
495
 
    if resp.status < 200 or resp.status >= 300:
496
 
        raise ClientException('Container DELETE failed',
497
 
                http_scheme=parsed.scheme, http_host=conn.host,
498
 
                http_port=conn.port, http_path=path, http_status=resp.status,
499
 
                http_reason=resp.reason)
500
 
 
501
 
 
502
 
def get_object(url, token, container, name, http_conn=None,
503
 
               resp_chunk_size=None):
504
 
    """
505
 
    Get an object
506
 
 
507
 
    :param url: storage URL
508
 
    :param token: auth token
509
 
    :param container: container name that the object is in
510
 
    :param name: object name to get
511
 
    :param http_conn: HTTP connection object (If None, it will create the
512
 
                      conn object)
513
 
    :param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
514
 
                            you specify a resp_chunk_size you must fully read
515
 
                            the object's contents before making another
516
 
                            request.
517
 
    :returns: a tuple of (response headers, the object's contents) The response
518
 
              headers will be a dict and all header names will be lowercase.
519
 
    :raises ClientException: HTTP GET request failed
520
 
    """
521
 
    if http_conn:
522
 
        parsed, conn = http_conn
523
 
    else:
524
 
        parsed, conn = http_connection(url)
525
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
526
 
    conn.request('GET', path, '', {'X-Auth-Token': token})
527
 
    resp = conn.getresponse()
528
 
    if resp.status < 200 or resp.status >= 300:
529
 
        resp.read()
530
 
        raise ClientException('Object GET failed', http_scheme=parsed.scheme,
531
 
                http_host=conn.host, http_port=conn.port, http_path=path,
532
 
                http_status=resp.status, http_reason=resp.reason)
533
 
    if resp_chunk_size:
534
 
 
535
 
        def _object_body():
536
 
            buf = resp.read(resp_chunk_size)
537
 
            while buf:
538
 
                yield buf
539
 
                buf = resp.read(resp_chunk_size)
540
 
        object_body = _object_body()
541
 
    else:
542
 
        object_body = resp.read()
543
 
    resp_headers = {}
544
 
    for header, value in resp.getheaders():
545
 
        resp_headers[header.lower()] = value
546
 
    return resp_headers, object_body
547
 
 
548
 
 
549
 
def head_object(url, token, container, name, http_conn=None):
550
 
    """
551
 
    Get object info
552
 
 
553
 
    :param url: storage URL
554
 
    :param token: auth token
555
 
    :param container: container name that the object is in
556
 
    :param name: object name to get info for
557
 
    :param http_conn: HTTP connection object (If None, it will create the
558
 
                      conn object)
559
 
    :returns: a dict containing the response's headers (all header names will
560
 
              be lowercase)
561
 
    :raises ClientException: HTTP HEAD request failed
562
 
    """
563
 
    if http_conn:
564
 
        parsed, conn = http_conn
565
 
    else:
566
 
        parsed, conn = http_connection(url)
567
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
568
 
    conn.request('HEAD', path, '', {'X-Auth-Token': token})
569
 
    resp = conn.getresponse()
570
 
    resp.read()
571
 
    if resp.status < 200 or resp.status >= 300:
572
 
        raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
573
 
                http_host=conn.host, http_port=conn.port, http_path=path,
574
 
                http_status=resp.status, http_reason=resp.reason)
575
 
    resp_headers = {}
576
 
    for header, value in resp.getheaders():
577
 
        resp_headers[header.lower()] = value
578
 
    return resp_headers
579
 
 
580
 
 
581
 
def put_object(url, token, container, name, contents, content_length=None,
582
 
               etag=None, chunk_size=65536, content_type=None, headers=None,
583
 
               http_conn=None):
584
 
    """
585
 
    Put an object
586
 
 
587
 
    :param url: storage URL
588
 
    :param token: auth token
589
 
    :param container: container name that the object is in
590
 
    :param name: object name to put
591
 
    :param contents: a string or a file like object to read object data from
592
 
    :param content_length: value to send as content-length header; also limits
593
 
                           the amount read from contents
594
 
    :param etag: etag of contents
595
 
    :param chunk_size: chunk size of data to write
596
 
    :param content_type: value to send as content-type header
597
 
    :param headers: additional headers to include in the request
598
 
    :param http_conn: HTTP connection object (If None, it will create the
599
 
                      conn object)
600
 
    :returns: etag from server response
601
 
    :raises ClientException: HTTP PUT request failed
602
 
    """
603
 
    if http_conn:
604
 
        parsed, conn = http_conn
605
 
    else:
606
 
        parsed, conn = http_connection(url)
607
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
608
 
    if not headers:
609
 
        headers = {}
610
 
    headers['X-Auth-Token'] = token
611
 
    if etag:
612
 
        headers['ETag'] = etag.strip('"')
613
 
    if content_length is not None:
614
 
        headers['Content-Length'] = str(content_length)
615
 
    if content_type is not None:
616
 
        headers['Content-Type'] = content_type
617
 
    if not contents:
618
 
        headers['Content-Length'] = '0'
619
 
    if hasattr(contents, 'read'):
620
 
        conn.putrequest('PUT', path)
621
 
        for header, value in headers.iteritems():
622
 
            conn.putheader(header, value)
623
 
        if content_length is None:
624
 
            conn.putheader('Transfer-Encoding', 'chunked')
625
 
            conn.endheaders()
626
 
            chunk = contents.read(chunk_size)
627
 
            while chunk:
628
 
                conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
629
 
                chunk = contents.read(chunk_size)
630
 
            conn.send('0\r\n\r\n')
631
 
        else:
632
 
            conn.endheaders()
633
 
            left = content_length
634
 
            while left > 0:
635
 
                size = chunk_size
636
 
                if size > left:
637
 
                    size = left
638
 
                chunk = contents.read(size)
639
 
                conn.send(chunk)
640
 
                left -= len(chunk)
641
 
    else:
642
 
        conn.request('PUT', path, contents, headers)
643
 
    resp = conn.getresponse()
644
 
    resp.read()
645
 
    if resp.status < 200 or resp.status >= 300:
646
 
        raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
647
 
                http_host=conn.host, http_port=conn.port, http_path=path,
648
 
                http_status=resp.status, http_reason=resp.reason)
649
 
    return resp.getheader('etag').strip('"')
650
 
 
651
 
 
652
 
def post_object(url, token, container, name, headers, http_conn=None):
653
 
    """
654
 
    Update object metadata
655
 
 
656
 
    :param url: storage URL
657
 
    :param token: auth token
658
 
    :param container: container name that the object is in
659
 
    :param name: name of the object to update
660
 
    :param headers: additional headers to include in the request
661
 
    :param http_conn: HTTP connection object (If None, it will create the
662
 
                      conn object)
663
 
    :raises ClientException: HTTP POST request failed
664
 
    """
665
 
    if http_conn:
666
 
        parsed, conn = http_conn
667
 
    else:
668
 
        parsed, conn = http_connection(url)
669
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
670
 
    headers['X-Auth-Token'] = token
671
 
    conn.request('POST', path, '', headers)
672
 
    resp = conn.getresponse()
673
 
    resp.read()
674
 
    if resp.status < 200 or resp.status >= 300:
675
 
        raise ClientException('Object POST failed', http_scheme=parsed.scheme,
676
 
                http_host=conn.host, http_port=conn.port, http_path=path,
677
 
                http_status=resp.status, http_reason=resp.reason)
678
 
 
679
 
 
680
 
def delete_object(url, token, container, name, http_conn=None):
681
 
    """
682
 
    Delete object
683
 
 
684
 
    :param url: storage URL
685
 
    :param token: auth token
686
 
    :param container: container name that the object is in
687
 
    :param name: object name to delete
688
 
    :param http_conn: HTTP connection object (If None, it will create the
689
 
                      conn object)
690
 
    :raises ClientException: HTTP DELETE request failed
691
 
    """
692
 
    if http_conn:
693
 
        parsed, conn = http_conn
694
 
    else:
695
 
        parsed, conn = http_connection(url)
696
 
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
697
 
    conn.request('DELETE', path, '', {'X-Auth-Token': token})
698
 
    resp = conn.getresponse()
699
 
    resp.read()
700
 
    if resp.status < 200 or resp.status >= 300:
701
 
        raise ClientException('Object DELETE failed',
702
 
                http_scheme=parsed.scheme, http_host=conn.host,
703
 
                http_port=conn.port, http_path=path, http_status=resp.status,
704
 
                http_reason=resp.reason)
705
 
 
706
 
 
707
 
class Connection(object):
708
 
    """Convenience class to make requests that will also retry the request"""
709
 
 
710
 
    def __init__(self, authurl, user, key, retries=5, preauthurl=None,
711
 
                 preauthtoken=None, snet=False, starting_backoff=1):
712
 
        """
713
 
        :param authurl: authenitcation URL
714
 
        :param user: user name to authenticate as
715
 
        :param key: key/password to authenticate with
716
 
        :param retries: Number of times to retry the request before failing
717
 
        :param preauthurl: storage URL (if you have already authenticated)
718
 
        :param preauthtoken: authentication token (if you have already
719
 
                             authenticated)
720
 
        :param snet: use SERVICENET internal network default is False
721
 
        """
722
 
        self.authurl = authurl
723
 
        self.user = user
724
 
        self.key = key
725
 
        self.retries = retries
726
 
        self.http_conn = None
727
 
        self.url = preauthurl
728
 
        self.token = preauthtoken
729
 
        self.attempts = 0
730
 
        self.snet = snet
731
 
        self.starting_backoff = starting_backoff
732
 
 
733
 
    def get_auth(self):
734
 
        return get_auth(self.authurl, self.user, self.key, snet=self.snet)
735
 
 
736
 
    def http_connection(self):
737
 
        return http_connection(self.url)
738
 
 
739
 
    def _retry(self, reset_func, func, *args, **kwargs):
740
 
        self.attempts = 0
741
 
        backoff = self.starting_backoff
742
 
        while self.attempts <= self.retries:
743
 
            self.attempts += 1
744
 
            try:
745
 
                if not self.url or not self.token:
746
 
                    self.url, self.token = self.get_auth()
747
 
                    self.http_conn = None
748
 
                if not self.http_conn:
749
 
                    self.http_conn = self.http_connection()
750
 
                kwargs['http_conn'] = self.http_conn
751
 
                rv = func(self.url, self.token, *args, **kwargs)
752
 
                return rv
753
 
            except (socket.error, HTTPException):
754
 
                if self.attempts > self.retries:
755
 
                    raise
756
 
                self.http_conn = None
757
 
            except ClientException, err:
758
 
                if self.attempts > self.retries:
759
 
                    raise
760
 
                if err.http_status == 401:
761
 
                    self.url = self.token = None
762
 
                    if self.attempts > 1:
763
 
                        raise
764
 
                elif err.http_status == 408:
765
 
                    self.http_conn = None
766
 
                elif 500 <= err.http_status <= 599:
767
 
                    pass
768
 
                else:
769
 
                    raise
770
 
            sleep(backoff)
771
 
            backoff *= 2
772
 
            if reset_func:
773
 
                reset_func(func, *args, **kwargs)
774
 
 
775
 
    def head_account(self):
776
 
        """Wrapper for :func:`head_account`"""
777
 
        return self._retry(None, head_account)
778
 
 
779
 
    def get_account(self, marker=None, limit=None, prefix=None,
780
 
                    full_listing=False):
781
 
        """Wrapper for :func:`get_account`"""
782
 
        # TODO(unknown): With full_listing=True this will restart the entire
783
 
        # listing with each retry. Need to make a better version that just
784
 
        # retries where it left off.
785
 
        return self._retry(None, get_account, marker=marker, limit=limit,
786
 
                           prefix=prefix, full_listing=full_listing)
787
 
 
788
 
    def post_account(self, headers):
789
 
        """Wrapper for :func:`post_account`"""
790
 
        return self._retry(None, post_account, headers)
791
 
 
792
 
    def head_container(self, container):
793
 
        """Wrapper for :func:`head_container`"""
794
 
        return self._retry(None, head_container, container)
795
 
 
796
 
    def get_container(self, container, marker=None, limit=None, prefix=None,
797
 
                      delimiter=None, full_listing=False):
798
 
        """Wrapper for :func:`get_container`"""
799
 
        # TODO(unknown): With full_listing=True this will restart the entire
800
 
        # listing with each retry. Need to make a better version that just
801
 
        # retries where it left off.
802
 
        return self._retry(None, get_container, container, marker=marker,
803
 
                           limit=limit, prefix=prefix, delimiter=delimiter,
804
 
                           full_listing=full_listing)
805
 
 
806
 
    def put_container(self, container, headers=None):
807
 
        """Wrapper for :func:`put_container`"""
808
 
        return self._retry(None, put_container, container, headers=headers)
809
 
 
810
 
    def post_container(self, container, headers):
811
 
        """Wrapper for :func:`post_container`"""
812
 
        return self._retry(None, post_container, container, headers)
813
 
 
814
 
    def delete_container(self, container):
815
 
        """Wrapper for :func:`delete_container`"""
816
 
        return self._retry(None, delete_container, container)
817
 
 
818
 
    def head_object(self, container, obj):
819
 
        """Wrapper for :func:`head_object`"""
820
 
        return self._retry(None, head_object, container, obj)
821
 
 
822
 
    def get_object(self, container, obj, resp_chunk_size=None):
823
 
        """Wrapper for :func:`get_object`"""
824
 
        return self._retry(None, get_object, container, obj,
825
 
                           resp_chunk_size=resp_chunk_size)
826
 
 
827
 
    def put_object(self, container, obj, contents, content_length=None,
828
 
                   etag=None, chunk_size=65536, content_type=None,
829
 
                   headers=None):
830
 
        """Wrapper for :func:`put_object`"""
831
 
 
832
 
        def _default_reset(*args, **kwargs):
833
 
            raise ClientException('put_object(%r, %r, ...) failure and no '
834
 
                'ability to reset contents for reupload.' % (container, obj))
835
 
 
836
 
        reset_func = _default_reset
837
 
        tell = getattr(contents, 'tell', None)
838
 
        seek = getattr(contents, 'seek', None)
839
 
        if tell and seek:
840
 
            orig_pos = tell()
841
 
            reset_func = lambda *a, **k: seek(orig_pos)
842
 
        elif not contents:
843
 
            reset_func = lambda *a, **k: None
844
 
 
845
 
        return self._retry(reset_func, put_object, container, obj, contents,
846
 
            content_length=content_length, etag=etag, chunk_size=chunk_size,
847
 
            content_type=content_type, headers=headers)
848
 
 
849
 
    def post_object(self, container, obj, headers):
850
 
        """Wrapper for :func:`post_object`"""
851
 
        return self._retry(None, post_object, container, obj, headers)
852
 
 
853
 
    def delete_object(self, container, obj):
854
 
        """Wrapper for :func:`delete_object`"""
855
 
        return self._retry(None, delete_object, container, obj)
856
 
 
857
 
# End inclusion of swift.common.client
858
 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
859
 
 
860
 
 
861
 
def mkdirs(path):
862
 
    try:
863
 
        makedirs(path)
864
 
    except OSError, err:
865
 
        if err.errno != EEXIST:
866
 
            raise
867
 
 
868
 
 
869
 
def put_errors_from_threads(threads, error_queue):
870
 
    """
871
 
    Places any errors from the threads into error_queue.
872
 
    :param threads: A list of QueueFunctionThread instances.
873
 
    :param error_queue: A queue to put error strings into.
874
 
    :returns: True if any errors were found.
875
 
    """
876
 
    was_error = False
877
 
    for thread in threads:
878
 
        for info in thread.exc_infos:
879
 
            was_error = True
880
 
            if isinstance(info[1], ClientException):
881
 
                error_queue.put(str(info[1]))
882
 
            else:
883
 
                error_queue.put(''.join(format_exception(*info)))
884
 
    return was_error
885
 
 
886
 
 
887
 
class QueueFunctionThread(Thread):
888
 
 
889
 
    def __init__(self, queue, func, *args, **kwargs):
890
 
        """ Calls func for each item in queue; func is called with a queued
891
 
            item as the first arg followed by *args and **kwargs. Use the abort
892
 
            attribute to have the thread empty the queue (without processing)
893
 
            and exit. """
894
 
        Thread.__init__(self)
895
 
        self.abort = False
896
 
        self.queue = queue
897
 
        self.func = func
898
 
        self.args = args
899
 
        self.kwargs = kwargs
900
 
        self.exc_infos = []
901
 
 
902
 
    def run(self):
903
 
        try:
904
 
            while True:
905
 
                try:
906
 
                    item = self.queue.get_nowait()
907
 
                    if not self.abort:
908
 
                        self.func(item, *self.args, **self.kwargs)
909
 
                    self.queue.task_done()
910
 
                except Empty:
911
 
                    if self.abort:
912
 
                        break
913
 
                    sleep(0.01)
914
 
        except Exception:
915
 
            self.exc_infos.append(exc_info())
916
 
 
917
 
 
918
 
st_delete_help = '''
919
 
delete --all OR delete container [--leave-segments] [object] [object] ...
920
 
    Deletes everything in the account (with --all), or everything in a
921
 
    container, or a list of objects depending on the args given. Segments of
922
 
    manifest objects will be deleted as well, unless you specify the
923
 
    --leave-segments option.'''.strip('\n')
924
 
 
925
 
 
926
 
def st_delete(parser, args, print_queue, error_queue):
927
 
    parser.add_option('-a', '--all', action='store_true', dest='yes_all',
928
 
        default=False, help='Indicates that you really want to delete '
929
 
        'everything in the account')
930
 
    parser.add_option('', '--leave-segments', action='store_true',
931
 
        dest='leave_segments', default=False, help='Indicates that you want '
932
 
        'the segments of manifest objects left alone')
933
 
    (options, args) = parse_args(parser, args)
934
 
    args = args[1:]
935
 
    if (not args and not options.yes_all) or (args and options.yes_all):
936
 
        error_queue.put('Usage: %s [options] %s' %
937
 
                        (basename(argv[0]), st_delete_help))
938
 
        return
939
 
 
940
 
    def _delete_segment((container, obj), conn):
941
 
        conn.delete_object(container, obj)
942
 
        if options.verbose:
943
 
            if conn.attempts > 2:
944
 
                print_queue.put('%s/%s [after %d attempts]' %
945
 
                                (container, obj, conn.attempts))
946
 
            else:
947
 
                print_queue.put('%s/%s' % (container, obj))
948
 
 
949
 
    object_queue = Queue(10000)
950
 
 
951
 
    def _delete_object((container, obj), conn):
952
 
        try:
953
 
            old_manifest = None
954
 
            if not options.leave_segments:
955
 
                try:
956
 
                    old_manifest = conn.head_object(container, obj).get(
957
 
                        'x-object-manifest')
958
 
                except ClientException, err:
959
 
                    if err.http_status != 404:
960
 
                        raise
961
 
            conn.delete_object(container, obj)
962
 
            if old_manifest:
963
 
                segment_queue = Queue(10000)
964
 
                scontainer, sprefix = old_manifest.split('/', 1)
965
 
                for delobj in conn.get_container(scontainer,
966
 
                                                 prefix=sprefix)[1]:
967
 
                    segment_queue.put((scontainer, delobj['name']))
968
 
                if not segment_queue.empty():
969
 
                    segment_threads = [QueueFunctionThread(segment_queue,
970
 
                        _delete_segment, create_connection()) for _junk in
971
 
                        xrange(10)]
972
 
                    for thread in segment_threads:
973
 
                        thread.start()
974
 
                    while not segment_queue.empty():
975
 
                        sleep(0.01)
976
 
                    for thread in segment_threads:
977
 
                        thread.abort = True
978
 
                        while thread.isAlive():
979
 
                            thread.join(0.01)
980
 
                    put_errors_from_threads(segment_threads, error_queue)
981
 
            if options.verbose:
982
 
                path = options.yes_all and join(container, obj) or obj
983
 
                if path[:1] in ('/', '\\'):
984
 
                    path = path[1:]
985
 
                if conn.attempts > 1:
986
 
                    print_queue.put('%s [after %d attempts]' %
987
 
                                    (path, conn.attempts))
988
 
                else:
989
 
                    print_queue.put(path)
990
 
        except ClientException, err:
991
 
            if err.http_status != 404:
992
 
                raise
993
 
            error_queue.put('Object %s not found' %
994
 
                            repr('%s/%s' % (container, obj)))
995
 
 
996
 
    container_queue = Queue(10000)
997
 
 
998
 
    def _delete_container(container, conn):
999
 
        try:
1000
 
            marker = ''
1001
 
            while True:
1002
 
                objects = [o['name'] for o in
1003
 
                           conn.get_container(container, marker=marker)[1]]
1004
 
                if not objects:
1005
 
                    break
1006
 
                for obj in objects:
1007
 
                    object_queue.put((container, obj))
1008
 
                marker = objects[-1]
1009
 
            while not object_queue.empty():
1010
 
                sleep(0.01)
1011
 
            attempts = 1
1012
 
            while True:
1013
 
                try:
1014
 
                    conn.delete_container(container)
1015
 
                    break
1016
 
                except ClientException, err:
1017
 
                    if err.http_status != 409:
1018
 
                        raise
1019
 
                    if attempts > 10:
1020
 
                        raise
1021
 
                    attempts += 1
1022
 
                    sleep(1)
1023
 
        except ClientException, err:
1024
 
            if err.http_status != 404:
1025
 
                raise
1026
 
            error_queue.put('Container %s not found' % repr(container))
1027
 
 
1028
 
    url, token = get_auth(options.auth, options.user, options.key,
1029
 
        snet=options.snet)
1030
 
    create_connection = lambda: Connection(options.auth, options.user,
1031
 
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1032
 
    object_threads = [QueueFunctionThread(object_queue, _delete_object,
1033
 
        create_connection()) for _junk in xrange(10)]
1034
 
    for thread in object_threads:
1035
 
        thread.start()
1036
 
    container_threads = [QueueFunctionThread(container_queue,
1037
 
        _delete_container, create_connection()) for _junk in xrange(10)]
1038
 
    for thread in container_threads:
1039
 
        thread.start()
1040
 
    if not args:
1041
 
        conn = create_connection()
1042
 
        try:
1043
 
            marker = ''
1044
 
            while True:
1045
 
                containers = \
1046
 
                    [c['name'] for c in conn.get_account(marker=marker)[1]]
1047
 
                if not containers:
1048
 
                    break
1049
 
                for container in containers:
1050
 
                    container_queue.put(container)
1051
 
                marker = containers[-1]
1052
 
            while not container_queue.empty():
1053
 
                sleep(0.01)
1054
 
            while not object_queue.empty():
1055
 
                sleep(0.01)
1056
 
        except ClientException, err:
1057
 
            if err.http_status != 404:
1058
 
                raise
1059
 
            error_queue.put('Account not found')
1060
 
    elif len(args) == 1:
1061
 
        if '/' in args[0]:
1062
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1063
 
                             'meant %r instead of %r.' % \
1064
 
                             (args[0].replace('/', ' ', 1), args[0])
1065
 
        conn = create_connection()
1066
 
        _delete_container(args[0], conn)
1067
 
    else:
1068
 
        for obj in args[1:]:
1069
 
            object_queue.put((args[0], obj))
1070
 
    while not container_queue.empty():
1071
 
        sleep(0.01)
1072
 
    for thread in container_threads:
1073
 
        thread.abort = True
1074
 
        while thread.isAlive():
1075
 
            thread.join(0.01)
1076
 
    put_errors_from_threads(container_threads, error_queue)
1077
 
    while not object_queue.empty():
1078
 
        sleep(0.01)
1079
 
    for thread in object_threads:
1080
 
        thread.abort = True
1081
 
        while thread.isAlive():
1082
 
            thread.join(0.01)
1083
 
    put_errors_from_threads(object_threads, error_queue)
1084
 
 
1085
 
 
1086
 
st_download_help = '''
1087
 
download --all OR download container [options] [object] [object] ...
1088
 
    Downloads everything in the account (with --all), or everything in a
1089
 
    container, or a list of objects depending on the args given. For a single
1090
 
    object download, you may use the -o [--output] <filename> option to
1091
 
    redirect the output to a specific file or if "-" then just redirect to
1092
 
    stdout.'''.strip('\n')
1093
 
 
1094
 
 
1095
 
def st_download(options, args, print_queue, error_queue):
1096
 
    parser.add_option('-a', '--all', action='store_true', dest='yes_all',
1097
 
        default=False, help='Indicates that you really want to download '
1098
 
        'everything in the account')
1099
 
    parser.add_option('-o', '--output', dest='out_file', help='For a single '
1100
 
        'file download, stream the output to an alternate location ')
1101
 
    (options, args) = parse_args(parser, args)
1102
 
    args = args[1:]
1103
 
    if options.out_file == '-':
1104
 
        options.verbose = 0
1105
 
    if options.out_file and len(args) != 2:
1106
 
        exit('-o option only allowed for single file downloads')
1107
 
    if (not args and not options.yes_all) or (args and options.yes_all):
1108
 
        error_queue.put('Usage: %s [options] %s' %
1109
 
                        (basename(argv[0]), st_download_help))
1110
 
        return
1111
 
 
1112
 
    object_queue = Queue(10000)
1113
 
 
1114
 
    def _download_object(queue_arg, conn):
1115
 
        if len(queue_arg) == 2:
1116
 
            container, obj = queue_arg
1117
 
            out_file = None
1118
 
        elif len(queue_arg) == 3:
1119
 
            container, obj, out_file = queue_arg
1120
 
        else:
1121
 
            raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
1122
 
        try:
1123
 
            headers, body = \
1124
 
                conn.get_object(container, obj, resp_chunk_size=65536)
1125
 
            content_type = headers.get('content-type')
1126
 
            if 'content-length' in headers:
1127
 
                content_length = int(headers.get('content-length'))
1128
 
            else:
1129
 
                content_length = None
1130
 
            etag = headers.get('etag')
1131
 
            path = options.yes_all and join(container, obj) or obj
1132
 
            if path[:1] in ('/', '\\'):
1133
 
                path = path[1:]
1134
 
            md5sum = None
1135
 
            make_dir = out_file != "-"
1136
 
            if content_type.split(';', 1)[0] == 'text/directory':
1137
 
                if make_dir and not isdir(path):
1138
 
                    mkdirs(path)
1139
 
                read_length = 0
1140
 
                if 'x-object-manifest' not in headers:
1141
 
                    md5sum = md5()
1142
 
                for chunk in body:
1143
 
                    read_length += len(chunk)
1144
 
                    if md5sum:
1145
 
                        md5sum.update(chunk)
1146
 
            else:
1147
 
                dirpath = dirname(path)
1148
 
                if make_dir and dirpath and not isdir(dirpath):
1149
 
                    mkdirs(dirpath)
1150
 
                if out_file == "-":
1151
 
                    fp = stdout
1152
 
                elif out_file:
1153
 
                    fp = open(out_file, 'wb')
1154
 
                else:
1155
 
                    fp = open(path, 'wb')
1156
 
                read_length = 0
1157
 
                if 'x-object-manifest' not in headers:
1158
 
                    md5sum = md5()
1159
 
                for chunk in body:
1160
 
                    fp.write(chunk)
1161
 
                    read_length += len(chunk)
1162
 
                    if md5sum:
1163
 
                        md5sum.update(chunk)
1164
 
                fp.close()
1165
 
            if md5sum and md5sum.hexdigest() != etag:
1166
 
                error_queue.put('%s: md5sum != etag, %s != %s' %
1167
 
                                (path, md5sum.hexdigest(), etag))
1168
 
            if content_length is not None and read_length != content_length:
1169
 
                error_queue.put('%s: read_length != content_length, %d != %d' %
1170
 
                                (path, read_length, content_length))
1171
 
            if 'x-object-meta-mtime' in headers and not options.out_file:
1172
 
                mtime = float(headers['x-object-meta-mtime'])
1173
 
                utime(path, (mtime, mtime))
1174
 
            if options.verbose:
1175
 
                if conn.attempts > 1:
1176
 
                    print_queue.put('%s [after %d attempts' %
1177
 
                                    (path, conn.attempts))
1178
 
                else:
1179
 
                    print_queue.put(path)
1180
 
        except ClientException, err:
1181
 
            if err.http_status != 404:
1182
 
                raise
1183
 
            error_queue.put('Object %s not found' %
1184
 
                            repr('%s/%s' % (container, obj)))
1185
 
 
1186
 
    container_queue = Queue(10000)
1187
 
 
1188
 
    def _download_container(container, conn):
1189
 
        try:
1190
 
            marker = ''
1191
 
            while True:
1192
 
                objects = [o['name'] for o in
1193
 
                           conn.get_container(container, marker=marker)[1]]
1194
 
                if not objects:
1195
 
                    break
1196
 
                for obj in objects:
1197
 
                    object_queue.put((container, obj))
1198
 
                marker = objects[-1]
1199
 
        except ClientException, err:
1200
 
            if err.http_status != 404:
1201
 
                raise
1202
 
            error_queue.put('Container %s not found' % repr(container))
1203
 
 
1204
 
    url, token = get_auth(options.auth, options.user, options.key,
1205
 
        snet=options.snet)
1206
 
    create_connection = lambda: Connection(options.auth, options.user,
1207
 
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1208
 
    object_threads = [QueueFunctionThread(object_queue, _download_object,
1209
 
        create_connection()) for _junk in xrange(10)]
1210
 
    for thread in object_threads:
1211
 
        thread.start()
1212
 
    container_threads = [QueueFunctionThread(container_queue,
1213
 
        _download_container, create_connection()) for _junk in xrange(10)]
1214
 
    for thread in container_threads:
1215
 
        thread.start()
1216
 
    if not args:
1217
 
        conn = create_connection()
1218
 
        try:
1219
 
            marker = ''
1220
 
            while True:
1221
 
                containers = [c['name']
1222
 
                              for c in conn.get_account(marker=marker)[1]]
1223
 
                if not containers:
1224
 
                    break
1225
 
                for container in containers:
1226
 
                    container_queue.put(container)
1227
 
                marker = containers[-1]
1228
 
        except ClientException, err:
1229
 
            if err.http_status != 404:
1230
 
                raise
1231
 
            error_queue.put('Account not found')
1232
 
    elif len(args) == 1:
1233
 
        if '/' in args[0]:
1234
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1235
 
                             'meant %r instead of %r.' % \
1236
 
                             (args[0].replace('/', ' ', 1), args[0])
1237
 
        _download_container(args[0], create_connection())
1238
 
    else:
1239
 
        if len(args) == 2:
1240
 
            obj = args[1]
1241
 
            object_queue.put((args[0], obj, options.out_file))
1242
 
        else:
1243
 
            for obj in args[1:]:
1244
 
                object_queue.put((args[0], obj))
1245
 
    while not container_queue.empty():
1246
 
        sleep(0.01)
1247
 
    for thread in container_threads:
1248
 
        thread.abort = True
1249
 
        while thread.isAlive():
1250
 
            thread.join(0.01)
1251
 
    put_errors_from_threads(container_threads, error_queue)
1252
 
    while not object_queue.empty():
1253
 
        sleep(0.01)
1254
 
    for thread in object_threads:
1255
 
        thread.abort = True
1256
 
        while thread.isAlive():
1257
 
            thread.join(0.01)
1258
 
    put_errors_from_threads(object_threads, error_queue)
1259
 
 
1260
 
 
1261
 
st_list_help = '''
1262
 
list [options] [container]
1263
 
    Lists the containers for the account or the objects for a container. -p or
1264
 
    --prefix is an option that will only list items beginning with that prefix.
1265
 
    -d or --delimiter is option (for container listings only) that will roll up
1266
 
    items with the given delimiter (see Cloud Files general documentation for
1267
 
    what this means).
1268
 
'''.strip('\n')
1269
 
 
1270
 
 
1271
 
def st_list(options, args, print_queue, error_queue):
1272
 
    parser.add_option('-p', '--prefix', dest='prefix', help='Will only list '
1273
 
        'items beginning with the prefix')
1274
 
    parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll '
1275
 
        'up items with the given delimiter (see Cloud Files general '
1276
 
        'documentation for what this means)')
1277
 
    (options, args) = parse_args(parser, args)
1278
 
    args = args[1:]
1279
 
    if options.delimiter and not args:
1280
 
        exit('-d option only allowed for container listings')
1281
 
    if len(args) > 1:
1282
 
        error_queue.put('Usage: %s [options] %s' %
1283
 
                        (basename(argv[0]), st_list_help))
1284
 
        return
1285
 
    conn = Connection(options.auth, options.user, options.key,
1286
 
        snet=options.snet)
1287
 
    try:
1288
 
        marker = ''
1289
 
        while True:
1290
 
            if not args:
1291
 
                items = \
1292
 
                    conn.get_account(marker=marker, prefix=options.prefix)[1]
1293
 
            else:
1294
 
                items = conn.get_container(args[0], marker=marker,
1295
 
                    prefix=options.prefix, delimiter=options.delimiter)[1]
1296
 
            if not items:
1297
 
                break
1298
 
            for item in items:
1299
 
                print_queue.put(item.get('name', item.get('subdir')))
1300
 
            marker = items[-1].get('name', items[-1].get('subdir'))
1301
 
    except ClientException, err:
1302
 
        if err.http_status != 404:
1303
 
            raise
1304
 
        if not args:
1305
 
            error_queue.put('Account not found')
1306
 
        else:
1307
 
            error_queue.put('Container %s not found' % repr(args[0]))
1308
 
 
1309
 
 
1310
 
st_stat_help = '''
1311
 
stat [container] [object]
1312
 
    Displays information for the account, container, or object depending on the
1313
 
    args given (if any).'''.strip('\n')
1314
 
 
1315
 
 
1316
 
def st_stat(options, args, print_queue, error_queue):
1317
 
    (options, args) = parse_args(parser, args)
1318
 
    args = args[1:]
1319
 
    conn = Connection(options.auth, options.user, options.key)
1320
 
    if not args:
1321
 
        try:
1322
 
            headers = conn.head_account()
1323
 
            if options.verbose > 1:
1324
 
                print_queue.put('''
1325
 
StorageURL: %s
1326
 
Auth Token: %s
1327
 
'''.strip('\n') % (conn.url, conn.token))
1328
 
            container_count = int(headers.get('x-account-container-count', 0))
1329
 
            object_count = int(headers.get('x-account-object-count', 0))
1330
 
            bytes_used = int(headers.get('x-account-bytes-used', 0))
1331
 
            print_queue.put('''
1332
 
   Account: %s
1333
 
Containers: %d
1334
 
   Objects: %d
1335
 
     Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
1336
 
                                 object_count, bytes_used))
1337
 
            for key, value in headers.items():
1338
 
                if key.startswith('x-account-meta-'):
1339
 
                    print_queue.put('%10s: %s' % ('Meta %s' %
1340
 
                        key[len('x-account-meta-'):].title(), value))
1341
 
            for key, value in headers.items():
1342
 
                if not key.startswith('x-account-meta-') and key not in (
1343
 
                        'content-length', 'date', 'x-account-container-count',
1344
 
                        'x-account-object-count', 'x-account-bytes-used'):
1345
 
                    print_queue.put(
1346
 
                        '%10s: %s' % (key.title(), value))
1347
 
        except ClientException, err:
1348
 
            if err.http_status != 404:
1349
 
                raise
1350
 
            error_queue.put('Account not found')
1351
 
    elif len(args) == 1:
1352
 
        if '/' in args[0]:
1353
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1354
 
                             'meant %r instead of %r.' % \
1355
 
                             (args[0].replace('/', ' ', 1), args[0])
1356
 
        try:
1357
 
            headers = conn.head_container(args[0])
1358
 
            object_count = int(headers.get('x-container-object-count', 0))
1359
 
            bytes_used = int(headers.get('x-container-bytes-used', 0))
1360
 
            print_queue.put('''
1361
 
  Account: %s
1362
 
Container: %s
1363
 
  Objects: %d
1364
 
    Bytes: %d
1365
 
 Read ACL: %s
1366
 
Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1367
 
                                object_count, bytes_used,
1368
 
                                headers.get('x-container-read', ''),
1369
 
                                headers.get('x-container-write', '')))
1370
 
            for key, value in headers.items():
1371
 
                if key.startswith('x-container-meta-'):
1372
 
                    print_queue.put('%9s: %s' % ('Meta %s' %
1373
 
                        key[len('x-container-meta-'):].title(), value))
1374
 
            for key, value in headers.items():
1375
 
                if not key.startswith('x-container-meta-') and key not in (
1376
 
                        'content-length', 'date', 'x-container-object-count',
1377
 
                        'x-container-bytes-used', 'x-container-read',
1378
 
                        'x-container-write'):
1379
 
                    print_queue.put(
1380
 
                        '%9s: %s' % (key.title(), value))
1381
 
        except ClientException, err:
1382
 
            if err.http_status != 404:
1383
 
                raise
1384
 
            error_queue.put('Container %s not found' % repr(args[0]))
1385
 
    elif len(args) == 2:
1386
 
        try:
1387
 
            headers = conn.head_object(args[0], args[1])
1388
 
            print_queue.put('''
1389
 
       Account: %s
1390
 
     Container: %s
1391
 
        Object: %s
1392
 
  Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1393
 
                                     args[1], headers.get('content-type')))
1394
 
            if 'content-length' in headers:
1395
 
                print_queue.put('Content Length: %s' %
1396
 
                                headers['content-length'])
1397
 
            if 'last-modified' in headers:
1398
 
                print_queue.put(' Last Modified: %s' %
1399
 
                                headers['last-modified'])
1400
 
            if 'etag' in headers:
1401
 
                print_queue.put('          ETag: %s' % headers['etag'])
1402
 
            if 'x-object-manifest' in headers:
1403
 
                print_queue.put('      Manifest: %s' %
1404
 
                                headers['x-object-manifest'])
1405
 
            for key, value in headers.items():
1406
 
                if key.startswith('x-object-meta-'):
1407
 
                    print_queue.put('%14s: %s' % ('Meta %s' %
1408
 
                        key[len('x-object-meta-'):].title(), value))
1409
 
            for key, value in headers.items():
1410
 
                if not key.startswith('x-object-meta-') and key not in (
1411
 
                        'content-type', 'content-length', 'last-modified',
1412
 
                        'etag', 'date', 'x-object-manifest'):
1413
 
                    print_queue.put(
1414
 
                        '%14s: %s' % (key.title(), value))
1415
 
        except ClientException, err:
1416
 
            if err.http_status != 404:
1417
 
                raise
1418
 
            error_queue.put('Object %s not found' %
1419
 
                            repr('%s/%s' % (args[0], args[1])))
1420
 
    else:
1421
 
        error_queue.put('Usage: %s [options] %s' %
1422
 
                        (basename(argv[0]), st_stat_help))
1423
 
 
1424
 
 
1425
 
st_post_help = '''
1426
 
post [options] [container] [object]
1427
 
    Updates meta information for the account, container, or object depending on
1428
 
    the args given. If the container is not found, it will be created
1429
 
    automatically; but this is not true for accounts and objects. Containers
1430
 
    also allow the -r (or --read-acl) and -w (or --write-acl) options. The -m
1431
 
    or --meta option is allowed on all and used to define the user meta data
1432
 
    items to set in the form Name:Value. This option can be repeated. Example:
1433
 
    post -m Color:Blue -m Size:Large'''.strip('\n')
1434
 
 
1435
 
 
1436
 
def st_post(options, args, print_queue, error_queue):
1437
 
    parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the '
1438
 
        'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
1439
 
        '.r:-.example.com, .r:www.example.com, account1, account2:user2')
1440
 
    parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
1441
 
        'Write ACL for containers. Quick summary of ACL syntax: account1, '
1442
 
        'account2:user2')
1443
 
    parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
1444
 
        help='Sets a meta data item with the syntax name:value. This option '
1445
 
        'may be repeated. Example: -m Color:Blue -m Size:Large')
1446
 
    (options, args) = parse_args(parser, args)
1447
 
    args = args[1:]
1448
 
    if (options.read_acl or options.write_acl) and not args:
1449
 
        exit('-r and -w options only allowed for containers')
1450
 
    conn = Connection(options.auth, options.user, options.key)
1451
 
    if not args:
1452
 
        headers = {}
1453
 
        for item in options.meta:
1454
 
            split_item = item.split(':')
1455
 
            headers['X-Account-Meta-' + split_item[0]] = \
1456
 
                len(split_item) > 1 and split_item[1]
1457
 
        try:
1458
 
            conn.post_account(headers=headers)
1459
 
        except ClientException, err:
1460
 
            if err.http_status != 404:
1461
 
                raise
1462
 
            error_queue.put('Account not found')
1463
 
    elif len(args) == 1:
1464
 
        if '/' in args[0]:
1465
 
            print >> stderr, 'WARNING: / in container name; you might have ' \
1466
 
                             'meant %r instead of %r.' % \
1467
 
                             (args[0].replace('/', ' ', 1), args[0])
1468
 
        headers = {}
1469
 
        for item in options.meta:
1470
 
            split_item = item.split(':')
1471
 
            headers['X-Container-Meta-' + split_item[0]] = \
1472
 
                len(split_item) > 1 and split_item[1]
1473
 
        if options.read_acl is not None:
1474
 
            headers['X-Container-Read'] = options.read_acl
1475
 
        if options.write_acl is not None:
1476
 
            headers['X-Container-Write'] = options.write_acl
1477
 
        try:
1478
 
            conn.post_container(args[0], headers=headers)
1479
 
        except ClientException, err:
1480
 
            if err.http_status != 404:
1481
 
                raise
1482
 
            conn.put_container(args[0], headers=headers)
1483
 
    elif len(args) == 2:
1484
 
        headers = {}
1485
 
        for item in options.meta:
1486
 
            split_item = item.split(':')
1487
 
            headers['X-Object-Meta-' + split_item[0]] = \
1488
 
                len(split_item) > 1 and split_item[1]
1489
 
        try:
1490
 
            conn.post_object(args[0], args[1], headers=headers)
1491
 
        except ClientException, err:
1492
 
            if err.http_status != 404:
1493
 
                raise
1494
 
            error_queue.put('Object %s not found' %
1495
 
                            repr('%s/%s' % (args[0], args[1])))
1496
 
    else:
1497
 
        error_queue.put('Usage: %s [options] %s' %
1498
 
                        (basename(argv[0]), st_post_help))
1499
 
 
1500
 
 
1501
 
st_upload_help = '''
1502
 
upload [options] container file_or_directory [file_or_directory] [...]
1503
 
    Uploads to the given container the files and directories specified by the
1504
 
    remaining args. -c or --changed is an option that will only upload files
1505
 
    that have changed since the last upload. -S <size> or --segment-size <size>
1506
 
    and --leave-segments are options as well (see --help for more).
1507
 
'''.strip('\n')
1508
 
 
1509
 
 
1510
 
def st_upload(options, args, print_queue, error_queue):
1511
 
    parser.add_option('-c', '--changed', action='store_true', dest='changed',
1512
 
        default=False, help='Will only upload files that have changed since '
1513
 
        'the last upload')
1514
 
    parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
1515
 
        'upload files in segments no larger than <size> and then create a '
1516
 
        '"manifest" file that will download all the segments as if it were '
1517
 
        'the original file. The segments will be uploaded to a '
1518
 
        '<container>_segments container so as to not pollute the main '
1519
 
        '<container> listings.')
1520
 
    parser.add_option('', '--leave-segments', action='store_true',
1521
 
        dest='leave_segments', default=False, help='Indicates that you want '
1522
 
        'the older segments of manifest objects left alone (in the case of '
1523
 
        'overwrites)')
1524
 
    (options, args) = parse_args(parser, args)
1525
 
    args = args[1:]
1526
 
    if len(args) < 2:
1527
 
        error_queue.put('Usage: %s [options] %s' %
1528
 
                        (basename(argv[0]), st_upload_help))
1529
 
        return
1530
 
    object_queue = Queue(10000)
1531
 
 
1532
 
    def _segment_job(job, conn):
1533
 
        if job.get('delete', False):
1534
 
            conn.delete_object(job['container'], job['obj'])
1535
 
        else:
1536
 
            fp = open(job['path'], 'rb')
1537
 
            fp.seek(job['segment_start'])
1538
 
            conn.put_object(job.get('container', args[0] + '_segments'),
1539
 
                job['obj'], fp, content_length=job['segment_size'])
1540
 
        if options.verbose and 'log_line' in job:
1541
 
            if conn.attempts > 1:
1542
 
                print_queue.put('%s [after %d attempts]' %
1543
 
                                (job['log_line'], conn.attempts))
1544
 
            else:
1545
 
                print_queue.put(job['log_line'])
1546
 
 
1547
 
    def _object_job(job, conn):
1548
 
        path = job['path']
1549
 
        container = job.get('container', args[0])
1550
 
        dir_marker = job.get('dir_marker', False)
1551
 
        try:
1552
 
            obj = path
1553
 
            if obj.startswith('./') or obj.startswith('.\\'):
1554
 
                obj = obj[2:]
1555
 
            put_headers = {'x-object-meta-mtime': str(getmtime(path))}
1556
 
            if dir_marker:
1557
 
                if options.changed:
1558
 
                    try:
1559
 
                        headers = conn.head_object(container, obj)
1560
 
                        ct = headers.get('content-type')
1561
 
                        cl = int(headers.get('content-length'))
1562
 
                        et = headers.get('etag')
1563
 
                        mt = headers.get('x-object-meta-mtime')
1564
 
                        if ct.split(';', 1)[0] == 'text/directory' and \
1565
 
                                cl == 0 and \
1566
 
                                et == 'd41d8cd98f00b204e9800998ecf8427e' and \
1567
 
                                mt == put_headers['x-object-meta-mtime']:
1568
 
                            return
1569
 
                    except ClientException, err:
1570
 
                        if err.http_status != 404:
1571
 
                            raise
1572
 
                conn.put_object(container, obj, '', content_length=0,
1573
 
                                content_type='text/directory',
1574
 
                                headers=put_headers)
1575
 
            else:
1576
 
                # We need to HEAD all objects now in case we're overwriting a
1577
 
                # manifest object and need to delete the old segments
1578
 
                # ourselves.
1579
 
                old_manifest = None
1580
 
                if options.changed or not options.leave_segments:
1581
 
                    try:
1582
 
                        headers = conn.head_object(container, obj)
1583
 
                        cl = int(headers.get('content-length'))
1584
 
                        mt = headers.get('x-object-meta-mtime')
1585
 
                        if options.changed and cl == getsize(path) and \
1586
 
                                mt == put_headers['x-object-meta-mtime']:
1587
 
                            return
1588
 
                        if not options.leave_segments:
1589
 
                            old_manifest = headers.get('x-object-manifest')
1590
 
                    except ClientException, err:
1591
 
                        if err.http_status != 404:
1592
 
                            raise
1593
 
                if options.segment_size and \
1594
 
                        getsize(path) < options.segment_size:
1595
 
                    full_size = getsize(path)
1596
 
                    segment_queue = Queue(10000)
1597
 
                    segment_threads = [QueueFunctionThread(segment_queue,
1598
 
                        _segment_job, create_connection()) for _junk in
1599
 
                        xrange(10)]
1600
 
                    for thread in segment_threads:
1601
 
                        thread.start()
1602
 
                    segment = 0
1603
 
                    segment_start = 0
1604
 
                    while segment_start < full_size:
1605
 
                        segment_size = int(options.segment_size)
1606
 
                        if segment_start + segment_size > full_size:
1607
 
                            segment_size = full_size - segment_start
1608
 
                        segment_queue.put({'path': path,
1609
 
                            'obj': '%s/%s/%s/%08d' % (obj,
1610
 
                                put_headers['x-object-meta-mtime'], full_size,
1611
 
                                segment),
1612
 
                            'segment_start': segment_start,
1613
 
                            'segment_size': segment_size,
1614
 
                            'log_line': '%s segment %s' % (obj, segment)})
1615
 
                        segment += 1
1616
 
                        segment_start += segment_size
1617
 
                    while not segment_queue.empty():
1618
 
                        sleep(0.01)
1619
 
                    for thread in segment_threads:
1620
 
                        thread.abort = True
1621
 
                        while thread.isAlive():
1622
 
                            thread.join(0.01)
1623
 
                    if put_errors_from_threads(segment_threads, error_queue):
1624
 
                        raise ClientException('Aborting manifest creation '
1625
 
                            'because not all segments could be uploaded. %s/%s'
1626
 
                            % (container, obj))
1627
 
                    new_object_manifest = '%s_segments/%s/%s/%s/' % (
1628
 
                        container, obj, put_headers['x-object-meta-mtime'],
1629
 
                        full_size)
1630
 
                    if old_manifest == new_object_manifest:
1631
 
                        old_manifest = None
1632
 
                    put_headers['x-object-manifest'] = new_object_manifest
1633
 
                    conn.put_object(container, obj, '', content_length=0,
1634
 
                                    headers=put_headers)
1635
 
                else:
1636
 
                    conn.put_object(container, obj, open(path, 'rb'),
1637
 
                        content_length=getsize(path), headers=put_headers)
1638
 
                if old_manifest:
1639
 
                    segment_queue = Queue(10000)
1640
 
                    scontainer, sprefix = old_manifest.split('/', 1)
1641
 
                    for delobj in conn.get_container(scontainer,
1642
 
                                                     prefix=sprefix)[1]:
1643
 
                        segment_queue.put({'delete': True,
1644
 
                            'container': scontainer, 'obj': delobj['name']})
1645
 
                    if not segment_queue.empty():
1646
 
                        segment_threads = [QueueFunctionThread(segment_queue,
1647
 
                            _segment_job, create_connection()) for _junk in
1648
 
                            xrange(10)]
1649
 
                        for thread in segment_threads:
1650
 
                            thread.start()
1651
 
                        while not segment_queue.empty():
1652
 
                            sleep(0.01)
1653
 
                        for thread in segment_threads:
1654
 
                            thread.abort = True
1655
 
                            while thread.isAlive():
1656
 
                                thread.join(0.01)
1657
 
                        put_errors_from_threads(segment_threads, error_queue)
1658
 
            if options.verbose:
1659
 
                if conn.attempts > 1:
1660
 
                    print_queue.put(
1661
 
                        '%s [after %d attempts]' % (obj, conn.attempts))
1662
 
                else:
1663
 
                    print_queue.put(obj)
1664
 
        except OSError, err:
1665
 
            if err.errno != ENOENT:
1666
 
                raise
1667
 
            error_queue.put('Local file %s not found' % repr(path))
1668
 
 
1669
 
    def _upload_dir(path):
1670
 
        names = listdir(path)
1671
 
        if not names:
1672
 
            object_queue.put({'path': path, 'dir_marker': True})
1673
 
        else:
1674
 
            for name in listdir(path):
1675
 
                subpath = join(path, name)
1676
 
                if isdir(subpath):
1677
 
                    _upload_dir(subpath)
1678
 
                else:
1679
 
                    object_queue.put({'path': subpath})
1680
 
 
1681
 
    url, token = get_auth(options.auth, options.user, options.key,
1682
 
        snet=options.snet)
1683
 
    create_connection = lambda: Connection(options.auth, options.user,
1684
 
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1685
 
    object_threads = [QueueFunctionThread(object_queue, _object_job,
1686
 
        create_connection()) for _junk in xrange(10)]
1687
 
    for thread in object_threads:
1688
 
        thread.start()
1689
 
    conn = create_connection()
1690
 
    # Try to create the container, just in case it doesn't exist. If this
1691
 
    # fails, it might just be because the user doesn't have container PUT
1692
 
    # permissions, so we'll ignore any error. If there's really a problem,
1693
 
    # it'll surface on the first object PUT.
1694
 
    try:
1695
 
        conn.put_container(args[0])
1696
 
        if options.segment_size is not None:
1697
 
            conn.put_container(args[0] + '_segments')
1698
 
    except Exception:
1699
 
        pass
1700
 
    try:
1701
 
        for arg in args[1:]:
1702
 
            if isdir(arg):
1703
 
                _upload_dir(arg)
1704
 
            else:
1705
 
                object_queue.put({'path': arg})
1706
 
        while not object_queue.empty():
1707
 
            sleep(0.01)
1708
 
        for thread in object_threads:
1709
 
            thread.abort = True
1710
 
            while thread.isAlive():
1711
 
                thread.join(0.01)
1712
 
        put_errors_from_threads(object_threads, error_queue)
1713
 
    except ClientException, err:
1714
 
        if err.http_status != 404:
1715
 
            raise
1716
 
        error_queue.put('Account not found')
1717
 
 
1718
 
 
1719
 
def parse_args(parser, args, enforce_requires=True):
1720
 
    if not args:
1721
 
        args = ['-h']
1722
 
    (options, args) = parser.parse_args(args)
1723
 
    if enforce_requires and \
1724
 
            not (options.auth and options.user and options.key):
1725
 
        exit('''
1726
 
Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or
1727
 
overridden with -A, -U, or -K.'''.strip('\n'))
1728
 
    return options, args
1729
 
 
1730
 
 
1731
 
if __name__ == '__main__':
1732
 
    parser = OptionParser(version='%prog 1.0', usage='''
1733
 
Usage: %%prog <command> [options] [args]
1734
 
 
1735
 
Commands:
1736
 
  %(st_stat_help)s
1737
 
  %(st_list_help)s
1738
 
  %(st_upload_help)s
1739
 
  %(st_post_help)s
1740
 
  %(st_download_help)s
1741
 
  %(st_delete_help)s
1742
 
 
1743
 
Example:
1744
 
  %%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K key stat
1745
 
'''.strip('\n') % globals())
1746
 
    parser.add_option('-s', '--snet', action='store_true', dest='snet',
1747
 
                      default=False, help='Use SERVICENET internal network')
1748
 
    parser.add_option('-v', '--verbose', action='count', dest='verbose',
1749
 
                      default=1, help='Print more info')
1750
 
    parser.add_option('-q', '--quiet', action='store_const', dest='verbose',
1751
 
                      const=0, default=1, help='Suppress status output')
1752
 
    parser.add_option('-A', '--auth', dest='auth',
1753
 
                      default=environ.get('ST_AUTH'),
1754
 
                      help='URL for obtaining an auth token')
1755
 
    parser.add_option('-U', '--user', dest='user',
1756
 
                      default=environ.get('ST_USER'),
1757
 
                      help='User name for obtaining an auth token')
1758
 
    parser.add_option('-K', '--key', dest='key',
1759
 
                      default=environ.get('ST_KEY'),
1760
 
                      help='Key for obtaining an auth token')
1761
 
    parser.disable_interspersed_args()
1762
 
    (options, args) = parse_args(parser, argv[1:], enforce_requires=False)
1763
 
    parser.enable_interspersed_args()
1764
 
 
1765
 
    commands = ('delete', 'download', 'list', 'post', 'stat', 'upload')
1766
 
    if not args or args[0] not in commands:
1767
 
        parser.print_usage()
1768
 
        if args:
1769
 
            exit('no such command: %s' % args[0])
1770
 
        exit()
1771
 
 
1772
 
    print_queue = Queue(10000)
1773
 
 
1774
 
    def _print(item):
1775
 
        if isinstance(item, unicode):
1776
 
            item = item.encode('utf8')
1777
 
        print item
1778
 
 
1779
 
    print_thread = QueueFunctionThread(print_queue, _print)
1780
 
    print_thread.start()
1781
 
 
1782
 
    error_queue = Queue(10000)
1783
 
 
1784
 
    def _error(item):
1785
 
        if isinstance(item, unicode):
1786
 
            item = item.encode('utf8')
1787
 
        print >> stderr, item
1788
 
 
1789
 
    error_thread = QueueFunctionThread(error_queue, _error)
1790
 
    error_thread.start()
1791
 
 
1792
 
    try:
1793
 
        parser.usage = globals()['st_%s_help' % args[0]]
1794
 
        try:
1795
 
            globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
1796
 
                                         error_queue)
1797
 
        except (ClientException, HTTPException, socket.error), err:
1798
 
            error_queue.put(str(err))
1799
 
        while not print_queue.empty():
1800
 
            sleep(0.01)
1801
 
        print_thread.abort = True
1802
 
        while print_thread.isAlive():
1803
 
            print_thread.join(0.01)
1804
 
        while not error_queue.empty():
1805
 
            sleep(0.01)
1806
 
        error_thread.abort = True
1807
 
        while error_thread.isAlive():
1808
 
            error_thread.join(0.01)
1809
 
    except (SystemExit, Exception):
1810
 
        for thread in threading_enumerate():
1811
 
            thread.abort = True
1812
 
        raise