12
6
Various API-incompatible changes are planned in order to expose missing
13
7
functionality in this wrapper.
10
from epsilon.extime import Time
12
from twisted.web.client import getPage
17
13
from twisted.web.http import datetimeToString
19
from dateutil.parser import parse as parseTime
21
from txaws.client.base import BaseClient, BaseQuery, error_wrapper
22
from txaws.s3.acls import AccessControlPolicy
23
from txaws.s3.model import (
24
Bucket, BucketItem, BucketListing, ItemOwner, LifecycleConfiguration,
25
LifecycleConfigurationRule, NotificationConfiguration, RequestPayment,
26
VersioningConfiguration, WebsiteConfiguration, MultipartInitiationResponse,
27
MultipartCompletionResponse)
28
from txaws.s3.exception import S3Error
29
from txaws.service import AWSServiceEndpoint, S3_ENDPOINT
15
from txaws.service import AWSServiceEndpoint
30
16
from txaws.util import XML, calculate_md5
33
def s3_error_wrapper(error):
34
error_wrapper(error, S3Error)
37
class URLContext(object):
39
The hosts and the paths that form an S3 endpoint change depending upon the
40
context in which they are called. While S3 supports bucket names in the
41
host name, we use the convention of providing it in the path so that
42
using IP addresses and alternative implementations of S3 actually works
45
def __init__(self, service_endpoint, bucket="", object_name=""):
46
self.endpoint = service_endpoint
48
self.object_name = object_name
51
return self.endpoint.get_host()
55
if self.bucket is not None:
57
if self.bucket is not None and self.object_name:
58
if not self.object_name.startswith("/"):
60
path += self.object_name
61
elif self.bucket is not None and not path.endswith("/"):
66
if self.endpoint.port is not None:
67
return "%s://%s:%d%s" % (
68
self.endpoint.scheme, self.get_host(), self.endpoint.port,
71
return "%s://%s%s" % (
72
self.endpoint.scheme, self.get_host(), self.get_path())
75
class S3Client(BaseClient):
76
"""A client for S3."""
78
def __init__(self, creds=None, endpoint=None, query_factory=None,
79
receiver_factory=None):
80
if query_factory is None:
82
super(S3Client, self).__init__(creds, endpoint, query_factory,
83
receiver_factory=receiver_factory)
85
def list_buckets(self):
89
Returns a list of all the buckets owned by the authenticated sender of
92
query = self.query_factory(
93
action="GET", creds=self.creds, endpoint=self.endpoint,
94
receiver_factory=self.receiver_factory)
96
return d.addCallback(self._parse_list_buckets)
98
def _parse_list_buckets(self, xml_bytes):
100
Parse XML bucket list response.
102
root = XML(xml_bytes)
104
for bucket_data in root.find("Buckets"):
105
name = bucket_data.findtext("Name")
106
date_text = bucket_data.findtext("CreationDate")
107
date_time = parseTime(date_text)
108
bucket = Bucket(name, date_time)
109
buckets.append(bucket)
112
def create_bucket(self, bucket):
116
query = self.query_factory(
117
action="PUT", creds=self.creds, endpoint=self.endpoint,
119
return query.submit()
121
def delete_bucket(self, bucket):
125
The bucket must be empty before it can be deleted.
127
query = self.query_factory(
128
action="DELETE", creds=self.creds, endpoint=self.endpoint,
130
return query.submit()
132
def get_bucket(self, bucket):
134
Get a list of all the objects in a bucket.
136
query = self.query_factory(
137
action="GET", creds=self.creds, endpoint=self.endpoint,
138
bucket=bucket, receiver_factory=self.receiver_factory)
140
return d.addCallback(self._parse_get_bucket)
142
def _parse_get_bucket(self, xml_bytes):
143
root = XML(xml_bytes)
144
name = root.findtext("Name")
145
prefix = root.findtext("Prefix")
146
marker = root.findtext("Marker")
147
max_keys = root.findtext("MaxKeys")
148
is_truncated = root.findtext("IsTruncated")
151
for content_data in root.findall("Contents"):
152
key = content_data.findtext("Key")
153
date_text = content_data.findtext("LastModified")
154
modification_date = parseTime(date_text)
155
etag = content_data.findtext("ETag")
156
size = content_data.findtext("Size")
157
storage_class = content_data.findtext("StorageClass")
158
owner_id = content_data.findtext("Owner/ID")
159
owner_display_name = content_data.findtext("Owner/DisplayName")
160
owner = ItemOwner(owner_id, owner_display_name)
161
content_item = BucketItem(key, modification_date, etag, size,
162
storage_class, owner)
163
contents.append(content_item)
166
for prefix_data in root.findall("CommonPrefixes"):
167
common_prefixes.append(prefix_data.text)
169
return BucketListing(name, prefix, marker, max_keys, is_truncated,
170
contents, common_prefixes)
172
def get_bucket_location(self, bucket):
174
Get the location (region) of a bucket.
176
@param bucket: The name of the bucket.
177
@return: A C{Deferred} that will fire with the bucket's region.
179
query = self.query_factory(action="GET", creds=self.creds,
180
endpoint=self.endpoint, bucket=bucket,
181
object_name="?location",
182
receiver_factory=self.receiver_factory)
184
return d.addCallback(self._parse_bucket_location)
186
def _parse_bucket_location(self, xml_bytes):
187
"""Parse a C{LocationConstraint} XML document."""
188
root = XML(xml_bytes)
189
return root.text or ""
191
def get_bucket_lifecycle(self, bucket):
193
Get the lifecycle configuration of a bucket.
195
@param bucket: The name of the bucket.
196
@return: A C{Deferred} that will fire with the bucket's lifecycle
199
query = self.query_factory(
200
action='GET', creds=self.creds, endpoint=self.endpoint,
201
bucket=bucket, object_name='?lifecycle',
202
receiver_factory=self.receiver_factory)
203
return query.submit().addCallback(self._parse_lifecycle_config)
205
def _parse_lifecycle_config(self, xml_bytes):
206
"""Parse a C{LifecycleConfiguration} XML document."""
207
root = XML(xml_bytes)
210
for content_data in root.findall("Rule"):
211
id = content_data.findtext("ID")
212
prefix = content_data.findtext("Prefix")
213
status = content_data.findtext("Status")
214
expiration = int(content_data.findtext("Expiration/Days"))
216
LifecycleConfigurationRule(id, prefix, status, expiration))
218
return LifecycleConfiguration(rules)
220
def get_bucket_website_config(self, bucket):
222
Get the website configuration of a bucket.
224
@param bucket: The name of the bucket.
225
@return: A C{Deferred} that will fire with the bucket's website
228
query = self.query_factory(
229
action='GET', creds=self.creds, endpoint=self.endpoint,
230
bucket=bucket, object_name='?website',
231
receiver_factory=self.receiver_factory)
232
return query.submit().addCallback(self._parse_website_config)
234
def _parse_website_config(self, xml_bytes):
235
"""Parse a C{WebsiteConfiguration} XML document."""
236
root = XML(xml_bytes)
237
index_suffix = root.findtext("IndexDocument/Suffix")
238
error_key = root.findtext("ErrorDocument/Key")
240
return WebsiteConfiguration(index_suffix, error_key)
242
def get_bucket_notification_config(self, bucket):
244
Get the notification configuration of a bucket.
246
@param bucket: The name of the bucket.
247
@return: A C{Deferred} that will request the bucket's notification
250
query = self.query_factory(
251
action='GET', creds=self.creds, endpoint=self.endpoint,
252
bucket=bucket, object_name='?notification',
253
receiver_factory=self.receiver_factory)
254
return query.submit().addCallback(self._parse_notification_config)
256
def _parse_notification_config(self, xml_bytes):
257
"""Parse a C{NotificationConfiguration} XML document."""
258
root = XML(xml_bytes)
259
topic = root.findtext("TopicConfiguration/Topic")
260
event = root.findtext("TopicConfiguration/Event")
262
return NotificationConfiguration(topic, event)
264
def get_bucket_versioning_config(self, bucket):
266
Get the versioning configuration of a bucket.
268
@param bucket: The name of the bucket. @return: A C{Deferred} that
269
will request the bucket's versioning configuration.
271
query = self.query_factory(
272
action='GET', creds=self.creds, endpoint=self.endpoint,
273
bucket=bucket, object_name='?versioning',
274
receiver_factory=self.receiver_factory)
275
return query.submit().addCallback(self._parse_versioning_config)
277
def _parse_versioning_config(self, xml_bytes):
278
"""Parse a C{VersioningConfiguration} XML document."""
279
root = XML(xml_bytes)
280
mfa_delete = root.findtext("MfaDelete")
281
status = root.findtext("Status")
283
return VersioningConfiguration(mfa_delete=mfa_delete, status=status)
285
def get_bucket_acl(self, bucket):
287
Get the access control policy for a bucket.
289
query = self.query_factory(
290
action='GET', creds=self.creds, endpoint=self.endpoint,
291
bucket=bucket, object_name='?acl',
292
receiver_factory=self.receiver_factory)
293
return query.submit().addCallback(self._parse_acl)
295
def put_bucket_acl(self, bucket, access_control_policy):
297
Set access control policy on a bucket.
299
data = access_control_policy.to_xml()
300
query = self.query_factory(
301
action='PUT', creds=self.creds, endpoint=self.endpoint,
302
bucket=bucket, object_name='?acl', data=data,
303
receiver_factory=self.receiver_factory)
304
return query.submit().addCallback(self._parse_acl)
306
def _parse_acl(self, xml_bytes):
308
Parse an C{AccessControlPolicy} XML document and convert it into an
309
L{AccessControlPolicy} instance.
311
return AccessControlPolicy.from_xml(xml_bytes)
313
def put_object(self, bucket, object_name, data=None, content_type=None,
314
metadata={}, amz_headers={}, body_producer=None):
316
Put an object in a bucket.
318
An existing object with the same name will be replaced.
320
@param bucket: The name of the bucket.
321
@param object: The name of the object.
322
@param data: The data to write.
323
@param content_type: The type of data being written.
324
@param metadata: A C{dict} used to build C{x-amz-meta-*} headers.
325
@param amz_headers: A C{dict} used to build C{x-amz-*} headers.
326
@return: A C{Deferred} that will fire with the result of request.
328
query = self.query_factory(
329
action="PUT", creds=self.creds, endpoint=self.endpoint,
330
bucket=bucket, object_name=object_name, data=data,
331
content_type=content_type, metadata=metadata,
332
amz_headers=amz_headers, body_producer=body_producer,
333
receiver_factory=self.receiver_factory)
334
return query.submit()
336
def copy_object(self, source_bucket, source_object_name, dest_bucket=None,
337
dest_object_name=None, metadata={}, amz_headers={}):
339
Copy an object stored in S3 from a source bucket to a destination
342
@param source_bucket: The S3 bucket to copy the object from.
343
@param source_object_name: The name of the object to copy.
344
@param dest_bucket: Optionally, the S3 bucket to copy the object to.
345
Defaults to C{source_bucket}.
346
@param dest_object_name: Optionally, the name of the new object.
347
Defaults to C{source_object_name}.
348
@param metadata: A C{dict} used to build C{x-amz-meta-*} headers.
349
@param amz_headers: A C{dict} used to build C{x-amz-*} headers.
350
@return: A C{Deferred} that will fire with the result of request.
352
dest_bucket = dest_bucket or source_bucket
353
dest_object_name = dest_object_name or source_object_name
354
amz_headers["copy-source"] = "/%s/%s" % (source_bucket,
356
query = self.query_factory(
357
action="PUT", creds=self.creds, endpoint=self.endpoint,
358
bucket=dest_bucket, object_name=dest_object_name,
359
metadata=metadata, amz_headers=amz_headers,
360
receiver_factory=self.receiver_factory)
361
return query.submit()
363
def get_object(self, bucket, object_name):
365
Get an object from a bucket.
367
query = self.query_factory(
368
action="GET", creds=self.creds, endpoint=self.endpoint,
369
bucket=bucket, object_name=object_name,
370
receiver_factory=self.receiver_factory)
371
return query.submit()
373
def head_object(self, bucket, object_name):
375
Retrieve object metadata only.
377
query = self.query_factory(
378
action="HEAD", creds=self.creds, endpoint=self.endpoint,
379
bucket=bucket, object_name=object_name)
381
return d.addCallback(query.get_response_headers)
383
def delete_object(self, bucket, object_name):
385
Delete an object from a bucket.
387
Once deleted, there is no method to restore or undelete an object.
389
query = self.query_factory(
390
action="DELETE", creds=self.creds, endpoint=self.endpoint,
391
bucket=bucket, object_name=object_name)
392
return query.submit()
394
def put_object_acl(self, bucket, object_name, access_control_policy):
396
Set access control policy on an object.
398
data = access_control_policy.to_xml()
399
query = self.query_factory(
400
action='PUT', creds=self.creds, endpoint=self.endpoint,
401
bucket=bucket, object_name='%s?acl' % object_name, data=data,
402
receiver_factory=self.receiver_factory)
403
return query.submit().addCallback(self._parse_acl)
405
def get_object_acl(self, bucket, object_name):
407
Get the access control policy for an object.
409
query = self.query_factory(
410
action='GET', creds=self.creds, endpoint=self.endpoint,
411
bucket=bucket, object_name='%s?acl' % object_name,
412
receiver_factory=self.receiver_factory)
413
return query.submit().addCallback(self._parse_acl)
415
def put_request_payment(self, bucket, payer):
417
Set request payment configuration on bucket to payer.
419
@param bucket: The name of the bucket.
420
@param payer: The name of the payer.
421
@return: A C{Deferred} that will fire with the result of the request.
423
data = RequestPayment(payer).to_xml()
424
query = self.query_factory(
425
action="PUT", creds=self.creds, endpoint=self.endpoint,
426
bucket=bucket, object_name="?requestPayment", data=data,
427
receiver_factory=self.receiver_factory)
428
return query.submit()
430
def get_request_payment(self, bucket):
432
Get the request payment configuration on a bucket.
434
@param bucket: The name of the bucket.
435
@return: A C{Deferred} that will fire with the name of the payer.
437
query = self.query_factory(
438
action="GET", creds=self.creds, endpoint=self.endpoint,
439
bucket=bucket, object_name="?requestPayment",
440
receiver_factory=self.receiver_factory)
441
return query.submit().addCallback(self._parse_get_request_payment)
443
def _parse_get_request_payment(self, xml_bytes):
445
Parse a C{RequestPaymentConfiguration} XML document and extract the
448
return RequestPayment.from_xml(xml_bytes).payer
450
def init_multipart_upload(self, bucket, object_name, content_type=None,
451
amz_headers={}, metadata={}):
453
Initiate a multipart upload to a bucket.
455
@param bucket: The name of the bucket
456
@param object_name: The object name
457
@param content_type: The Content-Type for the object
458
@param metadata: C{dict} containing additional metadata
459
@param amz_headers: A C{dict} used to build C{x-amz-*} headers.
460
@return: C{str} upload_id
462
objectname_plus = '%s?uploads' % object_name
463
query = self.query_factory(
464
action="POST", creds=self.creds, endpoint=self.endpoint,
465
bucket=bucket, object_name=objectname_plus, data='',
466
content_type=content_type, amz_headers=amz_headers,
469
return d.addCallback(MultipartInitiationResponse.from_xml)
471
def upload_part(self, bucket, object_name, upload_id, part_number,
472
data=None, content_type=None, metadata={},
475
Upload a part of data corresponding to a multipart upload.
477
@param bucket: The bucket name
478
@param object_name: The object name
479
@param upload_id: The multipart upload id
480
@param part_number: The part number
481
@param data: Data (optional, requires body_producer if not specified)
482
@param content_type: The Content-Type
483
@param metadata: Additional metadata
484
@param body_producer: an C{IBodyProducer} (optional, requires data if
486
@return: the C{Deferred} from underlying query.submit() call
488
parms = 'partNumber=%s&uploadId=%s' % (str(part_number), upload_id)
489
objectname_plus = '%s?%s' % (object_name, parms)
490
query = self.query_factory(
491
action="PUT", creds=self.creds, endpoint=self.endpoint,
492
bucket=bucket, object_name=objectname_plus, data=data,
493
content_type=content_type, metadata=metadata,
494
body_producer=body_producer, receiver_factory=self.receiver_factory)
496
return d.addCallback(query.get_response_headers)
498
def complete_multipart_upload(self, bucket, object_name, upload_id,
499
parts_list, content_type=None, metadata={}):
501
Complete a multipart upload.
503
N.B. This can be possibly be a slow operation.
505
@param bucket: The bucket name
506
@param object_name: The object name
507
@param upload_id: The multipart upload id
508
@param parts_list: A List of all the parts
509
(2-tuples of part sequence number and etag)
510
@param content_type: The Content-Type of the object
511
@param metadata: C{dict} containing additional metadata
512
@return: a C{Deferred} that fires after request is complete
514
data = self._build_complete_multipart_upload_xml(parts_list)
515
objectname_plus = '%s?uploadId=%s' % (object_name, upload_id)
516
query = self.query_factory(
517
action="POST", creds=self.creds, endpoint=self.endpoint,
518
bucket=bucket, object_name=objectname_plus, data=data,
519
content_type=content_type, metadata=metadata)
521
# TODO - handle error responses
522
return d.addCallback(MultipartCompletionResponse.from_xml)
524
def _build_complete_multipart_upload_xml(self, parts_list):
526
parts_list.sort(key=lambda p: int(p[0]))
527
xml.append('<CompleteMultipartUpload>')
528
for pt in parts_list:
530
xml.append('<PartNumber>%s</PartNumber>' % pt[0])
531
xml.append('<ETag>%s</ETag>' % pt[1])
532
xml.append('</Part>')
533
xml.append('</CompleteMultipartUpload>')
534
return '\n'.join(xml)
537
class Query(BaseQuery):
538
"""A query for submission to the S3 service."""
540
def __init__(self, bucket=None, object_name=None, data="",
541
content_type=None, metadata={}, amz_headers={},
542
body_producer=None, *args, **kwargs):
543
super(Query, self).__init__(*args, **kwargs)
19
class S3Request(object):
21
def __init__(self, verb, bucket=None, object_name=None, data="",
22
content_type=None, metadata={}, creds=None, endpoint=None):
544
24
self.bucket = bucket
545
25
self.object_name = object_name
547
self.body_producer = body_producer
548
27
self.content_type = content_type
549
28
self.metadata = metadata
550
self.amz_headers = amz_headers
30
self.endpoint = endpoint
551
31
self.date = datetimeToString()
552
if not self.endpoint or not self.endpoint.host:
553
self.endpoint = AWSServiceEndpoint(S3_ENDPOINT)
554
self.endpoint.set_method(self.action)
556
def set_content_type(self):
558
Set the content type based on the file extension used in the object
561
if self.object_name and not self.content_type:
562
# XXX nothing is currently done with the encoding... we may
563
# need to in the future
564
self.content_type, encoding = mimetypes.guess_type(
565
self.object_name, strict=False)
35
if self.bucket is not None:
37
if self.object_name is not None:
38
path += "/" + self.object_name
42
if self.endpoint is None:
43
self.endpoint = AWSServiceEndpoint()
44
self.endpoint.set_path(self.get_path())
45
return self.endpoint.get_uri()
567
47
def get_headers(self):
569
Build the list of headers needed in order to perform S3 operations.
571
if self.body_producer:
572
content_length = self.body_producer.length
574
content_length = len(self.data)
575
headers = {"Content-Length": content_length,
48
headers = {"Content-Length": len(self.data),
49
"Content-MD5": calculate_md5(self.data),
576
50
"Date": self.date}
577
if self.body_producer is None:
578
headers["Content-MD5"] = calculate_md5(self.data)
579
52
for key, value in self.metadata.iteritems():
580
53
headers["x-amz-meta-" + key] = value
581
for key, value in self.amz_headers.iteritems():
582
headers["x-amz-" + key] = value
583
# Before we check if the content type is set, let's see if we can set
584
# it by guessing the the mimetype.
585
self.set_content_type()
586
55
if self.content_type is not None:
587
56
headers["Content-Type"] = self.content_type
588
58
if self.creds is not None:
589
signature = self.sign(headers)
59
signature = self.get_signature(headers)
590
60
headers["Authorization"] = "AWS %s:%s" % (
591
61
self.creds.access_key, signature)
64
def get_canonicalized_resource(self):
65
return self.get_path()
594
67
def get_canonicalized_amz_headers(self, headers):
596
Get the headers defined by Amazon S3.
599
(name.lower(), value) for name, value in headers.iteritems()
69
headers = [(name.lower(), value) for name, value in headers.iteritems()
600
70
if name.lower().startswith("x-amz-")]
602
# XXX missing spec implementation:
603
# 1) txAWS doesn't currently combine headers with the same name
604
# 2) txAWS doesn't currently unfold long headers
605
72
return "".join("%s:%s\n" % (name, value) for name, value in headers)
607
def get_canonicalized_resource(self):
609
Get an S3 resource path.
612
if self.bucket is not None:
614
if self.bucket is not None and self.object_name:
615
if not self.object_name.startswith("/"):
617
path += self.object_name
618
elif self.bucket is not None and not path.endswith("/"):
622
def sign(self, headers):
623
"""Sign this query using its built in credentials."""
624
text = (self.action + "\n" +
74
def get_signature(self, headers):
75
text = (self.verb + "\n" +
625
76
headers.get("Content-MD5", "") + "\n" +
626
77
headers.get("Content-Type", "") + "\n" +
627
78
headers.get("Date", "") + "\n" +
628
79
self.get_canonicalized_amz_headers(headers) +
629
80
self.get_canonicalized_resource())
630
return self.creds.sign(text, hash_type="sha1")
632
def submit(self, url_context=None):
633
"""Submit this query.
635
@return: A deferred from get_page
638
url_context = URLContext(
639
self.endpoint, self.bucket, self.object_name)
641
url_context.get_url(), method=self.action, postdata=self.data,
642
headers=self.get_headers(), body_producer=self.body_producer,
643
receiver_factory=self.receiver_factory)
644
return d.addErrback(s3_error_wrapper)
81
return self.creds.sign(text)
84
return self.get_page(url=self.get_uri(), method=self.verb,
85
postdata=self.data, headers=self.get_headers())
87
def get_page(self, *a, **kw):
88
return getPage(*a, **kw)
93
request_factory = S3Request
95
def __init__(self, creds, endpoint):
97
self.endpoint = endpoint
99
def make_request(self, *a, **kw):
101
Create a request with the arguments passed in.
103
This uses the request_factory attribute, adding the creds and endpoint
104
to the arguments passed in.
106
return self.request_factory(creds=self.creds, endpoint=self.endpoint,
109
def _parse_bucket_list(self, response):
111
Parse XML bucket list response.
114
for bucket in root.find("Buckets"):
115
timeText = bucket.findtext("CreationDate")
117
"name": bucket.findtext("Name"),
118
"created": Time.fromISO8601TimeAndDate(timeText),
121
def list_buckets(self):
125
Returns a list of all the buckets owned by the authenticated sender of
128
deferred = self.make_request("GET").submit()
129
deferred.addCallback(self._parse_bucket_list)
132
def create_bucket(self, bucket):
136
return self.make_request("PUT", bucket).submit()
138
def delete_bucket(self, bucket):
142
The bucket must be empty before it can be deleted.
144
return self.make_request("DELETE", bucket).submit()
146
def put_object(self, bucket, object_name, data, content_type=None,
149
Put an object in a bucket.
151
Any existing object of the same name will be replaced.
153
return self.make_request("PUT", bucket, object_name, data,
154
content_type, metadata).submit()
156
def get_object(self, bucket, object_name):
158
Get an object from a bucket.
160
return self.make_request("GET", bucket, object_name).submit()
162
def head_object(self, bucket, object_name):
164
Retrieve object metadata only.
166
This is like get_object, but the object's content is not retrieved.
167
Currently the metadata is not returned to the caller either, so this
168
method is mostly useless, and only provided for completeness.
170
return self.make_request("HEAD", bucket, object_name).submit()
172
def delete_object(self, bucket, object_name):
174
Delete an object from a bucket.
176
Once deleted, there is no method to restore or undelete an object.
178
return self.make_request("DELETE", bucket, object_name).submit()