~notmyname/swift/saio_reference

« back to all changes in this revision

Viewing changes to bin/swift

  • Committer: John Dickinson
  • Date: 2011-06-14 16:04:06 UTC
  • mto: This revision was merged to the branch mainline in revision 311.
  • Revision ID: john.dickinson@rackspace.com-20110614160406-ce8i4ahvpseccvj4
renamed st to swift

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python -u
 
2
# Copyright (c) 2010-2011 OpenStack, LLC.
 
3
#
 
4
# Licensed under the Apache License, Version 2.0 (the "License");
 
5
# you may not use this file except in compliance with the License.
 
6
# You may obtain a copy of the License at
 
7
#
 
8
#    http://www.apache.org/licenses/LICENSE-2.0
 
9
#
 
10
# Unless required by applicable law or agreed to in writing, software
 
11
# distributed under the License is distributed on an "AS IS" BASIS,
 
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
13
# implied.
 
14
# See the License for the specific language governing permissions and
 
15
# limitations under the License.
 
16
 
 
17
from errno import EEXIST, ENOENT
 
18
from hashlib import md5
 
19
from optparse import OptionParser
 
20
from os import environ, listdir, makedirs, utime
 
21
from os.path import basename, dirname, getmtime, getsize, isdir, join
 
22
from Queue import Empty, Queue
 
23
from sys import argv, exc_info, exit, stderr, stdout
 
24
from threading import enumerate as threading_enumerate, Thread
 
25
from time import sleep
 
26
from traceback import format_exception
 
27
 
 
28
 
 
29
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
 
30
# Inclusion of swift.common.client for convenience of single file distribution
 
31
 
 
32
import socket
 
33
from cStringIO import StringIO
 
34
from re import compile, DOTALL
 
35
from tokenize import generate_tokens, STRING, NAME, OP
 
36
from urllib import quote as _quote, unquote
 
37
from urlparse import urlparse, urlunparse
 
38
 
 
39
try:
 
40
    from eventlet.green.httplib import HTTPException, HTTPSConnection
 
41
except ImportError:
 
42
    from httplib import HTTPException, HTTPSConnection
 
43
 
 
44
try:
 
45
    from eventlet import sleep
 
46
except ImportError:
 
47
    from time import sleep
 
48
 
 
49
try:
 
50
    from swift.common.bufferedhttp \
 
51
        import BufferedHTTPConnection as HTTPConnection
 
52
except ImportError:
 
53
    try:
 
54
        from eventlet.green.httplib import HTTPConnection
 
55
    except ImportError:
 
56
        from httplib import HTTPConnection
 
57
 
 
58
 
 
59
def quote(value, safe='/'):
 
60
    """
 
61
    Patched version of urllib.quote that encodes utf8 strings before quoting
 
62
    """
 
63
    if isinstance(value, unicode):
 
64
        value = value.encode('utf8')
 
65
    return _quote(value, safe)
 
66
 
 
67
 
 
68
# look for a real json parser first
 
69
try:
 
70
    # simplejson is popular and pretty good
 
71
    from simplejson import loads as json_loads
 
72
except ImportError:
 
73
    try:
 
74
        # 2.6 will have a json module in the stdlib
 
75
        from json import loads as json_loads
 
76
    except ImportError:
 
77
        # fall back on local parser otherwise
 
78
        comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
 
79
 
 
80
        def json_loads(string):
 
81
            '''
 
82
            Fairly competent json parser exploiting the python tokenizer and
 
83
            eval(). -- From python-cloudfiles
 
84
 
 
85
            _loads(serialized_json) -> object
 
86
            '''
 
87
            try:
 
88
                res = []
 
89
                consts = {'true': True, 'false': False, 'null': None}
 
90
                string = '(' + comments.sub('', string) + ')'
 
91
                for type, val, _junk, _junk, _junk in \
 
92
                        generate_tokens(StringIO(string).readline):
 
93
                    if (type == OP and val not in '[]{}:,()-') or \
 
94
                            (type == NAME and val not in consts):
 
95
                        raise AttributeError()
 
96
                    elif type == STRING:
 
97
                        res.append('u')
 
98
                        res.append(val.replace('\\/', '/'))
 
99
                    else:
 
100
                        res.append(val)
 
101
                return eval(''.join(res), {}, consts)
 
102
            except Exception:
 
103
                raise AttributeError()
 
104
 
 
105
 
 
106
class ClientException(Exception):
 
107
 
 
108
    def __init__(self, msg, http_scheme='', http_host='', http_port='',
 
109
                 http_path='', http_query='', http_status=0, http_reason='',
 
110
                 http_device=''):
 
111
        Exception.__init__(self, msg)
 
112
        self.msg = msg
 
113
        self.http_scheme = http_scheme
 
114
        self.http_host = http_host
 
115
        self.http_port = http_port
 
116
        self.http_path = http_path
 
117
        self.http_query = http_query
 
118
        self.http_status = http_status
 
119
        self.http_reason = http_reason
 
120
        self.http_device = http_device
 
121
 
 
122
    def __str__(self):
 
123
        a = self.msg
 
124
        b = ''
 
125
        if self.http_scheme:
 
126
            b += '%s://' % self.http_scheme
 
127
        if self.http_host:
 
128
            b += self.http_host
 
129
        if self.http_port:
 
130
            b += ':%s' % self.http_port
 
131
        if self.http_path:
 
132
            b += self.http_path
 
133
        if self.http_query:
 
134
            b += '?%s' % self.http_query
 
135
        if self.http_status:
 
136
            if b:
 
137
                b = '%s %s' % (b, self.http_status)
 
138
            else:
 
139
                b = str(self.http_status)
 
140
        if self.http_reason:
 
141
            if b:
 
142
                b = '%s %s' % (b, self.http_reason)
 
143
            else:
 
144
                b = '- %s' % self.http_reason
 
145
        if self.http_device:
 
146
            if b:
 
147
                b = '%s: device %s' % (b, self.http_device)
 
148
            else:
 
149
                b = 'device %s' % self.http_device
 
150
        return b and '%s: %s' % (a, b) or a
 
151
 
 
152
 
 
153
def http_connection(url):
 
154
    """
 
155
    Make an HTTPConnection or HTTPSConnection
 
156
 
 
157
    :param url: url to connect to
 
158
    :returns: tuple of (parsed url, connection object)
 
159
    :raises ClientException: Unable to handle protocol scheme
 
160
    """
 
161
    parsed = urlparse(url)
 
162
    if parsed.scheme == 'http':
 
163
        conn = HTTPConnection(parsed.netloc)
 
164
    elif parsed.scheme == 'https':
 
165
        conn = HTTPSConnection(parsed.netloc)
 
166
    else:
 
167
        raise ClientException('Cannot handle protocol scheme %s for url %s' %
 
168
                              (parsed.scheme, repr(url)))
 
169
    return parsed, conn
 
170
 
 
171
 
 
172
def get_auth(url, user, key, snet=False):
 
173
    """
 
174
    Get authentication/authorization credentials.
 
175
 
 
176
    The snet parameter is used for Rackspace's ServiceNet internal network
 
177
    implementation. In this function, it simply adds *snet-* to the beginning
 
178
    of the host name for the returned storage URL. With Rackspace Cloud Files,
 
179
    use of this network path causes no bandwidth charges but requires the
 
180
    client to be running on Rackspace's ServiceNet network.
 
181
 
 
182
    :param url: authentication/authorization URL
 
183
    :param user: user to authenticate as
 
184
    :param key: key or password for authorization
 
185
    :param snet: use SERVICENET internal network (see above), default is False
 
186
    :returns: tuple of (storage URL, auth token)
 
187
    :raises ClientException: HTTP GET request to auth URL failed
 
188
    """
 
189
    parsed, conn = http_connection(url)
 
190
    conn.request('GET', parsed.path, '',
 
191
                 {'X-Auth-User': user, 'X-Auth-Key': key})
 
192
    resp = conn.getresponse()
 
193
    resp.read()
 
194
    if resp.status < 200 or resp.status >= 300:
 
195
        raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
 
196
                http_host=conn.host, http_port=conn.port,
 
197
                http_path=parsed.path, http_status=resp.status,
 
198
                http_reason=resp.reason)
 
199
    url = resp.getheader('x-storage-url')
 
200
    if snet:
 
201
        parsed = list(urlparse(url))
 
202
        # Second item in the list is the netloc
 
203
        parsed[1] = 'snet-' + parsed[1]
 
204
        url = urlunparse(parsed)
 
205
    return url, resp.getheader('x-storage-token',
 
206
                                                resp.getheader('x-auth-token'))
 
207
 
 
208
 
 
209
def get_account(url, token, marker=None, limit=None, prefix=None,
 
210
                http_conn=None, full_listing=False):
 
211
    """
 
212
    Get a listing of containers for the account.
 
213
 
 
214
    :param url: storage URL
 
215
    :param token: auth token
 
216
    :param marker: marker query
 
217
    :param limit: limit query
 
218
    :param prefix: prefix query
 
219
    :param http_conn: HTTP connection object (If None, it will create the
 
220
                      conn object)
 
221
    :param full_listing: if True, return a full listing, else returns a max
 
222
                         of 10000 listings
 
223
    :returns: a tuple of (response headers, a list of containers) The response
 
224
              headers will be a dict and all header names will be lowercase.
 
225
    :raises ClientException: HTTP GET request failed
 
226
    """
 
227
    if not http_conn:
 
228
        http_conn = http_connection(url)
 
229
    if full_listing:
 
230
        rv = get_account(url, token, marker, limit, prefix, http_conn)
 
231
        listing = rv[1]
 
232
        while listing:
 
233
            marker = listing[-1]['name']
 
234
            listing = \
 
235
                get_account(url, token, marker, limit, prefix, http_conn)[1]
 
236
            if listing:
 
