~stewart/drizzle/bug911643

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSS3Protocol.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 *  Created by Barry Leslie on 10/02/09.
20
 
 *
21
 
 */
22
 
#include "CSConfig.h"
23
 
#include <inttypes.h>
24
 
#include <stdlib.h>
25
 
 
26
 
#include <curl/curl.h>
27
 
 
28
 
#include "CSGlobal.h"
29
 
#include "CSString.h"
30
 
#include "CSStrUtil.h"
31
 
#include "CSEncode.h"
32
 
#include "CSS3Protocol.h"
33
 
#include "CSXML.h"
34
 
 
35
 
#ifdef S3_UNIT_TEST
36
 
//#define SHOW_SIGNING
37
 
// Uncomment this line to trace network action during request. Very Usefull!!
38
 
#define DEBUG_CURL
39
 
#define DUMP_ERRORS
40
 
#endif
41
 
 
42
 
//#define DUMP_ERRORS
43
 
//#define SHOW_SIGNING
44
 
 
45
 
#define HEX_CHECKSUM_VALUE_SIZE (2 *CHECKSUM_VALUE_SIZE)
46
 
 
47
 
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
48
 
 
49
 
//-------------------------------
50
 
static const char *retryCodes[] = {
51
 
        "ExpiredToken",
52
 
        "InternalError",
53
 
        "OperationAborted",
54
 
        "RequestTimeout",
55
 
        "SlowDown",
56
 
        NULL
57
 
};
58
 
 
59
 
//======================================
60
 
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
61
 
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
62
 
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
63
 
 
64
 
class S3ProtocolCon : CSXMLBuffer, public CSObject {
65
 
 
66
 
        private:
67
 
        
68
 
        virtual bool openNode(char *path, char *value) {
69
 
                if (value && *value && (strcmp(path,"/error/code/") == 0)) {
70
 
                        printf("S3 ERROR Code: %s\n", value);
71
 
                        for (int i = 0; retryCodes[i] && !ms_retry; i++)
72
 
                                ms_retry = (strcmp(value, retryCodes[i]) == 0);
73
 
                                
74
 
                        if (ms_retry && !strcmp("slowdown", value)) 
75
 
                                ms_slowDown = true;
76
 
                } else if (value && *value && (strcmp(path,"/error/message/") == 0)) {
77
 
                        printf("S3 ERROR MESSAGE: %s\n", value);
78
 
                }
79
 
                return true;
80
 
        }
81
 
 
82
 
        virtual bool closeNode(char *path) {
83
 
                (void)path;
84
 
                return true;
85
 
        }
86
 
 
87
 
        virtual bool addAttribute(char *path, char *name, char *value) {
88
 
                (void)path;
89
 
                (void)name;
90
 
                (void)value;
91
 
                return true;
92
 
        }
93
 
        
94
 
        //-------------------------------
95
 
        void parse_s3_error()
96
 
        {
97
 
                enter_();
98
 
 
99
 
                if (!ms_errorReply)
100
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing HTTP reply: possible S3 connection failure.");
101
 
 
102
 
        #ifdef DUMP_ERRORS
103
 
                printf("ms_errorReply:\n===========\n%s\n===========\n", ms_errorReply->getCString());
104
 
        #endif
105
 
                
106
 
                if (!parseData(ms_errorReply->getCString(), ms_errorReply->length(), 0)){
107
 
                        int             err;
108
 
                        char    *msg;
109
 
 
110
 
                        getError(&err, &msg);
111
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
112
 
                }
113
 
                
114
 
                exit_();
115
 
        }
116
 
        
117
 
        public:
118
 
        
119
 
        CSHTTPHeaders   ms_reply_headers;
120
 
        CSStringBuffer  ms_buffer; // A scratch buffer
121
 
 
122
 
        CURL                    *ms_curl;       
123
 
        struct curl_slist       *ms_header_list;        // A curl list of headers to be sent with the next request.
124
 
 
125
 
        CSInputStream   *ms_inputStream;
126
 
        CSOutputStream  *ms_outputStream;
127
 
        
128
 
        CSMd5                   ms_md5;
129
 
        char                    ms_s3Checksum[HEX_CHECKSUM_VALUE_SIZE +1];
130
 
        bool                    ms_calculate_md5;
131
 
        
132
 
        bool                    ms_notFound; // True if the object could not be found
133
 
        bool                    ms_retry; // True if the request failed with a retry error.
134
 
        bool                    ms_slowDown;
135
 
        
136
 
        CSStringBuffer  *ms_errorReply;
137
 
        char                    ms_curl_error[CURL_ERROR_SIZE];
138
 
        
139
 
        off64_t                 ms_data_size;
140
 
        
141
 
        unsigned int    ms_replyStatus;
142
 
        bool                    ms_throw_error; // Gets set if an exception occurs in a callback.
143
 
        bool                    ms_old_libcurl;
144
 
        char                    *ms_safe_url;
145
 
        time_t                  ms_last_modified;
146
 
        
147
 
        S3ProtocolCon():
148
 
                ms_curl(NULL),
149
 
                ms_header_list(NULL),
150
 
                ms_inputStream(NULL),
151
 
                ms_outputStream(NULL),
152
 
                ms_calculate_md5(false),
153
 
                ms_notFound(false),
154
 
                ms_retry(false),
155
 
                ms_slowDown(false),
156
 
                ms_errorReply(NULL),
157
 
                ms_data_size(0),
158
 
                ms_replyStatus(0),
159
 
                ms_throw_error(false),
160
 
                ms_old_libcurl(false),
161
 
                ms_safe_url(NULL),
162
 
                ms_last_modified(0)
163
 
        {
164
 
        
165
 
                ms_curl = curl_easy_init();
166
 
                if (!ms_curl)
167
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
168
 
 
169
 
                curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW); 
170
 
                
171
 
                // libCurl versions prior to 7.17.0 did not make copies of strings passed into curl_easy_setopt()
172
 
                // If this version requirement is a problem I can do this myself, if I have to, I guess. :(
173
 
                if (curl_ver->version_num < 0X071700 ) {
174
 
                        ms_old_libcurl = true;
175
 
                        
176
 
                        //char msg[200];
177
 
                        //snprintf(msg, 200, "libcurl version %s is too old, require version 7.17.0 or newer.", curl_ver->version);
178
 
                        //CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
179
 
                }
180
 
                
181
 
                if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
182
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
183
 
                
184
 
#ifdef DEBUG_CURL
185
 
                curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
186
 
#endif          
187
 
                //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
188
 
        
189
 
 
190
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
191
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
192
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));  
193
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
194
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
195
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
196
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
197
 
                
198
 
 
199
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
200
 
        
201
 
        }
202
 
        
203
 
        ~S3ProtocolCon()
204
 
        {
205
 
                if (ms_curl) 
206
 
                        curl_easy_cleanup(ms_curl);                     
207
 
                if (ms_header_list) 
208
 
                        curl_slist_free_all(ms_header_list);                    
209
 
                if (ms_inputStream)
210
 
                        ms_inputStream->release();
211
 
                if (ms_outputStream)
212
 
                        ms_outputStream->release();
213
 
                if (ms_errorReply)
214
 
                        ms_errorReply->release();
215
 
                        
216
 
                ms_reply_headers.clearHeaders();
217
 
                
218
 
                if (ms_safe_url)
219
 
                        cs_free(ms_safe_url);
220
 
        }
221
 
 
222
 
        inline void check_reply_status() 
