~franciscosouza/txaws/txaws-bundled-vpc

« back to all changes in this revision

Viewing changes to txaws/s3/client.py

  • Committer: Drew Smathers
  • Date: 2012-05-16 02:19:56 UTC
  • mfrom: (133.2.11 newagent-767205)
  • Revision ID: drew.smathers@gmail.com-20120516021956-rstg41891qkb60zp
Merged newagent-767205 [r=oubiwann][f=767205]

This changes txaws client to use twisted.web.client.Agent and adds support
for plugging in custom IBodyProducer and receiver factories for requests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
74
74
class S3Client(BaseClient):
75
75
    """A client for S3."""
76
76
 
77
 
    def __init__(self, creds=None, endpoint=None, query_factory=None):
 
77
    def __init__(self, creds=None, endpoint=None, query_factory=None,
 
78
                 receiver_factory=None):
78
79
        if query_factory is None:
79
80
            query_factory = Query
80
 
        super(S3Client, self).__init__(creds, endpoint, query_factory)
 
81
        super(S3Client, self).__init__(creds, endpoint, query_factory,
 
82
                                       receiver_factory=receiver_factory)
81
83
 
82
84
    def list_buckets(self):
83
85
        """
87
89
        the request.
88
90
        """
89
91
        query = self.query_factory(
90
 
            action="GET", creds=self.creds, endpoint=self.endpoint)
 
92
            action="GET", creds=self.creds, endpoint=self.endpoint,
 
93
            receiver_factory=self.receiver_factory)
91
94
        d = query.submit()
92
95
        return d.addCallback(self._parse_list_buckets)
93
96
 
131
134
        """
132
135
        query = self.query_factory(
133
136
            action="GET", creds=self.creds, endpoint=self.endpoint,
134
 
            bucket=bucket)
 
137
            bucket=bucket, receiver_factory=self.receiver_factory)
135
138
        d = query.submit()
136
139
        return d.addCallback(self._parse_get_bucket)
137
140
 
174
177
        """
175
178
        query = self.query_factory(action="GET", creds=self.creds,
176
179
                                   endpoint=self.endpoint, bucket=bucket,
177
 
                                   object_name="?location")
 
180
                                   object_name="?location",
 
181
                                   receiver_factory=self.receiver_factory)
178
182
        d = query.submit()
179
183
        return d.addCallback(self._parse_bucket_location)
180
184
 
193
197
        """
194
198
        query = self.query_factory(
195
199
            action='GET', creds=self.creds, endpoint=self.endpoint,
196
 
            bucket=bucket, object_name='?lifecycle')
 
200
            bucket=bucket, object_name='?lifecycle',
 
201
            receiver_factory=self.receiver_factory)
197
202
        return query.submit().addCallback(self._parse_lifecycle_config)
198
203
 
199
204
    def _parse_lifecycle_config(self, xml_bytes):
221
226
        """
222
227
        query = self.query_factory(
223
228
            action='GET', creds=self.creds, endpoint=self.endpoint,
224
 
            bucket=bucket, object_name='?website')
 
229
            bucket=bucket, object_name='?website',
 
230
            receiver_factory=self.receiver_factory)
225
231
        return query.submit().addCallback(self._parse_website_config)
226
232
 
227
233
    def _parse_website_config(self, xml_bytes):
242
248
        """
243
249
        query = self.query_factory(
244
250
            action='GET', creds=self.creds, endpoint=self.endpoint,
245
 
            bucket=bucket, object_name='?notification')
 
251
            bucket=bucket, object_name='?notification',
 
252
            receiver_factory=self.receiver_factory)
246
253
        return query.submit().addCallback(self._parse_notification_config)
247
254
 
248
255
    def _parse_notification_config(self, xml_bytes):
262
269
        """
263
270
        query = self.query_factory(
264
271
            action='GET', creds=self.creds, endpoint=self.endpoint,
265
 
            bucket=bucket, object_name='?versioning')
 
272
            bucket=bucket, object_name='?versioning',
 
273
            receiver_factory=self.receiver_factory)
266
274
        return query.submit().addCallback(self._parse_versioning_config)
267
275
 
268
276
    def _parse_versioning_config(self, xml_bytes):
279
287
        """
280
288
        query = self.query_factory(
281
289
            action='GET', creds=self.creds, endpoint=self.endpoint,
282
 
            bucket=bucket, object_name='?acl')
 
290
            bucket=bucket, object_name='?acl',
 
291
            receiver_factory=self.receiver_factory)
283
292
        return query.submit().addCallback(self._parse_acl)
284
293
 
285
294
    def put_bucket_acl(self, bucket, access_control_policy):
289
298
        data = access_control_policy.to_xml()
290
299
        query = self.query_factory(
291
300
            action='PUT', creds=self.creds, endpoint=self.endpoint,
292
 
            bucket=bucket, object_name='?acl', data=data)
 
301
            bucket=bucket, object_name='?acl', data=data,
 
302
            receiver_factory=self.receiver_factory)
293
303
        return query.submit().addCallback(self._parse_acl)
294
304
 
295
305
    def _parse_acl(self, xml_bytes):
299
309
        """
300
310
        return AccessControlPolicy.from_xml(xml_bytes)
301
311
 
302
 
    def put_object(self, bucket, object_name, data, content_type=None,
303
 
                   metadata={}, amz_headers={}):
 
