~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/gopkg.in/mgo.v2/bulk.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package mgo
 
2
 
 
3
import (
 
4
        "bytes"
 
5
        "sort"
 
6
 
 
7
        "gopkg.in/mgo.v2/bson"
 
8
)
 
9
 
 
10
// Bulk represents an operation that can be prepared with several
 
11
// orthogonal changes before being delivered to the server.
 
12
//
 
13
// MongoDB servers older than version 2.6 do not have proper support for bulk
 
14
// operations, so the driver attempts to map its API as much as possible into
 
15
// the functionality that works. In particular, in those releases updates and
 
16
// removals are sent individually, and inserts are sent in bulk but have
 
17
// suboptimal error reporting compared to more recent versions of the server.
 
18
// See the documentation of BulkErrorCase for details on that.
 
19
//
 
20
// Relevant documentation:
 
21
//
 
22
//   http://blog.mongodb.org/post/84922794768/mongodbs-new-bulk-api
 
23
//
 
24
type Bulk struct {
 
25
        c       *Collection
 
26
        opcount int
 
27
        actions []bulkAction
 
28
        ordered bool
 
29
}
 
30
 
 
31
type bulkOp int
 
32
 
 
33
const (
 
34
        bulkInsert bulkOp = iota + 1
 
35
        bulkUpdate
 
36
        bulkUpdateAll
 
37
        bulkRemove
 
38
)
 
39
 
 
40
type bulkAction struct {
 
41
        op   bulkOp
 
42
        docs []interface{}
 
43
        idxs []int
 
44
}
 
45
 
 
46
type bulkUpdateOp []interface{}
 
47
type bulkDeleteOp []interface{}
 
48
 
 
49
// BulkResult holds the results for a bulk operation.
 
50
type BulkResult struct {
 
51
        Matched  int
 
52
        Modified int // Available only for MongoDB 2.6+
 
53
 
 
54
        // Be conservative while we understand exactly how to report these
 
55
        // results in a useful and convenient way, and also how to emulate
 
56
        // them with prior servers.
 
57
        private bool
 
58
}
 
59
 
 
60
// BulkError holds an error returned from running a Bulk operation.
 
61
// Individual errors may be obtained and inspected via the Cases method.
 
62
type BulkError struct {
 
63
        ecases []BulkErrorCase
 
64
}
 
65
 
 
66
func (e *BulkError) Error() string {
 
67
        if len(e.ecases) == 0 {
 
68
                return "invalid BulkError instance: no errors"
 
69
        }
 
70
        if len(e.ecases) == 1 {
 
71
                return e.ecases[0].Err.Error()
 
72
        }
 
73
        msgs := make([]string, 0, len(e.ecases))
 
74
        seen := make(map[string]bool)
 
75
        for _, ecase := range e.ecases {
 
76
                msg := ecase.Err.Error()
 
77
                if !seen[msg] {
 
78
                        seen[msg] = true
 
79
                        msgs = append(msgs, msg)
 
80
                }
 
81
        }
 
82
        if len(msgs) == 1 {
 
83
                return msgs[0]
 
84
        }
 
85
        var buf bytes.Buffer
 
86
        buf.WriteString("multiple errors in bulk operation:\n")
 
87
        for _, msg := range msgs {
 
88
                buf.WriteString("  - ")
 
89
                buf.WriteString(msg)
 
90
                buf.WriteByte('\n')
 
91
        }
 
92
        return buf.String()
 
93
}
 
94
 
 
95
type bulkErrorCases []BulkErrorCase
 
96
 
 
97
func (slice bulkErrorCases) Len() int           { return len(slice) }
 
98
func (slice bulkErrorCases) Less(i, j int) bool { return slice[i].Index < slice[j].Index }
 
99
func (slice bulkErrorCases) Swap(i, j int)      { slice[i], slice[j] = slice[j], slice[i] }
 
100
 
 
101
// BulkErrorCase holds an individual error found while attempting a single change
 
102
// within a bulk operation, and the position in which it was enqueued.
 
103
//
 
104
// MongoDB servers older than version 2.6 do not have proper support for bulk
 
105
// operations, so the driver attempts to map its API as much as possible into
 
106
// the functionality that works. In particular, only the last error is reported
 
107
// for bulk inserts and without any positional information, so the Index
 
108
// field is set to -1 in these cases.
 
109
type BulkErrorCase struct {
 
110
        Index int // Position of operation that failed, or -1 if unknown.
 
111
        Err   error
 
112
}
 
113
 
 
114
// Cases returns all individual errors found while attempting the requested changes.
 
115
//
 
116
// See the documentation of BulkErrorCase for limitations in older MongoDB releases.
 
117
func (e *BulkError) Cases() []BulkErrorCase {
 
118
        return e.ecases
 
119
}
 
120
 
 
121
// Bulk returns a value to prepare the execution of a bulk operation.
 