223
 
        {
224
 
                if (ms_replyStatus > 199 && ms_replyStatus < 300)
225
 
                        return;
226
 
                
227
 
                
228
 
                
229
 
                switch (ms_replyStatus) {
230
 
                        case 200:
231
 
                        case 204:       // No Content
232
 
                        //case 301: // Moved Permanently
233
 
                        //case 307: // Temporary Redirect
234
 
                                break;
235
 
                        case 404:       // Not Found
236
 
                        case 403:       // Forbidden (S3 object not found)
237
 
                                ms_notFound = true;
238
 
                                break;
239
 
                        case 500:       
240
 
                                ms_retry = true;
241
 
                                break;
242
 
                        default: {
243
 
                                parse_s3_error();
244
 
                                
245
 
                                
246
 
                                
247
 
                                if (!ms_retry) {
248
 
                                        enter_();
249
 
                                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_errorReply->getCString());
250
 
                                        outer_();
251
 
                                } else if (ms_slowDown) {
252
 
                                        enter_();
253
 
                                        CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 slow down request.");
254
 
                                        self->sleep(10); // sleep for 1/100 second.
255
 
                                        outer_();
256
 
                                }
257
 
                        }
258
 
                }
259
 
                
260
 
        }
261
 
 
262
 
        
263
 
        inline void ms_reset()
264
 
        {
265
 
                // Remove any old headers
266
 
                if (ms_header_list) {
267
 
                        curl_slist_free_all(ms_header_list);
268
 
                        ms_header_list = NULL;
269
 
                }
270
 
 
271
 
                ms_reply_headers.clearHeaders();
272
 
                ms_replyStatus = 0;
273
 
                ms_throw_error = false;
274
 
                if (ms_errorReply)
275
 
                        ms_errorReply->setLength(0);
276
 
                        
277
 
                ms_s3Checksum[0] = 0;
278
 
                ms_notFound = false;
279
 
                ms_retry = false;
280
 
                
281
 
                if (ms_outputStream) {
282
 
                        ms_outputStream->release();
283
 
                        ms_outputStream = NULL;
284
 
                }
285
 
                if (ms_inputStream) {
286
 
                        ms_inputStream->release();
287
 
                        ms_inputStream = NULL;
288
 
                }
289
 
                
290
 
                if (ms_safe_url) {
291
 
                        cs_free(ms_safe_url);
292
 
                        ms_safe_url = NULL;
293
 
                }
294
 
        }
295
 
        
296
 
        inline void ms_setHeader(const char *header)
297
 
        {
298
 
                ms_header_list = curl_slist_append(ms_header_list, header);
299
 
                if (!ms_header_list) 
300
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
301
 
        }
302
 
 
303
 
 
304
 
private:        
305
 
        inline const char *safe_url(const char *url)
306
 
        {
307
 
                if (ms_old_libcurl == false)
308
 
                        return url;
309
 
                        
310
 
                if (ms_safe_url) {
311
 
                        cs_free(ms_safe_url);
312
 
                        ms_safe_url = NULL;
313
 
                }
314
 
                ms_safe_url = cs_strdup(url);
315
 
                return ms_safe_url;
316
 
        }
317
 
        
318
 
public: 
319
 
        inline void ms_setURL(const char *url)
320
 
        {
321
 
                //printf("URL: \"%s\n", url);
322
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, safe_url(url)));
323
 
        }
324
 
        
325
 
        inline void ms_execute_delete_request()
326
 
        {
327
 
                CURLcode rtc;
328
 
                
329
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
330
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, "DELETE"));
331
 
 
332
 
                rtc = curl_easy_perform(ms_curl);
333
 
                
334
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, NULL)); // IMPORTANT: Reset this to it's default value
335
 
 
336
 
                if (rtc && !ms_throw_error)
337
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
338
 
                        
339
 
                if (ms_throw_error) {
340
 
                        enter_();
341
 
                        throw_();
342
 
                        outer_();
343
 
                }
344
 
                
345
 
                check_reply_status();
346
 
        }
347
 
        
348
 
        inline void ms_execute_copy_request()
349
 
        {
350
 
                CURLcode rtc;
351
 
                
352
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
353
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, 0));
354
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
355
 
                
356
 
                rtc = curl_easy_perform(ms_curl);
357
 
                
358
 
                if (rtc && !ms_throw_error)
359
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
360
 
                        
361
 
                if (ms_throw_error) {
362
 
                        enter_();
363
 
                        throw_();
364
 
                        outer_();
365
 
                }
366
 
                
367
 
                check_reply_status();
368
 
        }
369
 
        
370
 
        inline void ms_execute_get_request(CSOutputStream *output)
371
 
        {
372
 
                enter_();
373
 
                
374
 
                if (output) {
375
 
                        push_(output);
376
 
                        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
377
 
                } else {
378
 
                        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOBODY, 1L));
379
 
                }
380
 
                
381
 
                // 
382
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FILETIME, 1L));
383
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
384
 
    // Ask curl to parse the Last-Modified header.  This is easier than
385
 
    // parsing it ourselves.
386
 
 
387
 
                ms_outputStream = output;       
388
 
                if (curl_easy_perform(ms_curl) && !ms_throw_error) {
389
 
                        ms_outputStream = NULL; 
390
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
391
 
                }
392
 
                ms_outputStream = NULL; 
393
 
                if (output){
394
 
                        release_(output);
395
 
                }
396
 
                
397
 
                if (ms_throw_error) 
398
 
                        throw_();
399
 
                
400
 
                check_reply_status();
401
 
                curl_easy_getinfo(ms_curl, CURLINFO_FILETIME, &ms_last_modified);
402
 
                exit_();                
403
 
        }
404
 
        inline void ms_execute_put_request(CSInputStream *input, off64_t size)
405
 
        {
406
 
                enter_();
407
 
                
408
 
                push_(input);   
409
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
410
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
411
 
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
412
 
                //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
413
 
                //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
414
 
 
415
 
                ms_md5.md5_init();
416
 
                
417
 
                ms_data_size = size;
418
 
                ms_inputStream = input; 
419
 
                if (curl_easy_perform(ms_curl) && !ms_throw_error) {
420
 
                        ms_inputStream = NULL;  
421
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
422
 
                }
423
 
                ms_inputStream = NULL;
424
 
                release_(input);        
425
 
 
426
 
                        
427
 
                if (ms_throw_error)
428
 
                        throw_();
429
 
                
430
 
                check_reply_status();
431
 
                
432
 
                if (ms_calculate_md5) {
433
 
                        // If the data was not sent with an md5 checksum then verify
434
 
                        // the server's md5 value with the one calculated during the send.
435
 
                        char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
436
 
                        Md5Digest digest;
437
 
                        
438
 
                        ms_md5.md5_get_digest(&digest);
439
 
                        cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE +1, checksum, CHECKSUM_VALUE_SIZE, digest.val);
440
 
                        
441
 
                        cs_strToUpper(ms_s3Checksum);
442
 
                        if (strcmp(checksum, ms_s3Checksum)) {
443
 
                                // The request should be restarted in this case.
444
 
                                ms_retry = true;
445
 
                                CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
446
 
                        }
447
 
                }
448
 
 
449
 
                exit_();                
450
 
        }
451
 
        
452
 
};
453
 
 
454
 
//======================================
455
 
 
456
 
 
457
 
 
458
 
 
459
 
//======================================
460
 
CSS3Protocol::CSS3Protocol():
461
 
        s3_server(NULL),
