~juju-qa/ubuntu/xenial/juju/xenial-2.0-beta3

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/resource/state/resource.go

  • Committer: Martin Packman
  • Date: 2016-03-30 19:31:08 UTC
  • mfrom: (1.1.41)
  • Revision ID: martin.packman@canonical.com-20160330193108-h9iz3ak334uk0z5r
Merge new upstream source 2.0~beta3

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
        "time"
13
13
 
14
14
        "github.com/juju/errors"
 
15
        "github.com/juju/names"
15
16
        "github.com/juju/utils"
16
17
        charmresource "gopkg.in/juju/charm.v6-unstable/resource"
17
18
        "gopkg.in/mgo.v2/txn"
48
49
        // SetUnitResource stores the resource info for a unit.
49
50
        SetUnitResource(unitID string, args resource.Resource) error
50
51
 
 
52
        // SetUnitResourceProgress stores the resource info and download
 
53
        // progressfor a unit.
 
54
        SetUnitResourceProgress(unitID string, args resource.Resource, progress int64) error
 
55
 
51
56
        // NewResolvePendingResourceOps generates mongo transaction operations
52
57
        // to set the identified resource as active.
53
58
        NewResolvePendingResourceOps(resID, pendingID string) ([]txn.Op, error)
70
75
        Activate() error
71
76
}
72
77
 
 
78
type rawState interface {
 
79
        // VerifyService ensures that the service is in state.
 
80
        VerifyService(id string) error
 
81
 
 
82
        // Units returns the tags for all units in the service.
 
83
        Units(serviceID string) ([]names.UnitTag, error)
 
84
}
 
85
 
73
86
type resourceStorage interface {
74
87
        // PutAndCheckHash stores the content of the reader into the storage.
75
88
        PutAndCheckHash(path string, r io.Reader, length int64, hash string) error
84
97
 
85
98
type resourceState struct {
86
99
        persist resourcePersistence
 
100
        raw     rawState
87
101
        storage resourceStorage
88
102
 
89
103
        newPendingID     func() (string, error)
94
108
func (st resourceState) ListResources(serviceID string) (resource.ServiceResources, error) {
95
109
        resources, err := st.persist.ListResources(serviceID)
96
110
        if err != nil {
97
 
                return resource.ServiceResources{}, errors.Trace(err)
 
111
                if err := st.raw.VerifyService(serviceID); err != nil {
 
112
                        return resource.ServiceResources{}, errors.Trace(err)
 
113
                }
 
114
                return resource.ServiceResources{}, errors.Trace(err)
 
115
        }
 
116
 
 
117
        unitIDs, err := st.raw.Units(serviceID)
 
118
        if err != nil {
 
119
                return resource.ServiceResources{}, errors.Trace(err)
 
120
        }
 
121
        for _, unitID := range unitIDs {
 
122
                found := false
 
123
                for _, unitRes := range resources.UnitResources {
 
124
                        if unitID.String() == unitRes.Tag.String() {
 
125
                                found = true
 
126
                                break
 
127
                        }
 
128
                }
 
129
                if !found {
 
130
                        unitRes := resource.UnitResources{
 
131
                                Tag: unitID,
 
132
                        }
 
133
                        resources.UnitResources = append(resources.UnitResources, unitRes)
 
134
                }
98
135
        }
99
136
 
100
137
        return resources, nil
105
142
        id := newResourceID(serviceID, name)
106
143
        res, _, err := st.persist.GetResource(id)
107
144
        if err != nil {
 
145
                if err := st.raw.VerifyService(serviceID); err != nil {
 
146
                        return resource.Resource{}, errors.Trace(err)
 
147
                }
108
148
                return res, errors.Trace(err)
109
149
        }
110
150
        return res, nil
116
156
 
117
157
        resources, err := st.persist.ListPendingResources(serviceID)
118
158
        if err != nil {
 
159
                // We do not call VerifyService() here because pending resources
 
160
                // do not have to have an existing service.
119
161
                return res, errors.Trace(err)
120
162
        }
121
163
 
146
188
        if err != nil {
147
189
                return "", errors.Annotate(err, "could not generate resource ID")
148
190
        }
149
 
        logger.Tracef("adding pending resource %q for service %q (ID: %s)", chRes.Name, serviceID, pendingID)
 
191
        logger.Debugf("adding pending resource %q for service %q (ID: %s)", chRes.Name, serviceID, pendingID)
150
192
 
151
193
        if _, err := st.setResource(pendingID, serviceID, userID, chRes, r); err != nil {
152
194
                return "", errors.Trace(err)
238
280
        id := newResourceID(serviceID, name)
239
281
        resourceInfo, storagePath, err := st.persist.GetResource(id)
240
282
        if err != nil {
 
283
                if err := st.raw.VerifyService(serviceID); err != nil {
 
284
                        return resource.Resource{}, nil, errors.Trace(err)
 
285
                }
241
286
                return resource.Resource{}, nil, errors.Annotate(err, "while getting resource info")
242
287
        }
243
288
        if resourceInfo.IsPlaceholder() {
263
308
func (st resourceState) OpenResourceForUniter(unit resource.Unit, name string) (resource.Resource, io.ReadCloser, error) {
264
309
        serviceID := unit.ServiceName()
265
310
 
 
311
        pendingID, err := newPendingID()
 
312
        if err != nil {
 
313
                return resource.Resource{}, nil, errors.Trace(err)
 
314
        }
 
315
 
266
316
        resourceInfo, resourceReader, err := st.OpenResource(serviceID, name)
267
317
        if err != nil {
268
318
                return resource.Resource{}, nil, errors.Trace(err)
269
319
        }
270
320
 
271
 
        resourceReader = unitSetter{
 
321
        pending := resourceInfo // a copy
 
322
        pending.PendingID = pendingID
 
323
 
 
324
        if err := st.persist.SetUnitResourceProgress(unit.Name(), pending, 0); err != nil {
 
325
                resourceReader.Close()
 
326
                return resource.Resource{}, nil, errors.Trace(err)
 
327
        }
 
328
 
 
329
        resourceReader = &unitSetter{
272
330
                ReadCloser: resourceReader,
273
331
                persist:    st.persist,
274
332
                unit:       unit,
 
333
                pending:    pending,
275
334
                resource:   resourceInfo,
276
335
        }
277
336
 
363
422
        io.ReadCloser
364
423
        persist  resourcePersistence
365
424
        unit     resource.Unit
 
425
        pending  resource.Resource
366
426
        resource resource.Resource
 
427
        progress int64
367
428
}
368
429
 
369
430
// Read implements io.Reader.
370
 
func (u unitSetter) Read(p []byte) (n int, err error) {
 
431
func (u *unitSetter) Read(p []byte) (n int, err error) {
371
432
        n, err = u.ReadCloser.Read(p)
372
433
        if err == io.EOF {
373
434
                // record that the unit is now using this version of the resource
375
436
                        msg := "Failed to record that unit %q is using resource %q revision %v"
376
437
                        logger.Errorf(msg, u.unit.Name(), u.resource.Name, u.resource.RevisionString())
377
438
                }
 
439
        } else {
 
440
                u.progress += int64(n)
 
441
                // TODO(ericsnow) Don't do this every time?
 
442
                if err := u.persist.SetUnitResourceProgress(u.unit.Name(), u.pending, u.progress); err != nil {
 
443
                        logger.Errorf("failed to track progress: %v", err)
 
444
                }
378
445
        }
379
446
        return n, err
380
447
}