237
                rv[1].extend(listing)
 
238
        return rv
 
239
    parsed, conn = http_conn
 
240
    qs = 'format=json'
 
241
    if marker:
 
242
        qs += '&marker=%s' % quote(marker)
 
243
    if limit:
 
244
        qs += '&limit=%d' % limit
 
245
    if prefix:
 
246
        qs += '&prefix=%s' % quote(prefix)
 
247
    conn.request('GET', '%s?%s' % (parsed.path, qs), '',
 
248
                 {'X-Auth-Token': token})
 
249
    resp = conn.getresponse()
 
250
    resp_headers = {}
 
251
    for header, value in resp.getheaders():
 
252
        resp_headers[header.lower()] = value
 
253
    if resp.status < 200 or resp.status >= 300:
 
254
        resp.read()
 
255
        raise ClientException('Account GET failed', http_scheme=parsed.scheme,
 
256
                http_host=conn.host, http_port=conn.port,
 
257
                http_path=parsed.path, http_query=qs, http_status=resp.status,
 
258
                http_reason=resp.reason)
 
259
    if resp.status == 204:
 
260
        resp.read()
 
261
        return resp_headers, []
 
262
    return resp_headers, json_loads(resp.read())
 
263
 
 
264
 
 
265
def head_account(url, token, http_conn=None):
 
266
    """
 
267
    Get account stats.
 
268
 
 
269
    :param url: storage URL
 
270
    :param token: auth token
 
271
    :param http_conn: HTTP connection object (If None, it will create the
 
272
                      conn object)
 
273
    :returns: a dict containing the response's headers (all header names will
 
274
              be lowercase)
 
275
    :raises ClientException: HTTP HEAD request failed
 
276
    """
 
277
    if http_conn:
 
278
        parsed, conn = http_conn
 
279
    else:
 
280
        parsed, conn = http_connection(url)
 
281
    conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
 
282
    resp = conn.getresponse()
 
283
    resp.read()
 
284
    if resp.status < 200 or resp.status >= 300:
 
285
        raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
 
286
                http_host=conn.host, http_port=conn.port,
 
287
                http_path=parsed.path, http_status=resp.status,
 
288
                http_reason=resp.reason)
 
289
    resp_headers = {}
 
290
    for header, value in resp.getheaders():
 
291
        resp_headers[header.lower()] = value
 
292
    return resp_headers
 
293
 
 
294
 
 
295
def post_account(url, token, headers, http_conn=None):
 
296
    """
 
297
    Update an account's metadata.
 
298
 
 
299
    :param url: storage URL
 
300
    :param token: auth token
 
301
    :param headers: additional headers to include in the request
 
302
    :param http_conn: HTTP connection object (If None, it will create the
 
303
                      conn object)
 
304
    :raises ClientException: HTTP POST request failed
 
305
    """
 
306
    if http_conn:
 
307
        parsed, conn = http_conn
 
308
    else:
 
309
        parsed, conn = http_connection(url)
 
310
    headers['X-Auth-Token'] = token
 
311
    conn.request('POST', parsed.path, '', headers)
 
312
    resp = conn.getresponse()
 
313
    resp.read()
 
314
    if resp.status < 200 or resp.status >= 300:
 
315
        raise ClientException('Account POST failed',
 
316
                http_scheme=parsed.scheme, http_host=conn.host,
 
317
                http_port=conn.port, http_path=path, http_status=resp.status,
 
318
                http_reason=resp.reason)
 
319
 
 
320
 
 
321
def get_container(url, token, container, marker=None, limit=None,
 
322
                  prefix=None, delimiter=None, http_conn=None,
 
323
                  full_listing=False):
 
324
    """
 
325
    Get a listing of objects for the container.
 
326
 
 
327
    :param url: storage URL
 
328
    :param token: auth token
 
329
    :param container: container name to get a listing for
 
330
    :param marker: marker query
 
331
    :param limit: limit query
 
332
    :param prefix: prefix query
 
333
    :param delimeter: string to delimit the queries on
 
334
    :param http_conn: HTTP connection object (If None, it will create the
 
335
                      conn object)
 
336
    :param full_listing: if True, return a full listing, else returns a max
 
337
                         of 10000 listings
 
338
    :returns: a tuple of (response headers, a list of objects) The response
 
339
              headers will be a dict and all header names will be lowercase.
 
340
    :raises ClientException: HTTP GET request failed
 
341
    """
 
342
    if not http_conn:
 
343
        http_conn = http_connection(url)
 
344
    if full_listing:
 
345
        rv = get_container(url, token, container, marker, limit, prefix,
 
346
                           delimiter, http_conn)
 
347
        listing = rv[1]
 
348
        while listing:
 
349
            if not delimiter:
 
350
                marker = listing[-1]['name']
 
351
            else:
 
352
                marker = listing[-1].get('name', listing[-1].get('subdir'))
 
353
            listing = get_container(url, token, container, marker, limit,
 
354
                                    prefix, delimiter, http_conn)[1]
 
355
            if listing:
 
356
                rv[1].extend(listing)
 
357
        return rv
 
358
    parsed, conn = http_conn
 
359
    path = '%s/%s' % (parsed.path, quote(container))
 
360
    qs = 'format=json'
 
361
    if marker:
 
362
        qs += '&marker=%s' % quote(marker)
 
363
    if limit:
 
364
        qs += '&limit=%d' % limit
 
365
    if prefix:
 
366
        qs += '&prefix=%s' % quote(prefix)
 
367
    if delimiter:
 
368
        qs += '&delimiter=%s' % quote(delimiter)
 
369
    conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
 
370
    resp = conn.getresponse()
 
371
    if resp.status < 200 or resp.status >= 300:
 
372
        resp.read()
 
373
        raise ClientException('Container GET failed',
 
374
                http_scheme=parsed.scheme, http_host=conn.host,
 
375
                http_port=conn.port, http_path=path, http_query=qs,
 
376
                http_status=resp.status, http_reason=resp.reason)
 
377
    resp_headers = {}
 
378
    for header, value in resp.getheaders():
 
379
        resp_headers[header.lower()] = value
 
380
    if resp.status == 204:
 
381
        resp.read()
 
382
        return resp_headers, []
 
383
    return resp_headers, json_loads(resp.read())
 
384
 
 
385
 
 
386
def head_container(url, token, container, http_conn=None):
 
387
    """
 
388
    Get container stats.
 
389
 
 
390
    :param url: storage URL
 
391
    :param token: auth token
 
392
    :param container: container name to get stats for
 
393
    :param http_conn: HTTP connection object (If None, it will create the
 
394
                      conn object)
 
395
    :returns: a dict containing the response's headers (all header names will
 
396
              be lowercase)
 
397
    :raises ClientException: HTTP HEAD request failed
 
398
    """
 
399
    if http_conn:
 
400
        parsed, conn = http_conn
 
401
    else:
 
402
        parsed, conn = http_connection(url)
 
403
    path = '%s/%s' % (parsed.path, quote(container))
 
404
    conn.request('HEAD', path, '', {'X-Auth-Token': token})
 
405
    resp = conn.getresponse()
 
406
    resp.read()
 
407
    if resp.status < 200 or resp.status >= 300:
 
408
        raise ClientException('Container HEAD failed',
 
409
                http_scheme=parsed.scheme, http_host=conn.host,
 
410
                http_port=conn.port, http_path=path, http_status=resp.status,
 
411
                http_reason=resp.reason)
 
412
    resp_headers = {}
 
413
    for header, value in resp.getheaders():
 
414
        resp_headers[header.lower()] = value
 
415
    return resp_headers
 
416
 
 
417
 
 
418
def put_container(url, token, container, headers=None, http_conn=None):
 
419
    """
 
420
    Create a container
 
421
 
 
422
    :param url: storage URL
 
423
    :param token: auth token
 
424
    :param container: container name to create
 
425
    :param headers: additional headers to include in the request
 
426
    :param http_conn: HTTP connection object (If None, it will create the
 
427
                      conn object)
 
428
    :raises ClientException: HTTP PUT request failed
 
429
    """
 
430
    if http_conn:
 
431
        parsed, conn = http_conn
 
432
    else:
 
433
        parsed, conn = http_connection(url)
 
434
    path = '%s/%s' % (parsed.path, quote(container))
 
435
    if not headers:
 
436
        headers = {}
 
437
    headers['X-Auth-Token'] = token
 
438
    conn.request('PUT', path, '', headers)
 
439
    resp = conn.getresponse()
 
440
    resp.read()
 
441
    if resp.status < 200 or resp.status >= 300:
 
442
        raise ClientException('Container PUT failed',
 
443
                http_scheme=parsed.scheme, http_host=conn.host,
 
444
                http_port=conn.port, http_path=path, http_status=resp.status,
 
445
                http_reason=resp.reason)
 
446
 
 
447
 
 
448
def post_container(url, token, container, headers, http_conn=None):
 
449
    """
 
450
    Update a container's metadata.
 
451
 
 
452
    :param url: storage URL
 
453
    :param token: auth token
 
454
    :param container: container name to update
 
455
    :param headers: additional headers to include in the request
 
456
    :param http_conn: HTTP connection object (If None, it will create the
 
457
                      conn object)
 
458
    :raises ClientException: HTTP POST request failed
 
459
    """
 
460
    if http_conn:
 
461
        parsed, conn = http_conn
 
462
    else:
 
463
        parsed, conn = http_connection(url)
 
464
    path = '%s/%s' % (parsed.path, quote(container))
 
465
    headers['X-Auth-Token'] = token
 
466
    conn.request('POST', path, '', headers)
 
467
    resp = conn.getresponse()
 
468
    resp.read()
 
469
    if resp.status < 200 or resp.status >= 300:
 