122
func (c *Collection) Bulk() *Bulk {
 
123
        return &Bulk{c: c, ordered: true}
 
124
}
 
125
 
 
126
// Unordered puts the bulk operation in unordered mode.
 
127
//
 
128
// In unordered mode the indvidual operations may be sent
 
129
// out of order, which means latter operations may proceed
 
130
// even if prior ones have failed.
 
131
func (b *Bulk) Unordered() {
 
132
        b.ordered = false
 
133
}
 
134
 
 
135
func (b *Bulk) action(op bulkOp, opcount int) *bulkAction {
 
136
        var action *bulkAction
 
137
        if len(b.actions) > 0 && b.actions[len(b.actions)-1].op == op {
 
138
                action = &b.actions[len(b.actions)-1]
 
139
        } else if !b.ordered {
 
140
                for i := range b.actions {
 
141
                        if b.actions[i].op == op {
 
142
                                action = &b.actions[i]
 
143
                                break
 
144
                        }
 
145
                }
 
146
        }
 
147
        if action == nil {
 
148
                b.actions = append(b.actions, bulkAction{op: op})
 
149
                action = &b.actions[len(b.actions)-1]
 
150
        }
 
151
        for i := 0; i < opcount; i++ {
 
152
                action.idxs = append(action.idxs, b.opcount)
 
153
                b.opcount++
 
154
        }
 
155
        return action
 
156
}
 
157
 
 
158
// Insert queues up the provided documents for insertion.
 
159
func (b *Bulk) Insert(docs ...interface{}) {
 
160
        action := b.action(bulkInsert, len(docs))
 
161
        action.docs = append(action.docs, docs...)
 
162
}
 
163
 
 
164
// Remove queues up the provided selectors for removing matching documents.
 
165
// Each selector will remove only a single matching document.
 
166
func (b *Bulk) Remove(selectors ...interface{}) {
 
167
        action := b.action(bulkRemove, len(selectors))
 
168
        for _, selector := range selectors {
 
169
                if selector == nil {
 
170
                        selector = bson.D{}
 
171
                }
 
172
                action.docs = append(action.docs, &deleteOp{
 
173
                        Collection: b.c.FullName,
 
174
                        Selector:   selector,
 
175
                        Flags:      1,
 
176
                        Limit:      1,
 
177
                })
 
178
        }
 
179
}
 
180
 
 
181
// RemoveAll queues up the provided selectors for removing all matching documents.
 
182
// Each selector will remove all matching documents.
 
183
func (b *Bulk) RemoveAll(selectors ...interface{}) {
 
184
        action := b.action(bulkRemove, len(selectors))
 
185
        for _, selector := range selectors {
 
186
                if selector == nil {
 
187
                        selector = bson.D{}
 
188
                }
 
189
                action.docs = append(action.docs, &deleteOp{
 
190
                        Collection: b.c.FullName,
 
191
                        Selector:   selector,
 
192
                        Flags:      0,
 
193
                        Limit:      0,
 
194
                })
 
195
        }
 
196
}
 
197
 
 
198
// Update queues up the provided pairs of updating instructions.
 
199
// The first element of each pair selects which documents must be
 
200
// updated, and the second element defines how to update it.
 
201
// Each pair matches exactly one document for updating at most.
 
202
func (b *Bulk) Update(pairs ...interface{}) {
 
203
        if len(pairs)%2 != 0 {
 
204
                panic("Bulk.Update requires an even number of parameters")
 
205
        }
 
206
        action := b.action(bulkUpdate, len(pairs)/2)
 
207
        for i := 0; i < len(pairs); i += 2 {
 
208
                selector := pairs[i]
 
209
                if selector == nil {
 
210
                        selector = bson.D{}
 
211
                }
 
212
                action.docs = append(action.docs, &updateOp{
 
213
                        Collection: b.c.FullName,
 
214
                        Selector:   selector,
 
215
                        Update:     pairs[i+1],
 
216
                })
 
217
        }
 
218
}
 
219
 
 
220
// UpdateAll queues up the provided pairs of updating instructions.
 
221
// The first element of each pair selects which documents must be
 
222
// updated, and the second element defines how to update it.
 
223
// Each pair updates all documents matching the selector.
 
224
func (b *Bulk) UpdateAll(pairs ...interface{}) {
 
225
        if len(pairs)%2 != 0 {
 
226
                panic("Bulk.UpdateAll requires an even number of parameters")
 
227
        }
 
228
        action := b.action(bulkUpdate, len(pairs)/2)
 
229
        for i := 0; i < len(pairs); i += 2 {
 
230
                selector := pairs[i]
 
231
                if selector == nil {
 
232
                        selector = bson.D{}
 
233
                }
 
234
                action.docs = append(action.docs, &updateOp{
 
235
                        Collection: b.c.FullName,
 
236
                        Selector:   selector,
 
237
                        Update:     pairs[i+1],
 
238
                        Flags:      2,
 
239
                        Multi:      true,
 
240
                })
 
241
        }
 
242
}
 