462
 
        s3_public_key(NULL),
463
 
        s3_private_key(NULL),
464
 
        s3_maxRetries(5),
465
 
        s3_sleepTime(0)
466
 
{
467
 
        new_(s3_server, CSStringBuffer());
468
 
        s3_server->append("s3.amazonaws.com/");
469
 
 
470
 
        s3_public_key = CSString::newString("");
471
 
        s3_private_key = CSString::newString("");
472
 
        
473
 
}
474
 
 
475
 
//------------------
476
 
CSS3Protocol::~CSS3Protocol()
477
 
{
478
 
        if (s3_server)
479
 
                s3_server->release();
480
 
        
481
 
        if (s3_public_key)
482
 
                s3_public_key->release();
483
 
        
484
 
        if (s3_private_key)
485
 
                s3_private_key->release();
486
 
}
487
 
        
488
 
//------------------
489
 
CSString *CSS3Protocol::s3_getSignature(const char *verb, 
490
 
                                                                                const char *md5, 
491
 
                                                                                const char *content_type, 
492
 
                                                                                const char *date, 
493
 
                                                                                const char *bucket, 
494
 
                                                                                const char *key,
495
 
                                                                                CSString *headers 
496
 
                                                                        )
497
 
{
498
 
        CSStringBuffer *s3_buffer;
499
 
        enter_();
500
 
        if (headers)
501
 
                push_(headers);
502
 
        
503
 
        new_(s3_buffer, CSStringBuffer());
504
 
        push_(s3_buffer);
505
 
        
506
 
        s3_buffer->setLength(0);
507
 
        s3_buffer->append(verb);        
508
 
        s3_buffer->append("\n");        
509
 
        if (md5) s3_buffer->append(md5);        
510
 
        s3_buffer->append("\n");        
511
 
        if (content_type) s3_buffer->append(content_type);      
512
 
        s3_buffer->append("\n");        
513
 
        s3_buffer->append(date);
514
 
        if (headers) { 
515
 
                // Note: headers are assumed to be in lower case, sorted, and containing no white space.
516
 
                s3_buffer->append("\n");        
517
 
                s3_buffer->append(headers->getCString());
518
 
        }
519
 
        s3_buffer->append("\n/");
520
 
        s3_buffer->append(bucket);
521
 
        s3_buffer->append("/");
522
 
        s3_buffer->append(key);
523
 
 
524
 
#ifdef SHOW_SIGNING
525
 
printf("signing:\n=================\n%s\n=================\n",  s3_buffer->getCString());
526
 
printf("Public Key:\"%s\"\n",   s3_public_key->getCString());
527
 
printf("Private Key:\"%s\"\n",  s3_private_key->getCString());
528
 
if(0){
529
 
        const char *ptr = s3_buffer->getCString();
530
 
        while (*ptr) {
531
 
                printf("%x ", *ptr); ptr++;
532
 
        }
533
 
        printf("\n");
534
 
}
535
 
#endif
536
 
 
537
 
        CSString *sig = signature(s3_buffer->getCString(), s3_private_key->getCString());
538
 
        release_(s3_buffer);
539
 
        if (headers) 
540
 
                release_(headers);
541
 
 
542
 
        return_(sig);
543
 
}
544
 
//----------------------
545
 
// CURL callback functions:
546
 
////////////////////////////
547
 
//----------------------
548
 
//-----------------
549
 
static bool try_ReadStream(CSThread *self, S3ProtocolCon *con, unsigned char *ptr, size_t buffer_size, size_t *data_sent)
550
 
{
551
 
        volatile bool rtc = true;
552
 
        try_(a) {
553
 
                *data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
554
 
                if (*data_sent <= con->ms_data_size) {
555
 
                        con->ms_data_size -= *data_sent;
556
 
                        if (*data_sent)
557
 
                                con->ms_md5.md5_append(ptr, *data_sent); // Calculating the checksum for the data sent.
558
 
                } else if (*data_sent > con->ms_data_size) 
559
 
                        CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
560
 
                else if (con->ms_data_size && !*data_sent)
561
 
                        CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
562
 
                rtc = false;
563
 
        }
564
 
        
565
 
        catch_(a)
566
 
        cont_(a);
567
 
        return rtc;
568
 
}
569
 
 
570
 
//----------------------
571
 
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
572
 
{
573
 
        S3ProtocolCon *con = (S3ProtocolCon*) v_con;
574
 
        size_t data_sent, buffer_size = objs * obj_size;
575
 
 
576
 
        if (!con->ms_data_size)
577
 
                return 0;
578
 
                
579
 
        enter_();
580
 
        if (try_ReadStream(self, con, (unsigned char*)ptr, buffer_size, &data_sent)) {
581
 
                con->ms_throw_error = true;
582
 
                data_sent = (size_t)-1;
583
 
        }
584
 
        
585
 
        return_(data_sent);
586
 
}
587
 
 
588
 
//-----------------
589
 
static bool try_WriteStream(CSThread *self, S3ProtocolCon *con, char *ptr, size_t data_len)
590
 
{
591
 
        volatile bool rtc = true;
592
 
        try_(a) {
593
 
                if (con->ms_replyStatus >= 400) { // Collect the error reply.
594
 
                        if (!con->ms_errorReply)
595
 
                                con->ms_errorReply = new CSStringBuffer(50);            
596
 
                        con->ms_errorReply->append(ptr, data_len);
597
 
                } else if (     con->ms_outputStream)
598
 
                        con->ms_outputStream->write(ptr, data_len);
599
 
                rtc = false;
600
 
        }
601
 
        
602
 
        catch_(a)
603
 
        cont_(a);
604
 
        return rtc;
605
 
}
606
 
 
607
 
//----------------------
608
 
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
609
 
{
610
 
        S3ProtocolCon *con = (S3ProtocolCon*) v_con;
611
 
        size_t data_len = objs * obj_size;
612
 
 
613
 
        enter_();
614
 
        if (try_WriteStream(self, con, (char*)vptr, data_len)) {
615
 
                con->ms_throw_error = true;
616
 
                data_len = (size_t)-1;
617
 
        }
618
 
 
619
 
        return_(data_len);      
620
 
}
621
 
 
622
 
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
623
 
//----------------------
624
 
static bool try_addHeader(CSThread *self, S3ProtocolCon *con, char *name, uint32_t name_len, char *value, uint32_t value_len)
625
 
{
626
 
        volatile bool rtc = true;
627
 
        
628
 
        try_(a) {
629
 
                con->ms_reply_headers.addHeader(name, name_len, value, value_len);
630
 
                rtc = false;
631
 
        }
632
 
        
633
 
        catch_(a);
634
 
        cont_(a);
635
 
        return rtc;
636
 
}
637
 
 
638
 
//----------------------
639
 
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
640
 
{
641
 
        S3ProtocolCon *con = (S3ProtocolCon*) v_con;
642
 
        size_t size = objs * obj_size;
643
 
        char *end, *ptr = (char*) header, *name, *value = NULL;
644
 
        uint32_t name_len =0, value_len = 0;
645
 
        
646
 
//printf(       "receive_header: %s\n", ptr);
647
 
        end = ptr + size;
648
 
        if (*(end -2) == '\r' && *(end -1) == '\n')
649
 
                end -=2;
650
 
                
651
 
        while ((end != ptr) && (*ptr == ' ')) ptr++;
652
 
        if (end == ptr)
653
 
                return size;
654
 
        
655
 
        // Get the reply status.
656
 
        // Status 100 = Continue
657
 
        if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) ) 
