~juju-qa/ubuntu/yakkety/juju/juju-1.25.8

« back to all changes in this revision

Viewing changes to src/gopkg.in/mgo.v2/txn/txn.go

  • Committer: Nicholas Skaggs
  • Date: 2016-12-02 17:28:37 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161202172837-jkrbdlyjcxtrii2n
Initial commit of 1.25.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// The txn package implements support for multi-document transactions.
 
2
//
 
3
// For details check the following blog post:
 
4
//
 
5
//     http://blog.labix.org/2012/08/22/multi-doc-transactions-for-mongodb
 
6
//
 
7
package txn
 
8
 
 
9
import (
 
10
        "encoding/binary"
 
11
        "fmt"
 
12
        "reflect"
 
13
        "sort"
 
14
        "strings"
 
15
        "sync"
 
16
 
 
17
        "gopkg.in/mgo.v2"
 
18
        "gopkg.in/mgo.v2/bson"
 
19
 
 
20
        crand "crypto/rand"
 
21
        mrand "math/rand"
 
22
)
 
23
 
 
24
type state int
 
25
 
 
26
const (
 
27
        tpreparing state = 1 // One or more documents not prepared
 
28
        tprepared  state = 2 // Prepared but not yet ready to run
 
29
        taborting  state = 3 // Assertions failed, cleaning up
 
30
        tapplying  state = 4 // Changes are in progress
 
31
        taborted   state = 5 // Pre-conditions failed, nothing done
 
32
        tapplied   state = 6 // All changes applied
 
33
)
 
34
 
 
35
func (s state) String() string {
 
36
        switch s {
 
37
        case tpreparing:
 
38
                return "preparing"
 
39
        case tprepared:
 
40
                return "prepared"
 
41
        case taborting:
 
42
                return "aborting"
 
43
        case tapplying:
 
44
                return "applying"
 
45
        case taborted:
 
46
                return "aborted"
 
47
        case tapplied:
 
48
                return "applied"
 
49
        }
 
50
        panic(fmt.Errorf("unknown state: %d", s))
 
51
}
 
52
 
 
53
var rand *mrand.Rand
 
54
var randmu sync.Mutex
 
55
 
 
56
func init() {
 
57
        var seed int64
 
58
        err := binary.Read(crand.Reader, binary.BigEndian, &seed)
 
59
        if err != nil {
 
60
                panic(err)
 
61
        }
 
62
        rand = mrand.New(mrand.NewSource(seed))
 
63
}
 
64
 
 
65
type transaction struct {
 
66
        Id     bson.ObjectId `bson:"_id"`
 
67
        State  state         `bson:"s"`
 
68
        Info   interface{}   `bson:"i,omitempty"`
 
69
        Ops    []Op          `bson:"o"`
 
70
        Nonce  string        `bson:"n,omitempty"`
 
71
        Revnos []int64       `bson:"r,omitempty"`
 
72
 
 
73
        docKeysCached docKeys
 
74
}
 
75
 
 
76
func (t *transaction) String() string {
 
77
        if t.Nonce == "" {
 
78
                return t.Id.Hex()
 
79
        }
 
80
        return string(t.token())
 
81
}
 
82
 
 
83
func (t *transaction) done() bool {
 
84
        return t.State == tapplied || t.State == taborted
 
85
}
 
86
 
 
87
func (t *transaction) token() token {
 
88
        if t.Nonce == "" {
 
89
                panic("transaction has no nonce")
 
90
        }
 
91
        return tokenFor(t)
 
92
}
 
93
 
 
94
func (t *transaction) docKeys() docKeys {
 
95
        if t.docKeysCached != nil {
 
96
                return t.docKeysCached
 
97
        }
 
98
        dkeys := make(docKeys, 0, len(t.Ops))
 
99
NextOp:
 
100
        for _, op := range t.Ops {
 
101
                dkey := op.docKey()
 
102
                for i := range dkeys {
 
103
                        if dkey == dkeys[i] {
 
104
                                continue NextOp
 
105
                        }
 
106
                }
 
107
                dkeys = append(dkeys, dkey)
 
108
        }
 
109
        sort.Sort(dkeys)
 
110
        t.docKeysCached = dkeys
 
111
        return dkeys
 
112
}
 
