~juju-qa/ubuntu/yakkety/juju/juju-1.25.8

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/state/txns.go

  • Committer: Nicholas Skaggs
  • Date: 2016-12-02 17:28:37 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161202172837-jkrbdlyjcxtrii2n
Initial commit of 1.25.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012-2014 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package state
 
5
 
 
6
import (
 
7
        "reflect"
 
8
 
 
9
        "github.com/juju/errors"
 
10
        jujutxn "github.com/juju/txn"
 
11
        "gopkg.in/mgo.v2/bson"
 
12
        "gopkg.in/mgo.v2/txn"
 
13
)
 
14
 
 
15
// readTxnRevno is a convenience method delegating to the state's Database.
 
16
func (st *State) readTxnRevno(collectionName string, id interface{}) (int64, error) {
 
17
        collection, closer := st.database.GetCollection(collectionName)
 
18
        defer closer()
 
19
        query := collection.FindId(id).Select(bson.D{{"txn-revno", 1}})
 
20
        var result struct {
 
21
                TxnRevno int64 `bson:"txn-revno"`
 
22
        }
 
23
        err := query.One(&result)
 
24
        return result.TxnRevno, errors.Trace(err)
 
25
}
 
26
 
 
27
// runTransaction is a convenience method delegating to the state's Database.
 
28
func (st *State) runTransaction(ops []txn.Op) error {
 
29
        runner, closer := st.database.TransactionRunner()
 
30
        defer closer()
 
31
        return runner.RunTransaction(ops)
 
32
}
 
33
 
 
34
// runRawTransaction is a convenience method that will run a single
 
35
// transaction using a "raw" transaction runner that won't perform
 
36
// environment filtering.
 
37
func (st *State) runRawTransaction(ops []txn.Op) error {
 
38
        runner, closer := st.database.TransactionRunner()
 
39
        defer closer()
 
40
        if multiRunner, ok := runner.(*multiEnvRunner); ok {
 
41
                runner = multiRunner.rawRunner
 
42
        }
 
43
        return runner.RunTransaction(ops)
 
44
}
 
45
 
 
46
// run is a convenience method delegating to the state's Database.
 
47
func (st *State) run(transactions jujutxn.TransactionSource) error {
 
48
        runner, closer := st.database.TransactionRunner()
 
49
        defer closer()
 
50
        return runner.Run(transactions)
 
51
}
 
52
 
 
53
// ResumeTransactions resumes all pending transactions.
 
54
func (st *State) ResumeTransactions() error {
 
55
        runner, closer := st.database.TransactionRunner()
 
56
        defer closer()
 
57
        return runner.ResumeTransactions()
 
58
}
 
59
 
 
60
// MaybePruneTransactions removes data for completed transactions.
 
61
func (st *State) MaybePruneTransactions() error {
 
62
        runner, closer := st.database.TransactionRunner()
 
63
        defer closer()
 
64
        // Prune txns only when txn count has doubled since last prune.
 
65
        return runner.MaybePruneTransactions(2.0)
 
66
}
 
67
 
 
68
type multiEnvRunner struct {
 
69
        rawRunner jujutxn.Runner
 
70
        schema    collectionSchema
 
71
        envUUID   string
 
72
}
 
73
 
 
74
// RunTransaction is part of the jujutxn.Runner interface. Operations
 
75
// that affect multi-environment collections will be modified in-place
 
76
// to ensure correct interaction with these collections.
 
77
func (r *multiEnvRunner) RunTransaction(ops []txn.Op) error {
 
78
        ops, err := r.updateOps(ops)
 
79
        if err != nil {
 
80
                return errors.Trace(err)
 
81
        }
 
82
        return r.rawRunner.RunTransaction(ops)
 
83
}
 
84
 
 
85
// Run is part of the jujutxn.Runner interface. Operations returned by
 
86
// the given "transactions" function that affect multi-environment
 
87
// collections will be modified in-place to ensure correct interaction
 
88
// with these collections.
 