658
 
                        && !strncasecmp(ptr, "HTTP", 4)
659
 
                ) {
660
 
                char status[4];
661
 
                while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
662
 
                while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
663
 
                if (end == ptr)
664
 
                        return size;
665
 
                        
666
 
                if (end < (ptr +3)) // expecting a 3 digit status code.
667
 
                        return size;
668
 
                        
669
 
                memcpy(status, ptr, 3);
670
 
                status[3] = 0;
671
 
                
672
 
                con->ms_replyStatus = atoi(status);
673
 
        }
674
 
        
675
 
        name = ptr;
676
 
        while ((end != ptr) && (*ptr != ':')) ptr++;
677
 
        if (end == ptr)
678
 
                return size;
679
 
        name_len = ptr - name;
680
 
        
681
 
        ptr++; 
682
 
        while ((end != ptr) && (*ptr == ' ')) ptr++;
683
 
        if (end == ptr)
684
 
                return size;
685
 
        
686
 
        value = ptr;
687
 
        value_len = end - value;
688
 
        
689
 
        while (name[name_len-1] == ' ') name_len--;
690
 
        while (value[value_len-1] == ' ') value_len--;
691
 
        
692
 
        if (!strncasecmp(name, "ETag", 4)) {
693
 
                if (*value == '"') {
694
 
                        value++; value_len -=2; // Strip quotation marks from checksum string.
695
 
                }
696
 
                if (value_len == HEX_CHECKSUM_VALUE_SIZE) {
697
 
                        memcpy(con->ms_s3Checksum, value, value_len);
698
 
                        con->ms_s3Checksum[value_len] = 0;
699
 
                }
700
 
        }
701
 
        
702
 
        enter_();
703
 
        if (try_addHeader(self, con, name, name_len, value, value_len)) {
704
 
                con->ms_throw_error = true;
705
 
                size = (size_t)-1;
706
 
        }
707
 
        return_(size);
708
 
}
709
 
 
710
 
//----------------------
711
 
 
712
 
#define SET_DATE_FROM_TIME(t, d) {strftime(d, sizeof(d), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&t));}
713
 
#define SET_DATE(d) {time_t t = time(NULL); SET_DATE_FROM_TIME(t, d);}
714
 
 
715
 
bool CSS3Protocol::s3_delete(const char *bucket, const char *key)
716
 
{
717
 
        CSStringBuffer *s3_buffer;
718
 
        char date[64];
719
 
        CSString *signed_str;
720
 
        uint32_t retry_count = 0;
721
 
        S3ProtocolCon *con_data;
722
 
 
723
 
        enter_();
724
 
 
725
 
        new_(s3_buffer, CSStringBuffer());
726
 
        push_(s3_buffer);
727
 
 
728
 
        new_(con_data, S3ProtocolCon());
729
 
        push_(con_data);
730
 
 
731
 
retry:
732
 
        // Clear old settings. 
733
 
        con_data->ms_reset();   
734
 
        
735
 
        SET_DATE(date);
736
 
 
737
 
        // Build the URL
738
 
        s3_buffer->setLength(0);
739
 
        s3_buffer->append("http://");
740
 
        s3_buffer->append(bucket);
741
 
        s3_buffer->append("."); 
742
 
        s3_buffer->append(s3_server->getCString());
743
 
        s3_buffer->append(key);
744
 
 
745
 
        con_data->ms_setURL(s3_buffer->getCString());
746
 
        
747
 
        // Add the 'DATE' header
748
 
        s3_buffer->setLength(0);
749
 
        s3_buffer->append("Date: ");    
750
 
        s3_buffer->append(date);
751
 
        con_data->ms_setHeader(s3_buffer->getCString());
752
 
 
753
 
        // Create the authentication signature and add the 'Authorization' header
754
 
        signed_str = s3_getSignature("DELETE", NULL, NULL, date, bucket, key);
755
 
        push_(signed_str);
756
 
        s3_buffer->setLength(0);
757
 
        s3_buffer->append("Authorization: AWS ");       
758
 
        s3_buffer->append(s3_public_key->getCString());
759
 
        s3_buffer->append(":"); 
760
 
        s3_buffer->append(signed_str->getCString());    
761
 
        release_(signed_str); signed_str = NULL;
762
 
        
763
 
        con_data->ms_setHeader(s3_buffer->getCString());
764
 
        
765
 
        con_data->ms_execute_delete_request();
766
 
        
767
 
        if (con_data->ms_retry) {
768
 
                if (retry_count == s3_maxRetries) {
769
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
770
 
                }
771
 
        //printf("RETRY: s3_delete()\n");
772
 
                retry_count++;
773
 
                self->sleep(s3_sleepTime);
774
 
                goto retry;
775
 
        }
776
 
        
777
 
        bool notFound = con_data->ms_notFound;
778
 
        release_(con_data);
779
 
        release_(s3_buffer);
780
 
        
781
 
        return_(!notFound);
782
 
}
783
 
 
784
 
//-------------------------------
785
 
void CSS3Protocol::s3_copy(const char *dest_server, const char *dest_bucket, const char *dest_key, const char *src_bucket, const char *src_key)
786
 
{
787
 
        CSStringBuffer *s3_buffer;
788
 
        char date[64];
789
 
        CSString *signed_str;
790
 
        uint32_t retry_count = 0;
791
 
        S3ProtocolCon *con_data;
792
 
 
793
 
        enter_();
794
 
 
795
 
        new_(s3_buffer, CSStringBuffer());
796
 
        push_(s3_buffer);
797
 
 
798
 
        new_(con_data, S3ProtocolCon());
799
 
        push_(con_data);
800
 
        
801
 
        if (!dest_server)
802
 
                dest_server = s3_server->getCString();
803
 
 
804
 
retry:
805
 
        // Clear old settings. 
806
 
        con_data->ms_reset();   
807
 
        
808
 
        SET_DATE(date);
809
 
 
810
 
        // Build the URL
811
 
        s3_buffer->setLength(0);
812
 
        s3_buffer->append("http://");
813
 
        s3_buffer->append(dest_bucket);
814
 
        s3_buffer->append("."); 
815
 
        s3_buffer->append(s3_server->getCString());
816
 
        s3_buffer->append(dest_key);
817
 
 
818
 
        con_data->ms_setURL(s3_buffer->getCString());
819
 
        
820
 
        // Add the destination location
821
 
        s3_buffer->setLength(0);
822
 
        s3_buffer->append("Host: ");    
823
 
        s3_buffer->append(dest_bucket);
824
 
        s3_buffer->append("."); 
825
 
        s3_buffer->append(dest_server);
826
 
        s3_buffer->setLength(s3_buffer->length() -1); // trim the '/'
827
 
        con_data->ms_setHeader(s3_buffer->getCString());
828
 
        
829
 
        // Add the source location
830
 
        s3_buffer->setLength(0);
831
 
        s3_buffer->append("x-amz-copy-source:");        
832
 
        s3_buffer->append(src_bucket);
833
 
        s3_buffer->append("/");
834
 
        s3_buffer->append(src_key);
835
 
        con_data->ms_setHeader(s3_buffer->getCString());
836
 
        
837
 
        // Create the authentication signature and add the 'Authorization' header
838
 
        signed_str = s3_getSignature("PUT", NULL, NULL, date, dest_bucket, dest_key, CSString::newString(s3_buffer->getCString()));
839
 
        push_(signed_str);
840
 
 
841
 
        // Add the 'DATE' header
842
 
        s3_buffer->setLength(0);
843
 
        s3_buffer->append("Date: ");    
844
 
        s3_buffer->append(date);
845
 
        con_data->ms_setHeader(s3_buffer->getCString());
846
 
 
847
 
        // Add the signature
848
 
        s3_buffer->setLength(0);
849
 
        s3_buffer->append("Authorization: AWS ");       
850
 
        s3_buffer->append(s3_public_key->getCString());
851
 
        s3_buffer->append(":"); 
852
 
        s3_buffer->append(signed_str->getCString());    
853
 
        release_(signed_str); signed_str = NULL;
854
 
        con_data->ms_setHeader(s3_buffer->getCString());
855
 
        
856
 
        con_data->ms_execute_copy_request();
857
 
        
858
 
        if (con_data->ms_notFound) {
859
 
                s3_buffer->setLength(0);
860
 
                s3_buffer->append("Cloud copy failed, object not found: ");
861
 
                s3_buffer->append(src_bucket);
862
 
                s3_buffer->append(" ");
863
 
                s3_buffer->append(src_key);
864
 
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, s3_buffer->getCString());
865
 
        }