470
        raise ClientException('Container POST failed',
 
471
                http_scheme=parsed.scheme, http_host=conn.host,
 
472
                http_port=conn.port, http_path=path, http_status=resp.status,
 
473
                http_reason=resp.reason)
 
474
 
 
475
 
 
476
def delete_container(url, token, container, http_conn=None):
 
477
    """
 
478
    Delete a container
 
479
 
 
480
    :param url: storage URL
 
481
    :param token: auth token
 
482
    :param container: container name to delete
 
483
    :param http_conn: HTTP connection object (If None, it will create the
 
484
                      conn object)
 
485
    :raises ClientException: HTTP DELETE request failed
 
486
    """
 
487
    if http_conn:
 
488
        parsed, conn = http_conn
 
489
    else:
 
490
        parsed, conn = http_connection(url)
 
491
    path = '%s/%s' % (parsed.path, quote(container))
 
492
    conn.request('DELETE', path, '', {'X-Auth-Token': token})
 
493
    resp = conn.getresponse()
 
494
    resp.read()
 
495
    if resp.status < 200 or resp.status >= 300:
 
496
        raise ClientException('Container DELETE failed',
 
497
                http_scheme=parsed.scheme, http_host=conn.host,
 
498
                http_port=conn.port, http_path=path, http_status=resp.status,
 
499
                http_reason=resp.reason)
 
500
 
 
501
 
 
502
def get_object(url, token, container, name, http_conn=None,
 
503
               resp_chunk_size=None):
 
504
    """
 
505
    Get an object
 
506
 
 
507
    :param url: storage URL
 
508
    :param token: auth token
 
509
    :param container: container name that the object is in
 
510
    :param name: object name to get
 
511
    :param http_conn: HTTP connection object (If None, it will create the
 
512
                      conn object)
 
513
    :param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
 
514
                            you specify a resp_chunk_size you must fully read
 
515
                            the object's contents before making another
 
516
                            request.
 
517
    :returns: a tuple of (response headers, the object's contents) The response
 
518
              headers will be a dict and all header names will be lowercase.
 
519
    :raises ClientException: HTTP GET request failed
 
520
    """
 
521
    if http_conn:
 
522
        parsed, conn = http_conn
 
523
    else:
 
524
        parsed, conn = http_connection(url)
 
525
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
 
526
    conn.request('GET', path, '', {'X-Auth-Token': token})
 
527
    resp = conn.getresponse()
 
528
    if resp.status < 200 or resp.status >= 300:
 
529
        resp.read()
 
530
        raise ClientException('Object GET failed', http_scheme=parsed.scheme,
 
531
                http_host=conn.host, http_port=conn.port, http_path=path,
 
532
                http_status=resp.status, http_reason=resp.reason)
 
533
    if resp_chunk_size:
 
534
 
 
535
        def _object_body():
 
536
            buf = resp.read(resp_chunk_size)
 
537
            while buf:
 
538
                yield buf
 
539
                buf = resp.read(resp_chunk_size)
 
540
        object_body = _object_body()
 
541
    else:
 
542
        object_body = resp.read()
 
543
    resp_headers = {}
 
544
    for header, value in resp.getheaders():
 
545
        resp_headers[header.lower()] = value
 
546
    return resp_headers, object_body
 
547
 
 
548
 
 
549
def head_object(url, token, container, name, http_conn=None):
 
550
    """
 
551
    Get object info
 
552
 
 
553
    :param url: storage URL
 
554
    :param token: auth token
 
555
    :param container: container name that the object is in
 
556
    :param name: object name to get info for
 
557
    :param http_conn: HTTP connection object (If None, it will create the
 
558
                      conn object)
 
559
    :returns: a dict containing the response's headers (all header names will
 
560
              be lowercase)
 
561
    :raises ClientException: HTTP HEAD request failed
 
562
    """
 
563
    if http_conn:
 
564
        parsed, conn = http_conn
 
565
    else:
 
566
        parsed, conn = http_connection(url)
 
567
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
 
568
    conn.request('HEAD', path, '', {'X-Auth-Token': token})
 
569
    resp = conn.getresponse()
 
570
    resp.read()
 
571
    if resp.status < 200 or resp.status >= 300:
 
572
        raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
 
573
                http_host=conn.host, http_port=conn.port, http_path=path,
 
574
                http_status=resp.status, http_reason=resp.reason)
 
575
    resp_headers = {}
 
576
    for header, value in resp.getheaders():
 
577
        resp_headers[header.lower()] = value
 
578
    return resp_headers
 
579
 
 
580
 
 
581
def put_object(url, token, container, name, contents, content_length=None,
 
582
               etag=None, chunk_size=65536, content_type=None, headers=None,
 
583
               http_conn=None):
 
584
    """
 
585
    Put an object
 
586
 
 
587
    :param url: storage URL
 
588
    :param token: auth token
 
589
    :param container: container name that the object is in
 
590
    :param name: object name to put
 
591
    :param contents: a string or a file like object to read object data from
 
592
    :param content_length: value to send as content-length header; also limits
 
593
                           the amount read from contents
 
594
    :param etag: etag of contents
 
595
    :param chunk_size: chunk size of data to write
 
596
    :param content_type: value to send as content-type header
 
597
    :param headers: additional headers to include in the request
 
598
    :param http_conn: HTTP connection object (If None, it will create the
 
599
                      conn object)
 
600
    :returns: etag from server response
 
601
    :raises ClientException: HTTP PUT request failed
 
602
    """
 
603
    if http_conn:
 
604
        parsed, conn = http_conn
 
605
    else:
 
606
        parsed, conn = http_connection(url)
 
607
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
 
608
    if not headers:
 
609
        headers = {}
 
610
    headers['X-Auth-Token'] = token
 
611
    if etag:
 
612
        headers['ETag'] = etag.strip('"')
 
613
    if content_length is not None:
 
614
        headers['Content-Length'] = str(content_length)
 
615
    if content_type is not None:
 
616
        headers['Content-Type'] = content_type
 
617
    if not contents:
 
618
        headers['Content-Length'] = '0'
 
619
    if hasattr(contents, 'read'):
 
620
        conn.putrequest('PUT', path)
 
621
        for header, value in headers.iteritems():
 
622
            conn.putheader(header, value)
 
623
        if content_length is None:
 
624
            conn.putheader('Transfer-Encoding', 'chunked')
 
625
            conn.endheaders()
 
626
            chunk = contents.read(chunk_size)
 
627
            while chunk:
 
628
                conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
 
629
                chunk = contents.read(chunk_size)
 
630
            conn.send('0\r\n\r\n')
 
631
        else:
 
632
            conn.endheaders()
 
633
            left = content_length
 
634
            while left > 0:
 
635
                size = chunk_size
 
636
                if size > left:
 
637
                    size = left
 
638
                chunk = contents.read(size)
 
639
                conn.send(chunk)
 
640
                left -= len(chunk)
 
641
    else:
 
642
        conn.request('PUT', path, contents, headers)
 
643
    resp = conn.getresponse()
 
644
    resp.read()
 
645
    if resp.status < 200 or resp.status >= 300:
 
646
        raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
 
647
                http_host=conn.host, http_port=conn.port, http_path=path,
 
648
                http_status=resp.status, http_reason=resp.reason)
 
649
    return resp.getheader('etag').strip('"')
 
650
 
 
651
 
 
652
def post_object(url, token, container, name, headers, http_conn=None):
 
653
    """
 
654
    Update object metadata
 
655
 
 
656
    :param url: storage URL
 
657
    :param token: auth token
 
658
    :param container: container name that the object is in
 
659
    :param name: name of the object to update
 
660
    :param headers: additional headers to include in the request
 
661
    :param http_conn: HTTP connection object (If None, it will create the
 
662
                      conn object)
 
663
    :raises ClientException: HTTP POST request failed
 
664
    """
 
665
    if http_conn:
 
666
        parsed, conn = http_conn
 
667
    else:
 
668
        parsed, conn = http_connection(url)
 
669
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
 
670
    headers['X-Auth-Token'] = token
 
671
    conn.request('POST', path, '', headers)
 
672
    resp = conn.getresponse()
 
673
    resp.read()
 
674
    if resp.status < 200 or resp.status >= 300:
 
675
        raise ClientException('Object POST failed', http_scheme=parsed.scheme,
 
676
                http_host=conn.host, http_port=conn.port, http_path=path,
 
677
                http_status=resp.status, http_reason=resp.reason)
 
678
 
 
679
 
 
680
def delete_object(url, token, container, name, http_conn=None):
 
681
    """
 
682
    Delete object
 
683
 
 
684
    :param url: storage URL
 
685
    :param token: auth token
 
686
    :param container: container name that the object is in
 
687
    :param name: object name to delete
 
688
    :param http_conn: HTTP connection object (If None, it will create the
 
689
                      conn object)
 
690
    :raises ClientException: HTTP DELETE request failed
 
691
    """
 
692
    if http_conn:
 
693
        parsed, conn = http_conn
 
694
    else:
 
695
        parsed, conn = http_connection(url)
 
696
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
 
697
    conn.request('DELETE', path, '', {'X-Auth-Token': token})
 
698
    resp = conn.getresponse()
 
699
    resp.read()
 
700
    if resp.status < 200 or resp.status >= 300:
 
701
        raise ClientException('Object DELETE failed',
 
702
                http_scheme=parsed.scheme, http_host=conn.host,
 
703
                http_port=conn.port, http_path=path, http_status=resp.status,
 
704
                http_reason=resp.reason)
 
705
 
 
706
 
 
707
class Connection(object):
 
708
    """Convenience class to make requests that will also retry the request"""
 
709
 
 
710
    def __init__(self, authurl, user, key, retries=5, preauthurl=None,
 
711
                 preauthtoken=None, snet=False, starting_backoff=1):
 