113
 
 
114
// tokenFor returns a unique transaction token that
 
115
// is composed by t's id and a nonce. If t already has
 
116
// a nonce assigned to it, it will be used, otherwise
 
117
// a new nonce will be generated.
 
118
func tokenFor(t *transaction) token {
 
119
        nonce := t.Nonce
 
120
        if nonce == "" {
 
121
                nonce = newNonce()
 
122
        }
 
123
        return token(t.Id.Hex() + "_" + nonce)
 
124
}
 
125
 
 
126
func newNonce() string {
 
127
        randmu.Lock()
 
128
        r := rand.Uint32()
 
129
        randmu.Unlock()
 
130
        n := make([]byte, 8)
 
131
        for i := uint(0); i < 8; i++ {
 
132
                n[i] = "0123456789abcdef"[(r>>(4*i))&0xf]
 
133
        }
 
134
        return string(n)
 
135
}
 
136
 
 
137
type token string
 
138
 
 
139
func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) }
 
140
func (tt token) nonce() string     { return string(tt[25:]) }
 
141
 
 
142
// Op represents an operation to a single document that may be
 
143
// applied as part of a transaction with other operations.
 
144
type Op struct {
 
145
        // C and Id identify the collection and document this operation
 
146
        // refers to. Id is matched against the "_id" document field.
 
147
        C  string      `bson:"c"`
 
148
        Id interface{} `bson:"d"`
 
149
 
 
150
        // Assert optionally holds a query document that is used to
 
151
        // test the operation document at the time the transaction is
 
152
        // going to be applied. The assertions for all operations in
 
153
        // a transaction are tested before any changes take place,
 
154
        // and the transaction is entirely aborted if any of them
 
155
        // fails. This is also the only way to prevent a transaction
 
156
        // from being being applied (the transaction continues despite
 
157
        // the outcome of Insert, Update, and Remove).
 
158
        Assert interface{} `bson:"a,omitempty"`
 
159
 
 
160
        // The Insert, Update and Remove fields describe the mutation
 
161
        // intended by the operation. At most one of them may be set
 
162
        // per operation. If none are set, Assert must be set and the
 
163
        // operation becomes a read-only test.
 
164
        //
 
165
        // Insert holds the document to be inserted at the time the
 
166
        // transaction is applied. The Id field will be inserted
 
167
        // into the document automatically as its _id field. The
 
168
        // transaction will continue even if the document already
 
169
        // exists. Use Assert with txn.DocMissing if the insertion is
 
170
        // required.
 
171
        //
 
172
        // Update holds the update document to be applied at the time
 
173
        // the transaction is applied. The transaction will continue
 
174
        // even if a document with Id is missing. Use Assert to
 
175
        // test for the document presence or its contents.
 
176
        //
 
177
        // Remove indicates whether to remove the document with Id.
 
178
        // The transaction continues even if the document doesn't yet
 
179
        // exist at the time the transaction is applied. Use Assert
 
180
        // with txn.DocExists to make sure it will be removed.
 
181
        Insert interface{} `bson:"i,omitempty"`
 
182
        Update interface{} `bson:"u,omitempty"`
 
183
        Remove bool        `bson:"r,omitempty"`
 
184
}
 
185
 
 
186
func (op *Op) isChange() bool {
 
187
        return op.Update != nil || op.Insert != nil || op.Remove
 
188
}
 
189
 
 
190
func (op *Op) docKey() docKey {
 
191
        return docKey{op.C, op.Id}
 
192
}
 
