2
# Copyright (c) 2010-2011 OpenStack, LLC.
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
17
from errno import EEXIST, ENOENT
18
from hashlib import md5
19
from optparse import OptionParser
20
from os import environ, listdir, makedirs, utime
21
from os.path import basename, dirname, getmtime, getsize, isdir, join
22
from Queue import Empty, Queue
23
from sys import argv, exit, stderr, stdout
24
from threading import enumerate as threading_enumerate, Thread
25
from time import sleep
28
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
29
# Inclusion of swift.common.client for convenience of single file distribution
32
from cStringIO import StringIO
33
from httplib import HTTPException, HTTPSConnection
34
from re import compile, DOTALL
35
from tokenize import generate_tokens, STRING, NAME, OP
36
from urllib import quote as _quote, unquote
37
from urlparse import urlparse, urlunparse
40
from eventlet import sleep
42
from time import sleep
45
from swift.common.bufferedhttp \
46
import BufferedHTTPConnection as HTTPConnection
48
from httplib import HTTPConnection
51
def quote(value, safe='/'):
53
Patched version of urllib.quote that encodes utf8 strings before quoting
55
if isinstance(value, unicode):
56
value = value.encode('utf8')
57
return _quote(value, safe)
60
# look for a real json parser first
62
# simplejson is popular and pretty good
63
from simplejson import loads as json_loads
66
# 2.6 will have a json module in the stdlib
67
from json import loads as json_loads
69
# fall back on local parser otherwise
70
comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
72
def json_loads(string):
74
Fairly competent json parser exploiting the python tokenizer and
75
eval(). -- From python-cloudfiles
77
_loads(serialized_json) -> object
81
consts = {'true': True, 'false': False, 'null': None}
82
string = '(' + comments.sub('', string) + ')'
83
for type, val, _junk, _junk, _junk in \
84
generate_tokens(StringIO(string).readline):
85
if (type == OP and val not in '[]{}:,()-') or \
86
(type == NAME and val not in consts):
87
raise AttributeError()
90
res.append(val.replace('\\/', '/'))
93
return eval(''.join(res), {}, consts)
95
raise AttributeError()
98
class ClientException(Exception):
100
def __init__(self, msg, http_scheme='', http_host='', http_port='',
101
http_path='', http_query='', http_status=0, http_reason='',
103
Exception.__init__(self, msg)
105
self.http_scheme = http_scheme
106
self.http_host = http_host
107
self.http_port = http_port
108
self.http_path = http_path
109
self.http_query = http_query
110
self.http_status = http_status
111
self.http_reason = http_reason
112
self.http_device = http_device
118
b += '%s://' % self.http_scheme
122
b += ':%s' % self.http_port
126
b += '?%s' % self.http_query
129
b = '%s %s' % (b, self.http_status)
131
b = str(self.http_status)
134
b = '%s %s' % (b, self.http_reason)
136
b = '- %s' % self.http_reason
139
b = '%s: device %s' % (b, self.http_device)
141
b = 'device %s' % self.http_device
142
return b and '%s: %s' % (a, b) or a
145
def http_connection(url):
147
Make an HTTPConnection or HTTPSConnection
149
:param url: url to connect to
150
:returns: tuple of (parsed url, connection object)
151
:raises ClientException: Unable to handle protocol scheme
153
parsed = urlparse(url)
154
if parsed.scheme == 'http':
155
conn = HTTPConnection(parsed.netloc)
156
elif parsed.scheme == 'https':
157
conn = HTTPSConnection(parsed.netloc)
159
raise ClientException('Cannot handle protocol scheme %s for url %s' %
160
(parsed.scheme, repr(url)))
164
def get_auth(url, user, key, snet=False):
166
Get authentication/authorization credentials.
168
The snet parameter is used for Rackspace's ServiceNet internal network
169
implementation. In this function, it simply adds *snet-* to the beginning
170
of the host name for the returned storage URL. With Rackspace Cloud Files,
171
use of this network path causes no bandwidth charges but requires the
172
client to be running on Rackspace's ServiceNet network.
174
:param url: authentication/authorization URL
175
:param user: user to authenticate as
176
:param key: key or password for authorization
177
:param snet: use SERVICENET internal network (see above), default is False
178
:returns: tuple of (storage URL, auth token)
179
:raises ClientException: HTTP GET request to auth URL failed
181
parsed, conn = http_connection(url)
182
conn.request('GET', parsed.path, '',
183
{'X-Auth-User': user, 'X-Auth-Key': key})
184
resp = conn.getresponse()
186
if resp.status < 200 or resp.status >= 300:
187
raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
188
http_host=conn.host, http_port=conn.port,
189
http_path=parsed.path, http_status=resp.status,
190
http_reason=resp.reason)
191
url = resp.getheader('x-storage-url')
193
parsed = list(urlparse(url))
194
# Second item in the list is the netloc
195
parsed[1] = 'snet-' + parsed[1]
196
url = urlunparse(parsed)
197
return url, resp.getheader('x-storage-token',
198
resp.getheader('x-auth-token'))
201
def get_account(url, token, marker=None, limit=None, prefix=None,
202
http_conn=None, full_listing=False):
204
Get a listing of containers for the account.
206
:param url: storage URL
207
:param token: auth token
208
:param marker: marker query
209
:param limit: limit query
210
:param prefix: prefix query
211
:param http_conn: HTTP connection object (If None, it will create the
213
:param full_listing: if True, return a full listing, else returns a max
215
:returns: a tuple of (response headers, a list of containers) The response
216
headers will be a dict and all header names will be lowercase.
217
:raises ClientException: HTTP GET request failed
220
http_conn = http_connection(url)
222
rv = get_account(url, token, marker, limit, prefix, http_conn)
225
marker = listing[-1]['name']
227
get_account(url, token, marker, limit, prefix, http_conn)[1]
231
parsed, conn = http_conn
234
qs += '&marker=%s' % quote(marker)
236
qs += '&limit=%d' % limit
238
qs += '&prefix=%s' % quote(prefix)
239
conn.request('GET', '%s?%s' % (parsed.path, qs), '',
240
{'X-Auth-Token': token})
241
resp = conn.getresponse()
243
for header, value in resp.getheaders():
244
resp_headers[header.lower()] = value
245
if resp.status < 200 or resp.status >= 300:
247
raise ClientException('Account GET failed', http_scheme=parsed.scheme,
248
http_host=conn.host, http_port=conn.port,
249
http_path=parsed.path, http_query=qs, http_status=resp.status,
250
http_reason=resp.reason)
251
if resp.status == 204:
253
return resp_headers, []
254
return resp_headers, json_loads(resp.read())
257
def head_account(url, token, http_conn=None):
261
:param url: storage URL
262
:param token: auth token
263
:param http_conn: HTTP connection object (If None, it will create the
265
:returns: a dict containing the response's headers (all header names will
267
:raises ClientException: HTTP HEAD request failed
270
parsed, conn = http_conn
272
parsed, conn = http_connection(url)
273
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
274
resp = conn.getresponse()
276
if resp.status < 200 or resp.status >= 300:
277
raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
278
http_host=conn.host, http_port=conn.port,
279
http_path=parsed.path, http_status=resp.status,
280
http_reason=resp.reason)
282
for header, value in resp.getheaders():
283
resp_headers[header.lower()] = value
287
def post_account(url, token, headers, http_conn=None):
289
Update an account's metadata.
291
:param url: storage URL
292
:param token: auth token
293
:param headers: additional headers to include in the request
294
:param http_conn: HTTP connection object (If None, it will create the
296
:raises ClientException: HTTP POST request failed
299
parsed, conn = http_conn
301
parsed, conn = http_connection(url)
302
headers['X-Auth-Token'] = token
303
conn.request('POST', parsed.path, '', headers)
304
resp = conn.getresponse()
306
if resp.status < 200 or resp.status >= 300:
307
raise ClientException('Account POST failed',
308
http_scheme=parsed.scheme, http_host=conn.host,
309
http_port=conn.port, http_path=path, http_status=resp.status,
310
http_reason=resp.reason)
313
def get_container(url, token, container, marker=None, limit=None,
314
prefix=None, delimiter=None, http_conn=None,
317
Get a listing of objects for the container.
319
:param url: storage URL
320
:param token: auth token
321
:param container: container name to get a listing for
322
:param marker: marker query
323
:param limit: limit query
324
:param prefix: prefix query
325
:param delimeter: string to delimit the queries on
326
:param http_conn: HTTP connection object (If None, it will create the
328
:param full_listing: if True, return a full listing, else returns a max
330
:returns: a tuple of (response headers, a list of objects) The response
331
headers will be a dict and all header names will be lowercase.
332
:raises ClientException: HTTP GET request failed
335
http_conn = http_connection(url)
337
rv = get_container(url, token, container, marker, limit, prefix,
338
delimiter, http_conn)
342
marker = listing[-1]['name']
344
marker = listing[-1].get('name', listing[-1].get('subdir'))
345
listing = get_container(url, token, container, marker, limit,
346
prefix, delimiter, http_conn)[1]
348
rv[1].extend(listing)
350
parsed, conn = http_conn
351
path = '%s/%s' % (parsed.path, quote(container))
354
qs += '&marker=%s' % quote(marker)
356
qs += '&limit=%d' % limit
358
qs += '&prefix=%s' % quote(prefix)
360
qs += '&delimiter=%s' % quote(delimiter)
361
conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
362
resp = conn.getresponse()
363
if resp.status < 200 or resp.status >= 300:
365
raise ClientException('Container GET failed',
366
http_scheme=parsed.scheme, http_host=conn.host,
367
http_port=conn.port, http_path=path, http_query=qs,
368
http_status=resp.status, http_reason=resp.reason)
370
for header, value in resp.getheaders():
371
resp_headers[header.lower()] = value
372
if resp.status == 204:
374
return resp_headers, []
375
return resp_headers, json_loads(resp.read())
378
def head_container(url, token, container, http_conn=None):
382
:param url: storage URL
383
:param token: auth token
384
:param container: container name to get stats for
385
:param http_conn: HTTP connection object (If None, it will create the
387
:returns: a dict containing the response's headers (all header names will
389
:raises ClientException: HTTP HEAD request failed
392
parsed, conn = http_conn
394
parsed, conn = http_connection(url)
395
path = '%s/%s' % (parsed.path, quote(container))
396
conn.request('HEAD', path, '', {'X-Auth-Token': token})
397
resp = conn.getresponse()
399
if resp.status < 200 or resp.status >= 300:
400
raise ClientException('Container HEAD failed',
401
http_scheme=parsed.scheme, http_host=conn.host,
402
http_port=conn.port, http_path=path, http_status=resp.status,
403
http_reason=resp.reason)
405
for header, value in resp.getheaders():
406
resp_headers[header.lower()] = value
410
def put_container(url, token, container, headers=None, http_conn=None):
414
:param url: storage URL
415
:param token: auth token
416
:param container: container name to create
417
:param headers: additional headers to include in the request
418
:param http_conn: HTTP connection object (If None, it will create the
420
:raises ClientException: HTTP PUT request failed
423
parsed, conn = http_conn
425
parsed, conn = http_connection(url)
426
path = '%s/%s' % (parsed.path, quote(container))
429
headers['X-Auth-Token'] = token
430
conn.request('PUT', path, '', headers)
431
resp = conn.getresponse()
433
if resp.status < 200 or resp.status >= 300:
434
raise ClientException('Container PUT failed',
435
http_scheme=parsed.scheme, http_host=conn.host,
436
http_port=conn.port, http_path=path, http_status=resp.status,
437
http_reason=resp.reason)
440
def post_container(url, token, container, headers, http_conn=None):
442
Update a container's metadata.
444
:param url: storage URL
445
:param token: auth token
446
:param container: container name to update
447
:param headers: additional headers to include in the request
448
:param http_conn: HTTP connection object (If None, it will create the
450
:raises ClientException: HTTP POST request failed
453
parsed, conn = http_conn
455
parsed, conn = http_connection(url)
456
path = '%s/%s' % (parsed.path, quote(container))
457
headers['X-Auth-Token'] = token
458
conn.request('POST', path, '', headers)
459
resp = conn.getresponse()
461
if resp.status < 200 or resp.status >= 300:
462
raise ClientException('Container POST failed',
463
http_scheme=parsed.scheme, http_host=conn.host,
464
http_port=conn.port, http_path=path, http_status=resp.status,
465
http_reason=resp.reason)
468
def delete_container(url, token, container, http_conn=None):
472
:param url: storage URL
473
:param token: auth token
474
:param container: container name to delete
475
:param http_conn: HTTP connection object (If None, it will create the
477
:raises ClientException: HTTP DELETE request failed
480
parsed, conn = http_conn
482
parsed, conn = http_connection(url)
483
path = '%s/%s' % (parsed.path, quote(container))
484
conn.request('DELETE', path, '', {'X-Auth-Token': token})
485
resp = conn.getresponse()
487
if resp.status < 200 or resp.status >= 300:
488
raise ClientException('Container DELETE failed',
489
http_scheme=parsed.scheme, http_host=conn.host,
490
http_port=conn.port, http_path=path, http_status=resp.status,
491
http_reason=resp.reason)
494
def get_object(url, token, container, name, http_conn=None,
495
resp_chunk_size=None):
499
:param url: storage URL
500
:param token: auth token
501
:param container: container name that the object is in
502
:param name: object name to get
503
:param http_conn: HTTP connection object (If None, it will create the
505
:param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
506
you specify a resp_chunk_size you must fully read
507
the object's contents before making another
509
:returns: a tuple of (response headers, the object's contents) The response
510
headers will be a dict and all header names will be lowercase.
511
:raises ClientException: HTTP GET request failed
514
parsed, conn = http_conn
516
parsed, conn = http_connection(url)
517
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
518
conn.request('GET', path, '', {'X-Auth-Token': token})
519
resp = conn.getresponse()
520
if resp.status < 200 or resp.status >= 300:
522
raise ClientException('Object GET failed', http_scheme=parsed.scheme,
523
http_host=conn.host, http_port=conn.port, http_path=path,
524
http_status=resp.status, http_reason=resp.reason)
528
buf = resp.read(resp_chunk_size)
531
buf = resp.read(resp_chunk_size)
532
object_body = _object_body()
534
object_body = resp.read()
536
for header, value in resp.getheaders():
537
resp_headers[header.lower()] = value
538
return resp_headers, object_body
541
def head_object(url, token, container, name, http_conn=None):
545
:param url: storage URL
546
:param token: auth token
547
:param container: container name that the object is in
548
:param name: object name to get info for
549
:param http_conn: HTTP connection object (If None, it will create the
551
:returns: a dict containing the response's headers (all header names will
553
:raises ClientException: HTTP HEAD request failed
556
parsed, conn = http_conn
558
parsed, conn = http_connection(url)
559
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
560
conn.request('HEAD', path, '', {'X-Auth-Token': token})
561
resp = conn.getresponse()
563
if resp.status < 200 or resp.status >= 300:
564
raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
565
http_host=conn.host, http_port=conn.port, http_path=path,
566
http_status=resp.status, http_reason=resp.reason)
568
for header, value in resp.getheaders():
569
resp_headers[header.lower()] = value
573
def put_object(url, token, container, name, contents, content_length=None,
574
etag=None, chunk_size=65536, content_type=None, headers=None,
579
:param url: storage URL
580
:param token: auth token
581
:param container: container name that the object is in
582
:param name: object name to put
583
:param contents: a string or a file like object to read object data from
584
:param content_length: value to send as content-length header; also limits
585
the amount read from contents
586
:param etag: etag of contents
587
:param chunk_size: chunk size of data to write
588
:param content_type: value to send as content-type header
589
:param headers: additional headers to include in the request
590
:param http_conn: HTTP connection object (If None, it will create the
592
:returns: etag from server response
593
:raises ClientException: HTTP PUT request failed
596
parsed, conn = http_conn
598
parsed, conn = http_connection(url)
599
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
602
headers['X-Auth-Token'] = token
604
headers['ETag'] = etag.strip('"')
605
if content_length is not None:
606
headers['Content-Length'] = str(content_length)
607
if content_type is not None:
608
headers['Content-Type'] = content_type
610
headers['Content-Length'] = '0'
611
if hasattr(contents, 'read'):
612
conn.putrequest('PUT', path)
613
for header, value in headers.iteritems():
614
conn.putheader(header, value)
615
if content_length is None:
616
conn.putheader('Transfer-Encoding', 'chunked')
618
chunk = contents.read(chunk_size)
620
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
621
chunk = contents.read(chunk_size)
622
conn.send('0\r\n\r\n')
625
left = content_length
630
chunk = contents.read(size)
634
conn.request('PUT', path, contents, headers)
635
resp = conn.getresponse()
637
if resp.status < 200 or resp.status >= 300:
638
raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
639
http_host=conn.host, http_port=conn.port, http_path=path,
640
http_status=resp.status, http_reason=resp.reason)
641
return resp.getheader('etag').strip('"')
644
def post_object(url, token, container, name, headers, http_conn=None):
646
Update object metadata
648
:param url: storage URL
649
:param token: auth token
650
:param container: container name that the object is in
651
:param name: name of the object to update
652
:param headers: additional headers to include in the request
653
:param http_conn: HTTP connection object (If None, it will create the
655
:raises ClientException: HTTP POST request failed
658
parsed, conn = http_conn
660
parsed, conn = http_connection(url)
661
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
662
headers['X-Auth-Token'] = token
663
conn.request('POST', path, '', headers)
664
resp = conn.getresponse()
666
if resp.status < 200 or resp.status >= 300:
667
raise ClientException('Object POST failed', http_scheme=parsed.scheme,
668
http_host=conn.host, http_port=conn.port, http_path=path,
669
http_status=resp.status, http_reason=resp.reason)
672
def delete_object(url, token, container, name, http_conn=None):
676
:param url: storage URL
677
:param token: auth token
678
:param container: container name that the object is in
679
:param name: object name to delete
680
:param http_conn: HTTP connection object (If None, it will create the
682
:raises ClientException: HTTP DELETE request failed
685
parsed, conn = http_conn
687
parsed, conn = http_connection(url)
688
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
689
conn.request('DELETE', path, '', {'X-Auth-Token': token})
690
resp = conn.getresponse()
692
if resp.status < 200 or resp.status >= 300:
693
raise ClientException('Object DELETE failed',
694
http_scheme=parsed.scheme, http_host=conn.host,
695
http_port=conn.port, http_path=path, http_status=resp.status,
696
http_reason=resp.reason)
699
class Connection(object):
700
"""Convenience class to make requests that will also retry the request"""
702
def __init__(self, authurl, user, key, retries=5, preauthurl=None,
703
preauthtoken=None, snet=False):
705
:param authurl: authenitcation URL
706
:param user: user name to authenticate as
707
:param key: key/password to authenticate with
708
:param retries: Number of times to retry the request before failing
709
:param preauthurl: storage URL (if you have already authenticated)
710
:param preauthtoken: authentication token (if you have already
712
:param snet: use SERVICENET internal network default is False
714
self.authurl = authurl
717
self.retries = retries
718
self.http_conn = None
719
self.url = preauthurl
720
self.token = preauthtoken
725
return get_auth(self.authurl, self.user, self.key, snet=self.snet)
727
def http_connection(self):
728
return http_connection(self.url)
730
def _retry(self, func, *args, **kwargs):
733
while self.attempts <= self.retries:
736
if not self.url or not self.token:
737
self.url, self.token = self.get_auth()
738
self.http_conn = None
739
if not self.http_conn:
740
self.http_conn = self.http_connection()
741
kwargs['http_conn'] = self.http_conn
742
rv = func(self.url, self.token, *args, **kwargs)
744
except (socket.error, HTTPException):
745
if self.attempts > self.retries:
747
self.http_conn = None
748
except ClientException, err:
749
if self.attempts > self.retries:
751
if err.http_status == 401:
752
self.url = self.token = None
753
if self.attempts > 1:
755
elif 500 <= err.http_status <= 599:
762
def head_account(self):
763
"""Wrapper for :func:`head_account`"""
764
return self._retry(head_account)
766
def get_account(self, marker=None, limit=None, prefix=None,
768
"""Wrapper for :func:`get_account`"""
769
# TODO(unknown): With full_listing=True this will restart the entire
770
# listing with each retry. Need to make a better version that just
771
# retries where it left off.
772
return self._retry(get_account, marker=marker, limit=limit,
773
prefix=prefix, full_listing=full_listing)
775
def post_account(self, headers):
776
"""Wrapper for :func:`post_account`"""
777
return self._retry(post_account, headers)
779
def head_container(self, container):
780
"""Wrapper for :func:`head_container`"""
781
return self._retry(head_container, container)
783
def get_container(self, container, marker=None, limit=None, prefix=None,
784
delimiter=None, full_listing=False):
785
"""Wrapper for :func:`get_container`"""
786
# TODO(unknown): With full_listing=True this will restart the entire
787
# listing with each retry. Need to make a better version that just
788
# retries where it left off.
789
return self._retry(get_container, container, marker=marker,
790
limit=limit, prefix=prefix, delimiter=delimiter,
791
full_listing=full_listing)
793
def put_container(self, container, headers=None):
794
"""Wrapper for :func:`put_container`"""
795
return self._retry(put_container, container, headers=headers)
797
def post_container(self, container, headers):
798
"""Wrapper for :func:`post_container`"""
799
return self._retry(post_container, container, headers)
801
def delete_container(self, container):
802
"""Wrapper for :func:`delete_container`"""
803
return self._retry(delete_container, container)
805
def head_object(self, container, obj):
806
"""Wrapper for :func:`head_object`"""
807
return self._retry(head_object, container, obj)
809
def get_object(self, container, obj, resp_chunk_size=None):
810
"""Wrapper for :func:`get_object`"""
811
return self._retry(get_object, container, obj,
812
resp_chunk_size=resp_chunk_size)
814
def put_object(self, container, obj, contents, content_length=None,
815
etag=None, chunk_size=65536, content_type=None,
817
"""Wrapper for :func:`put_object`"""
818
return self._retry(put_object, container, obj, contents,
819
content_length=content_length, etag=etag, chunk_size=chunk_size,
820
content_type=content_type, headers=headers)
822
def post_object(self, container, obj, headers):
823
"""Wrapper for :func:`post_object`"""
824
return self._retry(post_object, container, obj, headers)
826
def delete_object(self, container, obj):
827
"""Wrapper for :func:`delete_object`"""
828
return self._retry(delete_object, container, obj)
830
# End inclusion of swift.common.client
831
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
838
if err.errno != EEXIST:
842
class QueueFunctionThread(Thread):
844
def __init__(self, queue, func, *args, **kwargs):
845
""" Calls func for each item in queue; func is called with a queued
846
item as the first arg followed by *args and **kwargs. Use the abort
847
attribute to have the thread empty the queue (without processing)
849
Thread.__init__(self)
859
item = self.queue.get_nowait()
861
self.func(item, *self.args, **self.kwargs)
862
self.queue.task_done()
870
delete --all OR delete container [--leave-segments] [object] [object] ...
871
Deletes everything in the account (with --all), or everything in a
872
container, or a list of objects depending on the args given. Segments of
873
manifest objects will be deleted as well, unless you specify the
874
--leave-segments option.'''.strip('\n')
877
def st_delete(parser, args, print_queue, error_queue):
878
parser.add_option('-a', '--all', action='store_true', dest='yes_all',
879
default=False, help='Indicates that you really want to delete '
880
'everything in the account')
881
parser.add_option('', '--leave-segments', action='store_true',
882
dest='leave_segments', default=False, help='Indicates that you want '
883
'the segments of manifest objects left alone')
884
(options, args) = parse_args(parser, args)
886
if (not args and not options.yes_all) or (args and options.yes_all):
887
error_queue.put('Usage: %s [options] %s' %
888
(basename(argv[0]), st_delete_help))
891
def _delete_segment((container, obj), conn):
892
conn.delete_object(container, obj)
894
print_queue.put('%s/%s' % (container, obj))
896
object_queue = Queue(10000)
898
def _delete_object((container, obj), conn):
901
if not options.leave_segments:
903
old_manifest = conn.head_object(container, obj).get(
905
except ClientException, err:
906
if err.http_status != 404:
908
conn.delete_object(container, obj)
910
segment_queue = Queue(10000)
911
scontainer, sprefix = old_manifest.split('/', 1)
912
for delobj in conn.get_container(scontainer,
914
segment_queue.put((scontainer, delobj['name']))
915
if not segment_queue.empty():
916
segment_threads = [QueueFunctionThread(segment_queue,
917
_delete_segment, create_connection()) for _junk in
919
for thread in segment_threads:
921
while not segment_queue.empty():
923
for thread in segment_threads:
925
while thread.isAlive():
928
path = options.yes_all and join(container, obj) or obj
929
if path[:1] in ('/', '\\'):
931
print_queue.put(path)
932
except ClientException, err:
933
if err.http_status != 404:
935
error_queue.put('Object %s not found' %
936
repr('%s/%s' % (container, obj)))
938
container_queue = Queue(10000)
940
def _delete_container(container, conn):
944
objects = [o['name'] for o in
945
conn.get_container(container, marker=marker)[1]]
949
object_queue.put((container, obj))
951
while not object_queue.empty():
956
conn.delete_container(container)
958
except ClientException, err:
959
if err.http_status != 409:
965
except ClientException, err:
966
if err.http_status != 404:
968
error_queue.put('Container %s not found' % repr(container))
970
url, token = get_auth(options.auth, options.user, options.key,
972
create_connection = lambda: Connection(options.auth, options.user,
973
options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
974
object_threads = [QueueFunctionThread(object_queue, _delete_object,
975
create_connection()) for _junk in xrange(10)]
976
for thread in object_threads:
978
container_threads = [QueueFunctionThread(container_queue,
979
_delete_container, create_connection()) for _junk in xrange(10)]
980
for thread in container_threads:
983
conn = create_connection()
988
[c['name'] for c in conn.get_account(marker=marker)[1]]
991
for container in containers:
992
container_queue.put(container)
993
marker = containers[-1]
994
while not container_queue.empty():
996
while not object_queue.empty():
998
except ClientException, err:
999
if err.http_status != 404:
1001
error_queue.put('Account not found')
1002
elif len(args) == 1:
1004
print >> stderr, 'WARNING: / in container name; you might have ' \
1005
'meant %r instead of %r.' % \
1006
(args[0].replace('/', ' ', 1), args[0])
1007
conn = create_connection()
1008
_delete_container(args[0], conn)
1010
for obj in args[1:]:
1011
object_queue.put((args[0], obj))
1012
while not container_queue.empty():
1014
for thread in container_threads:
1016
while thread.isAlive():
1018
while not object_queue.empty():
1020
for thread in object_threads:
1022
while thread.isAlive():
1026
st_download_help = '''
1027
download --all OR download container [options] [object] [object] ...
1028
Downloads everything in the account (with --all), or everything in a
1029
container, or a list of objects depending on the args given. For a single
1030
object download, you may use the -o [--output] <filename> option to
1031
redirect the output to a specific file or if "-" then just redirect to
1032
stdout.'''.strip('\n')
1035
def st_download(options, args, print_queue, error_queue):
1036
parser.add_option('-a', '--all', action='store_true', dest='yes_all',
1037
default=False, help='Indicates that you really want to download '
1038
'everything in the account')
1039
parser.add_option('-o', '--output', dest='out_file', help='For a single '
1040
'file download, stream the output to an alternate location ')
1041
(options, args) = parse_args(parser, args)
1043
if options.out_file == '-':
1045
if options.out_file and len(args) != 2:
1046
exit('-o option only allowed for single file downloads')
1047
if (not args and not options.yes_all) or (args and options.yes_all):
1048
error_queue.put('Usage: %s [options] %s' %
1049
(basename(argv[0]), st_download_help))
1052
object_queue = Queue(10000)
1054
def _download_object(queue_arg, conn):
1055
if len(queue_arg) == 2:
1056
container, obj = queue_arg
1058
elif len(queue_arg) == 3:
1059
container, obj, out_file = queue_arg
1061
raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
1064
conn.get_object(container, obj, resp_chunk_size=65536)
1065
content_type = headers.get('content-type')
1066
if 'content-length' in headers:
1067
content_length = int(headers.get('content-length'))
1069
content_length = None
1070
etag = headers.get('etag')
1071
path = options.yes_all and join(container, obj) or obj
1072
if path[:1] in ('/', '\\'):
1075
make_dir = out_file != "-"
1076
if content_type.split(';', 1)[0] == 'text/directory':
1077
if make_dir and not isdir(path):
1080
if 'x-object-manifest' not in headers:
1083
read_length += len(chunk)
1085
md5sum.update(chunk)
1087
dirpath = dirname(path)
1088
if make_dir and dirpath and not isdir(dirpath):
1093
fp = open(out_file, 'wb')
1095
fp = open(path, 'wb')
1097
if 'x-object-manifest' not in headers:
1101
read_length += len(chunk)
1103
md5sum.update(chunk)
1105
if md5sum and md5sum.hexdigest() != etag:
1106
error_queue.put('%s: md5sum != etag, %s != %s' %
1107
(path, md5sum.hexdigest(), etag))
1108
if content_length is not None and read_length != content_length:
1109
error_queue.put('%s: read_length != content_length, %d != %d' %
1110
(path, read_length, content_length))
1111
if 'x-object-meta-mtime' in headers and not options.out_file:
1112
mtime = float(headers['x-object-meta-mtime'])
1113
utime(path, (mtime, mtime))
1115
print_queue.put(path)
1116
except ClientException, err:
1117
if err.http_status != 404:
1119
error_queue.put('Object %s not found' %
1120
repr('%s/%s' % (container, obj)))
1122
container_queue = Queue(10000)
1124
def _download_container(container, conn):
1128
objects = [o['name'] for o in
1129
conn.get_container(container, marker=marker)[1]]
1133
object_queue.put((container, obj))
1134
marker = objects[-1]
1135
except ClientException, err:
1136
if err.http_status != 404:
1138
error_queue.put('Container %s not found' % repr(container))
1140
url, token = get_auth(options.auth, options.user, options.key,
1142
create_connection = lambda: Connection(options.auth, options.user,
1143
options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1144
object_threads = [QueueFunctionThread(object_queue, _download_object,
1145
create_connection()) for _junk in xrange(10)]
1146
for thread in object_threads:
1148
container_threads = [QueueFunctionThread(container_queue,
1149
_download_container, create_connection()) for _junk in xrange(10)]
1150
for thread in container_threads:
1153
conn = create_connection()
1157
containers = [c['name']
1158
for c in conn.get_account(marker=marker)[1]]
1161
for container in containers:
1162
container_queue.put(container)
1163
marker = containers[-1]
1164
except ClientException, err:
1165
if err.http_status != 404:
1167
error_queue.put('Account not found')
1168
elif len(args) == 1:
1170
print >> stderr, 'WARNING: / in container name; you might have ' \
1171
'meant %r instead of %r.' % \
1172
(args[0].replace('/', ' ', 1), args[0])
1173
_download_container(args[0], create_connection())
1177
object_queue.put((args[0], obj, options.out_file))
1179
for obj in args[1:]:
1180
object_queue.put((args[0], obj))
1181
while not container_queue.empty():
1183
for thread in container_threads:
1185
while thread.isAlive():
1187
while not object_queue.empty():
1189
for thread in object_threads:
1191
while thread.isAlive():
1196
list [options] [container]
1197
Lists the containers for the account or the objects for a container. -p or
1198
--prefix is an option that will only list items beginning with that prefix.
1199
-d or --delimiter is option (for container listings only) that will roll up
1200
items with the given delimiter (see Cloud Files general documentation for
1205
def st_list(options, args, print_queue, error_queue):
1206
parser.add_option('-p', '--prefix', dest='prefix', help='Will only list '
1207
'items beginning with the prefix')
1208
parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll '
1209
'up items with the given delimiter (see Cloud Files general '
1210
'documentation for what this means)')
1211
(options, args) = parse_args(parser, args)
1213
if options.delimiter and not args:
1214
exit('-d option only allowed for container listings')
1216
error_queue.put('Usage: %s [options] %s' %
1217
(basename(argv[0]), st_list_help))
1219
conn = Connection(options.auth, options.user, options.key,
1226
conn.get_account(marker=marker, prefix=options.prefix)[1]
1228
items = conn.get_container(args[0], marker=marker,
1229
prefix=options.prefix, delimiter=options.delimiter)[1]
1233
print_queue.put(item.get('name', item.get('subdir')))
1234
marker = items[-1].get('name', items[-1].get('subdir'))
1235
except ClientException, err:
1236
if err.http_status != 404:
1239
error_queue.put('Account not found')
1241
error_queue.put('Container %s not found' % repr(args[0]))
1245
stat [container] [object]
1246
Displays information for the account, container, or object depending on the
1247
args given (if any).'''.strip('\n')
1250
def st_stat(options, args, print_queue, error_queue):
1251
(options, args) = parse_args(parser, args)
1253
conn = Connection(options.auth, options.user, options.key)
1256
headers = conn.head_account()
1257
if options.verbose > 1:
1261
'''.strip('\n') % (conn.url, conn.token))
1262
container_count = int(headers.get('x-account-container-count', 0))
1263
object_count = int(headers.get('x-account-object-count', 0))
1264
bytes_used = int(headers.get('x-account-bytes-used', 0))
1269
Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
1270
object_count, bytes_used))
1271
for key, value in headers.items():
1272
if key.startswith('x-account-meta-'):
1273
print_queue.put('%10s: %s' % ('Meta %s' %
1274
key[len('x-account-meta-'):].title(), value))
1275
for key, value in headers.items():
1276
if not key.startswith('x-account-meta-') and key not in (
1277
'content-length', 'date', 'x-account-container-count',
1278
'x-account-object-count', 'x-account-bytes-used'):
1280
'%10s: %s' % (key.title(), value))
1281
except ClientException, err:
1282
if err.http_status != 404:
1284
error_queue.put('Account not found')
1285
elif len(args) == 1:
1287
print >> stderr, 'WARNING: / in container name; you might have ' \
1288
'meant %r instead of %r.' % \
1289
(args[0].replace('/', ' ', 1), args[0])
1291
headers = conn.head_container(args[0])
1292
object_count = int(headers.get('x-container-object-count', 0))
1293
bytes_used = int(headers.get('x-container-bytes-used', 0))
1300
Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1301
object_count, bytes_used,
1302
headers.get('x-container-read', ''),
1303
headers.get('x-container-write', '')))
1304
for key, value in headers.items():
1305
if key.startswith('x-container-meta-'):
1306
print_queue.put('%9s: %s' % ('Meta %s' %
1307
key[len('x-container-meta-'):].title(), value))
1308
for key, value in headers.items():
1309
if not key.startswith('x-container-meta-') and key not in (
1310
'content-length', 'date', 'x-container-object-count',
1311
'x-container-bytes-used', 'x-container-read',
1312
'x-container-write'):
1314
'%9s: %s' % (key.title(), value))
1315
except ClientException, err:
1316
if err.http_status != 404:
1318
error_queue.put('Container %s not found' % repr(args[0]))
1319
elif len(args) == 2:
1321
headers = conn.head_object(args[0], args[1])
1326
Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
1327
args[1], headers.get('content-type')))
1328
if 'content-length' in headers:
1329
print_queue.put('Content Length: %s' %
1330
headers['content-length'])
1331
if 'last-modified' in headers:
1332
print_queue.put(' Last Modified: %s' %
1333
headers['last-modified'])
1334
if 'etag' in headers:
1335
print_queue.put(' ETag: %s' % headers['etag'])
1336
if 'x-object-manifest' in headers:
1337
print_queue.put(' Manifest: %s' %
1338
headers['x-object-manifest'])
1339
for key, value in headers.items():
1340
if key.startswith('x-object-meta-'):
1341
print_queue.put('%14s: %s' % ('Meta %s' %
1342
key[len('x-object-meta-'):].title(), value))
1343
for key, value in headers.items():
1344
if not key.startswith('x-object-meta-') and key not in (
1345
'content-type', 'content-length', 'last-modified',
1346
'etag', 'date', 'x-object-manifest'):
1348
'%14s: %s' % (key.title(), value))
1349
except ClientException, err:
1350
if err.http_status != 404:
1352
error_queue.put('Object %s not found' %
1353
repr('%s/%s' % (args[0], args[1])))
1355
error_queue.put('Usage: %s [options] %s' %
1356
(basename(argv[0]), st_stat_help))
1360
post [options] [container] [object]
1361
Updates meta information for the account, container, or object depending on
1362
the args given. If the container is not found, it will be created
1363
automatically; but this is not true for accounts and objects. Containers
1364
also allow the -r (or --read-acl) and -w (or --write-acl) options. The -m
1365
or --meta option is allowed on all and used to define the user meta data
1366
items to set in the form Name:Value. This option can be repeated. Example:
1367
post -m Color:Blue -m Size:Large'''.strip('\n')
1370
def st_post(options, args, print_queue, error_queue):
1371
parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the '
1372
'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
1373
'.r:-.example.com, .r:www.example.com, account1, account2:user2')
1374
parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
1375
'Write ACL for containers. Quick summary of ACL syntax: account1, '
1377
parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
1378
help='Sets a meta data item with the syntax name:value. This option '
1379
'may be repeated. Example: -m Color:Blue -m Size:Large')
1380
(options, args) = parse_args(parser, args)
1382
if (options.read_acl or options.write_acl) and not args:
1383
exit('-r and -w options only allowed for containers')
1384
conn = Connection(options.auth, options.user, options.key)
1387
for item in options.meta:
1388
split_item = item.split(':')
1389
headers['X-Account-Meta-' + split_item[0]] = \
1390
len(split_item) > 1 and split_item[1]
1392
conn.post_account(headers=headers)
1393
except ClientException, err:
1394
if err.http_status != 404:
1396
error_queue.put('Account not found')
1397
elif len(args) == 1:
1399
print >> stderr, 'WARNING: / in container name; you might have ' \
1400
'meant %r instead of %r.' % \
1401
(args[0].replace('/', ' ', 1), args[0])
1403
for item in options.meta:
1404
split_item = item.split(':')
1405
headers['X-Container-Meta-' + split_item[0]] = \
1406
len(split_item) > 1 and split_item[1]
1407
if options.read_acl is not None:
1408
headers['X-Container-Read'] = options.read_acl
1409
if options.write_acl is not None:
1410
headers['X-Container-Write'] = options.write_acl
1412
conn.post_container(args[0], headers=headers)
1413
except ClientException, err:
1414
if err.http_status != 404:
1416
conn.put_container(args[0], headers=headers)
1417
elif len(args) == 2:
1419
for item in options.meta:
1420
split_item = item.split(':')
1421
headers['X-Object-Meta-' + split_item[0]] = \
1422
len(split_item) > 1 and split_item[1]
1424
conn.post_object(args[0], args[1], headers=headers)
1425
except ClientException, err:
1426
if err.http_status != 404:
1428
error_queue.put('Object %s not found' %
1429
repr('%s/%s' % (args[0], args[1])))
1431
error_queue.put('Usage: %s [options] %s' %
1432
(basename(argv[0]), st_post_help))
1435
st_upload_help = '''
1436
upload [options] container file_or_directory [file_or_directory] [...]
1437
Uploads to the given container the files and directories specified by the
1438
remaining args. -c or --changed is an option that will only upload files
1439
that have changed since the last upload. -S <size> or --segment-size <size>
1440
and --leave-segments are options as well (see --help for more).
1444
def st_upload(options, args, print_queue, error_queue):
1445
parser.add_option('-c', '--changed', action='store_true', dest='changed',
1446
default=False, help='Will only upload files that have changed since '
1448
parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
1449
'upload files in segments no larger than <size> and then create a '
1450
'"manifest" file that will download all the segments as if it were '
1451
'the original file. The segments will be uploaded to a '
1452
'<container>_segments container so as to not pollute the main '
1453
'<container> listings.')
1454
parser.add_option('', '--leave-segments', action='store_true',
1455
dest='leave_segments', default=False, help='Indicates that you want '
1456
'the older segments of manifest objects left alone (in the case of '
1458
(options, args) = parse_args(parser, args)
1461
error_queue.put('Usage: %s [options] %s' %
1462
(basename(argv[0]), st_upload_help))
1464
object_queue = Queue(10000)
1466
def _segment_job(job, conn):
1467
if job.get('delete', False):
1468
conn.delete_object(job['container'], job['obj'])
1470
fp = open(job['path'], 'rb')
1471
fp.seek(job['segment_start'])
1472
conn.put_object(job.get('container', args[0] + '_segments'),
1473
job['obj'], fp, content_length=job['segment_size'])
1474
if options.verbose and 'log_line' in job:
1475
print_queue.put(job['log_line'])
1477
def _object_job(job, conn):
1479
container = job.get('container', args[0])
1480
dir_marker = job.get('dir_marker', False)
1483
if obj.startswith('./') or obj.startswith('.\\'):
1485
put_headers = {'x-object-meta-mtime': str(getmtime(path))}
1489
headers = conn.head_object(container, obj)
1490
ct = headers.get('content-type')
1491
cl = int(headers.get('content-length'))
1492
et = headers.get('etag')
1493
mt = headers.get('x-object-meta-mtime')
1494
if ct.split(';', 1)[0] == 'text/directory' and \
1496
et == 'd41d8cd98f00b204e9800998ecf8427e' and \
1497
mt == put_headers['x-object-meta-mtime']:
1499
except ClientException, err:
1500
if err.http_status != 404:
1502
conn.put_object(container, obj, '', content_length=0,
1503
content_type='text/directory',
1504
headers=put_headers)
1506
# We need to HEAD all objects now in case we're overwriting a
1507
# manifest object and need to delete the old segments
1510
if options.changed or not options.leave_segments:
1512
headers = conn.head_object(container, obj)
1513
cl = int(headers.get('content-length'))
1514
mt = headers.get('x-object-meta-mtime')
1515
if options.changed and cl == getsize(path) and \
1516
mt == put_headers['x-object-meta-mtime']:
1518
if not options.leave_segments:
1519
old_manifest = headers.get('x-object-manifest')
1520
except ClientException, err:
1521
if err.http_status != 404:
1523
if options.segment_size and \
1524
getsize(path) < options.segment_size:
1525
full_size = getsize(path)
1526
segment_queue = Queue(10000)
1527
segment_threads = [QueueFunctionThread(segment_queue,
1528
_segment_job, create_connection()) for _junk in
1530
for thread in segment_threads:
1534
while segment_start < full_size:
1535
segment_size = int(options.segment_size)
1536
if segment_start + segment_size > full_size:
1537
segment_size = full_size - segment_start
1538
segment_queue.put({'path': path,
1539
'obj': '%s/%s/%s/%08d' % (obj,
1540
put_headers['x-object-meta-mtime'], full_size,
1542
'segment_start': segment_start,
1543
'segment_size': segment_size,
1544
'log_line': '%s segment %s' % (obj, segment)})
1546
segment_start += segment_size
1547
while not segment_queue.empty():
1549
for thread in segment_threads:
1551
while thread.isAlive():
1553
new_object_manifest = '%s_segments/%s/%s/%s/' % (
1554
container, obj, put_headers['x-object-meta-mtime'],
1556
if old_manifest == new_object_manifest:
1558
put_headers['x-object-manifest'] = new_object_manifest
1559
conn.put_object(container, obj, '', content_length=0,
1560
headers=put_headers)
1562
conn.put_object(container, obj, open(path, 'rb'),
1563
content_length=getsize(path), headers=put_headers)
1565
segment_queue = Queue(10000)
1566
scontainer, sprefix = old_manifest.split('/', 1)
1567
for delobj in conn.get_container(scontainer,
1569
segment_queue.put({'delete': True,
1570
'container': scontainer, 'obj': delobj['name']})
1571
if not segment_queue.empty():
1572
segment_threads = [QueueFunctionThread(segment_queue,
1573
_segment_job, create_connection()) for _junk in
1575
for thread in segment_threads:
1577
while not segment_queue.empty():
1579
for thread in segment_threads:
1581
while thread.isAlive():
1584
print_queue.put(obj)
1585
except OSError, err:
1586
if err.errno != ENOENT:
1588
error_queue.put('Local file %s not found' % repr(path))
1590
def _upload_dir(path):
1591
names = listdir(path)
1593
object_queue.put({'path': path, 'dir_marker': True})
1595
for name in listdir(path):
1596
subpath = join(path, name)
1598
_upload_dir(subpath)
1600
object_queue.put({'path': subpath})
1602
url, token = get_auth(options.auth, options.user, options.key,
1604
create_connection = lambda: Connection(options.auth, options.user,
1605
options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
1606
object_threads = [QueueFunctionThread(object_queue, _object_job,
1607
create_connection()) for _junk in xrange(10)]
1608
for thread in object_threads:
1610
conn = create_connection()
1611
# Try to create the container, just in case it doesn't exist. If this
1612
# fails, it might just be because the user doesn't have container PUT
1613
# permissions, so we'll ignore any error. If there's really a problem,
1614
# it'll surface on the first object PUT.
1616
conn.put_container(args[0])
1617
if options.segment_size is not None:
1618
conn.put_container(args[0] + '_segments')
1622
for arg in args[1:]:
1626
object_queue.put({'path': arg})
1627
while not object_queue.empty():
1629
for thread in object_threads:
1631
while thread.isAlive():
1633
except ClientException, err:
1634
if err.http_status != 404:
1636
error_queue.put('Account not found')
1639
def parse_args(parser, args, enforce_requires=True):
1642
(options, args) = parser.parse_args(args)
1643
if enforce_requires and \
1644
not (options.auth and options.user and options.key):
1646
Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or
1647
overridden with -A, -U, or -K.'''.strip('\n'))
1648
return options, args
1651
if __name__ == '__main__':
1652
parser = OptionParser(version='%prog 1.0', usage='''
1653
Usage: %%prog <command> [options] [args]
1660
%(st_download_help)s
1664
%%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K key stat
1665
'''.strip('\n') % globals())
1666
parser.add_option('-s', '--snet', action='store_true', dest='snet',
1667
default=False, help='Use SERVICENET internal network')
1668
parser.add_option('-v', '--verbose', action='count', dest='verbose',
1669
default=1, help='Print more info')
1670
parser.add_option('-q', '--quiet', action='store_const', dest='verbose',
1671
const=0, default=1, help='Suppress status output')
1672
parser.add_option('-A', '--auth', dest='auth',
1673
default=environ.get('ST_AUTH'),
1674
help='URL for obtaining an auth token')
1675
parser.add_option('-U', '--user', dest='user',
1676
default=environ.get('ST_USER'),
1677
help='User name for obtaining an auth token')
1678
parser.add_option('-K', '--key', dest='key',
1679
default=environ.get('ST_KEY'),
1680
help='Key for obtaining an auth token')
1681
parser.disable_interspersed_args()
1682
(options, args) = parse_args(parser, argv[1:], enforce_requires=False)
1683
parser.enable_interspersed_args()
1685
commands = ('delete', 'download', 'list', 'post', 'stat', 'upload')
1686
if not args or args[0] not in commands:
1687
parser.print_usage()
1689
exit('no such command: %s' % args[0])
1692
print_queue = Queue(10000)
1695
if isinstance(item, unicode):
1696
item = item.encode('utf8')
1699
print_thread = QueueFunctionThread(print_queue, _print)
1700
print_thread.start()
1702
error_queue = Queue(10000)
1705
if isinstance(item, unicode):
1706
item = item.encode('utf8')
1707
print >> stderr, item
1709
error_thread = QueueFunctionThread(error_queue, _error)
1710
error_thread.start()
1713
parser.usage = globals()['st_%s_help' % args[0]]
1714
globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
1716
while not print_queue.empty():
1718
print_thread.abort = True
1719
while print_thread.isAlive():
1720
print_thread.join(0.01)
1721
while not error_queue.empty():
1723
error_thread.abort = True
1724
while error_thread.isAlive():
1725
error_thread.join(0.01)
1726
except (SystemExit, Exception):
1727
for thread in threading_enumerate():