2
# Copyright (c) 2010-2011 OpenStack, LLC.
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
17
from errno import EEXIST, ENOENT
18
from hashlib import md5
19
from optparse import OptionParser
20
from os import environ, listdir, makedirs, utime
21
from os.path import basename, dirname, getmtime, getsize, isdir, join
22
from Queue import Empty, Queue
23
from sys import argv, exc_info, exit, stderr, stdout
24
from threading import enumerate as threading_enumerate, Thread
25
from time import sleep
26
from traceback import format_exception
29
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
30
# Inclusion of swift.common.client for convenience of single file distribution
33
from cStringIO import StringIO
34
from re import compile, DOTALL
35
from tokenize import generate_tokens, STRING, NAME, OP
36
from urllib import quote as _quote, unquote
37
from urlparse import urlparse, urlunparse
40
from eventlet.green.httplib import HTTPException, HTTPSConnection
42
from httplib import HTTPException, HTTPSConnection
45
from eventlet import sleep
47
from time import sleep
50
from swift.common.bufferedhttp \
51
import BufferedHTTPConnection as HTTPConnection
54
from eventlet.green.httplib import HTTPConnection
56
from httplib import HTTPConnection
59
def quote(value, safe='/'):
61
Patched version of urllib.quote that encodes utf8 strings before quoting
63
if isinstance(value, unicode):
64
value = value.encode('utf8')
65
return _quote(value, safe)
68
# look for a real json parser first
70
# simplejson is popular and pretty good
71
from simplejson import loads as json_loads
74
# 2.6 will have a json module in the stdlib
75
from json import loads as json_loads
77
# fall back on local parser otherwise
78
comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
80
def json_loads(string):
82
Fairly competent json parser exploiting the python tokenizer and
83
eval(). -- From python-cloudfiles
85
_loads(serialized_json) -> object
89
consts = {'true': True, 'false': False, 'null': None}
90
string = '(' + comments.sub('', string) + ')'
91
for type, val, _junk, _junk, _junk in \
92
generate_tokens(StringIO(string).readline):
93
if (type == OP and val not in '[]{}:,()-') or \
94
(type == NAME and val not in consts):
95
raise AttributeError()
98
res.append(val.replace('\\/', '/'))
101
return eval(''.join(res), {}, consts)
103
raise AttributeError()
106
class ClientException(Exception):
108
def __init__(self, msg, http_scheme='', http_host='', http_port='',
109
http_path='', http_query='', http_status=0, http_reason='',
111
Exception.__init__(self, msg)
113
self.http_scheme = http_scheme
114
self.http_host = http_host
115
self.http_port = http_port
116
self.http_path = http_path
117
self.http_query = http_query
118
self.http_status = http_status
119
self.http_reason = http_reason
120
self.http_device = http_device
126
b += '%s://' % self.http_scheme
130
b += ':%s' % self.http_port
134
b += '?%s' % self.http_query
137
b = '%s %s' % (b, self.http_status)
139
b = str(self.http_status)
142
b = '%s %s' % (b, self.http_reason)
144
b = '- %s' % self.http_reason
147
b = '%s: device %s' % (b, self.http_device)
149
b = 'device %s' % self.http_device
150
return b and '%s: %s' % (a, b) or a
153
def http_connection(url):
155
Make an HTTPConnection or HTTPSConnection
157
:param url: url to connect to
158
:returns: tuple of (parsed url, connection object)
159
:raises ClientException: Unable to handle protocol scheme
161
parsed = urlparse(url)
162
if parsed.scheme == 'http':
163
conn = HTTPConnection(parsed.netloc)
164
elif parsed.scheme == 'https':
165
conn = HTTPSConnection(parsed.netloc)
167
raise ClientException('Cannot handle protocol scheme %s for url %s' %
168
(parsed.scheme, repr(url)))
172
def get_auth(url, user, key, snet=False):
174
Get authentication/authorization credentials.
176
The snet parameter is used for Rackspace's ServiceNet internal network
177
implementation. In this function, it simply adds *snet-* to the beginning
178
of the host name for the returned storage URL. With Rackspace Cloud Files,
179
use of this network path causes no bandwidth charges but requires the
180
client to be running on Rackspace's ServiceNet network.
182
:param url: authentication/authorization URL
183
:param user: user to authenticate as
184
:param key: key or password for authorization
185
:param snet: use SERVICENET internal network (see above), default is False
186
:returns: tuple of (storage URL, auth token)
187
:raises ClientException: HTTP GET request to auth URL failed
189
parsed, conn = http_connection(url)
190
conn.request('GET', parsed.path, '',
191
{'X-Auth-User': user, 'X-Auth-Key': key})
192
resp = conn.getresponse()
194
if resp.status < 200 or resp.status >= 300:
195
raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
196
http_host=conn.host, http_port=conn.port,
197
http_path=parsed.path, http_status=resp.status,
198
http_reason=resp.reason)
199
url = resp.getheader('x-storage-url')
201
parsed = list(urlparse(url))
202
# Second item in the list is the netloc
203
parsed[1] = 'snet-' + parsed[1]
204
url = urlunparse(parsed)
205
return url, resp.getheader('x-storage-token',
206
resp.getheader('x-auth-token'))
209
def get_account(url, token, marker=None, limit=None, prefix=None,
210
http_conn=None, full_listing=False):
212
Get a listing of containers for the account.
214
:param url: storage URL
215
:param token: auth token
216
:param marker: marker query
217
:param limit: limit query
218
:param prefix: prefix query
219
:param http_conn: HTTP connection object (If None, it will create the
221
:param full_listing: if True, return a full listing, else returns a max
223
:returns: a tuple of (response headers, a list of containers) The response
224
headers will be a dict and all header names will be lowercase.
225
:raises ClientException: HTTP GET request failed
228
http_conn = http_connection(url)
230
rv = get_account(url, token, marker, limit, prefix, http_conn)
233
marker = listing[-1]['name']
235
get_account(url, token, marker, limit, prefix, http_conn)[1]
237
rv[1].extend(listing)
239
parsed, conn = http_conn
242
qs += '&marker=%s' % quote(marker)
244
qs += '&limit=%d' % limit
246
qs += '&prefix=%s' % quote(prefix)
247
conn.request('GET', '%s?%s' % (parsed.path, qs), '',
248
{'X-Auth-Token': token})
249
resp = conn.getresponse()
251
for header, value in resp.getheaders():
252
resp_headers[header.lower()] = value
253
if resp.status < 200 or resp.status >= 300:
255
raise ClientException('Account GET failed', http_scheme=parsed.scheme,
256
http_host=conn.host, http_port=conn.port,
257
http_path=parsed.path, http_query=qs, http_status=resp.status,
258
http_reason=resp.reason)
259
if resp.status == 204:
261
return resp_headers, []
262
return resp_headers, json_loads(resp.read())
265
def head_account(url, token, http_conn=None):
269
:param url: storage URL
270
:param token: auth token
271
:param http_conn: HTTP connection object (If None, it will create the
273
:returns: a dict containing the response's headers (all header names will
275
:raises ClientException: HTTP HEAD request failed
278
parsed, conn = http_conn
280
parsed, conn = http_connection(url)
281
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
282
resp = conn.getresponse()
284
if resp.status < 200 or resp.status >= 300:
285
raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
286
http_host=conn.host, http_port=conn.port,
287
http_path=parsed.path, http_status=resp.status,
288
http_reason=resp.reason)
290
for header, value in resp.getheaders():
291
resp_headers[header.lower()] = value
295
def post_account(url, token, headers, http_conn=None):
297
Update an account's metadata.
299
:param url: storage URL
300
:param token: auth token
301
:param headers: additional headers to include in the request
302
:param http_conn: HTTP connection object (If None, it will create the
304
:raises ClientException: HTTP POST request failed
307
parsed, conn = http_conn
309
parsed, conn = http_connection(url)
310
headers['X-Auth-Token'] = token
311
conn.request('POST', parsed.path, '', headers)
312
resp = conn.getresponse()
314
if resp.status < 200 or resp.status >= 300:
315
raise ClientException('Account POST failed',
316
http_scheme=parsed.scheme, http_host=conn.host,
317
http_port=conn.port, http_path=path, http_status=resp.status,
318
http_reason=resp.reason)
321
def get_container(url, token, container, marker=None, limit=None,
322
prefix=None, delimiter=None, http_conn=None,
325
Get a listing of objects for the container.
327
:param url: storage URL
328
:param token: auth token
329
:param container: container name to get a listing for
330
:param marker: marker query
331
:param limit: limit query
332
:param prefix: prefix query
333
:param delimeter: string to delimit the queries on
334
:param http_conn: HTTP connection object (If None, it will create the
336
:param full_listing: if True, return a full listing, else returns a max
338
:returns: a tuple of (response headers, a list of objects) The response
339
headers will be a dict and all header names will be lowercase.
340
:raises ClientException: HTTP GET request failed
343
http_conn = http_connection(url)
345
rv = get_container(url, token, container, marker, limit, prefix,
346
delimiter, http_conn)
350
marker = listing[-1]['name']
352
marker = listing[-1].get('name', listing[-1].get('subdir'))
353
listing = get_container(url, token, container, marker, limit,
354
prefix, delimiter, http_conn)[1]
356
rv[1].extend(listing)
358
parsed, conn = http_conn
359
path = '%s/%s' % (parsed.path, quote(container))
362
qs += '&marker=%s' % quote(marker)
364
qs += '&limit=%d' % limit
366
qs += '&prefix=%s' % quote(prefix)
368
qs += '&delimiter=%s' % quote(delimiter)
369
conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
370
resp = conn.getresponse()
371
if resp.status < 200 or resp.status >= 300:
373
raise ClientException('Container GET failed',
374
http_scheme=parsed.scheme, http_host=conn.host,
375
http_port=conn.port, http_path=path, http_query=qs,
376
http_status=resp.status, http_reason=resp.reason)
378
for header, value in resp.getheaders():
379
resp_headers[header.lower()] = value
380
if resp.status == 204:
382
return resp_headers, []
383
return resp_headers, json_loads(resp.read())
386
def head_container(url, token, container, http_conn=None):
390
:param url: storage URL
391
:param token: auth token
392
:param container: container name to get stats for
393
:param http_conn: HTTP connection object (If None, it will create the
395
:returns: a dict containing the response's headers (all header names will
397
:raises ClientException: HTTP HEAD request failed
400
parsed, conn = http_conn
402
parsed, conn = http_connection(url)
403
path = '%s/%s' % (parsed.path, quote(container))
404
conn.request('HEAD', path, '', {'X-Auth-Token': token})
405
resp = conn.getresponse()
407
if resp.status < 200 or resp.status >= 300:
408
raise ClientException('Container HEAD failed',
409
http_scheme=parsed.scheme, http_host=conn.host,
410
http_port=conn.port, http_path=path, http_status=resp.status,
411
http_reason=resp.reason)
413
for header, value in resp.getheaders():
414
resp_headers[header.lower()] = value
418
def put_container(url, token, container, headers=None, http_conn=None):
422
:param url: storage URL
423
:param token: auth token
424
:param container: container name to create
425
:param headers: additional headers to include in the request
426
:param http_conn: HTTP connection object (If None, it will create the
428
:raises ClientException: HTTP PUT request failed
431
parsed, conn = http_conn
433
parsed, conn = http_connection(url)
434
path = '%s/%s' % (parsed.path, quote(container))
437
headers['X-Auth-Token'] = token
438
conn.request('PUT', path, '', headers)
439
resp = conn.getresponse()
441
if resp.status < 200 or resp.status >= 300:
442
raise ClientException('Container PUT failed',
443
http_scheme=parsed.scheme, http_host=conn.host,
444
http_port=conn.port, http_path=path, http_status=resp.status,
445
http_reason=resp.reason)
448
def post_container(url, token, container, headers, http_conn=None):
450
Update a container's metadata.
452
:param url: storage URL
453
:param token: auth token
454
:param container: container name to update
455
:param headers: additional headers to include in the request
456
:param http_conn: HTTP connection object (If None, it will create the
458
:raises ClientException: HTTP POST request failed
461
parsed, conn = http_conn
463
parsed, conn = http_connection(url)
464
path = '%s/%s' % (parsed.path, quote(container))
465
headers['X-Auth-Token'] = token
466
conn.request('POST', path, '', headers)
467
resp = conn.getresponse()
469
if resp.status < 200 or resp.status >= 300:
470
raise ClientException('Container POST failed',
471
http_scheme=parsed.scheme, http_host=conn.host,
472
http_port=conn.port, http_path=path, http_status=resp.status,
473
http_reason=resp.reason)
476
def delete_container(url, token, container, http_conn=None):
480
:param url: storage URL
481
:param token: auth token
482
:param container: container name to delete
483
:param http_conn: HTTP connection object (If None, it will create the
485
:raises ClientException: HTTP DELETE request failed
488
parsed, conn = http_conn
490
parsed, conn = http_connection(url)
491
path = '%s/%s' % (parsed.path, quote(container))
492
conn.request('DELETE', path, '', {'X-Auth-Token': token})
493
resp = conn.getresponse()
495
if resp.status < 200 or resp.status >= 300:
496
raise ClientException('Container DELETE failed',
497
http_scheme=parsed.scheme, http_host=conn.host,
498
http_port=conn.port, http_path=path, http_status=resp.status,
499
http_reason=resp.reason)
502
def get_object(url, token, container, name, http_conn=None,
503
resp_chunk_size=None):
507
:param url: storage URL
508
:param token: auth token
509
:param container: container name that the object is in
510
:param name: object name to get
511
:param http_conn: HTTP connection object (If None, it will create the
513
:param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
514
you specify a resp_chunk_size you must fully read
515
the object's contents before making another
517
:returns: a tuple of (response headers, the object's contents) The response
518
headers will be a dict and all header names will be lowercase.
519
:raises ClientException: HTTP GET request failed
522
parsed, conn = http_conn
524
parsed, conn = http_connection(url)
525
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
526
conn.request('GET', path, '', {'X-Auth-Token': token})
527
resp = conn.getresponse()
528
if resp.status < 200 or resp.status >= 300:
530
raise ClientException('Object GET failed', http_scheme=parsed.scheme,
531
http_host=conn.host, http_port=conn.port, http_path=path,
532
http_status=resp.status, http_reason=resp.reason)
536
buf = resp.read(resp_chunk_size)
539
buf = resp.read(resp_chunk_size)
540
object_body = _object_body()
542
object_body = resp.read()
544
for header, value in resp.getheaders():
545
resp_headers[header.lower()] = value
546
return resp_headers, object_body
549
def head_object(url, token, container, name, http_conn=None):
553
:param url: storage URL
554
:param token: auth token
555
:param container: container name that the object is in
556
:param name: object name to get info for
557
:param http_conn: HTTP connection object (If None, it will create the
559
:returns: a dict containing the response's headers (all header names will
561
:raises ClientException: HTTP HEAD request failed
564
parsed, conn = http_conn
566
parsed, conn = http_connection(url)
567
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
568
conn.request('HEAD', path, '', {'X-Auth-Token': token})
569
resp = conn.getresponse()
571
if resp.status < 200 or resp.status >= 300:
572
raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
573
http_host=conn.host, http_port=conn.port, http_path=path,
574
http_status=resp.status, http_reason=resp.reason)
576
for header, value in resp.getheaders():
577
resp_headers[header.lower()] = value
581
def put_object(url, token, container, name, contents, content_length=None,
582
etag=None, chunk_size=65536, content_type=None, headers=None,
587
:param url: storage URL
588
:param token: auth token
589
:param container: container name that the object is in
590
:param name: object name to put
591
:param contents: a string or a file like object to read object data from
592
:param content_length: value to send as content-length header; also limits
593
the amount read from contents
594
:param etag: etag of contents
595
:param chunk_size: chunk size of data to write
596
:param content_type: value to send as content-type header
597
:param headers: additional headers to include in the request
598
:param http_conn: HTTP connection object (If None, it will create the
600
:returns: etag from server response
601
:raises ClientException: HTTP PUT request failed
604
parsed, conn = http_conn
606
parsed, conn = http_connection(url)
607
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
610
headers['X-Auth-Token'] = token
612
headers['ETag'] = etag.strip('"')
613
if content_length is not None:
614
headers['Content-Length'] = str(content_length)
615
if content_type is not None:
616
headers['Content-Type'] = content_type
618
headers['Content-Length'] = '0'
619
if hasattr(contents, 'read'):
620
conn.putrequest('PUT', path)
621
for header, value in headers.iteritems():
622
conn.putheader(header, value)
623
if content_length is None:
624
conn.putheader('Transfer-Encoding', 'chunked')
626
chunk = contents.read(chunk_size)
628
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
629
chunk = contents.read(chunk_size)
630
conn.send('0\r\n\r\n')
633
left = content_length
638
chunk = contents.read(size)
642
conn.request('PUT', path, contents, headers)
643
resp = conn.getresponse()
645
if resp.status < 200 or resp.status >= 300:
646
raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
647
http_host=conn.host, http_port=conn.port, http_path=path,
648
http_status=resp.status, http_reason=resp.reason)
649
return resp.getheader('etag').strip('"')
652
def post_object(url, token, container, name, headers, http_conn=None):
654
Update object metadata
656
:param url: storage URL
657
:param token: auth token
658
:param container: container name that the object is in
659
:param name: name of the object to update
660
:param headers: additional headers to include in the request
661
:param http_conn: HTTP connection object (If None, it will create the
663
:raises ClientException: HTTP POST request failed
666
parsed, conn = http_conn
668
parsed, conn = http_connection(url)
669
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
670
headers['X-Auth-Token'] = token
671
conn.request('POST', path, '', headers)
672
resp = conn.getresponse()
674
if resp.status < 200 or resp.status >= 300:
675
raise ClientException('Object POST failed', http_scheme=parsed.scheme,
676
http_host=conn.host, http_port=conn.port, http_path=path,
677
http_status=resp.status, http_reason=resp.reason)
680
def delete_object(url, token, container, name, http_conn=None):
684
:param url: storage URL
685
:param token: auth token
686
:param container: container name that the object is in
687
:param name: object name to delete
688
:param http_conn: HTTP connection object (If None, it will create the
690
:raises ClientException: HTTP DELETE request failed
693
parsed, conn = http_conn
695
parsed, conn = http_connection(url)
696
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
697
conn.request('DELETE', path, '', {'X-Auth-Token': token})
698
resp = conn.getresponse()
700
if resp.status < 200 or resp.status >= 300:
701
raise ClientException('Object DELETE failed',
702
http_scheme=parsed.scheme, http_host=conn.host,
703
http_port=conn.port, http_path=path, http_status=resp.status,
704
http_reason=resp.reason)
707
class Connection(object):
708
"""Convenience class to make requests that will also retry the request"""
710
def __init__(self, authurl, user, key, retries=5, preauthurl=None,
711
preauthtoken=None, snet=False, starting_backoff=1):
713
:param authurl: authenitcation URL
714
:param user: user name to authenticate as
715
:param key: key/password to authenticate with
716
:param retries: Number of times to retry the request before failing
717
:param preauthurl: storage URL (if you have already authenticated)
718
:param preauthtoken: authentication token (if you have already
720
:param snet: use SERVICENET internal network default is False
722
self.authurl = authurl
725
self.retries = retries
726
self.http_conn = None
727
self.url = preauthurl
728
self.token = preauthtoken
731
self.starting_backoff = starting_backoff
734
return get_auth(self.authurl, self.user, self.key, snet=self.snet)
736
def http_connection(self):
737
return http_connection(self.url)
739
def _retry(self, reset_func, func, *args, **kwargs):
741
backoff = self.starting_backoff
742
while self.attempts <= self.retries:
745
if not self.url or not self.token:
746
self.url, self.token = self.get_auth()
747
self.http_conn = None
748
if not self.http_conn:
749
self.http_conn = self.http_connection()
750
kwargs['http_conn'] = self.http_conn
751
rv = func(self.url, self.token, *args, **kwargs)
753
except (socket.error, HTTPException):
754
if self.attempts > self.retries:
756
self.http_conn = None
757
except ClientException, err:
758
if self.attempts > self.retries:
760
if err.http_status == 401:
761
self.url = self.token = None
762
if self.attempts > 1:
764
elif err.http_status == 408:
765
self.http_conn = None
766
elif 500 <= err.http_status <= 599:
773
reset_func(func, *args, **kwargs)
775
def head_account(self):
776
"""Wrapper for :func:`head_account`"""
777
return self._retry(None, head_account)
779
def get_account(self, marker=None, limit=None, prefix=None,
781
"""Wrapper for :func:`get_account`"""
782
# TODO(unknown): With full_listing=True this will restart the entire
783
# listing with each retry. Need to make a better version that just
784
# retries where it left off.
785
return self._retry(None, get_account, marker=marker, limit=limit,
786
prefix=prefix, full_listing=full_listing)
788
def post_account(self, headers):
789
"""Wrapper for :func:`post_account`"""
790
return self._retry(None, post_account, headers)
792
def head_container(self, container):
793
"""Wrapper for :func:`head_container`"""
794
return self._retry(None, head_container, container)
796
def get_container(self, container, marker=None, limit=None, prefix=None,
797
delimiter=None, full_listing=False):
798
"""Wrapper for :func:`get_container`"""
799
# TODO(unknown): With full_listing=True this will restart the entire
800
# listing with each retry. Need to make a better version that just
801
# retries where it left off.
802
return self._retry(None, get_container, container, marker=marker,
803
limit=limit, prefix=prefix, delimiter=delimiter,
804
full_listing=full_listing)
806
def put_container(self, container, headers=None):
807
"""Wrapper for :func:`put_container`"""
808
return self._retry(None, put_container, container, headers=headers)
810
def post_container(self, container, headers):
811
"""Wrapper for :func:`post_container`"""
812
return self._retry(None, post_container, container, headers)
814
def delete_container(self, container):
815
"""Wrapper for :func:`delete_container`"""
816
return self._retry(None, delete_container, container)
818
def head_object(self, container, obj):
819
"""Wrapper for :func:`head_object`"""
820
return self._retry(None, head_object, container, obj)
822
def get_object(self, container, obj, resp_chunk_size=None):
823
"""Wrapper for :func:`get_object`"""
824
return self._retry(None, get_object, container, obj,
825
resp_chunk_size=resp_chunk_size)
827
def put_object(self, container, obj, contents, content_length=None,
828
etag=None, chunk_size=65536, content_type=None,
830
"""Wrapper for :func:`put_object`"""
832
def _default_reset(*args, **kwargs):
833
raise ClientException('put_object(%r, %r, ...) failure and no '
834
'ability to reset contents for reupload.' % (container, obj))
836
reset_func = _default_reset
837
tell = getattr(contents, 'tell', None)
838
seek = getattr(contents, 'seek', None)
841
reset_func = lambda *a, **k: seek(orig_pos)
843
reset_func = lambda *a, **k: None
845
return self._retry(reset_func, put_object, container, obj, contents,
846
content_length=content_length, etag=etag, chunk_size=chunk_size,
847
content_type=content_type, headers=headers)
849
def post_object(self, container, obj, headers):
850
"""Wrapper for :func:`post_object`"""
851
return self._retry(None, post_object, container, obj, headers)
853
def delete_object(self, container, obj):
854
"""Wrapper for :func:`delete_object`"""
855
return self._retry(None, delete_object, container, obj)
857
# End inclusion of swift.common.client
858
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
865
if err.errno != EEXIST:
869
def put_errors_from_threads(threads, error_queue):
871
Places any errors from the threads into error_queue.
872
:param threads: A list of QueueFunctionThread instances.
873
:param error_queue: A queue to put error strings into.
874
:returns: True if any errors were found.
877
for thread in threads:
878
for info in thread.exc_infos:
880
if isinstance(info[1], ClientException):
881
error_queue.put(str(info[1]))
883
error_queue.put(''.join(format_exception(*info)))
887
class QueueFunctionThread(Thread):
889
def __init__(self, queue, func, *args, **kwargs):
890
""" Calls func for each item in queue; func is called with a queued
891
item as the first arg followed by *args and **kwargs. Use the abort
892
attribute to have the thread empty the queue (without processing)
894
Thread.__init__(self)
906
item = self.queue.get_nowait()
908
self.func(item, *self.args, **self.kwargs)
909
self.queue.task_done()
915
self.exc_infos.append(exc_info())
919
delete --all OR delete container [--leave-segments] [object] [object] ...
920
Deletes everything in the account (with --all), or everything in a
921
container, or a list of objects depending on the args given. Segments of
922
manifest objects will be deleted as well, unless you specify the
923
--leave-segments option.'''.strip('\n')
926
def st_delete(parser, args, print_queue, error_queue):
927
parser.add_option('-a', '--all', action='store_true', dest='yes_all',
928
default=False, help='Indicates that you really want to delete '
929
'everything in the account')
930
parser.add_option('', '--leave-segments', action='store_true',
931
dest='leave_segments', default=False, help='Indicates that you want '
932
'the segments of manifest objects left alone')
933
(options, args) = parse_args(parser, args)
935
if (not args and not options.yes_all) or (args and options.yes_all):
936
error_queue.put('Usage: %s [options] %s' %
937
(basename(argv[0]), st_delete_help))
940
def _delete_segment((container, obj), conn):
941
conn.delete_object(container, obj)
943
if conn.attempts > 2:
944
print_queue.put('%s/%s [after %d attempts]' %
945
(container, obj, conn.attempts))
947
print_queue.put('%s/%s' % (container, obj))
949
object_queue = Queue(10000)
951
def _delete_object((container, obj), conn):
954
if not options.leave_segments:
956
old_manifest = conn.head_object(container, obj).get(
958
except ClientException, err:
959
if err.http_status != 404:
961
conn.delete_object(container, obj)
963
segment_queue = Queue(10000)
964
scontainer, sprefix = old_manifest.split('/', 1)
965
for delobj in conn.get_container(scontainer,
967
segment_queue.put((scontainer, delobj['name']))
968
if not segment_queue.empty():
969
segment_threads = [QueueFunctionThread(segment_queue,
970
_delete_segment, create_connection()) for _junk in
972
for thread in segment_threads:
974
while not segment_queue.empty():
976
for thread in segment_threads:
978
while thread.isAlive():
980
put_errors_from_threads(segment_threads, error_queue)
982
path = options.yes_all and join(container, obj) or obj
983
if path[:1] in ('/', '\\'):
985
if conn.attempts > 1:
986
print_queue.put('%s [after %d attempts]' %
987
(path, conn.attempts))
989
print_queue.put(path)
990
except ClientException, err:
991
if err.http_status != 404:
993
error_queue.put('Object %s not found' %
994
repr('%s/%s' % (container, obj)))
996
container_queue = Queue(10000)
998
def _delete_container(container, conn):
1002
objects = [o['name'] for o in
1003
conn.get_container(container, marker=marker)[1]]
1007
object_queue.put((container, obj))
1008
marker = objects[-1]
1009
while not object_queue.empty():
1014
conn.delete_container(container)
1016
except ClientException, err:
1017
if err.http_status != 409:
1023
except ClientException, err:
1024
if err.http_status != 404:
1026
error_queue.put('Container %s not found' % repr(container))
1028
url, token = get_auth(options.auth, options.user, options.key,
1030
create_connection = lambda: Connection(options.auth, options.user,
1031
options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1032
object_threads = [QueueFunctionThread(object_queue, _delete_object,
1033
create_connection()) for _junk in xrange(10)]
1034
for thread in object_threads:
1036
container_threads = [QueueFunctionThread(container_queue,
1037
_delete_container, create_connection()) for _junk in xrange(10)]
1038
for thread in container_threads:
1041
conn = create_connection()
1046
[c['name'] for c in conn.get_account(marker=marker)[1]]
1049
for container in containers:
1050
container_queue.put(container)
1051
marker = containers[-1]
1052
while not container_queue.empty():
1054
while not object_queue.empty():
1056
except ClientException, err:
1057
if err.http_status != 404:
1059
error_queue.put('Account not found')
1060
elif len(args) == 1:
1062
print >> stderr, 'WARNING: / in container name; you might have ' \
1063
'meant %r instead of %r.' % \
1064
(args[0].replace('/', ' ', 1), args[0])
1065
conn = create_connection()
1066
_delete_container(args[0], conn)
1068
for obj in args[1:]:
1069
object_queue.put((args[0], obj))
1070
while not container_queue.empty():
1072
for thread in container_threads:
1074
while thread.isAlive():
1076
put_errors_from_threads(container_threads, error_queue)
1077
while not object_queue.empty():
1079
for thread in object_threads:
1081
while thread.isAlive():
1083
put_errors_from_threads(object_threads, error_queue)
1086
st_download_help = '''
1087
download --all OR download container [options] [object] [object] ...
1088
Downloads everything in the account (with --all), or everything in a
1089
container, or a list of objects depending on the args given. For a single
1090
object download, you may use the -o [--output] <filename> option to
1091
redirect the output to a specific file or if "-" then just redirect to
1092
stdout.'''.strip('\n')
1095
def st_download(options, args, print_queue, error_queue):
1096
parser.add_option('-a', '--all', action='store_true', dest='yes_all',
1097
default=False, help='Indicates that you really want to download '
1098
'everything in the account')
1099
parser.add_option('-o', '--output', dest='out_file', help='For a single '
1100
'file download, stream the output to an alternate location ')
1101
(options, args) = parse_args(parser, args)
1103
if options.out_file == '-':
1105
if options.out_file and len(args) != 2:
1106
exit('-o option only allowed for single file downloads')
1107
if (not args and not options.yes_all) or (args and options.yes_all):
1108
error_queue.put('Usage: %s [options] %s' %
1109
(basename(argv[0]), st_download_help))
1112
object_queue = Queue(10000)
1114
def _download_object(queue_arg, conn):
1115
if len(queue_arg) == 2:
1116
container, obj = queue_arg
1118
elif len(queue_arg) == 3:
1119
container, obj, out_file = queue_arg
1121
raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
1124
conn.get_object(container, obj, resp_chunk_size=65536)
1125
content_type = headers.get('content-type')
1126
if 'content-length' in headers:
1127
content_length = int(headers.get('content-length'))
1129
content_length = None
1130
etag = headers.get('etag')
1131
path = options.yes_all and join(container, obj) or obj
1132
if path[:1] in ('/', '\\'):
1135
make_dir = out_file != "-"
1136
if content_type.split(';', 1)[0] == 'text/directory':
1137
if make_dir and not isdir(path):
1140
if 'x-object-manifest' not in headers:
1143
read_length += len(chunk)
1145
md5sum.update(chunk)
1147
dirpath = dirname(path)
1148
if make_dir and dirpath and not isdir(dirpath):
1153
fp = open(out_file, 'wb')
1155
fp = open(path, 'wb')
1157
if 'x-object-manifest' not in headers:
1161
read_length += len(chunk)
1163
md5sum.update(chunk)
1165
if md5sum and md5sum.hexdigest() != etag:
1166
error_queue.put('%s: md5sum != etag, %s != %s' %
1167
(path, md5sum.hexdigest(), etag))
1168
if content_length is not None and read_length != content_length:
1169
error_queue.put('%s: read_length != content_length, %d != %d' %
1170
(path, read_length, content_length))
1171
if 'x-object-meta-mtime' in headers and not options.out_file:
1172
mtime = float(headers['x-object-meta-mtime'])
1173
utime(path, (mtime, mtime))
1175
if conn.attempts > 1:
1176
print_queue.put('%s [after %d attempts' %
1177
(path, conn.attempts))
1179
print_queue.put(path)
1180
except ClientException, err:
1181
if err.http_status != 404:
1183
error_queue.put('Object %s not found' %
1184
repr('%s/%s' % (container, obj)))
1186
container_queue = Queue(10000)
1188
def _download_container(container, conn):
1192
objects = [o['name'] for o in
1193
conn.get_container(container, marker=marker)[1]]
1197
object_queue.put((container, obj))
1198
marker = objects[-1]
1199
except ClientException, err:
1200
if err.http_status != 404:
1202
error_queue.put('Container %s not found' % repr(container))
1204
url, token = get_auth(options.auth, options.user, options.key,
1206
create_connection = lambda: Connection(options.auth, options.user,
1207
options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1208
object_threads = [QueueFunctionThread(object_queue, _download_object,
1209
create_connection()) for _junk in xrange(10)]
1210
for thread in object_threads:
1212
container_threads = [QueueFunctionThread(container_queue,
1213
_download_container, create_connection()) for _junk in xrange(10)]
1214
for thread in container_threads:
1217
conn = create_connection()
1221
containers = [c['name']
1222
for c in conn.get_account(marker=marker)[1]]
1225
for container in containers:
1226
container_queue.put(container)
1227
marker = containers[-1]
1228
except ClientException, err:
1229
if err.http_status != 404:
1231
error_queue.put('Account not found')
1232
elif len(args) == 1:
1234
print >> stderr, 'WARNING: / in container name; you might have ' \
1235
'meant %r instead of %r.' % \
1236
(args[0].replace('/', ' ', 1), args[0])
1237
_download_container(args[0], create_connection())
1241
object_queue.put((args[0], obj, options.out_file))
1243
for obj in args[1:]:
1244
object_queue.put((args[0], obj))
1245
while not container_queue.empty():
1247
for thread in container_threads:
1249
while thread.isAlive():
1251
put_errors_from_threads(container_threads, error_queue)
1252
while not object_queue.empty():
1254
for thread in object_threads:
1256
while thread.isAlive():
1258
put_errors_from_threads(object_threads, error_queue)
1262
list [options] [container]
1263
Lists the containers for the account or the objects for a container. -p or
1264
--prefix is an option that will only list items beginning with that prefix.
1265
-d or --delimiter is option (for container listings only) that will roll up
1266
items with the given delimiter (see Cloud Files general documentation for
1271
def st_list(options, args, print_queue, error_queue):
1272
parser.add_option('-p', '--prefix', dest='prefix', help='Will only list '
1273
'items beginning with the prefix')
1274
parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll '
1275
'up items with the given delimiter (see Cloud Files general '
1276
'documentation for what this means)')
1277
(options, args) = parse_args(parser, args)
1279
if options.delimiter and not args:
1280
exit('-d option only allowed for container listings')
1282
error_queue.put('Usage: %s [options] %s' %
1283
(basename(argv[0]), st_list_help))
1285
conn = Connection(options.auth, options.user, options.key,
1292
conn.get_account(marker=marker, prefix=options.prefix)[1]
1294
items = conn.get_container(args[0], marker=marker,
1295
prefix=options.prefix, delimiter=options.delimiter)[1]
1299
print_queue.put(item.get('name', item.get('subdir')))
1300
marker = items[-1].get('name', items[-1].get('subdir'))
1301
except ClientException, err:
1302
if err.http_status != 404:
1305
error_queue.put('Account not found')
1307
error_queue.put('Container %s not found' % repr(args[0]))
1311
stat [container] [object]
1312
Displays information for the account, container, or object depending on the
1313
args given (if any).'''.strip('\n')
1316
def st_stat(options, args, print_queue, error_queue):
1317
(options, args) = parse_args(parser, args)
1319
conn = Connection(options.auth, options.user, options.key)
1322
headers = conn.head_account()
1323
if options.verbose > 1:
1327
'''.strip('\n') % (conn.url, conn.token))
1328
container_count = int(headers.get('x-account-container-count', 0))
1329
object_count = int(headers.get('x-account-object-count', 0))
1330
bytes_used = int(headers.get('x-account-bytes-used', 0))
1335
Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
1336
object_count, bytes_used))
1337
for key, value in headers.items():
1338
if key.startswith('x-account-meta-'):
1339
print_queue.put('%10s: %s' % ('Meta %s' %
1340
key[len('x-account-meta-'):].title(), value))
1341
for key, value in headers.items():
1342
if not key.startswith('x-account-meta-') and key not in (
1343
'content-length', 'date', 'x-account-container-count',
1344
'x-account-object-count', 'x-account-bytes-used'):
1346
'%10s: %s' % (key.title(), value))
1347
except ClientException, err:
1348
if err.http_status != 404:
1350
error_queue.put('Account not found')
1351
elif len(args) == 1:
1353
print >> stderr, 'WARNING: / in container name; you might have ' \
1354
'meant %r instead of %r.' % \
1355
(args[0].replace('/', ' ', 1), args[0])
1357
headers = conn.head_container(args[0])
1358
object_count = int(headers.get('x-container-object-count', 0))
1359
bytes_used = int(headers.get('x-container-bytes-used', 0))
1366
Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1367
object_count, bytes_used,
1368
headers.get('x-container-read', ''),
1369
headers.get('x-container-write', '')))
1370
for key, value in headers.items():
1371
if key.startswith('x-container-meta-'):
1372
print_queue.put('%9s: %s' % ('Meta %s' %
1373
key[len('x-container-meta-'):].title(), value))
1374
for key, value in headers.items():
1375
if not key.startswith('x-container-meta-') and key not in (
1376
'content-length', 'date', 'x-container-object-count',
1377
'x-container-bytes-used', 'x-container-read',
1378
'x-container-write'):
1380
'%9s: %s' % (key.title(), value))
1381
except ClientException, err:
1382
if err.http_status != 404:
1384
error_queue.put('Container %s not found' % repr(args[0]))
1385
elif len(args) == 2:
1387
headers = conn.head_object(args[0], args[1])
1392
Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1393
args[1], headers.get('content-type')))
1394
if 'content-length' in headers:
1395
print_queue.put('Content Length: %s' %
1396
headers['content-length'])
1397
if 'last-modified' in headers:
1398
print_queue.put(' Last Modified: %s' %
1399
headers['last-modified'])
1400
if 'etag' in headers:
1401
print_queue.put(' ETag: %s' % headers['etag'])
1402
if 'x-object-manifest' in headers:
1403
print_queue.put(' Manifest: %s' %
1404
headers['x-object-manifest'])
1405
for key, value in headers.items():
1406
if key.startswith('x-object-meta-'):
1407
print_queue.put('%14s: %s' % ('Meta %s' %
1408
key[len('x-object-meta-'):].title(), value))
1409
for key, value in headers.items():
1410
if not key.startswith('x-object-meta-') and key not in (
1411
'content-type', 'content-length', 'last-modified',
1412
'etag', 'date', 'x-object-manifest'):
1414
'%14s: %s' % (key.title(), value))
1415
except ClientException, err:
1416
if err.http_status != 404:
1418
error_queue.put('Object %s not found' %
1419
repr('%s/%s' % (args[0], args[1])))
1421
error_queue.put('Usage: %s [options] %s' %
1422
(basename(argv[0]), st_stat_help))
1426
post [options] [container] [object]
1427
Updates meta information for the account, container, or object depending on
1428
the args given. If the container is not found, it will be created
1429
automatically; but this is not true for accounts and objects. Containers
1430
also allow the -r (or --read-acl) and -w (or --write-acl) options. The -m
1431
or --meta option is allowed on all and used to define the user meta data
1432
items to set in the form Name:Value. This option can be repeated. Example:
1433
post -m Color:Blue -m Size:Large'''.strip('\n')
1436
def st_post(options, args, print_queue, error_queue):
1437
parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the '
1438
'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
1439
'.r:-.example.com, .r:www.example.com, account1, account2:user2')
1440
parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
1441
'Write ACL for containers. Quick summary of ACL syntax: account1, '
1443
parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
1444
help='Sets a meta data item with the syntax name:value. This option '
1445
'may be repeated. Example: -m Color:Blue -m Size:Large')
1446
(options, args) = parse_args(parser, args)
1448
if (options.read_acl or options.write_acl) and not args:
1449
exit('-r and -w options only allowed for containers')
1450
conn = Connection(options.auth, options.user, options.key)
1453
for item in options.meta:
1454
split_item = item.split(':')
1455
headers['X-Account-Meta-' + split_item[0]] = \
1456
len(split_item) > 1 and split_item[1]
1458
conn.post_account(headers=headers)
1459
except ClientException, err:
1460
if err.http_status != 404:
1462
error_queue.put('Account not found')
1463
elif len(args) == 1:
1465
print >> stderr, 'WARNING: / in container name; you might have ' \
1466
'meant %r instead of %r.' % \
1467
(args[0].replace('/', ' ', 1), args[0])
1469
for item in options.meta:
1470
split_item = item.split(':')
1471
headers['X-Container-Meta-' + split_item[0]] = \
1472
len(split_item) > 1 and split_item[1]
1473
if options.read_acl is not None:
1474
headers['X-Container-Read'] = options.read_acl
1475
if options.write_acl is not None:
1476
headers['X-Container-Write'] = options.write_acl
1478
conn.post_container(args[0], headers=headers)
1479
except ClientException, err:
1480
if err.http_status != 404:
1482
conn.put_container(args[0], headers=headers)
1483
elif len(args) == 2:
1485
for item in options.meta:
1486
split_item = item.split(':')
1487
headers['X-Object-Meta-' + split_item[0]] = \
1488
len(split_item) > 1 and split_item[1]
1490
conn.post_object(args[0], args[1], headers=headers)
1491
except ClientException, err:
1492
if err.http_status != 404:
1494
error_queue.put('Object %s not found' %
1495
repr('%s/%s' % (args[0], args[1])))
1497
error_queue.put('Usage: %s [options] %s' %
1498
(basename(argv[0]), st_post_help))
1501
st_upload_help = '''
1502
upload [options] container file_or_directory [file_or_directory] [...]
1503
Uploads to the given container the files and directories specified by the
1504
remaining args. -c or --changed is an option that will only upload files
1505
that have changed since the last upload. -S <size> or --segment-size <size>
1506
and --leave-segments are options as well (see --help for more).
1510
def st_upload(options, args, print_queue, error_queue):
1511
parser.add_option('-c', '--changed', action='store_true', dest='changed',
1512
default=False, help='Will only upload files that have changed since '
1514
parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
1515
'upload files in segments no larger than <size> and then create a '
1516
'"manifest" file that will download all the segments as if it were '
1517
'the original file. The segments will be uploaded to a '
1518
'<container>_segments container so as to not pollute the main '
1519
'<container> listings.')
1520
parser.add_option('', '--leave-segments', action='store_true',
1521
dest='leave_segments', default=False, help='Indicates that you want '
1522
'the older segments of manifest objects left alone (in the case of '
1524
(options, args) = parse_args(parser, args)
1527
error_queue.put('Usage: %s [options] %s' %
1528
(basename(argv[0]), st_upload_help))
1530
object_queue = Queue(10000)
1532
def _segment_job(job, conn):
1533
if job.get('delete', False):
1534
conn.delete_object(job['container'], job['obj'])
1536
fp = open(job['path'], 'rb')
1537
fp.seek(job['segment_start'])
1538
conn.put_object(job.get('container', args[0] + '_segments'),
1539
job['obj'], fp, content_length=job['segment_size'])
1540
if options.verbose and 'log_line' in job:
1541
if conn.attempts > 1:
1542
print_queue.put('%s [after %d attempts]' %
1543
(job['log_line'], conn.attempts))
1545
print_queue.put(job['log_line'])
1547
def _object_job(job, conn):
1549
container = job.get('container', args[0])
1550
dir_marker = job.get('dir_marker', False)
1553
if obj.startswith('./') or obj.startswith('.\\'):
1555
put_headers = {'x-object-meta-mtime': str(getmtime(path))}
1559
headers = conn.head_object(container, obj)
1560
ct = headers.get('content-type')
1561
cl = int(headers.get('content-length'))
1562
et = headers.get('etag')
1563
mt = headers.get('x-object-meta-mtime')
1564
if ct.split(';', 1)[0] == 'text/directory' and \
1566
et == 'd41d8cd98f00b204e9800998ecf8427e' and \
1567
mt == put_headers['x-object-meta-mtime']:
1569
except ClientException, err:
1570
if err.http_status != 404:
1572
conn.put_object(container, obj, '', content_length=0,
1573
content_type='text/directory',
1574
headers=put_headers)
1576
# We need to HEAD all objects now in case we're overwriting a
1577
# manifest object and need to delete the old segments
1580
if options.changed or not options.leave_segments:
1582
headers = conn.head_object(container, obj)
1583
cl = int(headers.get('content-length'))
1584
mt = headers.get('x-object-meta-mtime')
1585
if options.changed and cl == getsize(path) and \
1586
mt == put_headers['x-object-meta-mtime']:
1588
if not options.leave_segments:
1589
old_manifest = headers.get('x-object-manifest')
1590
except ClientException, err:
1591
if err.http_status != 404:
1593
if options.segment_size and \
1594
getsize(path) < options.segment_size:
1595
full_size = getsize(path)
1596
segment_queue = Queue(10000)
1597
segment_threads = [QueueFunctionThread(segment_queue,
1598
_segment_job, create_connection()) for _junk in
1600
for thread in segment_threads:
1604
while segment_start < full_size:
1605
segment_size = int(options.segment_size)
1606
if segment_start + segment_size > full_size:
1607
segment_size = full_size - segment_start
1608
segment_queue.put({'path': path,
1609
'obj': '%s/%s/%s/%08d' % (obj,
1610
put_headers['x-object-meta-mtime'], full_size,
1612
'segment_start': segment_start,
1613
'segment_size': segment_size,
1614
'log_line': '%s segment %s' % (obj, segment)})
1616
segment_start += segment_size
1617
while not segment_queue.empty():
1619
for thread in segment_threads:
1621
while thread.isAlive():
1623
if put_errors_from_threads(segment_threads, error_queue):
1624
raise ClientException('Aborting manifest creation '
1625
'because not all segments could be uploaded. %s/%s'
1627
new_object_manifest = '%s_segments/%s/%s/%s/' % (
1628
container, obj, put_headers['x-object-meta-mtime'],
1630
if old_manifest == new_object_manifest:
1632
put_headers['x-object-manifest'] = new_object_manifest
1633
conn.put_object(container, obj, '', content_length=0,
1634
headers=put_headers)
1636
conn.put_object(container, obj, open(path, 'rb'),
1637
content_length=getsize(path), headers=put_headers)
1639
segment_queue = Queue(10000)
1640
scontainer, sprefix = old_manifest.split('/', 1)
1641
for delobj in conn.get_container(scontainer,
1643
segment_queue.put({'delete': True,
1644
'container': scontainer, 'obj': delobj['name']})
1645
if not segment_queue.empty():
1646
segment_threads = [QueueFunctionThread(segment_queue,
1647
_segment_job, create_connection()) for _junk in
1649
for thread in segment_threads:
1651
while not segment_queue.empty():
1653
for thread in segment_threads:
1655
while thread.isAlive():
1657
put_errors_from_threads(segment_threads, error_queue)
1659
if conn.attempts > 1:
1661
'%s [after %d attempts]' % (obj, conn.attempts))
1663
print_queue.put(obj)
1664
except OSError, err:
1665
if err.errno != ENOENT:
1667
error_queue.put('Local file %s not found' % repr(path))
1669
def _upload_dir(path):
1670
names = listdir(path)
1672
object_queue.put({'path': path, 'dir_marker': True})
1674
for name in listdir(path):
1675
subpath = join(path, name)
1677
_upload_dir(subpath)
1679
object_queue.put({'path': subpath})
1681
url, token = get_auth(options.auth, options.user, options.key,
1683
create_connection = lambda: Connection(options.auth, options.user,
1684
options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1685
object_threads = [QueueFunctionThread(object_queue, _object_job,
1686
create_connection()) for _junk in xrange(10)]
1687
for thread in object_threads:
1689
conn = create_connection()
1690
# Try to create the container, just in case it doesn't exist. If this
1691
# fails, it might just be because the user doesn't have container PUT
1692
# permissions, so we'll ignore any error. If there's really a problem,
1693
# it'll surface on the first object PUT.
1695
conn.put_container(args[0])
1696
if options.segment_size is not None:
1697
conn.put_container(args[0] + '_segments')
1701
for arg in args[1:]:
1705
object_queue.put({'path': arg})
1706
while not object_queue.empty():
1708
for thread in object_threads:
1710
while thread.isAlive():
1712
put_errors_from_threads(object_threads, error_queue)
1713
except ClientException, err:
1714
if err.http_status != 404:
1716
error_queue.put('Account not found')
1719
def parse_args(parser, args, enforce_requires=True):
1722
(options, args) = parser.parse_args(args)
1723
if enforce_requires and \
1724
not (options.auth and options.user and options.key):
1726
Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or
1727
overridden with -A, -U, or -K.'''.strip('\n'))
1728
return options, args
1731
if __name__ == '__main__':
1732
parser = OptionParser(version='%prog 1.0', usage='''
1733
Usage: %%prog <command> [options] [args]
1740
%(st_download_help)s
1744
%%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K key stat
1745
'''.strip('\n') % globals())
1746
parser.add_option('-s', '--snet', action='store_true', dest='snet',
1747
default=False, help='Use SERVICENET internal network')
1748
parser.add_option('-v', '--verbose', action='count', dest='verbose',
1749
default=1, help='Print more info')
1750
parser.add_option('-q', '--quiet', action='store_const', dest='verbose',
1751
const=0, default=1, help='Suppress status output')
1752
parser.add_option('-A', '--auth', dest='auth',
1753
default=environ.get('ST_AUTH'),
1754
help='URL for obtaining an auth token')
1755
parser.add_option('-U', '--user', dest='user',
1756
default=environ.get('ST_USER'),
1757
help='User name for obtaining an auth token')
1758
parser.add_option('-K', '--key', dest='key',
1759
default=environ.get('ST_KEY'),
1760
help='Key for obtaining an auth token')
1761
parser.disable_interspersed_args()
1762
(options, args) = parse_args(parser, argv[1:], enforce_requires=False)
1763
parser.enable_interspersed_args()
1765
commands = ('delete', 'download', 'list', 'post', 'stat', 'upload')
1766
if not args or args[0] not in commands:
1767
parser.print_usage()
1769
exit('no such command: %s' % args[0])
1772
print_queue = Queue(10000)
1775
if isinstance(item, unicode):
1776
item = item.encode('utf8')
1779
print_thread = QueueFunctionThread(print_queue, _print)
1780
print_thread.start()
1782
error_queue = Queue(10000)
1785
if isinstance(item, unicode):
1786
item = item.encode('utf8')
1787
print >> stderr, item
1789
error_thread = QueueFunctionThread(error_queue, _error)
1790
error_thread.start()
1793
parser.usage = globals()['st_%s_help' % args[0]]
1795
globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
1797
except (ClientException, HTTPException, socket.error), err:
1798
error_queue.put(str(err))
1799
while not print_queue.empty():
1801
print_thread.abort = True
1802
while print_thread.isAlive():
1803
print_thread.join(0.01)
1804
while not error_queue.empty():
1806
error_thread.abort = True
1807
while error_thread.isAlive():
1808
error_thread.join(0.01)
1809
except (SystemExit, Exception):
1810
for thread in threading_enumerate():