~nick-craig-wood/goamz/list-buckets2

« back to all changes in this revision

Viewing changes to s3/multi.go

  • Committer: Gustavo Niemeyer
  • Date: 2013-02-11 17:38:51 UTC
  • mfrom: (27.2.6 multipart-uploads)
  • Revision ID: gustavo@niemeyer.net-20130211173851-l89c3gxj13hd74gt
s3: add support for multipart uploads

R=jameinel, rog
CC=
https://codereview.appspot.com/7237068

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package s3
 
2
 
 
3
import (
 
4
        "bytes"
 
5
        "crypto/md5"
 
6
        "encoding/base64"
 
7
        "encoding/xml"
 
8
        "errors"
 
9
        "io"
 
10
        "sort"
 
11
        "strconv"
 
12
)
 
13
 
 
14
// Multi represents an unfinished multipart upload.
 
15
//
 
16
// Multipart uploads allow sending big objects in smaller chunks.
 
17
// After all parts have been sent, the upload must be explicitly
 
18
// completed by calling Complete with the list of parts.
 
19
//
 
20
// See http://goo.gl/vJfTG for an overview of multipart uploads.
 
21
type Multi struct {
 
22
        Bucket   *Bucket
 
23
        Key      string
 
24
        UploadId string
 
25
}
 
26
 
 
27
var listMultiMax = 1000
 
28
 
 
29
type listMultiResp struct {
 
30
        NextKeyMarker      string
 
31
        NextUploadIdMarker string
 
32
        IsTruncated        bool
 
33
        Upload             []Multi
 
34
        CommonPrefixes     []string `xml:"CommonPrefixes>Prefix"`
 
35
}
 
36
 
 
37
// ListMulti returns the list of unfinished multipart uploads in b.
 
38
//
 
39
// The prefix parameter limits the response to keys that begin with the
 
40
// specified prefix. You can use prefixes to separate a bucket into different
 
41
// groupings of keys (to get the feeling of folders, for example).
 
42
//
 
43
// The delim parameter causes the response to group all of the keys that
 
44
// share a common prefix up to the next delimiter in a single entry within
 
45
// the CommonPrefixes field. You can use delimiters to separate a bucket
 
46
// into different groupings of keys, similar to how folders would work.
 
47
//
 
48
// See http://goo.gl/ePioY for details.
 
49
func (b *Bucket) ListMulti(prefix, delim string) (multis []*Multi, prefixes []string, err error) {
 
50
        params := map[string][]string{
 
51
                "uploads":     {""},
 
52
                "max-uploads": {strconv.FormatInt(int64(listMultiMax), 10)},
 
53
                "prefix":      {prefix},
 
54
                "delimiter":   {delim},
 
55
        }
 
56
        for attempt := attempts.Start(); attempt.Next(); {
 
57
                req := &request{
 
58
                        method: "GET",
 
59
                        bucket: b.Name,
 
60
                        params: params,
 
61
                }
 
62
                var resp listMultiResp
 
63
                err := b.S3.query(req, &resp)
 
64
                if shouldRetry(err) && attempt.HasNext() {
 
65
                        continue
 
66
                }
 
67
                if err != nil {
 
68
                        return nil, nil, err
 
69
                }
 
70
                for i := range resp.Upload {
 
71
                        multi := &resp.Upload[i]
 
72
                        multi.Bucket = b
 
73
                        multis = append(multis, multi)
 
74
                }
 
75
                prefixes = append(prefixes, resp.CommonPrefixes...)
 
76
                if !resp.IsTruncated {
 
77
                        return multis, prefixes, nil
 
78
                }
 
79
                params["key-marker"] = []string{resp.NextKeyMarker}
 
80
                params["upload-id-marker"] = []string{resp.NextUploadIdMarker}
 
81
                attempt = attempts.Start() // Last request worked.
 
82
        }
 
83
        panic("unreachable")
 
84
}
 
85
 
 
86
// InitMulti initializes a new multipart upload at the provided
 
87
// key inside b and returns a value for manipulating it.
 
88
//
 
89
// See http://goo.gl/XP8kL for details.
 