712
        """
 
713
        :param authurl: authenitcation URL
 
714
        :param user: user name to authenticate as
 
715
        :param key: key/password to authenticate with
 
716
        :param retries: Number of times to retry the request before failing
 
717
        :param preauthurl: storage URL (if you have already authenticated)
 
718
        :param preauthtoken: authentication token (if you have already
 
719
                             authenticated)
 
720
        :param snet: use SERVICENET internal network default is False
 
721
        """
 
722
        self.authurl = authurl
 
723
        self.user = user
 
724
        self.key = key
 
725
        self.retries = retries
 
726
        self.http_conn = None
 
727
        self.url = preauthurl
 
728
        self.token = preauthtoken
 
729
        self.attempts = 0
 
730
        self.snet = snet
 
731
        self.starting_backoff = starting_backoff
 
732
 
 
733
    def get_auth(self):
 
734
        return get_auth(self.authurl, self.user, self.key, snet=self.snet)
 
735
 
 
736
    def http_connection(self):
 
737
        return http_connection(self.url)
 
738
 
 
739
    def _retry(self, reset_func, func, *args, **kwargs):
 
740
        self.attempts = 0
 
741
        backoff = self.starting_backoff
 
742
        while self.attempts <= self.retries:
 
743
            self.attempts += 1
 
744
            try:
 
745
                if not self.url or not self.token:
 
746
                    self.url, self.token = self.get_auth()
 
747
                    self.http_conn = None
 
748
                if not self.http_conn:
 
749
                    self.http_conn = self.http_connection()
 
750
                kwargs['http_conn'] = self.http_conn
 
751
                rv = func(self.url, self.token, *args, **kwargs)
 
752
                return rv
 
753
            except (socket.error, HTTPException):
 
754
                if self.attempts > self.retries:
 
755
                    raise
 
756
                self.http_conn = None
 
757
            except ClientException, err:
 
758
                if self.attempts > self.retries:
 
759
                    raise
 
760
                if err.http_status == 401:
 
761
                    self.url = self.token = None
 
762
                    if self.attempts > 1:
 
763
                        raise
 
764
                elif err.http_status == 408:
 
765
                    self.http_conn = None
 
766
                elif 500 <= err.http_status <= 599:
 
767
                    pass
 
768
                else:
 
769
                    raise
 
770
            sleep(backoff)
 
771
            backoff *= 2
 
772
            if reset_func:
 
773
                reset_func(func, *args, **kwargs)
 
774
 
 
775
    def head_account(self):
 
776
        """Wrapper for :func:`head_account`"""
 
777
        return self._retry(None, head_account)
 
778
 
 
779
    def get_account(self, marker=None, limit=None, prefix=None,
 
780
                    full_listing=False):
 
781
        """Wrapper for :func:`get_account`"""
 
782
        # TODO(unknown): With full_listing=True this will restart the entire
 
783
        # listing with each retry. Need to make a better version that just
 
784
        # retries where it left off.
 
785
        return self._retry(None, get_account, marker=marker, limit=limit,
 
786
                           prefix=prefix, full_listing=full_listing)
 
787
 
 
788
    def post_account(self, headers):
 
789
        """Wrapper for :func:`post_account`"""
 
790
        return self._retry(None, post_account, headers)
 
791
 
 
792
    def head_container(self, container):
 
793
        """Wrapper for :func:`head_container`"""
 
794
        return self._retry(None, head_container, container)
 
795
 
 
796
    def get_container(self, container, marker=None, limit=None, prefix=None,
 
797
                      delimiter=None, full_listing=False):
 
798
        """Wrapper for :func:`get_container`"""
 
799
        # TODO(unknown): With full_listing=True this will restart the entire
 
800
        # listing with each retry. Need to make a better version that just
 
801
        # retries where it left off.
 
802
        return self._retry(None, get_container, container, marker=marker,
 
803
                           limit=limit, prefix=prefix, delimiter=delimiter,
 
804
                           full_listing=full_listing)
 
805
 
 
806
    def put_container(self, container, headers=None):
 
807
        """Wrapper for :func:`put_container`"""
 
808
        return self._retry(None, put_container, container, headers=headers)
 
809
 
 
810
    def post_container(self, container, headers):
 
811
        """Wrapper for :func:`post_container`"""
 
812
        return self._retry(None, post_container, container, headers)
 
813
 
 
814
    def delete_container(self, container):
 
815
        """Wrapper for :func:`delete_container`"""
 
816
        return self._retry(None, delete_container, container)
 
817
 
 
818
    def head_object(self, container, obj):
 
819
        """Wrapper for :func:`head_object`"""
 
820
        return self._retry(None, head_object, container, obj)
 
821
 
 
822
    def get_object(self, container, obj, resp_chunk_size=None):
 
823
        """Wrapper for :func:`get_object`"""
 
824
        return self._retry(None, get_object, container, obj,
 
825
                           resp_chunk_size=resp_chunk_size)
 
826
 
 
827
    def put_object(self, container, obj, contents, content_length=None,
 
828
                   etag=None, chunk_size=65536, content_type=None,
 
829
                   headers=None):
 
830
        """Wrapper for :func:`put_object`"""
 
831
 
 
832
        def _default_reset(*args, **kwargs):
 
833
            raise ClientException('put_object(%r, %r, ...) failure and no '
 
834
                'ability to reset contents for reupload.' % (container, obj))
 
835
 
 
836
        reset_func = _default_reset
 
837
        tell = getattr(contents, 'tell', None)
 
838
        seek = getattr(contents, 'seek', None)
 
839
        if tell and seek:
 
840
            orig_pos = tell()
 
841
            reset_func = lambda *a, **k: seek(orig_pos)
 
842
        elif not contents:
 
843
            reset_func = lambda *a, **k: None
 
844
 
 
845
        return self._retry(reset_func, put_object, container, obj, contents,
 
846
            content_length=content_length, etag=etag, chunk_size=chunk_size,
 
847
            content_type=content_type, headers=headers)
 
848
 
 
849
    def post_object(self, container, obj, headers):
 
850
        """Wrapper for :func:`post_object`"""
 
851
        return self._retry(None, post_object, container, obj, headers)
 
852
 
 
853
    def delete_object(self, container, obj):
 
854
        """Wrapper for :func:`delete_object`"""
 
855
        return self._retry(None, delete_object, container, obj)
 
856
 
 
857
# End inclusion of swift.common.client
 
858
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
 
859
 
 
860
 
 
861
def mkdirs(path):
 
862
    try:
 
863
        makedirs(path)
 
864
    except OSError, err:
 
865
        if err.errno != EEXIST:
 
866
            raise
 
867
 
 
868
 
 
869
def put_errors_from_threads(threads, error_queue):
 
870
    """
 
871
    Places any errors from the threads into error_queue.
 
872
    :param threads: A list of QueueFunctionThread instances.
 
873
    :param error_queue: A queue to put error strings into.
 
874
    :returns: True if any errors were found.
 
875
    """
 
876
    was_error = False
 
877
    for thread in threads:
 
878
        for info in thread.exc_infos:
 
879
            was_error = True
 
880
            if isinstance(info[1], ClientException):
 
881
                error_queue.put(str(info[1]))
 
882
            else:
 
883
                error_queue.put(''.join(format_exception(*info)))
 
884
    return was_error
 
885
 
 
886
 
 
887
class QueueFunctionThread(Thread):
 
888
 
 
889
    def __init__(self, queue, func, *args, **kwargs):
 
890
        """ Calls func for each item in queue; func is called with a queued
 
891
            item as the first arg followed by *args and **kwargs. Use the abort
 
892
            attribute to have the thread empty the queue (without processing)
 
893
            and exit. """
 
894
        Thread.__init__(self)
 
895
        self.abort = False
 
896
        self.queue = queue
 
897
        self.func = func
 
898
        self.args = args
 
899
        self.kwargs = kwargs
 
900
        self.exc_infos = []
 
901
 
 
902
    def run(self):
 
903
        try:
 
904
            while True:
 
905
                try:
 
906
                    item = self.queue.get_nowait()
 
907
                    if not self.abort:
 
908
                        self.func(item, *self.args, **self.kwargs)
 
909
                    self.queue.task_done()
 
910
                except Empty:
 
911
                    if self.abort:
 
912
                        break
 
913
                    sleep(0.01)
 
914
        except Exception:
 
915
            self.exc_infos.append(exc_info())
 
916
 
 
917
 
 
918
st_delete_help = '''
 
919
delete --all OR delete container [--leave-segments] [object] [object] ...
 
920
    Deletes everything in the account (with --all), or everything in a
 
921
    container, or a list of objects depending on the args given. Segments of
 
922
    manifest objects will be deleted as well, unless you specify the
 
923
    --leave-segments option.'''.strip('\n')
 
924
 
 
925
 
 
926
def st_delete(parser, args, print_queue, error_queue):
 
927
    parser.add_option('-a', '--all', action='store_true', dest='yes_all',
 
928
        default=False, help='Indicates that you really want to delete '
 
929
        'everything in the account')
 
930
    parser.add_option('', '--leave-segments', action='store_true',
 
931
        dest='leave_segments', default=False, help='Indicates that you want '
 
932
        'the segments of manifest objects left alone')
 
933
    (options, args) = parse_args(parser, args)
 
934
    args = args[1:]
 
935
    if (not args and not options.yes_all) or (args and options.yes_all):
 
936
        error_queue.put('Usage: %s [options] %s' %
 
937
                        (basename(argv[0]), st_delete_help))
 
938
        return
 
939
 
 
940
    def _delete_segment((container, obj), conn):
 
941
        conn.delete_object(container, obj)
 
942
        if options.verbose:
 
943
            if conn.attempts > 2:
 
944
                print_queue.put('%s/%s [after %d attempts]' %
 
945
                                (container, obj, conn.attempts))
 
946
            else:
 
947
                print_queue.put('%s/%s' % (container, obj))
 