193
 
 
194
func (op *Op) name() string {
 
195
        switch {
 
196
        case op.Update != nil:
 
197
                return "update"
 
198
        case op.Insert != nil:
 
199
                return "insert"
 
200
        case op.Remove:
 
201
                return "remove"
 
202
        case op.Assert != nil:
 
203
                return "assert"
 
204
        }
 
205
        return "none"
 
206
}
 
207
 
 
208
const (
 
209
        // DocExists and DocMissing may be used on an operation's
 
210
        // Assert value to assert that the document with the given
 
211
        // Id exists or does not exist, respectively.
 
212
        DocExists  = "d+"
 
213
        DocMissing = "d-"
 
214
)
 
215
 
 
216
// A Runner applies operations as part of a transaction onto any number
 
217
// of collections within a database. See the Run method for details.
 
218
type Runner struct {
 
219
        tc *mgo.Collection // txns
 
220
        sc *mgo.Collection // stash
 
221
        lc *mgo.Collection // log
 
222
}
 
223
 
 
224
// NewRunner returns a new transaction runner that uses tc to hold its
 
225
// transactions.
 
226
//
 
227
// Multiple transaction collections may exist in a single database, but
 
228
// all collections that are touched by operations in a given transaction
 
229
// collection must be handled exclusively by it.
 
230
//
 
231
// A second collection with the same name of tc but suffixed by ".stash"
 
232
// will be used for implementing the transactional behavior of insert
 
233
// and remove operations.
 
234
func NewRunner(tc *mgo.Collection) *Runner {
 
235
        return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil}
 
236
}
 
237
 
 
238
var ErrAborted = fmt.Errorf("transaction aborted")
 
239
 
 
240
// Run creates a new transaction with ops and runs it immediately.
 
241
// The id parameter specifies the transaction id, and may be written
 
242
// down ahead of time to later verify the success of the change and
 
243
// resume it, when the procedure is interrupted for any reason. If
 
244
// empty, a random id will be generated.
 
245
// The info parameter, if not nil, is included under the "i"
 
246
// field of the transaction document.
 
247
//
 
248
// Operations across documents are not atomically applied, but are
 
249
// guaranteed to be eventually all applied in the order provided or
 
250
// all aborted, as long as the affected documents are only modified
 
251
// through transactions. If documents are simultaneously modified
 
252
// by transactions and out of transactions the behavior is undefined.
 
253
//
 
254
// If Run returns no errors, all operations were applied successfully.
 
255
// If it returns ErrAborted, one or more operations can't be applied
 
256
// and the transaction was entirely aborted with no changes performed.
 
257
// Otherwise, if the transaction is interrupted while running for any
 
258
// reason, it may be resumed explicitly or by attempting to apply
 
259
// another transaction on any of the documents targeted by ops, as
 
260
// long as the interruption was made after the transaction document
 
261
// itself was inserted. Run Resume with the obtained transaction id
 
262
// to confirm whether the transaction was applied or not.
 
263
//
 
264
// Any number of transactions may be run concurrently, with one
 
265
// runner or many.
 
266
func (r *Runner) Run(ops []Op, id bson.ObjectId, info interface{}) (err error) {
 
267
        const efmt = "error in transaction op %d: %s"
 
268
        for i := range ops {
 
269
                op := &ops[i]
 
270
                if op.C == "" || op.Id == nil {
 
271
                        return fmt.Errorf(efmt, i, "C or Id missing")
 
272
                }
 
273
                changes := 0
 
274
                if op.Insert != nil {
 
275
                        changes++
 
276
                }
 
277
                if op.Update != nil {
 
278
                        changes++
 
279
                }
 
280
                if op.Remove {
 
281
                        changes++
 
282
                }
 
283
                if changes > 1 {
 
284
                        return fmt.Errorf(efmt, i, "more than one of Insert/Update/Remove set")
 
285
                }
 
286
                if changes == 0 && op.Assert == nil {
 
287
                        return fmt.Errorf(efmt, i, "none of Assert/Insert/Update/Remove set")
 
288
                }
 
289
        }
 
290
        if id == "" {
 
291
                id = bson.NewObjectId()
 
292
        }
 
293
 
 
294
        // Insert transaction sooner rather than later, to stay on the safer side.
 
295
        t := transaction{
 
296
                Id:    id,
 
297
                Ops:   ops,
 
298
                State: tpreparing,
 
299
                Info:  info,
 
300
        }
 
301
        if err = r.tc.Insert(&t); err != nil {
 
302
                return err
 
303
        }
 
304
        if err = flush(r, &t); err != nil {
 
305
                return err
 
306
        }
 
307
        if t.State == taborted {
 
308
                return ErrAborted
 
309
        } else if t.State != tapplied {
 
310
                panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
 
311
        }
 
312
        return nil
 
313
}
 