866
 
        
867
 
        if (con_data->ms_retry) {
868
 
                if (retry_count == s3_maxRetries) {
869
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
870
 
                }
871
 
        //printf("RETRY: s3_copy()\n");
872
 
                retry_count++;
873
 
                self->sleep(s3_sleepTime);
874
 
                goto retry;
875
 
        }
876
 
        
877
 
        release_(con_data);
878
 
        release_(s3_buffer);
879
 
        
880
 
        exit_();
881
 
}
882
 
 
883
 
 
884
 
//-------------------------------
885
 
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found, S3RangePtr range, time_t *last_modified)
886
 
{
887
 
        CSStringBuffer *s3_buffer;
888
 
    char date[64];
889
 
        CSString *signed_str;
890
 
        uint32_t retry_count = 0;
891
 
        S3ProtocolCon *con_data;
892
 
        CSVector *replyHeaders;
893
 
        CSString *range_header = NULL;
894
 
        const char *http_op;
895
 
 
896
 
        enter_();
897
 
 
898
 
        if (output) {
899
 
                push_(output);
900
 
                http_op = "GET";
901
 
        } else
902
 
                http_op = "HEAD";
903
 
 
904
 
        new_(s3_buffer, CSStringBuffer());
905
 
        push_(s3_buffer);
906
 
 
907
 
        new_(con_data, S3ProtocolCon());
908
 
        push_(con_data);
909
 
 
910
 
retry:
911
 
        // Clear old settings. 
912
 
        con_data->ms_reset();   
913
 
        
914
 
        SET_DATE(date);
915
 
 
916
 
        // Build the URL
917
 
        s3_buffer->setLength(0);
918
 
        s3_buffer->append("http://");
919
 
        s3_buffer->append(bucket);
920
 
        s3_buffer->append("."); 
921
 
        s3_buffer->append(s3_server->getCString());
922
 
        s3_buffer->append(key);
923
 
 
924
 
        con_data->ms_setURL(s3_buffer->getCString());
925
 
        
926
 
        // Add the 'DATE' header
927
 
        s3_buffer->setLength(0);
928
 
        s3_buffer->append("Date: ");    
929
 
        s3_buffer->append(date);
930
 
        con_data->ms_setHeader(s3_buffer->getCString());
931
 
 
932
 
        if (range) {
933
 
                char buffer[80];
934
 
                snprintf(buffer, 80,"Range: bytes=%"PRIu64"-%"PRIu64, range->startByte, range->endByte);
935
 
 
936
 
                range_header = CSString::newString(buffer);
937
 
        }
938
 
        // Create the authentication signature and add the 'Authorization' header
939
 
        if (range_header)
940
 
                con_data->ms_setHeader(range_header->getCString());
941
 
        signed_str = s3_getSignature(http_op, NULL, NULL, date, bucket, key, NULL);
942
 
        push_(signed_str);
943
 
        s3_buffer->setLength(0);
944
 
        s3_buffer->append("Authorization: AWS ");       
945
 
        s3_buffer->append(s3_public_key->getCString());
946
 
        s3_buffer->append(":"); 
947
 
        s3_buffer->append(signed_str->getCString());    
948
 
        release_(signed_str); signed_str = NULL;
949
 
        con_data->ms_setHeader(s3_buffer->getCString());
950
 
        
951
 
        if (output) output->retain();
952
 
        con_data->ms_execute_get_request(output);
953
 
        
954
 
        if (con_data->ms_retry) {
955
 
                if (retry_count == s3_maxRetries) {
956
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
957
 
                }
958
 
        //printf("RETRY: s3_receive()\n");
959
 
                retry_count++;
960
 
                output->reset();
961
 
                self->sleep(s3_sleepTime);
962
 
                goto retry;
963
 
        }
964
 
        
965
 
        if (last_modified)
966
 
                *last_modified = con_data->ms_last_modified;
967
 
        *found = !con_data->ms_notFound;
968
 
        replyHeaders = con_data->ms_reply_headers.takeHeaders();
969
 
        release_(con_data);
970
 
        release_(s3_buffer);
971
 
        if (output)
972
 
                release_(output);
973
 
        
974
 
        return_(replyHeaders);
975
 
}
976
 
 
977
 
class S3ListParser : public CSXMLBuffer {
978
 
 
979
 
        CSVector *list;
980
 
        public:
981
 
 
982
 
 
983
 
        bool parseListData(const char *data, size_t len, CSVector *keys)
984
 
        {
985
 
                list = keys;
986
 
                return parseData(data, len, 0);
987
 
        }
988
 
 
989
 
        private:
990
 
        virtual bool openNode(char *path, char *value) {
991
 
                if (value && *value && (strcmp(path,"/listbucketresult/contents/key/") == 0))
992
 
                        list->add(CSString::newString(value));
993
 
                return true;
994
 
        }
995
 
 
996
 
        virtual bool closeNode(char *path) {
997
 
                (void)path;
998
 
                return true;
999
 
        }
1000
 
 
1001
 
        virtual bool addAttribute(char *path, char *name, char *value) {
1002
 
                (void)path;
1003
 
                (void)name;
1004
 
                (void)value;
1005
 
                return true;
1006
 
        }
1007
 
 
1008
 
};
1009
 
 
1010
 
//-------------------------------
1011
 
static CSVector *parse_s3_list(CSMemoryOutputStream *output)
1012
 
{
1013
 
        S3ListParser s3ListParser;
1014
 
        const char *data;
1015
 
        CSVector *vector;
1016
 
        size_t len;
1017
 
        
1018
 
        enter_();
1019
 
 
1020
 
        push_(output);
1021
 
        
1022
 
        new_(vector, CSVector(10));     
1023
 
        push_(vector);  
1024
 
 
1025
 
        data = (const char *) output->getMemory(&len);
1026
 
        if (!s3ListParser.parseListData(data, len, vector)) {
1027
 
                int             err;
1028
 
                char    *msg;
1029
 
 
1030
 
                s3ListParser.getError(&err, &msg);
1031
 
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
1032
 
        }
1033
 
 
1034
 
        pop_(vector);
1035
 
        release_(output);
1036
 
        return_(vector);
1037
 
}
1038
 
 
1039
 
 
1040
 
//-------------------------------
1041
 