948
 
 
949
    object_queue = Queue(10000)
 
950
 
 
951
    def _delete_object((container, obj), conn):
 
952
        try:
 
953
            old_manifest = None
 
954
            if not options.leave_segments:
 
955
                try:
 
956
                    old_manifest = conn.head_object(container, obj).get(
 
957
                        'x-object-manifest')
 
958
                except ClientException, err:
 
959
                    if err.http_status != 404:
 
960
                        raise
 
961
            conn.delete_object(container, obj)
 
962
            if old_manifest:
 
963
                segment_queue = Queue(10000)
 
964
                scontainer, sprefix = old_manifest.split('/', 1)
 
965
                for delobj in conn.get_container(scontainer,
 
966
                                                 prefix=sprefix)[1]:
 
967
                    segment_queue.put((scontainer, delobj['name']))
 
968
                if not segment_queue.empty():
 
969
                    segment_threads = [QueueFunctionThread(segment_queue,
 
970
                        _delete_segment, create_connection()) for _junk in
 
971
                        xrange(10)]
 
972
                    for thread in segment_threads:
 
973
                        thread.start()
 
974
                    while not segment_queue.empty():
 
975
                        sleep(0.01)
 
976
                    for thread in segment_threads:
 
977
                        thread.abort = True
 
978
                        while thread.isAlive():
 
979
                            thread.join(0.01)
 
980
                    put_errors_from_threads(segment_threads, error_queue)
 
981
            if options.verbose:
 
982
                path = options.yes_all and join(container, obj) or obj
 
983
                if path[:1] in ('/', '\\'):
 
984
                    path = path[1:]
 
985
                if conn.attempts > 1:
 
986
                    print_queue.put('%s [after %d attempts]' %
 
987
                                    (path, conn.attempts))
 
988
                else:
 
989
                    print_queue.put(path)
 
990
        except ClientException, err:
 
991
            if err.http_status != 404:
 
992
                raise
 
993
            error_queue.put('Object %s not found' %
 
994
                            repr('%s/%s' % (container, obj)))
 
995
 
 
996
    container_queue = Queue(10000)
 
997
 
 
998
    def _delete_container(container, conn):
 
999
        try:
 
1000
            marker = ''
 
1001
            while True:
 
1002
                objects = [o['name'] for o in
 
1003
                           conn.get_container(container, marker=marker)[1]]
 
1004
                if not objects:
 
1005
                    break
 
1006
                for obj in objects:
 
1007
                    object_queue.put((container, obj))
 
1008
                marker = objects[-1]
 
1009
            while not object_queue.empty():
 
1010
                sleep(0.01)
 
1011
            attempts = 1
 
1012
            while True:
 
1013
                try:
 
1014
                    conn.delete_container(container)
 
1015
                    break
 
1016
                except ClientException, err:
 
1017
                    if err.http_status != 409:
 
1018
                        raise
 
1019
                    if attempts > 10:
 
1020
                        raise
 
1021
                    attempts += 1
 
1022
                    sleep(1)
 
1023
        except ClientException, err:
 
1024
            if err.http_status != 404:
 
1025
                raise
 
1026
            error_queue.put('Container %s not found' % repr(container))
 
1027
 
 
1028
    url, token = get_auth(options.auth, options.user, options.key,
 
1029
        snet=options.snet)
 