314
 
 
315
// ResumeAll resumes all pending transactions. All ErrAborted errors
 
316
// from individual transactions are ignored.
 
317
func (r *Runner) ResumeAll() (err error) {
 
318
        debugf("Resuming all unfinished transactions")
 
319
        iter := r.tc.Find(bson.D{{"s", bson.D{{"$in", []state{tpreparing, tprepared, tapplying}}}}}).Iter()
 
320
        var t transaction
 
321
        for iter.Next(&t) {
 
322
                if t.State == tapplied || t.State == taborted {
 
323
                        continue
 
324
                }
 
325
                debugf("Resuming %s from %q", t.Id, t.State)
 
326
                if err := flush(r, &t); err != nil {
 
327
                        return err
 
328
                }
 
329
                if !t.done() {
 
330
                        panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State))
 
331
                }
 
332
        }
 
333
        return nil
 
334
}
 
335
 
 
336
// Resume resumes the transaction with id. It returns mgo.ErrNotFound
 
337
// if the transaction is not found. Otherwise, it has the same semantics
 
338
// of the Run method after the transaction is inserted.
 
339
func (r *Runner) Resume(id bson.ObjectId) (err error) {
 
340
        t, err := r.load(id)
 
341
        if err != nil {
 
342
                return err
 
343
        }
 
344
        if !t.done() {
 
345
                debugf("Resuming %s from %q", t, t.State)
 
346
                if err := flush(r, t); err != nil {
 
347
                        return err
 
348
                }
 
349
        }
 
350
        if t.State == taborted {
 
351
                return ErrAborted
 
352
        } else if t.State != tapplied {
 
353
                panic(fmt.Errorf("invalid state for %s after flush: %q", t, t.State))
 
354
        }
 
355
        return nil
 
356
}
 
357
 
 
358
// ChangeLog enables logging of changes to the given collection
 
359
// every time a transaction that modifies content is done being
 
360
// applied.
 
361
//
 
362
// Saved documents are in the format:
 
363
//
 
364
//     {"_id": <txn id>, <collection>: {"d": [<doc id>, ...], "r": [<doc revno>, ...]}}
 
365
//
 
366
// The document revision is the value of the txn-revno field after
 
367
// the change has been applied. Negative values indicate the document
 
368
// was not present in the collection. Revisions will not change when
 
369
// updates or removes are applied to missing documents or inserts are
 
370
// attempted when the document isn't present.
 
371
func (r *Runner) ChangeLog(logc *mgo.Collection) {
 
372
        r.lc = logc
 
373
}
 
374
 
 
375
// PurgeMissing removes from collections any state that refers to transaction
 
376
// documents that for whatever reason have been lost from the system (removed
 
377
// by accident or lost in a hard crash, for example).
 
378
//
 
379
// This method should very rarely be needed, if at all, and should never be
 
380
// used during the normal operation of an application. Its purpose is to put
 
381
// a system that has seen unavoidable corruption back in a working state.
 