CSVector *CSS3Protocol::s3_list(const char *bucket, const char *key_prefix, uint32_t max)
1042
 
{
1043
 
        CSStringBuffer *s3_buffer;
1044
 
    char date[64];
1045
 
        CSString *signed_str;
1046
 
        CSMemoryOutputStream *output;
1047
 
        uint32_t retry_count = 0;
1048
 
        S3ProtocolCon *con_data;
1049
 
        enter_();
1050
 
 
1051
 
        new_(s3_buffer, CSStringBuffer());
1052
 
        push_(s3_buffer);
1053
 
 
1054
 
        output = CSMemoryOutputStream::newStream(1024, 1024);
1055
 
        push_(output);
1056
 
        
1057
 
        new_(con_data, S3ProtocolCon());
1058
 
        push_(con_data);
1059
 
 
1060
 
retry:
1061
 
 
1062
 
        // Clear old settings. 
1063
 
        con_data->ms_reset();   
1064
 
        
1065
 
        SET_DATE(date);
1066
 
 
1067
 
        // Build the URL
1068
 
        s3_buffer->setLength(0);
1069
 
        s3_buffer->append("http://");
1070
 
        s3_buffer->append(bucket);
1071
 
        s3_buffer->append("."); 
1072
 
        s3_buffer->append(s3_server->getCString());
1073
 
//s3_buffer->append("/");       
1074
 
//s3_buffer->append(bucket);
1075
 
        if (key_prefix) {
1076
 
                s3_buffer->append("?prefix=");
1077
 
                s3_buffer->append(key_prefix);
1078
 
        }
1079
 
        
1080
 
        if (max) {
1081
 
                if (key_prefix)
1082
 
                        s3_buffer->append("&max-keys=");
1083
 
                else
1084
 
                        s3_buffer->append("?max-keys=");
1085
 
                s3_buffer->append(max);
1086
 
        }
1087
 
 
1088
 
        con_data->ms_setURL(s3_buffer->getCString());
1089
 
        
1090
 
        // Add the 'DATE' header
1091
 
        s3_buffer->setLength(0);
1092
 
        s3_buffer->append("Date: ");    
1093
 
        s3_buffer->append(date);
1094
 
        con_data->ms_setHeader(s3_buffer->getCString());
1095
 
 
1096
 
        // Create the authentication signature and add the 'Authorization' header
1097
 
        signed_str = s3_getSignature("GET", NULL, NULL, date, bucket, "");
1098
 
        push_(signed_str);
1099
 
        s3_buffer->setLength(0);
1100
 
        s3_buffer->append("Authorization: AWS ");       
1101
 
        s3_buffer->append(s3_public_key->getCString());
1102
 
        s3_buffer->append(":"); 
1103
 
        s3_buffer->append(signed_str->getCString());    
1104
 
        release_(signed_str); signed_str = NULL;
1105
 
        con_data->ms_setHeader(s3_buffer->getCString());
1106
 
        
1107
 
        con_data->ms_execute_get_request(RETAIN(output));
1108
 
        
1109
 
        if (con_data->ms_retry) {
1110
 
                if (retry_count == s3_maxRetries) {
1111
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1112
 
                }
1113
 
        //printf("RETRY: s3_list()\n");
1114
 
                retry_count++;
1115
 
                output->reset();
1116
 
                self->sleep(s3_sleepTime);
1117
 
                goto retry;
1118
 
        }
1119
 
        
1120
 
        release_(con_data);
1121
 
        pop_(output);
1122
 
        release_(s3_buffer);
1123
 
        return_(parse_s3_list(output));
1124
 
}
1125
 
 
1126
 
//-------------------------------
1127
 
CSString *CSS3Protocol::s3_getAuthorization(const char *bucket, const char *key, const char *content_type, uint32_t *s3AuthorizationTime)
1128
 
{
1129
 
    char date[64];
1130
 
        CSString *signed_str;
1131
 
        time_t sys_time;
1132
 
 
1133
 
        enter_();
1134
 
 
1135
 
        if (!content_type)
1136
 
                content_type = "binary/octet-stream";
1137
 
                
1138
 
        sys_time = time(NULL);
1139
 
        
1140
 
        *s3AuthorizationTime = (uint32_t)sys_time;
1141
 
        
1142
 
        SET_DATE_FROM_TIME(sys_time, date);
1143
 
        signed_str = s3_getSignature("PUT", NULL, content_type, date, bucket, key);
1144
 
        return_(signed_str);
1145
 
}
1146
 
 
1147
 
//-------------------------------
1148
 
CSVector *CSS3Protocol::s3_send(CSInputStream *input, const char *bucket, const char *key, off64_t size, const char *content_type, Md5Digest *digest, const char *s3Authorization, time_t s3AuthorizationTime)
1149
 
{
1150
 
        CSStringBuffer *s3_buffer;
1151
 
    char date[64];
1152
 
        CSString *signed_str;
1153
 
        uint32_t retry_count = 0;
1154
 
        S3ProtocolCon *con_data;
1155
 
        CSVector *replyHeaders;
1156
 
        char checksum[32], *md5 = NULL;
1157
 
 
1158
 
        enter_();
1159
 
        push_(input);
1160
 
 
1161
 
        new_(s3_buffer, CSStringBuffer());
1162
 
        push_(s3_buffer);
1163
 
 
1164
 
        new_(con_data, S3ProtocolCon());
1165
 
        push_(con_data);
1166
 
                
1167
 
        if (!content_type)
1168
 
                content_type = "binary/octet-stream";
1169
 
                
1170
 
retry:
1171
 
 
1172
 
        // Clear old settings. 
1173
 
        con_data->ms_reset();   
1174
 
        
1175
 
        if (s3Authorization) {
1176
 
                SET_DATE_FROM_TIME(s3AuthorizationTime, date);
1177
 
        } else {
1178
 
                SET_DATE(date);
1179
 
        }
1180
 
        
1181
 
        // Build the URL
1182
 
        s3_buffer->setLength(0);
1183
 
        s3_buffer->append("http://");
1184
 
        s3_buffer->append(bucket);
1185
 
        s3_buffer->append("."); 
1186
 
        s3_buffer->append(s3_server->getCString());
1187
 
        s3_buffer->append(key);
1188
 
 
1189
 
        con_data->ms_setURL(s3_buffer->getCString());
1190
 
        
1191
 
        // Add the 'DATE' header
1192
 
        s3_buffer->setLength(0);
1193
 
        s3_buffer->append("Date: ");    
1194
 
        s3_buffer->append(date);
1195
 
        con_data->ms_setHeader(s3_buffer->getCString());
1196
 
        
1197
 
        // Add the 'Content-Type' header
1198
 
        s3_buffer->setLength(0);
1199
 
        s3_buffer->append("Content-Type: ");    
1200
 
        s3_buffer->append(content_type);
1201
 
        con_data->ms_setHeader(s3_buffer->getCString());
1202
 
                
1203
 
        if (digest) {
1204
 
                // Add the Md5 checksum header
1205
 
                md5 = checksum;
1206
 
                memset(checksum, 0, 32);
1207
 
                base64Encode(digest->val, 16, checksum, 32);
1208
 
                
1209
 
                s3_buffer->setLength(0);
1210
 
                s3_buffer->append("Content-MD5: ");     
1211
 
                s3_buffer->append(checksum);
1212
 
                con_data->ms_setHeader(s3_buffer->getCString());                
1213
 
                con_data->ms_calculate_md5 = false;
1214
 
        } else 
1215
 
                con_data->ms_calculate_md5 = true;
1216
 
        
1217
 
 
1218
 
        // Create the authentication signature and add the 'Authorization' header
1219
 
        if (!s3Authorization)
1220
 
                signed_str = s3_getSignature("PUT", md5, content_type, date, bucket, key);
1221
 
        else
1222
 
                signed_str = CSString::newString(s3Authorization);
1223
 
        push_(signed_str);
1224
 
        s3_buffer->setLength(0);
1225
 
        s3_buffer->append("Authorization: AWS ");       
1226
 
        s3_buffer->append(s3_public_key->getCString());
1227
 
        s3_buffer->append(":"); 
1228
 
        s3_buffer->append(signed_str->getCString());    
1229
 
        release_(signed_str); signed_str = NULL;
1230
 
        con_data->ms_setHeader(s3_buffer->getCString());
1231
 
        
1232
 
        con_data->ms_execute_put_request(RETAIN(input), size);
1233
 
        
1234
 
        if (con_data->ms_retry) {
1235
 
                if (retry_count == s3_maxRetries) {
1236
 
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1237
 
                }
1238
 
        //printf("RETRY: s3_send()\n");
1239
 
                retry_count++;
1240
 
                input->reset();
1241
 
                self->sleep(s3_sleepTime);
1242
 
                goto retry;
1243
 
        }
1244
 
        
1245
 
        replyHeaders = con_data->ms_reply_headers.takeHeaders();
1246
 
 
1247
 
        release_(con_data);
1248
 
        release_(s3_buffer);
1249
 
        release_(input);
1250
 
        return_(replyHeaders);
1251
 
}
1252
 
 
1253
 