312
    def put_object(self, bucket, object_name, data=None, content_type=None,
 
313
                   metadata={}, amz_headers={}, body_producer=None):
304
314
        """
305
315
        Put an object in a bucket.
306
316
 
318
328
            action="PUT", creds=self.creds, endpoint=self.endpoint,
319
329
            bucket=bucket, object_name=object_name, data=data,
320
330
            content_type=content_type, metadata=metadata,
321
 
            amz_headers=amz_headers)
 
331
            amz_headers=amz_headers, body_producer=body_producer,
 
332
            receiver_factory=self.receiver_factory)
322
333
        return query.submit()
323
334
 
324
335
    def copy_object(self, source_bucket, source_object_name, dest_bucket=None,
344
355
        query = self.query_factory(
345
356
            action="PUT", creds=self.creds, endpoint=self.endpoint,
346
357
            bucket=dest_bucket, object_name=dest_object_name,
347
 
            metadata=metadata, amz_headers=amz_headers)
 
358
            metadata=metadata, amz_headers=amz_headers,
 
359
            receiver_factory=self.receiver_factory)
348
360
        return query.submit()
349
361
 
350
362
    def get_object(self, bucket, object_name):
353
365
        """
354
366
        query = self.query_factory(
355
367
            action="GET", creds=self.creds, endpoint=self.endpoint,
356
 
            bucket=bucket, object_name=object_name)
 
368
            bucket=bucket, object_name=object_name,
 
369
            receiver_factory=self.receiver_factory)
357
370
        return query.submit()
358
371
 
359
372
    def head_object(self, bucket, object_name):
384
397
        data = access_control_policy.to_xml()
385
398
        query = self.query_factory(
386
399
            action='PUT', creds=self.creds, endpoint=self.endpoint,
387
 
            bucket=bucket, object_name='%s?acl' % object_name, data=data)
 
400
            bucket=bucket, object_name='%s?acl' % object_name, data=data,
 
401
            receiver_factory=self.receiver_factory)
388
402
        return query.submit().addCallback(self._parse_acl)
389
403
 
390
404
    def get_object_acl(self, bucket, object_name):
393
407
        """
394
408
        query = self.query_factory(
395
409
            action='GET', creds=self.creds, endpoint=self.endpoint,
396
 
            bucket=bucket, object_name='%s?acl' % object_name)
 
410
            bucket=bucket, object_name='%s?acl' % object_name,
 
411
            receiver_factory=self.receiver_factory)
397
412
        return query.submit().addCallback(self._parse_acl)
398
413
 
399
414
    def put_request_payment(self, bucket, payer):
407
422
        data = RequestPayment(payer).to_xml()
408
423
        query = self.query_factory(
409
424
            action="PUT", creds=self.creds, endpoint=self.endpoint,
410
 
            bucket=bucket, object_name="?requestPayment", data=data)
 
425
            bucket=bucket, object_name="?requestPayment", data=data,
 
426
            receiver_factory=self.receiver_factory)
411
427
        return query.submit()
412
428
 
413
429
    def get_request_payment(self, bucket):
419
435
        """
420
436
        query = self.query_factory(
421
437
            action="GET", creds=self.creds, endpoint=self.endpoint,
422
 
            bucket=bucket, object_name="?requestPayment")
 
438
            bucket=bucket, object_name="?requestPayment",
 
439
            receiver_factory=self.receiver_factory)
423
440
        return query.submit().addCallback(self._parse_get_request_payment)
424
441
 
425
442
    def _parse_get_request_payment(self, xml_bytes):
434
451
    """A query for submission to the S3 service."""
435
452
 
436
453
    def __init__(self, bucket=None, object_name=None, data="",
437
 
                 content_type=None, metadata={}, amz_headers={}, *args,
438
 
                 **kwargs):
 
454
                 content_type=None, metadata={}, amz_headers={},
 
455
                 body_producer=None, *args, **kwargs):
439
456
        super(Query, self).__init__(*args, **kwargs)
440
457
        self.bucket = bucket
441
458
        self.object_name = object_name
442
459
        self.data = data
 
460
        self.body_producer = body_producer
443
461
        self.content_type = content_type
444
462
        self.metadata = metadata
445
463
        self.amz_headers = amz_headers
463
481
        """
464
482
        Build the list of headers needed in order to perform S3 operations.
465
483
        """
466
 
        headers = {"Content-Length": len(self.data),
467
 
                   "Content-MD5": calculate_md5(self.data),
 
484
        if self.body_producer:
 
485
            content_length = self.body_producer.length
 
486
        else:
 
487
            content_length = len(self.data)
 
488
        headers = {"Content-Length": content_length,
468
489
                   "Date": self.date}
 
490
        if self.body_producer is None:
 
491
            headers["Content-MD5"] = calculate_md5(self.data)
469
492
        for key, value in self.metadata.iteritems():
470
493
            headers["x-amz-meta-" + key] = value
471
494
        for key, value in self.amz_headers.iteritems():
529
552
                self.endpoint, self.bucket, self.object_name)
530
553
        d = self.get_page(
531
554
            url_context.get_url(), method=self.action, postdata=self.data,
532
 
            headers=self.get_headers())
 
555
            headers=self.get_headers(), body_producer=self.body_producer,
 
556
            receiver_factory=self.receiver_factory)
533
557
        return d.addErrback(s3_error_wrapper)