90
func (b *Bucket) InitMulti(key string, contType string, perm ACL) (*Multi, error) {
 
91
        headers := map[string][]string{
 
92
                "Content-Type":   {contType},
 
93
                "Content-Length": {"0"},
 
94
                "x-amz-acl":      {string(perm)},
 
95
        }
 
96
        params := map[string][]string{
 
97
                "uploads": {""},
 
98
        }
 
99
        req := &request{
 
100
                method:  "POST",
 
101
                bucket:  b.Name,
 
102
                path:    key,
 
103
                headers: headers,
 
104
                params:  params,
 
105
        }
 
106
        var err error
 
107
        var resp struct {
 
108
                UploadId string `xml:"UploadId"`
 
109
        }
 
110
        for attempt := attempts.Start(); attempt.Next(); {
 
111
                err = b.S3.query(req, &resp)
 
112
                if !shouldRetry(err) {
 
113
                        break
 
114
                }
 
115
        }
 
116
        if err != nil {
 
117
                return nil, err
 
118
        }
 
119
        return &Multi{Bucket: b, Key: key, UploadId: resp.UploadId}, nil
 
120
}
 
121
 
 
122
// PutPart sends part n of the multipart upload, reading all the content from r.
 
123
// Each part, except for the last one, must be at least 5MB in size.
 
124
//
 
125
// See http://goo.gl/pqZer for details.
 
126
func (m *Multi) PutPart(n int, r io.ReadSeeker) (Part, error) {
 
127
        length, b64md5, err := seekerInfo(r)
 
128
        if err != nil {
 
129
                return Part{}, err
 
130
        }
 
131
        headers := map[string][]string{
 
132
                "Content-Length": {strconv.FormatInt(length, 10)},
 
133
                "Content-MD5":    {b64md5},
 
134
        }
 
135
        params := map[string][]string{
 
136
                "uploadId":   {m.UploadId},
 
137
                "partNumber": {strconv.FormatInt(int64(n), 10)},
 
138
        }
 
139
        for attempt := attempts.Start(); attempt.Next(); {
 
140
                _, err := r.Seek(0, 0)
 
141
                if err != nil {
 
142
                        return Part{}, err
 
143
                }
 
144
                req := &request{
 
145
                        method:  "PUT",
 
146
                        bucket:  m.Bucket.Name,
 
147
                        path:    m.Key,
 
148
                        headers: headers,
 
149
                        params:  params,
 
150
                        payload: r,
 
151
                }
 
152
                err = m.Bucket.S3.prepare(req)
 
153
                if err != nil {
 
154
                        return Part{}, err
 
155
                }
 
156
                resp, err := m.Bucket.S3.run(req, nil)
 
157
                if shouldRetry(err) && attempt.HasNext() {
 
158
                        continue
 
159
                }
 
160
                if err != nil {
 
161
                        return Part{}, err
 
162
                }
 
163
                etag := resp.Header.Get("ETag")
 
164
                if etag == "" {
 
165
                        return Part{}, errors.New("part upload succeeded with no ETag")
 
166
                }
 
167
                return Part{n, etag, length}, nil
 
168
        }
 
169
        panic("unreachable")
 
170
}
 
171
 
 
172
func seekerInfo(r io.ReadSeeker) (length int64, b64md5 string, err error) {
 
173
        _, err = r.Seek(0, 0)
 
174
        if err != nil {
 
175
                return 0, "", err
 
176
        }
 
177
        digest := md5.New()
 
178
        length, err = io.Copy(digest, r)
 
179
        if err != nil {
 
180
                return 0, "", err
 
181
        }
 
182
        b64md5 = base64.StdEncoding.EncodeToString(digest.Sum(nil))
 
183
        return length, b64md5, nil
 
184
}
 
185
 
 
186
type Part struct {
 
187
        N    int `xml:"PartNumber"`
 
188
        ETag string
 
189
        Size int64
 
190
}
 
191
 
 
192
type listPartsResp struct {
 
193
        NextPartNumberMarker string
 
194
        IsTruncated          bool
 
195
        Part                 []Part
 
196
}
 
197
 
 
198
var listPartsMax = 1000
 
199
 
 
200
// ListParts returns the list of previously uploaded parts in m.
 
201
//
 
202
// See http://goo.gl/ePioY for details.
 