1030
    create_connection = lambda: Connection(options.auth, options.user,
 
1031
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
 
1032
    object_threads = [QueueFunctionThread(object_queue, _delete_object,
 
1033
        create_connection()) for _junk in xrange(10)]
 
1034
    for thread in object_threads:
 
1035
        thread.start()
 
1036
    container_threads = [QueueFunctionThread(container_queue,
 
1037
        _delete_container, create_connection()) for _junk in xrange(10)]
 
1038
    for thread in container_threads:
 
1039
        thread.start()
 
1040
    if not args:
 
1041
        conn = create_connection()
 
1042
        try:
 
1043
            marker = ''
 
1044
            while True:
 
1045
                containers = \
 
1046
                    [c['name'] for c in conn.get_account(marker=marker)[1]]
 
1047
                if not containers:
 
1048
                    break
 
1049
                for container in containers:
 
1050
                    container_queue.put(container)
 
1051
                marker = containers[-1]
 
1052
            while not container_queue.empty():
 
1053
                sleep(0.01)
 
1054
            while not object_queue.empty():
 
1055
                sleep(0.01)
 
1056
        except ClientException, err:
 
1057
            if err.http_status != 404:
 
1058
                raise
 
1059
            error_queue.put('Account not found')
 
1060
    elif len(args) == 1:
 
1061
        if '/' in args[0]:
 
1062
            print >> stderr, 'WARNING: / in container name; you might have ' \
 
1063
                             'meant %r instead of %r.' % \
 
1064
                             (args[0].replace('/', ' ', 1), args[0])
 
1065
        conn = create_connection()
 
1066
        _delete_container(args[0], conn)
 
1067
    else:
 
1068
        for obj in args[1:]:
 
1069
            object_queue.put((args[0], obj))
 
1070
    while not container_queue.empty():
 
1071
        sleep(0.01)
 
1072
    for thread in container_threads:
 
1073
        thread.abort = True
 
1074
        while thread.isAlive():
 
1075
            thread.join(0.01)
 
1076
    put_errors_from_threads(container_threads, error_queue)
 
1077
    while not object_queue.empty():
 
1078
        sleep(0.01)
 
1079
    for thread in object_threads:
 
1080
        thread.abort = True
 
1081
        while thread.isAlive():
 
1082
            thread.join(0.01)
 
1083
    put_errors_from_threads(object_threads, error_queue)
 
1084
 
 
1085
 
 
1086
st_download_help = '''
 
1087
download --all OR download container [options] [object] [object] ...
 
1088
    Downloads everything in the account (with --all), or everything in a
 
1089
    container, or a list of objects depending on the args given. For a single
 
1090
    object download, you may use the -o [--output] <filename> option to
 
1091
    redirect the output to a specific file or if "-" then just redirect to
 
1092
    stdout.'''.strip('\n')
 
1093
 
 
1094
 
 
1095
def st_download(options, args, print_queue, error_queue):
 
1096
    parser.add_option('-a', '--all', action='store_true', dest='yes_all',
 
1097
        default=False, help='Indicates that you really want to download '
 
1098
        'everything in the account')
 
1099
    parser.add_option('-o', '--output', dest='out_file', help='For a single '
 
1100
        'file download, stream the output to an alternate location ')
 
1101
    (options, args) = parse_args(parser, args)
 
1102
    args = args[1:]
 
1103
    if options.out_file == '-':
 
1104
        options.verbose = 0
 
1105
    if options.out_file and len(args) != 2:
 
1106
        exit('-o option only allowed for single file downloads')
 
1107
    if (not args and not options.yes_all) or (args and options.yes_all):
 
1108
        error_queue.put('Usage: %s [options] %s' %
 
1109
                        (basename(argv[0]), st_download_help))
 
1110
        return
 
1111
 
 
1112
    object_queue = Queue(10000)
 
1113
 
 
1114
    def _download_object(queue_arg, conn):
 
1115
        if len(queue_arg) == 2:
 
1116
            container, obj = queue_arg
 
1117
            out_file = None
 
1118
        elif len(queue_arg) == 3:
 
1119
            container, obj, out_file = queue_arg
 
1120
        else:
 
1121
            raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
 
1122
        try:
 
1123
            headers, body = \
 
1124
                conn.get_object(container, obj, resp_chunk_size=65536)
 
1125
            content_type = headers.get('content-type')
 
1126
            if 'content-length' in headers:
 
1127
                content_length = int(headers.get('content-length'))
 
1128
            else:
 
1129
                content_length = None
 
1130
            etag = headers.get('etag')
 
1131
            path = options.yes_all and join(container, obj) or obj
 
1132
            if path[:1] in ('/', '\\'):
 
1133
                path = path[1:]
 
1134
            md5sum = None
 
1135
            make_dir = out_file != "-"
 
1136
            if content_type.split(';', 1)[0] == 'text/directory':
 
1137
                if make_dir and not isdir(path):
 
1138
                    mkdirs(path)
 
1139
                read_length = 0
 
1140
                if 'x-object-manifest' not in headers:
 
1141
                    md5sum = md5()
 
1142
                for chunk in body:
 
1143
                    read_length += len(chunk)
 
1144
                    if md5sum:
 
1145
                        md5sum.update(chunk)
 
1146
            else:
 
1147
                dirpath = dirname(path)
 
1148
                if make_dir and dirpath and not isdir(dirpath):
 
1149
                    mkdirs(dirpath)
 
1150
                if out_file == "-":
 
1151
                    fp = stdout
 
1152
                elif out_file:
 
1153
                    fp = open(out_file, 'wb')
 
1154
                else:
 
1155
                    fp = open(path, 'wb')
 
1156
                read_length = 0
 
1157
                if 'x-object-manifest' not in headers:
 
1158
                    md5sum = md5()
 
1159
                for chunk in body:
 
1160
                    fp.write(chunk)
 
1161
                    read_length += len(chunk)
 
1162
                    if md5sum:
 
1163
                        md5sum.update(chunk)
 
1164
                fp.close()
 
1165
            if md5sum and md5sum.hexdigest() != etag:
 
1166
                error_queue.put('%s: md5sum != etag, %s != %s' %
 
1167
                                (path, md5sum.hexdigest(), etag))
 
1168
            if content_length is not None and read_length != content_length:
 
1169
                error_queue.put('%s: read_length != content_length, %d != %d' %
 
1170
                                (path, read_length, content_length))
 
1171
            if 'x-object-meta-mtime' in headers and not options.out_file:
 
1172
                mtime = float(headers['x-object-meta-mtime'])
 
1173
                utime(path, (mtime, mtime))
 
1174
            if options.verbose:
 
1175
                if conn.attempts > 1:
 
1176
                    print_queue.put('%s [after %d attempts' %
 
1177
                                    (path, conn.attempts))
 
1178
                else:
 
1179
                    print_queue.put(path)
 
1180
        except ClientException, err:
 
1181
            if err.http_status != 404:
 
1182
                raise
 
1183
            error_queue.put('Object %s not found' %
 
1184
                            repr('%s/%s' % (container, obj)))
 
1185
 
 
1186
    container_queue = Queue(10000)
 
1187
 
 
1188
    def _download_container(container, conn):
 
1189
        try:
 
1190
            marker = ''
 
1191
            while True:
 
1192
                objects = [o['name'] for o in
 
1193
                           conn.get_container(container, marker=marker)[1]]
 
1194
                if not objects:
 
1195
                    break
 
1196
                for obj in objects:
 
1197
                    object_queue.put((container, obj))
 
1198
                marker = objects[-1]
 
1199
        except ClientException, err:
 
1200
            if err.http_status != 404:
 
1201
                raise
 
1202
            error_queue.put('Container %s not found' % repr(container))
 
1203
 
 
1204
    url, token = get_auth(options.auth, options.user, options.key,
 
1205
        snet=options.snet)
 
1206
    create_connection = lambda: Connection(options.auth, options.user,
 
1207
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
 
1208
    object_threads = [QueueFunctionThread(object_queue, _download_object,
 
1209
        create_connection()) for _junk in xrange(10)]
 
1210
    for thread in object_threads:
 
1211
        thread.start()
 
1212
    container_threads = [QueueFunctionThread(container_queue,
 
1213
        _download_container, create_connection()) for _junk in xrange(10)]
 
1214
    for thread in container_threads:
 
1215
        thread.start()
 
1216
    if not args:
 
1217
        conn = create_connection()
 
1218
        try:
 
1219
            marker = ''
 
1220
            while True:
 
1221
                containers = [c['name']
 
1222
                              for c in conn.get_account(marker=marker)[1]]
 
1223
                if not containers:
 
1224
                    break
 
1225
                for container in containers:
 
1226
                    container_queue.put(container)
 
1227
                marker = containers[-1]
 
1228
        except ClientException, err:
 
1229
            if err.http_status != 404:
 
1230
                raise
 
1231
            error_queue.put('Account not found')
 
1232
    elif len(args) == 1:
 
1233
        if '/' in args[0]:
 
1234
            print >> stderr, 'WARNING: / in container name; you might have ' \
 
1235
                             'meant %r instead of %r.' % \
 
1236
                             (args[0].replace('/', ' ', 1), args[0])
 
1237
        _download_container(args[0], create_connection())
 
1238
    else:
 
1239
        if len(args) == 2:
 
1240
            obj = args[1]
 
1241
            object_queue.put((args[0], obj, options.out_file))
 
1242
        else:
 
1243
            for obj in args[1:]:
 
1244
                object_queue.put((args[0], obj))
 
1245
    while not container_queue.empty():
 
1246
        sleep(0.01)
 
1247
    for thread in container_threads:
 
1248
        thread.abort = True
 
1249
        while thread.isAlive():
 
1250
            thread.join(0.01)
 
1251
    put_errors_from_threads(container_threads, error_queue)
 
1252
    while not object_queue.empty():
 
1253
        sleep(0.01)
 
1254
    for thread in object_threads:
 
1255
        thread.abort = True
 
1256
        while thread.isAlive():
 
1257
            thread.join(0.01)
 
1258
    put_errors_from_threads(object_threads, error_queue)
 
1259
 
 
1260
 
 
1261
st_list_help = '''
 
1262
list [options] [container]
 
1263
    Lists the containers for the account or the objects for a container. -p or
 
1264
    --prefix is an option that will only list items beginning with that prefix.
 
1265
    -d or --delimiter is option (for container listings only) that will roll up
 
1266
    items with the given delimiter (see Cloud Files general documentation for
 
1267
    what this means).
 
1268
'''.strip('\n')
 
1269
 
 
1270
 
 
1271
def st_list(options, args, print_queue, error_queue):
 
1272
    parser.add_option('-p', '--prefix', dest='prefix', help='Will only list '
 
1273
        'items beginning with the prefix')
 
1274
    parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll '
 
1275
        'up items with the given delimiter (see Cloud Files general '
 
1276
        'documentation for what this means)')
 
1277
    (options, args) = parse_args(parser, args)
 
1278
    args = args[1:]
 
1279
    if options.delimiter and not args:
 
1280
        exit('-d option only allowed for container listings')
 
1281
    if len(args) > 1:
 
1282
        error_queue.put('Usage: %s [options] %s' %
 
1283
                        (basename(argv[0]), st_list_help))
 
1284
        return
 
1285
    conn = Connection(options.auth, options.user, options.key,
 
1286
        snet=options.snet)
 
1287
    try:
 
1288
        marker = ''
 
1289
        while True:
 
1290
            if not args:
 
1291
                items = \
 
1292
                    conn.get_account(marker=marker, prefix=options.prefix)[1]
 
1293
            else:
 
1294
                items = conn.get_container(args[0], marker=marker,
 
1295
                    prefix=options.prefix, delimiter=options.delimiter)[1]
 
1296
            if not items:
 
1297
                break
 
1298
            for item in items:
 
1299
                print_queue.put(item.get('name', item.get('subdir')))
 
1300
            marker = items[-1].get('name', items[-1].get('subdir'))
 
1301
    except ClientException, err:
 
1302
        if err.http_status != 404:
 
1303
            raise
 
1304
        if not args:
 
1305
            error_queue.put('Account not found')
 
1306
        else:
 
1307
            error_queue.put('Container %s not found' % repr(args[0]))
 
1308
 
 
1309
 
 
1310
st_stat_help = '''
 
1311
stat [container] [object]
 
1312
    Displays information for the account, container, or object depending on the
 
1313
    args given (if any).'''.strip('\n')
 
1314
 
 
1315
 
 
1316
def st_stat(options, args, print_queue, error_queue):
 
1317
    (options, args) = parse_args(parser, args)
 
1318
    args = args[1:]
 
1319
    conn = Connection(options.auth, options.user, options.key)
 
1320
    if not args:
 
1321
        try:
 
1322
            headers = conn.head_account()
 
1323
            if options.verbose > 1:
 
1324
                print_queue.put('''
 
1325
StorageURL: %s
 
1326
Auth Token: %s
 
1327
'''.strip('\n') % (conn.url, conn.token))
 
1328
            container_count = int(headers.get('x-account-container-count', 0))
 
1329
            object_count = int(headers.get('x-account-object-count', 0))
 
1330
            bytes_used = int(headers.get('x-account-bytes-used', 0))
 
1331
            print_queue.put('''
 
1332
   Account: %s
 
1333
Containers: %d
 
1334
   Objects: %d
 
1335
     Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
 
1336
                                 object_count, bytes_used))
 
1337
            for key, value in headers.items():
 
1338
                if key.startswith('x-account-meta-'):
 
1339
                    print_queue.put('%10s: %s' % ('Meta %s' %
 
1340
                        key[len('x-account-meta-'):].title(), value))
 
1341
            for key, value in headers.items():
 
1342
                if not key.startswith('x-account-meta-') and key not in (
 
1343
                        'content-length', 'date', 'x-account-container-count',
 
1344
                        'x-account-object-count', 'x-account-bytes-used'):
 
1345
                    print_queue.put(
 
1346
                        '%10s: %s' % (key.title(), value))
 
1347
        except ClientException, err:
 
1348
            if err.http_status != 404:
 
1349
                raise
 
1350
            error_queue.put('Account not found')
 
1351
    elif len(args) == 1:
 
1352
        if '/' in args[0]:
 
1353
            print >> stderr, 'WARNING: / in container name; you might have ' \
 
1354
                             'meant %r instead of %r.' % \
 
1355
                             (args[0].replace('/', ' ', 1), args[0])
 
1356
        try:
 
1357
            headers = conn.head_container(args[0])
 
1358
            object_count = int(headers.get('x-container-object-count', 0))
 
1359
            bytes_used = int(headers.get('x-container-bytes-used', 0))
 
1360
            print_queue.put('''
 
1361
  Account: %s
 
1362
Container: %s
 
1363
  Objects: %d
 
1364
    Bytes: %d
 
1365
 Read ACL: %s
 
1366
Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
 
1367
                                object_count, bytes_used,
 
1368
                                headers.get('x-container-read', ''),
 
1369
                                headers.get('x-container-write', '')))
 
1370
            for key, value in headers.items():
 
1371
                if key.startswith('x-container-meta-'):
 
1372
                    print_queue.put('%9s: %s' % ('Meta %s' %
 
1373
                        key[len('x-container-meta-'):].title(), value))
 
1374
            for key, value in headers.items():
 
1375
                if not key.startswith('x-container-meta-') and key not in (
 
1376
                        'content-length', 'date', 'x-container-object-count',
 
1377
                        'x-container-bytes-used', 'x-container-read',
 
1378
                        'x-container-write'):
 
1379
                    print_queue.put(
 
1380
                        '%9s: %s' % (key.title(), value))
 
1381
        except ClientException, err:
 
1382
            if err.http_status != 404:
 
1383
                raise
 
1384
            error_queue.put('Container %s not found' % repr(args[0]))
 
1385
    elif len(args) == 2:
 
1386
        try:
 
1387
            headers = conn.head_object(args[0], args[1])
 