89
func (r *multiEnvRunner) Run(transactions jujutxn.TransactionSource) error {
 
90
        return r.rawRunner.Run(func(attempt int) ([]txn.Op, error) {
 
91
                ops, err := transactions(attempt)
 
92
                if err != nil {
 
93
                        // Don't use Trace here as jujutxn doens't use juju/errors
 
94
                        // and won't deal correctly with some returned errors.
 
95
                        return nil, err
 
96
                }
 
97
                ops, err = r.updateOps(ops)
 
98
                if err != nil {
 
99
                        return nil, errors.Trace(err)
 
100
                }
 
101
                return ops, nil
 
102
        })
 
103
}
 
104
 
 
105
// ResumeTransactions is part of the jujutxn.Runner interface.
 
106
func (r *multiEnvRunner) ResumeTransactions() error {
 
107
        return r.rawRunner.ResumeTransactions()
 
108
}
 
109
 
 
110
// MaybePruneTransactions is part of the jujutxn.Runner interface.
 
111
func (r *multiEnvRunner) MaybePruneTransactions(pruneFactor float32) error {
 
112
        return r.rawRunner.MaybePruneTransactions(pruneFactor)
 
113
}
 
114
 
 
115
// updateOps modifies the Insert and Update fields in a slice of
 
116
// txn.Ops to ensure they are multi-environment safe where possible.
 
117
//
 
118
// Note that the input slice is actually modified in-place (but see
 
119
// TODO below).
 
120
func (r *multiEnvRunner) updateOps(ops []txn.Op) ([]txn.Op, error) {
 
121
        for i, op := range ops {
 
122
                info, found := r.schema[op.C]
 
123
                if !found {
 
124
                        return nil, errors.Errorf("forbidden transaction: references unknown collection %q", op.C)
 
125
                }
 
126
                if info.rawAccess {
 
127
                        return nil, errors.Errorf("forbidden transaction: references raw-access collection %q", op.C)
 
128
                }
 
129
                if !info.global {
 
130
                        // TODO(fwereade): this interface implies we're returning a copy
 
131
                        // of the transactions -- as I think we should be -- rather than
 
132
                        // rewriting them in place (which IMO breaks client expectations
 
133
                        // pretty hard, not to mention rendering us unable to accept any
 
134
                        // structs passed by value, or which lack an env-uuid field).
 
135
                        //
 
136
                        // The counterargument is that it's convenient to use rewritten
 
137
                        // docs directly to construct entities; I think that's suboptimal,
 
138
                        // because the cost of a DB read to just grab the actual data pales
 
139
                        // in the face of the transaction operation itself, and it's a
 
140
                        // small price to pay for a safer implementation.
 
141
                        var docID interface{}
 
142
                        if id, ok := op.Id.(string); ok {
 
143
                                docID = ensureEnvUUID(r.envUUID, id)
 
144
                                ops[i].Id = docID
 
145
                        } else {
 
146
                                docID = op.Id
 
147
                        }
 
148
                        if op.Insert != nil {
 
149
                                newInsert, err := r.mungeInsert(op.Insert, docID)
 
150
                                if err != nil {
 
151
                                        return nil, errors.Annotatef(err, "cannot insert into %q", op.C)
 
152
                                }
 
153
                                ops[i].Insert = newInsert
 
154
                        }
 
155
                        if op.Update != nil {
 
156
                                newUpdate, err := r.mungeUpdate(op.Update, docID)
 
157
                                if err != nil {
 
158
                                        return nil, errors.Annotatef(err, "cannot update %q", op.C)
 
159
                                }
 
160
                                ops[i].Update = newUpdate
 
161
                        }
 
162
                }
 
163
        }
 
164
        logger.Tracef("rewrote transaction: %#v", ops)
 
165
        return ops, nil
 
166
}
 
167
 
 
168
// mungeInsert takes the value of an txn.Op Insert field and modifies
 
169
// it to be multi-environment safe, returning the modified document.
 
170
func (r *multiEnvRunner) mungeInsert(doc interface{}, docID interface{}) (interface{}, error) {
 
171
        switch doc := doc.(type) {
 
172
        case bson.D:
 
173
                return r.mungeBsonD(doc, docID)
 
174
        case bson.M:
 
175
                return doc, r.mungeBsonM(doc, docID)
 
176
        case map[string]interface{}:
 
177
                return doc, r.mungeBsonM(bson.M(doc), docID)
 
178
        default:
 
179
                return doc, r.mungeStruct(doc, docID)
 
180
        }
 
181
}
 
