1
// Copyright 2012-2014 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
9
"github.com/juju/errors"
10
jujutxn "github.com/juju/txn"
11
"gopkg.in/mgo.v2/bson"
15
// readTxnRevno is a convenience method delegating to the state's Database.
16
func (st *State) readTxnRevno(collectionName string, id interface{}) (int64, error) {
17
collection, closer := st.database.GetCollection(collectionName)
19
query := collection.FindId(id).Select(bson.D{{"txn-revno", 1}})
21
TxnRevno int64 `bson:"txn-revno"`
23
err := query.One(&result)
24
return result.TxnRevno, errors.Trace(err)
27
// runTransaction is a convenience method delegating to the state's Database.
28
func (st *State) runTransaction(ops []txn.Op) error {
29
runner, closer := st.database.TransactionRunner()
31
return runner.RunTransaction(ops)
34
// runRawTransaction is a convenience method that will run a single
35
// transaction using a "raw" transaction runner that won't perform
36
// environment filtering.
37
func (st *State) runRawTransaction(ops []txn.Op) error {
38
runner, closer := st.database.TransactionRunner()
40
if multiRunner, ok := runner.(*multiEnvRunner); ok {
41
runner = multiRunner.rawRunner
43
return runner.RunTransaction(ops)
46
// run is a convenience method delegating to the state's Database.
47
func (st *State) run(transactions jujutxn.TransactionSource) error {
48
runner, closer := st.database.TransactionRunner()
50
return runner.Run(transactions)
53
// ResumeTransactions resumes all pending transactions.
54
func (st *State) ResumeTransactions() error {
55
runner, closer := st.database.TransactionRunner()
57
return runner.ResumeTransactions()
60
// MaybePruneTransactions removes data for completed transactions.
61
func (st *State) MaybePruneTransactions() error {
62
runner, closer := st.database.TransactionRunner()
64
// Prune txns only when txn count has doubled since last prune.
65
return runner.MaybePruneTransactions(2.0)
68
type multiEnvRunner struct {
69
rawRunner jujutxn.Runner
70
schema collectionSchema
74
// RunTransaction is part of the jujutxn.Runner interface. Operations
75
// that affect multi-environment collections will be modified in-place
76
// to ensure correct interaction with these collections.
77
func (r *multiEnvRunner) RunTransaction(ops []txn.Op) error {
78
ops, err := r.updateOps(ops)
80
return errors.Trace(err)
82
return r.rawRunner.RunTransaction(ops)
85
// Run is part of the jujutxn.Runner interface. Operations returned by
86
// the given "transactions" function that affect multi-environment
87
// collections will be modified in-place to ensure correct interaction
88
// with these collections.
89
func (r *multiEnvRunner) Run(transactions jujutxn.TransactionSource) error {
90
return r.rawRunner.Run(func(attempt int) ([]txn.Op, error) {
91
ops, err := transactions(attempt)
93
// Don't use Trace here as jujutxn doens't use juju/errors
94
// and won't deal correctly with some returned errors.
97
ops, err = r.updateOps(ops)
99
return nil, errors.Trace(err)
105
// ResumeTransactions is part of the jujutxn.Runner interface.
106
func (r *multiEnvRunner) ResumeTransactions() error {
107
return r.rawRunner.ResumeTransactions()
110
// MaybePruneTransactions is part of the jujutxn.Runner interface.
111
func (r *multiEnvRunner) MaybePruneTransactions(pruneFactor float32) error {
112
return r.rawRunner.MaybePruneTransactions(pruneFactor)
115
// updateOps modifies the Insert and Update fields in a slice of
116
// txn.Ops to ensure they are multi-environment safe where possible.
118
// Note that the input slice is actually modified in-place (but see
120
func (r *multiEnvRunner) updateOps(ops []txn.Op) ([]txn.Op, error) {
121
for i, op := range ops {
122
info, found := r.schema[op.C]
124
return nil, errors.Errorf("forbidden transaction: references unknown collection %q", op.C)
127
return nil, errors.Errorf("forbidden transaction: references raw-access collection %q", op.C)
130
// TODO(fwereade): this interface implies we're returning a copy
131
// of the transactions -- as I think we should be -- rather than
132
// rewriting them in place (which IMO breaks client expectations
133
// pretty hard, not to mention rendering us unable to accept any
134
// structs passed by value, or which lack an env-uuid field).
136
// The counterargument is that it's convenient to use rewritten
137
// docs directly to construct entities; I think that's suboptimal,
138
// because the cost of a DB read to just grab the actual data pales
139
// in the face of the transaction operation itself, and it's a
140
// small price to pay for a safer implementation.
141
var docID interface{}
142
if id, ok := op.Id.(string); ok {
143
docID = ensureEnvUUID(r.envUUID, id)
148
if op.Insert != nil {
149
newInsert, err := r.mungeInsert(op.Insert, docID)
151
return nil, errors.Annotatef(err, "cannot insert into %q", op.C)
153
ops[i].Insert = newInsert
155
if op.Update != nil {
156
newUpdate, err := r.mungeUpdate(op.Update, docID)
158
return nil, errors.Annotatef(err, "cannot update %q", op.C)
160
ops[i].Update = newUpdate
164
logger.Tracef("rewrote transaction: %#v", ops)
168
// mungeInsert takes the value of an txn.Op Insert field and modifies
169
// it to be multi-environment safe, returning the modified document.
170
func (r *multiEnvRunner) mungeInsert(doc interface{}, docID interface{}) (interface{}, error) {
171
switch doc := doc.(type) {
173
return r.mungeBsonD(doc, docID)
175
return doc, r.mungeBsonM(doc, docID)
176
case map[string]interface{}:
177
return doc, r.mungeBsonM(bson.M(doc), docID)
179
return doc, r.mungeStruct(doc, docID)
183
// mungeBsonD takes the value of a txn.Op field expressed as a bson.D
184
// and modifies it to be multi-environment safe.
185
func (r *multiEnvRunner) mungeBsonD(doc bson.D, docID interface{}) (bson.D, error) {
188
for i, elem := range doc {
195
if elem.Value != r.envUUID {
196
return nil, errors.Errorf(`bad "env-uuid" value: expected %s, got %s`, r.envUUID, elem.Value)
201
doc = append(doc, bson.DocElem{"_id", docID})
204
doc = append(doc, bson.DocElem{"env-uuid", r.envUUID})
209
// mungeBsonM takes the value of a txn.Op field expressed as a bson.M
210
// and modifies it to be multi-environment safe. The map is modified
212
func (r *multiEnvRunner) mungeBsonM(doc bson.M, docID interface{}) error {
215
for key, value := range doc {
222
if value != r.envUUID {
223
return errors.Errorf(`bad "env-uuid" value: expected %s, got %s`, r.envUUID, value)
231
doc["env-uuid"] = r.envUUID
236
// mungeStruct takes the value of a txn.Op field expressed as some
237
// struct and modifies it to be multi-environment safe. The struct is
238
// modified in-place.
239
func (r *multiEnvRunner) mungeStruct(doc, docID interface{}) error {
240
v := reflect.ValueOf(doc)
243
if t.Kind() == reflect.Ptr {
248
if t.Kind() != reflect.Struct {
249
return errors.Errorf("unknown type %s", t)
253
for i := 0; i < t.NumField(); i++ {
256
switch f.Tag.Get("bson") {
258
err = r.mungeStructField(v, f.Name, docID, overrideField)
260
err = r.mungeStructField(v, f.Name, r.envUUID, fieldMustMatch)
264
return errors.Trace(err)
268
return errors.Errorf(`struct lacks field with bson:"env-uuid" tag`)
273
const overrideField = "override"
274
const fieldMustMatch = "mustmatch"
276
// mungeStructFIeld updates the field of a struct to a new value. If
277
// updateType is overrideField == fieldMustMatch then the field must
278
// match the value given, if present.
279
func (r *multiEnvRunner) mungeStructField(v reflect.Value, name string, newValue interface{}, updateType string) error {
280
fv := v.FieldByName(name)
281
if fv.Interface() != newValue {
282
if updateType == fieldMustMatch && fv.String() != "" {
283
return errors.Errorf("bad %q field value: expected %s, got %s", name, newValue, fv.String())
286
fv.Set(reflect.ValueOf(newValue))
288
return errors.Errorf("cannot set %q field: struct passed by value", name)
294
// mungeInsert takes the value of an txn.Op Update field and modifies
295
// it to be multi-environment safe, returning the modified document.
296
func (r *multiEnvRunner) mungeUpdate(updateDoc, docID interface{}) (interface{}, error) {
297
switch doc := updateDoc.(type) {
299
return r.mungeBsonDUpdate(doc, docID)
301
return r.mungeBsonMUpdate(doc, docID)
303
return nil, errors.Errorf("don't know how to handle %T", updateDoc)
307
// mungeBsonDUpdate modifies Update field values expressed as a bson.D
308
// and attempts to make them multi-environment safe.
309
func (r *multiEnvRunner) mungeBsonDUpdate(updateDoc bson.D, docID interface{}) (bson.D, error) {
310
outDoc := make(bson.D, 0, len(updateDoc))
311
for _, elem := range updateDoc {
312
if elem.Name == "$set" {
313
// TODO(mjs) - only worry about structs for now. This is
314
// enough to fix LP #1474606 and a more extensive change
315
// to simplify the multi-env txn layer and correctly
316
// handle all cases is coming soon.
317
newSetDoc, err := r.mungeStructOnly(elem.Value, docID)
319
return nil, errors.Trace(err)
321
outDoc = append(outDoc, bson.DocElem{elem.Name, newSetDoc})
323
outDoc = append(outDoc, elem)
329
// mungeBsonMUpdate modifies Update field values expressed as a bson.M
330
// and attempts to make them multi-environment safe.
331
func (r *multiEnvRunner) mungeBsonMUpdate(updateDoc bson.M, docID interface{}) (bson.M, error) {
332
outDoc := make(bson.M)
333
for name, elem := range updateDoc {
335
// TODO(mjs) - as above.
336
newSetDoc, err := r.mungeStructOnly(elem, docID)
338
return nil, errors.Trace(err)
340
outDoc[name] = newSetDoc
348
// mungeStructOnly modifies the input document to address
349
// multi-environment concerns, but only if it's a struct.
350
func (r *multiEnvRunner) mungeStructOnly(doc interface{}, docID interface{}) (interface{}, error) {
351
switch doc := doc.(type) {
352
case bson.D, bson.M, map[string]interface{}:
355
return doc, r.mungeStruct(doc, docID)