1388
            print_queue.put('''
 
1389
       Account: %s
 
1390
     Container: %s
 
1391
        Object: %s
 
1392
  Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
 
1393
                                     args[1], headers.get('content-type')))
 
1394
            if 'content-length' in headers:
 
1395
                print_queue.put('Content Length: %s' %
 
1396
                                headers['content-length'])
 
1397
            if 'last-modified' in headers:
 
1398
                print_queue.put(' Last Modified: %s' %
 
1399
                                headers['last-modified'])
 
1400
            if 'etag' in headers:
 
1401
                print_queue.put('          ETag: %s' % headers['etag'])
 
1402
            if 'x-object-manifest' in headers:
 
1403
                print_queue.put('      Manifest: %s' %
 
1404
                                headers['x-object-manifest'])
 
1405
            for key, value in headers.items():
 
1406
                if key.startswith('x-object-meta-'):
 
1407
                    print_queue.put('%14s: %s' % ('Meta %s' %
 
1408
                        key[len('x-object-meta-'):].title(), value))
 
1409
            for key, value in headers.items():
 
1410
                if not key.startswith('x-object-meta-') and key not in (
 
1411
                        'content-type', 'content-length', 'last-modified',
 
1412
                        'etag', 'date', 'x-object-manifest'):
 
1413
                    print_queue.put(
 
1414
                        '%14s: %s' % (key.title(), value))
 
1415
        except ClientException, err:
 
1416
            if err.http_status != 404:
 
1417
                raise
 
1418
            error_queue.put('Object %s not found' %
 
1419
                            repr('%s/%s' % (args[0], args[1])))
 
1420
    else:
 
1421
        error_queue.put('Usage: %s [options] %s' %
 
1422
                        (basename(argv[0]), st_stat_help))
 
1423
 
 
1424
 
 
1425
st_post_help = '''
 
1426
post [options] [container] [object]
 
1427
    Updates meta information for the account, container, or object depending on
 
1428
    the args given. If the container is not found, it will be created
 
1429
    automatically; but this is not true for accounts and objects. Containers
 
1430
    also allow the -r (or --read-acl) and -w (or --write-acl) options. The -m
 
1431
    or --meta option is allowed on all and used to define the user meta data
 
1432
    items to set in the form Name:Value. This option can be repeated. Example:
 
1433
    post -m Color:Blue -m Size:Large'''.strip('\n')
 
1434
 
 
1435
 
 
1436
def st_post(options, args, print_queue, error_queue):
 
1437
    parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the '
 
1438
        'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
 
1439
        '.r:-.example.com, .r:www.example.com, account1, account2:user2')
 
1440
    parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
 
1441
        'Write ACL for containers. Quick summary of ACL syntax: account1, '
 
1442
        'account2:user2')
 
1443
    parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
 
1444
        help='Sets a meta data item with the syntax name:value. This option '
 
1445
        'may be repeated. Example: -m Color:Blue -m Size:Large')
 
1446
    (options, args) = parse_args(parser, args)
 
1447
    args = args[1:]
 
1448
    if (options.read_acl or options.write_acl) and not args:
 
1449
        exit('-r and -w options only allowed for containers')
 
1450
    conn = Connection(options.auth, options.user, options.key)
 
1451
    if not args:
 
1452
        headers = {}
 
1453
        for item in options.meta:
 
1454
            split_item = item.split(':')
 
1455
            headers['X-Account-Meta-' + split_item[0]] = \
 
1456
                len(split_item) > 1 and split_item[1]
 
1457
        try:
 
1458
            conn.post_account(headers=headers)
 
1459
        except ClientException, err:
 
1460
            if err.http_status != 404:
 
1461
                raise
 
1462
            error_queue.put('Account not found')
 
1463
    elif len(args) == 1:
 
1464
        if '/' in args[0]:
 
1465
            print >> stderr, 'WARNING: / in container name; you might have ' \
 
1466
                             'meant %r instead of %r.' % \
 
1467
                             (args[0].replace('/', ' ', 1), args[0])
 
1468
        headers = {}
 
1469
        for item in options.meta:
 
1470
            split_item = item.split(':')
 
1471
            headers['X-Container-Meta-' + split_item[0]] = \
 
1472
                len(split_item) > 1 and split_item[1]
 
1473
        if options.read_acl is not None:
 
1474
            headers['X-Container-Read'] = options.read_acl
 
1475
        if options.write_acl is not None:
 
1476
            headers['X-Container-Write'] = options.write_acl
 
1477
        try:
 
1478
            conn.post_container(args[0], headers=headers)
 
1479
        except ClientException, err:
 
1480
            if err.http_status != 404:
 
1481
                raise
 
1482
            conn.put_container(args[0], headers=headers)
 
1483
    elif len(args) == 2:
 
1484
        headers = {}
 
1485
        for item in options.meta:
 
1486
            split_item = item.split(':')
 
1487
            headers['X-Object-Meta-' + split_item[0]] = \
 
1488
                len(split_item) > 1 and split_item[1]
 
1489
        try:
 
1490
            conn.post_object(args[0], args[1], headers=headers)
 
1491
        except ClientException, err:
 
1492
            if err.http_status != 404:
 
1493
                raise
 
1494
            error_queue.put('Object %s not found' %
 
1495
                            repr('%s/%s' % (args[0], args[1])))
 
1496
    else:
 
1497
        error_queue.put('Usage: %s [options] %s' %
 
1498
                        (basename(argv[0]), st_post_help))
 
1499
 
 
1500
 
 
1501
st_upload_help = '''
 
1502
upload [options] container file_or_directory [file_or_directory] [...]
 
1503
    Uploads to the given container the files and directories specified by the
 
1504
    remaining args. -c or --changed is an option that will only upload files
 
1505
    that have changed since the last upload. -S <size> or --segment-size <size>
 
1506
    and --leave-segments are options as well (see --help for more).
 
1507
'''.strip('\n')
 
1508
 
 
1509
 
 
1510
def st_upload(options, args, print_queue, error_queue):
 
1511
    parser.add_option('-c', '--changed', action='store_true', dest='changed',
 
1512
        default=False, help='Will only upload files that have changed since '
 
1513
        'the last upload')
 
1514
    parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
 
1515
        'upload files in segments no larger than <size> and then create a '
 
1516
        '"manifest" file that will download all the segments as if it were '
 
1517
        'the original file. The segments will be uploaded to a '
 
1518
        '<container>_segments container so as to not pollute the main '
 
1519
        '<container> listings.')
 
1520
    parser.add_option('', '--leave-segments', action='store_true',
 
1521
        dest='leave_segments', default=False, help='Indicates that you want '
 
1522
        'the older segments of manifest objects left alone (in the case of '
 
1523
        'overwrites)')
 
1524
    (options, args) = parse_args(parser, args)
 
1525
    args = args[1:]
 
1526
    if len(args) < 2:
 
1527
        error_queue.put('Usage: %s [options] %s' %
 
1528
                        (basename(argv[0]), st_upload_help))
 
1529
        return
 
1530
    object_queue = Queue(10000)
 
1531
 
 
1532
    def _segment_job(job, conn):
 
1533
        if job.get('delete', False):
 
1534
            conn.delete_object(job['container'], job['obj'])
 
1535
        else:
 
1536
            fp = open(job['path'], 'rb')
 
1537
            fp.seek(job['segment_start'])
 
1538
            conn.put_object(job.get('container', args[0] + '_segments'),
 
1539
                job['obj'], fp, content_length=job['segment_size'])
 
1540
        if options.verbose and 'log_line' in job:
 
1541
            if conn.attempts > 1:
 
1542
                print_queue.put('%s [after %d attempts]' %
 
1543
                                (job['log_line'], conn.attempts))
 
1544
            else:
 
1545
                print_queue.put(job['log_line'])
 
1546
 
 
1547
    def _object_job(job, conn):
 
1548
        path = job['path']
 
1549
        container = job.get('container', args[0])
 
1550
        dir_marker = job.get('dir_marker', False)
 
1551
        try:
 
1552
            obj = path
 
1553
            if obj.startswith('./') or obj.startswith('.\\'):
 
1554
                obj = obj[2:]
 
1555
            put_headers = {'x-object-meta-mtime': str(getmtime(path))}
 
1556
            if dir_marker:
 
1557
                if options.changed:
 
1558
                    try:
 
1559
                        headers = conn.head_object(container, obj)
 
1560
                        ct = headers.get('content-type')
 
1561
                        cl = int(headers.get('content-length'))
 
1562
                        et = headers.get('etag')
 
1563
                        mt = headers.get('x-object-meta-mtime')
 
1564
                        if ct.split(';', 1)[0] == 'text/directory' and \
 
1565
                                cl == 0 and \
 
1566
                                et == 'd41d8cd98f00b204e9800998ecf8427e' and \
 
1567
                                mt == put_headers['x-object-meta-mtime']:
 
1568
                            return
 
1569
                    except ClientException, err:
 
1570
                        if err.http_status != 404:
 
1571
                            raise
 
1572
                conn.put_object(container, obj, '', content_length=0,
 
1573
                                content_type='text/directory',
 
1574
                                headers=put_headers)
 
1575
            else:
 
1576
                # We need to HEAD all objects now in case we're overwriting a
 
1577
                # manifest object and need to delete the old segments
 
1578
                # ourselves.
 
1579
                old_manifest = None
 
1580
                if options.changed or not options.leave_segments:
 
1581
                    try:
 
1582
                        headers = conn.head_object(container, obj)
 
1583
                        cl = int(headers.get('content-length'))
 
1584
                        mt = headers.get('x-object-meta-mtime')
 
1585
                        if options.changed and cl == getsize(path) and \
 
1586
                                mt == put_headers['x-object-meta-mtime']:
 
1587
                            return
 
1588
                        if not options.leave_segments:
 
1589
                            old_manifest = headers.get('x-object-manifest')
 
1590
                    except ClientException, err:
 
1591
                        if err.http_status != 404:
 
1592
                            raise
 
1593
                if options.segment_size and \
 
1594
                        getsize(path) < options.segment_size:
 
1595
                    full_size = getsize(path)
 
1596
                    segment_queue = Queue(10000)
 
1597
                    segment_threads = [QueueFunctionThread(segment_queue,
 
1598
                        _segment_job, create_connection()) for _junk in
 
1599
                        xrange(10)]
 
1600
                    for thread in segment_threads:
 
1601
                        thread.start()
 
1602
                    segment = 0
 
1603
                    segment_start = 0
 
1604
                    while segment_start < full_size:
 
1605
                        segment_size = int(options.segment_size)
 
1606
                        if segment_start + segment_size > full_size:
 
1607
                            segment_size = full_size - segment_start
 
1608
                        segment_queue.put({'path': path,
 
1609
                            'obj': '%s/%s/%s/%08d' % (obj,
 
1610
                                put_headers['x-object-meta-mtime'], full_size,
 
1611
                                segment),
 
1612
                            'segment_start': segment_start,
 
1613
                            'segment_size': segment_size,
 
1614
                            'log_line': '%s segment %s' % (obj, segment)})
 
1615
                        segment += 1
 
1616
                        segment_start += segment_size
 
1617
                    while not segment_queue.empty():
 
1618
                        sleep(0.01)
 
1619
                    for thread in segment_threads:
 
1620
                        thread.abort = True
 
1621
                        while thread.isAlive():
 
1622
                            thread.join(0.01)
 
1623
                    if put_errors_from_threads(segment_threads, error_queue):
 
1624
                        raise ClientException('Aborting manifest creation '
 
1625
                            'because not all segments could be uploaded. %s/%s'
 
1626
                            % (container, obj))
 
1627
                    new_object_manifest = '%s_segments/%s/%s/%s/' % (
 
1628
                        container, obj, put_headers['x-object-meta-mtime'],
 
1629
                        full_size)
 
1630
                    if old_manifest == new_object_manifest:
 
1631
                        old_manifest = None
 
1632
                    put_headers['x-object-manifest'] = new_object_manifest
 
1633
                    conn.put_object(container, obj, '', content_length=0,
 
1634
                                    headers=put_headers)
 
1635
                else:
 
1636
                    conn.put_object(container, obj, open(path, 'rb'),
 
1637
                        content_length=getsize(path), headers=put_headers)
 
1638
                if old_manifest:
 
1639
                    segment_queue = Queue(10000)
 
1640
                    scontainer, sprefix = old_manifest.split('/', 1)
 
1641
                    for delobj in conn.get_container(scontainer,
 
1642
                                                     prefix=sprefix)[1]:
 
1643
                        segment_queue.put({'delete': True,
 
1644
                            'container': scontainer, 'obj': delobj['name']})
 
1645
                    if not segment_queue.empty():
 
1646
                        segment_threads = [QueueFunctionThread(segment_queue,
 
1647
                            _segment_job, create_connection()) for _junk in
 
1648
                            xrange(10)]
 
1649
                        for thread in segment_threads:
 
1650
                            thread.start()
 
1651
                        while not segment_queue.empty():
 
1652
                            sleep(0.01)
 
1653
                        for thread in segment_threads:
 
1654
                            thread.abort = True
 
1655
                            while thread.isAlive():
 
1656
                                thread.join(0.01)
 
1657
                        put_errors_from_threads(segment_threads, error_queue)
 
1658
            if options.verbose:
 
1659
                if conn.attempts > 1:
 
1660
                    print_queue.put(
 
1661
                        '%s [after %d attempts]' % (obj, conn.attempts))
 
1662
                else:
 
1663
                    print_queue.put(obj)
 
1664
        except OSError, err:
 
1665
            if err.errno != ENOENT:
 
1666
                raise
 
1667
            error_queue.put('Local file %s not found' % repr(path))
 
1668
 
 
1669
    def _upload_dir(path):
 
1670
        names = listdir(path)
 
1671
        if not names:
 
1672
            object_queue.put({'path': path, 'dir_marker': True})
 
1673
        else:
 
1674
            for name in listdir(path):
 
1675
                subpath = join(path, name)
 
1676
                if isdir(subpath):
 
1677
                    _upload_dir(subpath)
 
1678
                else:
 
1679
                    object_queue.put({'path': subpath})
 
1680
 
 
1681
    url, token = get_auth(options.auth, options.user, options.key,
 
1682
        snet=options.snet)
 
1683
    create_connection = lambda: Connection(options.auth, options.user,
 
1684
        options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
 
1685
    object_threads = [QueueFunctionThread(object_queue, _object_job,
 
1686
        create_connection()) for _junk in xrange(10)]
 
1687
    for thread in object_threads:
 
1688
        thread.start()
 
1689
    conn = create_connection()
 
1690
    # Try to create the container, just in case it doesn't exist. If this
 
1691
    # fails, it might just be because the user doesn't have container PUT
 
1692
    # permissions, so we'll ignore any error. If there's really a problem,
 
1693
    # it'll surface on the first object PUT.
 
1694
    try:
 
1695
        conn.put_container(args[0])
 
1696
        if options.segment_size is not None:
 
1697
            conn.put_container(args[0] + '_segments')
 
1698
    except Exception:
 
1699
        pass
 
1700
    try:
 
1701
        for arg in args[1:]:
 
1702
            if isdir(arg):
 
1703
                _upload_dir(arg)
 
1704
            else:
 
1705
                object_queue.put({'path': arg})
 
1706
        while not object_queue.empty():
 
1707
            sleep(0.01)
 
1708
        for thread in object_threads:
 
1709
            thread.abort = True
 
1710
            while thread.isAlive():
 
1711
                thread.join(0.01)
 
1712
        put_errors_from_threads(object_threads, error_queue)
 
1713
    except ClientException, err:
 
1714
        if err.http_status != 404:
 
1715
            raise
 
1716
        error_queue.put('Account not found')
 
1717
 
 
1718
 
 
1719
def parse_args(parser, args, enforce_requires=True):
 
1720
    if not args:
 
1721
        args = ['-h']
 
1722
    (options, args) = parser.parse_args(args)
 
1723
    if enforce_requires and \
 
1724
            not (options.auth and options.user and options.key):
 
1725
        exit('''
 
1726
Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or
 
1727
overridden with -A, -U, or -K.'''.strip('\n'))
 
1728
    return options, args
 
1729
 
 
1730
 
 
1731
if __name__ == '__main__':
 
1732
    parser = OptionParser(version='%prog 1.0', usage='''
 
1733
Usage: %%prog <command> [options] [args]
 
1734
 
 
1735
Commands:
 
1736
  %(st_stat_help)s
 
1737
  %(st_list_help)s
 
1738
  %(st_upload_help)s
 
1739
  %(st_post_help)s
 
1740
  %(st_download_help)s
 
1741
  %(st_delete_help)s
 
1742
 
 
1743
Example:
 
1744
  %%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K key stat
 
1745
'''.strip('\n') % globals())
 
1746
    parser.add_option('-s', '--snet', action='store_true', dest='snet',
 
1747
                      default=False, help='Use SERVICENET internal network')
 
1748
    parser.add_option('-v', '--verbose', action='count', dest='verbose',
 
1749
                      default=1, help='Print more info')
 
1750
    parser.add_option('-q', '--quiet', action='store_const', dest='verbose',
 
1751
                      const=0, default=1, help='Suppress status output')
 
1752
    parser.add_option('-A', '--auth', dest='auth',
 
1753
                      default=environ.get('ST_AUTH'),
 
1754
                      help='URL for obtaining an auth token')
 
1755
    parser.add_option('-U', '--user', dest='user',
 
1756
                      default=environ.get('ST_USER'),
 
1757
                      help='User name for obtaining an auth token')
 
1758
    parser.add_option('-K', '--key', dest='key',
 
1759
                      default=environ.get('ST_KEY'),
 
1760
                      help='Key for obtaining an auth token')
 
1761
    parser.disable_interspersed_args()
 
1762
    (options, args) = parse_args(parser, argv[1:], enforce_requires=False)
 
1763
    parser.enable_interspersed_args()
 
1764
 
 
1765
    commands = ('delete', 'download', 'list', 'post', 'stat', 'upload')
 
1766
    if not args or args[0] not in commands:
 
1767
        parser.print_usage()
 
1768
        if args:
 
1769
            exit('no such command: %s' % args[0])
 
1770
        exit()
 
1771
 
 
1772
    print_queue = Queue(10000)
 
1773
 
 
1774
    def _print(item):
 
1775
        if isinstance(item, unicode):
 
1776
            item = item.encode('utf8')
 
1777
        print item
 
1778
 
 
1779
    print_thread = QueueFunctionThread(print_queue, _print)
 
1780
    print_thread.start()
 
1781
 
 
1782
    error_queue = Queue(10000)
 
1783
 
 
1784
    def _error(item):
 
1785
        if isinstance(item, unicode):
 
1786
            item = item.encode('utf8')
 
1787
        print >> stderr, item
 
1788
 
 
1789
    error_thread = QueueFunctionThread(error_queue, _error)
 
1790
    error_thread.start()
 
1791
 
 
1792
    try:
 
1793
        parser.usage = globals()['st_%s_help' % args[0]]
 
1794
        try:
 
1795
            globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
 
1796
                                         error_queue)
 
1797
        except (ClientException, HTTPException, socket.error), err:
 
1798
            error_queue.put(str(err))
 
1799
        while not print_queue.empty():
 
1800
            sleep(0.01)
 
1801
        print_thread.abort = True
 
1802
        while print_thread.isAlive():
 
1803
            print_thread.join(0.01)
 
1804
        while not error_queue.empty():
 
1805
            sleep(0.01)
 
1806
        error_thread.abort = True
 
1807
        while error_thread.isAlive():
 
1808
            error_thread.join(0.01)
 
1809
    except (SystemExit, Exception):
 
1810
        for thread in threading_enumerate():
 
1811
            thread.abort = True
 
1812
        raise