182
 
 
183
// mungeBsonD takes the value of a txn.Op field expressed as a bson.D
 
184
// and modifies it to be multi-environment safe.
 
185
func (r *multiEnvRunner) mungeBsonD(doc bson.D, docID interface{}) (bson.D, error) {
 
186
        idSeen := false
 
187
        envUUIDSeen := false
 
188
        for i, elem := range doc {
 
189
                switch elem.Name {
 
190
                case "_id":
 
191
                        idSeen = true
 
192
                        doc[i].Value = docID
 
193
                case "env-uuid":
 
194
                        envUUIDSeen = true
 
195
                        if elem.Value != r.envUUID {
 
196
                                return nil, errors.Errorf(`bad "env-uuid" value: expected %s, got %s`, r.envUUID, elem.Value)
 
197
                        }
 
198
                }
 
199
        }
 
200
        if !idSeen {
 
201
                doc = append(doc, bson.DocElem{"_id", docID})
 
202
        }
 
203
        if !envUUIDSeen {
 
204
                doc = append(doc, bson.DocElem{"env-uuid", r.envUUID})
 
205
        }
 
206
        return doc, nil
 
207
}
 
208
 
 
209
// mungeBsonM takes the value of a txn.Op field expressed as a bson.M
 
210
// and modifies it to be multi-environment safe. The map is modified
 
211
// in-place.
 
212
func (r *multiEnvRunner) mungeBsonM(doc bson.M, docID interface{}) error {
 
213
        idSeen := false
 
214
        envUUIDSeen := false
 
215
        for key, value := range doc {
 
216
                switch key {
 
217
                case "_id":
 
218
                        idSeen = true
 
219
                        doc[key] = docID
 
220
                case "env-uuid":
 
221
                        envUUIDSeen = true
 
222
                        if value != r.envUUID {
 
223
                                return errors.Errorf(`bad "env-uuid" value: expected %s, got %s`, r.envUUID, value)
 
224
                        }
 
225
                }
 
226
        }
 
227
        if !idSeen {
 
228
                doc["_id"] = docID
 
229
        }
 
230
        if !envUUIDSeen {
 
231
                doc["env-uuid"] = r.envUUID
 
232
        }
 
233
        return nil
 
234
}
 
235
 
 
236
// mungeStruct takes the value of a txn.Op field expressed as some
 
237
// struct and modifies it to be multi-environment safe. The struct is
 
238
// modified in-place.
 
239
func (r *multiEnvRunner) mungeStruct(doc, docID interface{}) error {
 
240
        v := reflect.ValueOf(doc)
 
241
        t := v.Type()
 
242
 
 
243
        if t.Kind() == reflect.Ptr {
 
244
                v = v.Elem()
 
245
                t = v.Type()
 
246
        }
 
247
 
 
248
        if t.Kind() != reflect.Struct {
 
249
                return errors.Errorf("unknown type %s", t)
 
250
        }
 
251
 
 
252
        envUUIDSeen := false
 
253
        for i := 0; i < t.NumField(); i++ {
 
254
                f := t.Field(i)
 
255
                var err error
 
256
                switch f.Tag.Get("bson") {
 
257
                case "_id":
 
258
                        err = r.mungeStructField(v, f.Name, docID, overrideField)
 
259
                case "env-uuid":
 
260
                        err = r.mungeStructField(v, f.Name, r.envUUID, fieldMustMatch)
 
261
                        envUUIDSeen = true
 
262
                }
 
263
                if err != nil {
 
264
                        return errors.Trace(err)
 
265
                }
 
266
        }
 
267
        if !envUUIDSeen {
 
268
                return errors.Errorf(`struct lacks field with bson:"env-uuid" tag`)
 
269
        }
 
270
        return nil
 
271
}
 
272
 
 
273
const overrideField = "override"
 
274
const fieldMustMatch = "mustmatch"
 