203
func (m *Multi) ListParts() ([]Part, error) {
 
204
        params := map[string][]string{
 
205
                "uploadId":  {m.UploadId},
 
206
                "max-parts": {strconv.FormatInt(int64(listPartsMax), 10)},
 
207
        }
 
208
        var parts []Part
 
209
        for attempt := attempts.Start(); attempt.Next(); {
 
210
                req := &request{
 
211
                        method: "GET",
 
212
                        bucket: m.Bucket.Name,
 
213
                        path:   m.Key,
 
214
                        params: params,
 
215
                }
 
216
                var resp listPartsResp
 
217
                err := m.Bucket.S3.query(req, &resp)
 
218
                if shouldRetry(err) && attempt.HasNext() {
 
219
                        continue
 
220
                }
 
221
                if err != nil {
 
222
                        return nil, err
 
223
                }
 
224
                parts = append(parts, resp.Part...)
 
225
                if !resp.IsTruncated {
 
226
                        return parts, nil
 
227
                }
 
228
                params["part-number-marker"] = []string{resp.NextPartNumberMarker}
 
229
                attempt = attempts.Start() // Last request worked.
 
230
        }
 
231
        panic("unreachable")
 
232
}
 
233
 
 
234
type completeUpload struct {
 
235
        XMLName xml.Name      `xml:"CompleteMultipartUpload"`
 
236
        Parts   completeParts `xml:"Part"`
 
237
}
 
238
 
 
239
type completePart struct {
 
240
        PartNumber int
 
241
        ETag       string
 
242
}
 
243
 
 
244
type completeParts []completePart
 
245
 
 
246
func (p completeParts) Len() int           { return len(p) }
 
247
func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber }
 
248
func (p completeParts) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
 
249
 
 
250
// Complete assembles the given previously uploaded parts into the
 
251
// final object. This operation may take several minutes.
 
252
//
 
253
// See http://goo.gl/2Z7Tw for details.
 
254
func (m *Multi) Complete(parts []Part) error {
 
255
        params := map[string][]string{
 
256
                "uploadId": {m.UploadId},
 
257
        }
 
258
        c := completeUpload{}
 
259
        for _, p := range parts {
 
260
                c.Parts = append(c.Parts, completePart{p.N, p.ETag})
 
261
        }
 
262
        sort.Sort(c.Parts)
 
263
        data, err := xml.Marshal(&c)
 
264
        if err != nil {
 
265
                return err
 
266
        }
 
267
        for attempt := attempts.Start(); attempt.Next(); {
 
268
                req := &request{
 
269
                        method:  "POST",
 
270
                        bucket:  m.Bucket.Name,
 
271
                        path:    m.Key,
 
272
                        params:  params,
 
273
                        payload: bytes.NewReader(data),
 
274
                }
 
275
                err := m.Bucket.S3.query(req, nil)
 
276
                if shouldRetry(err) && attempt.HasNext() {
 
277
                        continue
 
278
                }
 
279
                return err
 
280
        }
 
281
        panic("unreachable")
 
282
}
 
283
 
 
284
// Abort deletes an unifinished multipart upload and any previously
 
285
// uploaded parts for it.
 
286
//
 
287
// After a multipart upload is aborted, no additional parts can be
 
288
// uploaded using it. However, if any part uploads are currently in
 
289
// progress, those part uploads might or might not succeed. As a result,
 
290
// it might be necessary to abort a given multipart upload multiple
 
291
// times in order to completely free all storage consumed by all parts.
 
292
//
 
293
// NOTE: If the described scenario happens to you, please report back to
 
294
// the goamz authors with details. In the future such retrying should be
 
295
// handled internally, but it's not clear what happens precisely (Is an
 
296
// error returned? Is the issue completely undetectable?).
 
297
//
 
298
// See http://goo.gl/dnyJw for details.
 
299
func (m *Multi) Abort() error {
 
300
        params := map[string][]string{
 
301
                "uploadId":   {m.UploadId},
 
302
        }
 
303
        for attempt := attempts.Start(); attempt.Next(); {
 
304
                req := &request{
 
305
                        method:  "DELETE",
 
306
                        bucket:  m.Bucket.Name,
 
307
                        path:    m.Key,
 
308
                        params:  params,
 
309
                }
 
310
                err := m.Bucket.S3.query(req, nil)
 
311
                if shouldRetry(err) && attempt.HasNext() {
 
312
                        continue
 
313
                }
 
314
                return err
 
315
        }
 
316
        panic("unreachable")
 
317
}