1
// Copyright 2012-2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
9
"github.com/juju/errors"
10
jujutxn "github.com/juju/txn"
11
"gopkg.in/juju/names.v2"
15
"github.com/juju/juju/mongo"
18
type SessionCloser func()
20
func dontCloseAnything() {}
22
// Database exposes the mongodb capabilities that most of state should see.
23
type Database interface {
25
// CopySession returns a matching Database with its own session, and a
26
// func that must be called when the Database is no longer needed.
28
// GetCollection and TransactionRunner results from the resulting Database
29
// will all share a session; this does not absolve you of responsibility
30
// for calling those collections' closers.
31
CopySession() (Database, SessionCloser)
33
// GetCollection returns the named Collection, and a func that must be
34
// called when the Collection is no longer needed. The returned Collection
35
// might or might not have its own session, depending on the Database; the
36
// closer must always be called regardless.
38
// If the schema specifies model-filtering for the named collection,
39
// the returned collection will automatically filter queries; for details,
40
// see modelStateCollection.
41
GetCollection(name string) (mongo.Collection, SessionCloser)
43
// TransactionRunner() returns a runner responsible for making changes to
44
// the database, and a func that must be called when the runner is no longer
45
// needed. The returned Runner might or might not have its own session,
46
// depending on the Database; the closer must always be called regardless.
48
// It will reject transactions that reference raw-access (or unknown)
49
// collections; it will automatically rewrite operations that reference
50
// non-global collections; and it will ensure that non-global documents can
51
// only be inserted while the corresponding model is still Alive.
52
TransactionRunner() (jujutxn.Runner, SessionCloser)
54
// Schema returns the schema used to load the database. The returned schema
55
// is not a copy and must not be modified.
56
Schema() collectionSchema
59
// Change represents any mgo/txn-representable change to a Database.
60
type Change interface {
62
// Prepare ensures that db is in a valid base state for applying
63
// the change, and returns mgo/txn operations that will fail any
64
// enclosing transaction if the state has materially changed; or
66
Prepare(db Database) ([]txn.Op, error)
69
// ErrChangeComplete can be returned from Prepare to finish an Apply
70
// attempt and report success without taking any further action.
71
var ErrChangeComplete = errors.New("change complete")
73
// Apply runs the supplied Change against the supplied Database. If it
74
// returns no error, the change succeeded.
75
func Apply(db Database, change Change) error {
76
db, closer := db.CopySession()
79
buildTxn := func(int) ([]txn.Op, error) {
80
ops, err := change.Prepare(db)
81
if errors.Cause(err) == ErrChangeComplete {
82
return nil, jujutxn.ErrNoOperations
85
return nil, errors.Trace(err)
90
runner, closer := db.TransactionRunner()
92
if err := runner.Run(buildTxn); err != nil {
93
return errors.Trace(err)
98
// collectionInfo describes important features of a collection.
99
type collectionInfo struct {
101
// explicitCreate, if non-nil, will cause the collection to be explicitly
102
// Create~d (with the given value) before ensuring indexes.
103
explicitCreate *mgo.CollectionInfo
105
// indexes listed here will be EnsureIndex~ed before state is opened.
108
// global collections will not have model filtering applied. Non-
109
// global collections will have both transactions and reads filtered by
110
// relevant model uuid.
113
// rawAccess collections can be safely accessed as a mongo.WriteCollection.
114
// Direct database access to txn-aware collections is strongly discouraged:
115
// merely writing directly to a field makes it impossible to use that field
116
// with mgo/txn; in the worst case, document deletion can destroy critical
117
// parts of the state distributed by mgo/txn, causing different runners to
118
// choose different global transaction orderings; this can in turn cause
119
// operations to be skipped.
121
// Short explanation follows: two different runners pick different -- but
122
// overlapping -- "next" transactions; they each pick the same txn-id; the
123
// first runner writes to an overlapping document and records the txn-id;
124
// and then the second runner inspects that document, sees that the chosen
125
// txn-id has already been applied, and <splat> skips that operation.
127
// Goodbye consistency. So please don't mix txn and non-txn writes without
128
// very careful analysis; and then, please, just don't do it anyway. If you
129
// need raw mgo, use a rawAccess collection.
133
// collectionSchema defines the set of collections used in juju.
134
type collectionSchema map[string]collectionInfo
136
// Load causes all recorded collections to be created and indexed as specified;
137
// the returned Database will filter queries and transactions according to the
138
// suppplied model UUID.
139
func (schema collectionSchema) Load(db *mgo.Database, modelUUID string) (Database, error) {
140
if !names.IsValidModel(modelUUID) {
141
return nil, errors.New("invalid model UUID")
143
for name, info := range schema {
144
rawCollection := db.C(name)
145
if spec := info.explicitCreate; spec != nil {
146
if err := createCollection(rawCollection, spec); err != nil {
147
message := fmt.Sprintf("cannot create collection %q", name)
148
return nil, maybeUnauthorized(err, message)
151
for _, index := range info.indexes {
152
if err := rawCollection.EnsureIndex(index); err != nil {
153
return nil, maybeUnauthorized(err, "cannot create index")
160
modelUUID: modelUUID,
164
// createCollection swallows collection-already-exists errors.
165
func createCollection(raw *mgo.Collection, spec *mgo.CollectionInfo) error {
166
err := raw.Create(spec)
167
// The lack of error code for this error was reported upstream:
168
// https://jira.mongodb.org/browse/SERVER-6992
169
if err == nil || err.Error() == "collection already exists" {
175
// database implements Database.
176
type database struct {
178
// raw is the underlying mgo Database.
181
// schema specifies how the various collections must be handled.
182
schema collectionSchema
184
// modelUUID is used to automatically filter queries and operations on
185
// certain collections (as defined in .schema).
188
// runner exists for testing purposes; if non-nil, the result of
189
// TransactionRunner will always ultimately use this value to run
190
// all transactions. Setting it renders the database goroutine-unsafe.
191
runner jujutxn.Runner
193
// ownSession is used to avoid copying additional sessions in a database
194
// resulting from CopySession.
198
// CopySession is part of the Database interface.
199
func (db *database) CopySession() (Database, SessionCloser) {
200
session := db.raw.Session.Copy()
202
raw: db.raw.With(session),
204
modelUUID: db.modelUUID,
210
// GetCollection is part of the Database interface.
211
func (db *database) GetCollection(name string) (collection mongo.Collection, closer SessionCloser) {
212
info, found := db.schema[name]
214
logger.Errorf("using unknown collection %q", name)
217
// Copy session if necessary.
219
collection = mongo.WrapCollection(db.raw.C(name))
220
closer = dontCloseAnything
222
collection, closer = mongo.CollectionFromName(db.raw, name)
225
// Apply model filtering.
227
collection = &modelStateCollection{
228
WriteCollection: collection.Writeable(),
229
modelUUID: db.modelUUID,
233
// Prevent layer-breaking.
235
// TODO(fwereade): it would be nice to tweak the mongo.Collection
236
// interface a bit to drop Writeable in this situation, but it's
237
// not convenient yet.
239
return collection, closer
242
// TransactionRunner is part of the Database interface.
243
func (db *database) TransactionRunner() (runner jujutxn.Runner, closer SessionCloser) {
245
closer = dontCloseAnything
249
session := raw.Session.Copy()
250
raw = raw.With(session)
251
closer = session.Close
253
params := jujutxn.RunnerParams{Database: raw}
254
runner = jujutxn.NewRunner(params)
256
return &multiModelRunner{
258
modelUUID: db.modelUUID,
263
// Schema is part of the Database interface.
264
func (db *database) Schema() collectionSchema {