243
 
 
244
// Upsert queues up the provided pairs of upserting instructions.
 
245
// The first element of each pair selects which documents must be
 
246
// updated, and the second element defines how to update it.
 
247
// Each pair matches exactly one document for updating at most.
 
248
func (b *Bulk) Upsert(pairs ...interface{}) {
 
249
        if len(pairs)%2 != 0 {
 
250
                panic("Bulk.Update requires an even number of parameters")
 
251
        }
 
252
        action := b.action(bulkUpdate, len(pairs)/2)
 
253
        for i := 0; i < len(pairs); i += 2 {
 
254
                selector := pairs[i]
 
255
                if selector == nil {
 
256
                        selector = bson.D{}
 
257
                }
 
258
                action.docs = append(action.docs, &updateOp{
 
259
                        Collection: b.c.FullName,
 
260
                        Selector:   selector,
 
261
                        Update:     pairs[i+1],
 
262
                        Flags:      1,
 
263
                        Upsert:     true,
 
264
                })
 
265
        }
 
266
}
 
267
 
 
268
// Run runs all the operations queued up.
 
269
//
 
270
// If an error is reported on an unordered bulk operation, the error value may
 
271
// be an aggregation of all issues observed. As an exception to that, Insert
 
272
// operations running on MongoDB versions prior to 2.6 will report the last
 
273
// error only due to a limitation in the wire protocol.
 
274
func (b *Bulk) Run() (*BulkResult, error) {
 
275
        var result BulkResult
 
276
        var berr BulkError
 
277
        var failed bool
 
278
        for i := range b.actions {
 
279
                action := &b.actions[i]
 
280
                var ok bool
 
281
                switch action.op {
 
282
                case bulkInsert:
 
283
                        ok = b.runInsert(action, &result, &berr)
 
284
                case bulkUpdate:
 
285
                        ok = b.runUpdate(action, &result, &berr)
 
286
                case bulkRemove:
 
287
                        ok = b.runRemove(action, &result, &berr)
 
288
                default:
 
289
                        panic("unknown bulk operation")
 
290
                }
 
291
                if !ok {
 
292
                        failed = true
 
293
                        if b.ordered {
 
294
                                break
 
295
                        }
 
296
                }
 
297
        }
 
298
        if failed {
 
299
                sort.Sort(bulkErrorCases(berr.ecases))
 
300
                return nil, &berr
 
301
        }
 
302
        return &result, nil
 
303
}
 
304
 
 
305
func (b *Bulk) runInsert(action *bulkAction, result *BulkResult, berr *BulkError) bool {
 
306
        op := &insertOp{b.c.FullName, action.docs, 0}
 
307
        if !b.ordered {
 
308
                op.flags = 1 // ContinueOnError
 
309
        }
 
310
        lerr, err := b.c.writeOp(op, b.ordered)
 
311
        return b.checkSuccess(action, berr, lerr, err)
 
312
}
 
313
 
 
314
func (b *Bulk) runUpdate(action *bulkAction, result *BulkResult, berr *BulkError) bool {
 
315
        lerr, err := b.c.writeOp(bulkUpdateOp(action.docs), b.ordered)
 
316
        if lerr != nil {
 
317
                result.Matched += lerr.N
 
318
                result.Modified += lerr.modified
 
319
        }
 
320
        return b.checkSuccess(action, berr, lerr, err)
 
321
}
 
322
 
 
323
func (b *Bulk) runRemove(action *bulkAction, result *BulkResult, berr *BulkError) bool {
 
324
        lerr, err := b.c.writeOp(bulkDeleteOp(action.docs), b.ordered)
 
325
        if lerr != nil {
 
326
                result.Matched += lerr.N
 
327
                result.Modified += lerr.modified
 
328
        }
 
329
        return b.checkSuccess(action, berr, lerr, err)
 
330
}
 
331
 
 
332
func (b *Bulk) checkSuccess(action *bulkAction, berr *BulkError, lerr *LastError, err error) bool {
 
333
        if lerr != nil && len(lerr.ecases) > 0 {
 
334
                for i := 0; i < len(lerr.ecases); i++ {
 
335
                        // Map back from the local error index into the visible one.
 
336
                        ecase := lerr.ecases[i]
 
337
                        idx := ecase.Index
 
338
                        if idx >= 0 {
 
339
                                idx = action.idxs[idx]
 
340
                        }
 
341
                        berr.ecases = append(berr.ecases, BulkErrorCase{idx, ecase.Err})
 
342
                }
 
343
                return false
 
344
        } else if err != nil {
 
345
                for i := 0; i < len(action.idxs); i++ {
 
346
                        berr.ecases = append(berr.ecases, BulkErrorCase{action.idxs[i], err})
 
347
                }
 
348
                return false
 
349
        }
 
350
        return true
 
351
}