382
func (r *Runner) PurgeMissing(collections ...string) error {
 
383
        type M map[string]interface{}
 
384
        type S []interface{}
 
385
 
 
386
        type TDoc struct {
 
387
                Id       interface{} "_id"
 
388
                TxnQueue []string    "txn-queue"
 
389
        }
 
390
 
 
391
        found := make(map[bson.ObjectId]bool)
 
392
 
 
393
        sort.Strings(collections)
 
394
        for _, collection := range collections {
 
395
                c := r.tc.Database.C(collection)
 
396
                iter := c.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
 
397
                var tdoc TDoc
 
398
                for iter.Next(&tdoc) {
 
399
                        for _, txnToken := range tdoc.TxnQueue {
 
400
                                txnId := bson.ObjectIdHex(txnToken[:24])
 
401
                                if found[txnId] {
 
402
                                        continue
 
403
                                }
 
404
                                if r.tc.FindId(txnId).One(nil) == nil {
 
405
                                        found[txnId] = true
 
406
                                        continue
 
407
                                }
 
408
                                logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tdoc.Id, txnId)
 
409
                                err := c.UpdateId(tdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
 
410
                                if err != nil {
 
411
                                        return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
 
412
                                }
 
413
                        }
 
414
                }
 
415
                if err := iter.Close(); err != nil {
 
416
                        return fmt.Errorf("transaction queue iteration error for %s: %v", collection, err)
 
417
                }
 
418
        }
 
419
 
 
420
        type StashTDoc struct {
 
421
                Id       docKey   "_id"
 
422
                TxnQueue []string "txn-queue"
 
423
        }
 
424
 
 
425
        iter := r.sc.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
 
426
        var stdoc StashTDoc
 
427
        for iter.Next(&stdoc) {
 
428
                for _, txnToken := range stdoc.TxnQueue {
 
429
                        txnId := bson.ObjectIdHex(txnToken[:24])
 
430
                        if found[txnId] {
 
431
                                continue
 
432
                        }
 
433
                        if r.tc.FindId(txnId).One(nil) == nil {
 
434
                                found[txnId] = true
 
435
                                continue
 
436
                        }
 
437
                        logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stdoc.Id.C, stdoc.Id.Id, txnId)
 
438
                        err := r.sc.UpdateId(stdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
 
439
                        if err != nil {
 
440
                                return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
 
441
                        }
 
442
                }
 
443
        }
 
444
        if err := iter.Close(); err != nil {
 
445
                return fmt.Errorf("transaction stash iteration error: %v", err)
 
446
        }
 
447
 
 
448
        return nil
 
449
}
 
450
 
 
451
func (r *Runner) load(id bson.ObjectId) (*transaction, error) {
 
452
        var t transaction
 
453
        err := r.tc.FindId(id).One(&t)
 
454
        if err == mgo.ErrNotFound {
 
455
                return nil, fmt.Errorf("cannot find transaction %s", id)
 
456
        } else if err != nil {
 
457
                return nil, err
 
458
        }
 
459
        return &t, nil
 
460
}
 
461
 
 
462
type typeNature int
 
463
 
 
464
const (
 
465
        // The order of these values matters. Transactions
 
466
        // from applications using different ordering will
 
467
        // be incompatible with each other.
 
468
        _ typeNature = iota
 
469
        natureString
 
470
        natureInt
 
471
        natureFloat
 
472
        natureBool
 
473
        natureStruct
 
474
)
 
475
 
 
476
func valueNature(v interface{}) (value interface{}, nature typeNature) {
 
477
        rv := reflect.ValueOf(v)
 
478
        switch rv.Kind() {
 
479
        case reflect.String:
 
480
                return rv.String(), natureString
 
481
        case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
 
482
                return rv.Int(), natureInt
 
483
        case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
 
484
                return int64(rv.Uint()), natureInt
 
485
        case reflect.Float32, reflect.Float64:
 
486
                return rv.Float(), natureFloat
 
487
        case reflect.Bool:
 
488
                return rv.Bool(), natureBool
 
489
        case reflect.Struct:
 
490
                return v, natureStruct
 
491
        }
 
492
        panic("document id type unsupported by txn: " + rv.Kind().String())
 
493
}
 