//-------------------------------
1254
 
CSString *CSS3Protocol::s3_getDataURL(const char *bucket, const char *key, uint32_t keep_alive)
1255
 
{
1256
 
        CSStringBuffer *s3_buffer;
1257
 
        char timeout[32];
1258
 
        CSString *signed_str;
1259
 
        enter_();
1260
 
        
1261
 
        new_(s3_buffer, CSStringBuffer());
1262
 
        push_(s3_buffer);
1263
 
 
1264
 
        snprintf(timeout, 32, "%"PRId32"", ((uint32_t)time(NULL)) + keep_alive);
1265
 
        
1266
 
        signed_str = s3_getSignature("GET", NULL, NULL, timeout, bucket, key);
1267
 
//printf("Unsafe: \"%s\"\n", signed_str->getCString());
1268
 
        signed_str = urlEncode(signed_str); // Because the signature is in the URL it must be URL encoded.
1269
 
//printf("  Safe: \"%s\"\n", signed_str->getCString());
1270
 
        push_(signed_str);
1271
 
        
1272
 
        s3_buffer->setLength(0);        
1273
 
        s3_buffer->append("http://");
1274
 
        s3_buffer->append(bucket);
1275
 
        s3_buffer->append("."); 
1276
 
        s3_buffer->append(s3_server->getCString());
1277
 
        s3_buffer->append(key);
1278
 
 
1279
 
        s3_buffer->append("?AWSAccessKeyId=");
1280
 
        s3_buffer->append(s3_public_key->getCString());
1281
 
        s3_buffer->append("&Expires=");
1282
 
        s3_buffer->append(timeout);
1283
 
        s3_buffer->append("&Signature=");
1284
 
        s3_buffer->append(signed_str->getCString());
1285
 
        
1286
 
        release_(signed_str);
1287
 
        
1288
 
        pop_(s3_buffer);
1289
 
        CSString *str = CSString::newString(s3_buffer);
1290
 
        return_(str);   
1291
 
}
1292
 
 
1293
 
//#define S3_UNIT_TEST
1294
 
#ifdef S3_UNIT_TEST
1295
 
static void show_help_info(const char *cmd)
1296
 
{
1297
 
        printf("Get authenticated query string:\n\t%s q <bucket> <object_key> <timeout>\n", cmd);
1298
 
        printf("Delete object:\n\t%s d <bucket> <object_key>\n", cmd);
1299
 
        printf("Delete all object with a given prefix:\n\t%s D <bucket> <object_prefix>\n", cmd);
1300
 
        printf("Get object, data will be written to 'prottest.out':\n\t%s g <bucket> <object_key> <timeout>\n", cmd);
1301
 
        printf("Get object header only:\n\t%s h <bucket> <object_key> <timeout>\n", cmd);
1302
 
        printf("Put (Upload) an object:\n\t%s p <bucket> <object_key> <file>\n", cmd);
1303
 
        printf("List objects in the bucket:\n\t%s l <bucket> [<object_prefix> [max_list_size]]\n", cmd);
1304
 
        printf("Copy object:\n\t%s c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key> \n", cmd);
1305
 
        printf("Copy all object with a given prefix:\n\t%s C <src_bucket> <object_key_prefix> <dst_bucket> \n", cmd);
1306
 
}
1307
 
 
1308
 
void dump_headers(CSVector *header_array)
1309
 
{
1310
 
        CSHTTPHeaders headers;
1311
 
        
1312
 
        headers.setHeaders(header_array);
1313
 
        printf("Reply Headers:\n");
1314
 
        printf("--------------\n");
1315
 
        
1316
 
        for (uint32_t i = 0; i < headers.numHeaders(); i++) {
1317
 
                CSHeader *h = headers.getHeader(i);
1318
 
                
1319
 
                printf("%s : %s\n", h->getNameCString(), h->getValueCString());
1320
 
                h->release();
1321
 
        }
1322
 
        printf("--------------\n");
1323
 
        headers.clearHeaders();
1324
 
}
1325
 
 
1326
 
int main(int argc, char **argv)
1327
 