275
 
 
276
// mungeStructFIeld updates the field of a struct to a new value. If
 
277
// updateType is overrideField == fieldMustMatch then the field must
 
278
// match the value given, if present.
 
279
func (r *multiEnvRunner) mungeStructField(v reflect.Value, name string, newValue interface{}, updateType string) error {
 
280
        fv := v.FieldByName(name)
 
281
        if fv.Interface() != newValue {
 
282
                if updateType == fieldMustMatch && fv.String() != "" {
 
283
                        return errors.Errorf("bad %q field value: expected %s, got %s", name, newValue, fv.String())
 
284
                }
 
285
                if fv.CanSet() {
 
286
                        fv.Set(reflect.ValueOf(newValue))
 
287
                } else {
 
288
                        return errors.Errorf("cannot set %q field: struct passed by value", name)
 
289
                }
 
290
        }
 
291
        return nil
 
292
}
 
293
 
 
294
// mungeInsert takes the value of an txn.Op Update field and modifies
 
295
// it to be multi-environment safe, returning the modified document.
 
296
func (r *multiEnvRunner) mungeUpdate(updateDoc, docID interface{}) (interface{}, error) {
 
297
        switch doc := updateDoc.(type) {
 
298
        case bson.D:
 
299
                return r.mungeBsonDUpdate(doc, docID)
 
300
        case bson.M:
 
301
                return r.mungeBsonMUpdate(doc, docID)
 
302
        default:
 
303
                return nil, errors.Errorf("don't know how to handle %T", updateDoc)
 
304
        }
 
305
}
 
306
 
 
307
// mungeBsonDUpdate modifies Update field values expressed as a bson.D
 
308
// and attempts to make them multi-environment safe.
 
309
func (r *multiEnvRunner) mungeBsonDUpdate(updateDoc bson.D, docID interface{}) (bson.D, error) {
 
310
        outDoc := make(bson.D, 0, len(updateDoc))
 
311
        for _, elem := range updateDoc {
 
312
                if elem.Name == "$set" {
 
313
                        // TODO(mjs) - only worry about structs for now. This is
 
314
                        // enough to fix LP #1474606 and a more extensive change
 
315
                        // to simplify the multi-env txn layer and correctly
 
316
                        // handle all cases is coming soon.
 
317
                        newSetDoc, err := r.mungeStructOnly(elem.Value, docID)
 
318
                        if err != nil {
 
319
                                return nil, errors.Trace(err)
 
320
                        }
 
321
                        outDoc = append(outDoc, bson.DocElem{elem.Name, newSetDoc})
 
322
                } else {
 
323
                        outDoc = append(outDoc, elem)
 
324
                }
 
325
        }
 
326
        return outDoc, nil
 
327
}
 
328
 
 
329
// mungeBsonMUpdate modifies Update field values expressed as a bson.M
 
330
// and attempts to make them multi-environment safe.
 
331
func (r *multiEnvRunner) mungeBsonMUpdate(updateDoc bson.M, docID interface{}) (bson.M, error) {
 
332
        outDoc := make(bson.M)
 
333
        for name, elem := range updateDoc {
 
334
                if name == "$set" {
 
335
                        // TODO(mjs) - as above.
 
336
                        newSetDoc, err := r.mungeStructOnly(elem, docID)
 
337
                        if err != nil {
 
338
                                return nil, errors.Trace(err)
 
339
                        }
 
340
                        outDoc[name] = newSetDoc
 
341
                } else {
 
342
                        outDoc[name] = elem
 
343
                }
 
344
        }
 
345
        return outDoc, nil
 
346
}
 
347
 
 
348
// mungeStructOnly modifies the input document to address
 
349
// multi-environment concerns, but only if it's a struct.
 
350
func (r *multiEnvRunner) mungeStructOnly(doc interface{}, docID interface{}) (interface{}, error) {
 
351
        switch doc := doc.(type) {
 
352
        case bson.D, bson.M, map[string]interface{}:
 
353
                return doc, nil
 
354
        default:
 
355
                return doc, r.mungeStruct(doc, docID)
 
356
        }
 
357
}