494
 
 
495
type docKey struct {
 
496
        C  string
 
497
        Id interface{}
 
498
}
 
499
 
 
500
type docKeys []docKey
 
501
 
 
502
func (ks docKeys) Len() int      { return len(ks) }
 
503
func (ks docKeys) Swap(i, j int) { ks[i], ks[j] = ks[j], ks[i] }
 
504
func (ks docKeys) Less(i, j int) bool {
 
505
        a, b := ks[i], ks[j]
 
506
        if a.C != b.C {
 
507
                return a.C < b.C
 
508
        }
 
509
        return valuecmp(a.Id, b.Id) == -1
 
510
}
 
511
 
 
512
func valuecmp(a, b interface{}) int {
 
513
        av, an := valueNature(a)
 
514
        bv, bn := valueNature(b)
 
515
        if an < bn {
 
516
                return -1
 
517
        }
 
518
        if an > bn {
 
519
                return 1
 
520
        }
 
521
 
 
522
        if av == bv {
 
523
                return 0
 
524
        }
 
525
        var less bool
 
526
        switch an {
 
527
        case natureString:
 
528
                less = av.(string) < bv.(string)
 
529
        case natureInt:
 
530
                less = av.(int64) < bv.(int64)
 
531
        case natureFloat:
 
532
                less = av.(float64) < bv.(float64)
 
533
        case natureBool:
 
534
                less = !av.(bool) && bv.(bool)
 
535
        case natureStruct:
 
536
                less = structcmp(av, bv) == -1
 
537
        default:
 
538
                panic("unreachable")
 
539
        }
 
540
        if less {
 
541
                return -1
 
542
        }
 
543
        return 1
 
544
}
 
545
 
 
546
func structcmp(a, b interface{}) int {
 
547
        av := reflect.ValueOf(a)
 
548
        bv := reflect.ValueOf(b)
 
549
 
 
550
        var ai, bi = 0, 0
 
551
        var an, bn = av.NumField(), bv.NumField()
 
552
        var avi, bvi interface{}
 
553
        var af, bf reflect.StructField
 
554
        for {
 
555
                for ai < an {
 
556
                        af = av.Type().Field(ai)
 
557
                        if isExported(af.Name) {
 
558
                                avi = av.Field(ai).Interface()
 
559
                                ai++
 
560
                                break
 
561
                        }
 
562
                        ai++
 
563
                }
 
564
                for bi < bn {
 
565
                        bf = bv.Type().Field(bi)
 
566
                        if isExported(bf.Name) {
 
567
                                bvi = bv.Field(bi).Interface()
 
568
                                bi++
 
569
                                break
 
570
                        }
 
571
                        bi++
 
572
                }
 
573
                if n := valuecmp(avi, bvi); n != 0 {
 
574
                        return n
 
575
                }
 
576
                nameA := getFieldName(af)
 
577
                nameB := getFieldName(bf)
 
578
                if nameA < nameB {
 
579
                        return -1
 
580
                }
 
581
                if nameA > nameB {
 
582
                        return 1
 
583
                }
 
584
                if ai == an && bi == bn {
 
585
                        return 0
 
586
                }
 
587
                if ai == an || bi == bn {
 
588
                        if ai == bn {
 
589
                                return -1
 
590
                        }
 
591
                        return 1
 
592
                }
 
593
        }
 
594
        panic("unreachable")
 
595
}
 
596
 
 
597
func isExported(name string) bool {
 
598
        a := name[0]
 
599
        return a >= 'A' && a <= 'Z'
 
600
}
 
601
 
 
602
func getFieldName(f reflect.StructField) string {
 
603
        name := f.Tag.Get("bson")
 
604
        if i := strings.Index(name, ","); i >= 0 {
 
605
                name = name[:i]
 
606
        }
 
607
        if name == "" {
 
608
                name = strings.ToLower(f.Name)
 
609
        }
 
610
        return name
 
611
}