{
1328
 
        CSThread *main_thread;
1329
 
        const char *pub_key;
1330
 
        const char *priv_key;
1331
 
        const char *server;
1332
 
        CSS3Protocol *prot = NULL;
1333
 
        
1334
 
        if (argc < 3) {
1335
 
                show_help_info(argv[0]);
1336
 
                return 0;
1337
 
        }
1338
 
        
1339
 
        if (! CSThread::startUp()) {
1340
 
                CSException::throwException(CS_CONTEXT, ENOMEM, "CSThread::startUp() failed.");
1341
 
                return 1;
1342
 
        }
1343
 
        
1344
 
        cs_init_memory();
1345
 
        
1346
 
        main_thread = new CSThread( NULL);
1347
 
        CSThread::setSelf(main_thread);
1348
 
        
1349
 
        enter_();
1350
 
        try_(a) {
1351
 
        
1352
 
                pub_key = getenv("S3_ACCESS_KEY_ID");
1353
 
                priv_key = getenv("S3_SECRET_ACCESS_KEY");
1354
 
                new_(prot, CSS3Protocol());
1355
 
                push_(prot);
1356
 
                
1357
 
                server = getenv("S3_SERVER");
1358
 
                if ((server == NULL) || (*server == 0))
1359
 
                        server = "s3.amazonaws.com/";
1360
 
                prot->s3_setServer(server);
1361
 
                prot->s3_setPublicKey(pub_key);
1362
 
                prot->s3_setPrivateKey(priv_key);
1363
 
                prot->s3_setMaxRetries(0);
1364
 
                
1365
 
                switch (argv[1][0]) {
1366
 
                        case 'q': // Get the query string
1367
 
                                if (argc == 5) {
1368
 
                                        CSString *qstr = prot->s3_getDataURL(argv[2], argv[3], atoi(argv[4]));
1369
 
                                        printf("To test call:\ncurl -L -D - \"%s\"\n", qstr->getCString());
1370
 
                                        qstr->release();
1371
 
                                } else
1372
 
                                        printf("Bad command: q <bucket> <object_key> <timeout>\n");
1373
 
                                
1374
 
                                break;
1375
 
                        case 'd': // Delete the object
1376
 
                                if (argc == 4) {
1377
 
                                        printf("delete %s %s\n", argv[2], argv[3]);
1378
 
                                        if (!prot->s3_delete(argv[2], argv[3]))
1379
 
                                                printf("%s/%s could not be found.\n", argv[2], argv[3]);
1380
 
 
1381
 
                                } else
1382
 
                                        printf("Bad command: d <bucket> <object_key>\n");
1383
 
                                
1384
 
                                break;
1385
 
                        case 'D': // Delete  objects like
1386
 
                                if (argc == 4) {
1387
 
                                        CSVector *list;
1388
 
                                        CSString *key;
1389
 
                                        
1390
 
                                        list = prot->s3_list(argv[2], argv[3]);
1391
 
                                        push_(list);
1392
 
                                        while (key = (CSString*) list->take(0)) {
1393
 
                                                printf("Deleting %s\n", key->getCString());
1394
 
                                                prot->s3_delete(argv[2], key->getCString());
1395
 
                                                key->release();
1396
 
                                        }
1397
 
                                        release_(list);
1398
 
                                        
1399
 
                                } else
1400
 
                                        printf("Bad command: D <bucket> <object_key_prefix>\n");
1401
 
                                
1402
 
                                break;
1403
 
                        case 'g':  // Get the object
1404
 
                                if ((argc == 4) || (argc == 6)) {
1405
 
                                        CSFile *output; 
1406
 
                                        CSVector *headers;
1407
 
                                        bool found;                             
1408
 
                                        S3RangeRec *range_ptr = NULL, range =   {0,0};          
1409
 
                                        
1410
 
                                        if (argc == 6) {
1411
 
                                                range.startByte = atoi(argv[4]);
1412
 
                                                range.endByte = atoi(argv[5]);
1413
 
                                                range_ptr = &range;
1414
 
                                        }
1415
 
                                        
1416
 
                                        output = CSFile::newFile("prottest.out");
1417
 
                                        push_(output);
1418
 
                                        output->open(CSFile::CREATE | CSFile::TRUNCATE);
1419
 
                                        headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found, range_ptr);
1420
 
                                        if (!found)
1421
 
                                                printf("%s/%s could not be found.\n", argv[2], argv[3]);
1422
 
                                                
1423
 
                                        dump_headers(headers);
1424
 
                                                
1425
 
                                        release_(output);
1426
 
                                } else
1427
 
                                        printf("Bad command: g <bucket> <object_key>\n");
1428
 
                                
1429
 
                                break;
1430
 
                                
1431
 
                        case 'h':  // Get the object header
1432
 
                                if (argc == 4) {
1433
 
                                        CSVector *headers;
1434
 
                                        bool found;     
1435
 
                                        S3RangeRec range =      {0,0};          
1436
 
                                        
1437
 
                                        headers = prot->s3_receive(NULL, argv[2], argv[3], &found);
1438
 
                                        if (!found)
1439
 
                                                printf("%s/%s could not be found.\n", argv[2], argv[3]);
1440
 
                                                
1441
 
                                        dump_headers(headers);
1442
 
                                                
1443
 
                                } else
1444
 
                                        printf("Bad command: h <bucket> <object_key>\n");
1445
 
                                
1446
 
                                break;
1447
 
                                
1448
 
                        case 'p':  // Put (Upload) the object
1449
 
                                if (argc == 5) {
1450
 
                                        CSFile *input;
1451
 
                                        Md5Digest digest;
1452
 
                                        CSVector *headers;
1453
 
                                        
1454
 
                                        input = CSFile::newFile(argv[4]);
1455
 
                                        push_(input);
1456
 
                                        input->open(CSFile::READONLY);
1457
 
                                        input->md5Digest(&digest);
1458
 
                                        headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize(), NULL, &digest);
1459
 
                                        dump_headers(headers);
1460
 
                                        release_(input);
1461
 
                                } else
1462
 
                                        printf("Bad command: p <bucket> <object_key> <file> \n");
1463
 
                                
1464
 
                                break;
1465
 
                                
1466
 
                        case 'c':  // Copy the object
1467
 
                                if (argc == 6) {
1468
 
                                        prot->s3_copy(NULL, argv[4], argv[5], argv[2], argv[3]);
1469
 
                                } else
1470
 
                                        printf("Bad command: c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key>\n");
1471
 
                                
1472
 
                                break;
1473
 
                                
1474
 
                        case 'C':  // Copy  objects like
1475
 
                                if (argc == 5) {
1476
 
                                        CSVector *list;
1477
 
                                        CSString *key;
1478
 
                                        
1479
 
                                        list = prot->s3_list(argv[2], argv[3]);
1480
 
                                        push_(list);
1481
 
                                        while (key = (CSString*) list->take(0)) {
1482
 
                                                printf("Copying %s\n", key->getCString());
1483
 
                                                prot->s3_copy(NULL, argv[4], key->getCString(), argv[2], key->getCString());
1484
 
                                                key->release();
1485
 
                                        }
1486
 
                                        release_(list);
1487
 
                                        
1488
 
                                } else
1489
 
                                        printf("Bad command: C <src_bucket> <object_key_prefix> <dst_bucket>\n");
1490
 
                                
1491
 
                                break;
1492
 
                        case 'l':  // List the object
1493
 
                                if ((argc == 3) || (argc == 4) || (argc == 5)) {
1494
 
                                        uint32_t max = 0;
1495
 
                                        char *prefix = NULL;
1496
 
                                        CSVector *list;
1497
 
                                        CSString *key;
1498
 
                                        
1499
 
                                        if (argc > 3) {
1500
 
                                                prefix = argv[3];
1501
 
                                                if (!strlen(prefix))
1502
 
                                                        prefix = NULL;
1503
 
                                        }
1504
 
                                        
1505
 
                                        if (argc == 5) 
1506
 
                                                max = atol(argv[4]);
1507
 
                                                
1508
 
                                        list = prot->s3_list(argv[2], prefix, max);
1509
 
                                        push_(list);
1510
 
                                        while (key = (CSString*) list->take(0)) {
1511
 
                                                printf("%s\n", key->getCString());
1512
 
                                                key->release();
1513
 
                                        }
1514
 
                                        release_(list);
1515
 
                                        
1516
 
                                } else
1517
 
                                        printf("Bad command: l <bucket> [<object_prefix> [max_list_size]] \n");
1518
 
                                
1519
 
                                break;
1520
 
                        default:
1521
 
                                printf("Unknown command.\n");
1522
 
                                show_help_info(argv[0]);
1523
 
                }
1524
 
                
1525
 
                release_(prot);
1526
 
        }
1527
 
        
1528
 
        catch_(a);              
1529
 
        self->logException();
1530
 
        
1531
 
        cont_(a);
1532
 
                
1533
 
        outer_()
1534
 
        main_thread->release();
1535
 
        cs_exit_memory();
1536
 
        CSThread::shutDown();
1537
 
        return 0;
1538
 
}
1539
 
 
1540
 
#endif
1541
 
 
1542