~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/storageprovisioner/volume_events.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package storageprovisioner
 
5
 
 
6
import (
 
7
        "github.com/juju/errors"
 
8
        "gopkg.in/juju/names.v2"
 
9
 
 
10
        "github.com/juju/juju/apiserver/params"
 
11
        "github.com/juju/juju/instance"
 
12
        "github.com/juju/juju/storage"
 
13
        "github.com/juju/juju/watcher"
 
14
)
 
15
 
 
16
// volumesChanged is called when the lifecycle states of the volumes
 
17
// with the provided IDs have been seen to have changed.
 
18
func volumesChanged(ctx *context, changes []string) error {
 
19
        tags := make([]names.Tag, len(changes))
 
20
        for i, change := range changes {
 
21
                tags[i] = names.NewVolumeTag(change)
 
22
        }
 
23
        alive, dying, dead, err := storageEntityLife(ctx, tags)
 
24
        if err != nil {
 
25
                return errors.Trace(err)
 
26
        }
 
27
        logger.Debugf("volumes alive: %v, dying: %v, dead: %v", alive, dying, dead)
 
28
        if err := processDyingVolumes(ctx, dying); err != nil {
 
29
                return errors.Annotate(err, "processing dying volumes")
 
30
        }
 
31
        if len(alive)+len(dead) == 0 {
 
32
                return nil
 
33
        }
 
34
 
 
35
        // Get volume information for alive and dead volumes, so
 
36
        // we can provision/deprovision.
 
37
        volumeTags := make([]names.VolumeTag, 0, len(alive)+len(dead))
 
38
        for _, tag := range alive {
 
39
                volumeTags = append(volumeTags, tag.(names.VolumeTag))
 
40
        }
 
41
        for _, tag := range dead {
 
42
                volumeTags = append(volumeTags, tag.(names.VolumeTag))
 
43
        }
 
44
        volumeResults, err := ctx.config.Volumes.Volumes(volumeTags)
 
45
        if err != nil {
 
46
                return errors.Annotatef(err, "getting volume information")
 
47
        }
 
48
        if err := processDeadVolumes(ctx, volumeTags[len(alive):], volumeResults[len(alive):]); err != nil {
 
49
                return errors.Annotate(err, "deprovisioning volumes")
 
50
        }
 
51
        if err := processAliveVolumes(ctx, alive, volumeResults[:len(alive)]); err != nil {
 
52
                return errors.Annotate(err, "provisioning volumes")
 
53
        }
 
54
        return nil
 
55
}
 
56
 
 
57
// volumeAttachmentsChanged is called when the lifecycle states of the volume
 
58
// attachments with the provided IDs have been seen to have changed.
 
59
func volumeAttachmentsChanged(ctx *context, watcherIds []watcher.MachineStorageId) error {
 
60
        ids := copyMachineStorageIds(watcherIds)
 
61
        alive, dying, dead, err := attachmentLife(ctx, ids)
 
62
        if err != nil {
 
63
                return errors.Trace(err)
 
64
        }
 
65
        logger.Debugf("volume attachments alive: %v, dying: %v, dead: %v", alive, dying, dead)
 
66
        if len(dead) != 0 {
 
67
                // We should not see dead volume attachments;
 
68
                // attachments go directly from Dying to removed.
 
69
                logger.Warningf("unexpected dead volume attachments: %v", dead)
 
70
        }
 
71
        if len(alive)+len(dying) == 0 {
 
72
                return nil
 
73
        }
 
74
 
 
75
        // Get volume information for alive and dying volume attachments, so
 
76
        // we can attach/detach.
 
77
        ids = append(alive, dying...)
 
78
        volumeAttachmentResults, err := ctx.config.Volumes.VolumeAttachments(ids)
 
79
        if err != nil {
 
80
                return errors.Annotatef(err, "getting volume attachment information")
 
81
        }
 
82
 
 
83
        // Deprovision Dying volume attachments.
 
84
        dyingVolumeAttachmentResults := volumeAttachmentResults[len(alive):]
 
85
        if err := processDyingVolumeAttachments(ctx, dying, dyingVolumeAttachmentResults); err != nil {
 
86
                return errors.Annotate(err, "deprovisioning volume attachments")
 
87
        }
 
88
 
 
89
        // Provision Alive volume attachments.
 
90
        aliveVolumeAttachmentResults := volumeAttachmentResults[:len(alive)]
 
91
        if err := processAliveVolumeAttachments(ctx, alive, aliveVolumeAttachmentResults); err != nil {
 
92
                return errors.Annotate(err, "provisioning volumes")
 
93
        }
 
94
 
 
95
        return nil
 
96
}
 
97
 
 
98
// processDyingVolumes processes the VolumeResults for Dying volumes,
 
99
// removing them from provisioning-pending as necessary.
 
100
func processDyingVolumes(ctx *context, tags []names.Tag) error {
 
101
        for _, tag := range tags {
 
102
                removePendingVolume(ctx, tag.(names.VolumeTag))
 
103
        }
 
104
        return nil
 
105
}
 
106
 
 
107
// updateVolume updates the context with the given volume info.
 
108
func updateVolume(ctx *context, info storage.Volume) {
 
109
        ctx.volumes[info.Tag] = info
 
110
        for id, params := range ctx.incompleteVolumeAttachmentParams {
 
111
                if params.VolumeId == "" && id.AttachmentTag == info.Tag.String() {
 
112
                        params.VolumeId = info.VolumeId
 
113
                        updatePendingVolumeAttachment(ctx, id, params)
 
114
                }
 
115
        }
 
116
}
 
117
 
 
118
// updatePendingVolume adds the given volume params to either the incomplete
 
119
// set or the schedule. If the params are incomplete due to a missing instance
 
120
// ID, updatePendingVolume will request that the machine be watched so its
 
121
// instance ID can be learned.
 
122
func updatePendingVolume(ctx *context, params storage.VolumeParams) {
 
123
        if params.Attachment.InstanceId == "" {
 
124
                watchMachine(ctx, params.Attachment.Machine)
 
125
                ctx.incompleteVolumeParams[params.Tag] = params
 
126
        } else {
 
127
                delete(ctx.incompleteVolumeParams, params.Tag)
 
128
                scheduleOperations(ctx, &createVolumeOp{args: params})
 
129
        }
 
130
}
 
131
 
 
132
// removePendingVolume removes the specified pending volume from the
 
133
// incomplete set and/or the schedule if it exists there.
 
134
func removePendingVolume(ctx *context, tag names.VolumeTag) {
 
135
        delete(ctx.incompleteVolumeParams, tag)
 
136
        ctx.schedule.Remove(tag)
 
137
}
 
138
 
 
139
// updatePendingVolumeAttachment adds the given volume attachment params to
 
140
// either the incomplete set or the schedule. If the params are incomplete
 
141
// due to a missing instance ID, updatePendingVolumeAttachment will request
 
142
// that the machine be watched so its instance ID can be learned.
 
143
func updatePendingVolumeAttachment(
 
144
        ctx *context,
 
145
        id params.MachineStorageId,
 
146
        params storage.VolumeAttachmentParams,
 
147
) {
 
148
        if params.InstanceId == "" {
 
149
                watchMachine(ctx, params.Machine)
 
150
        } else if params.VolumeId != "" {
 
151
                delete(ctx.incompleteVolumeAttachmentParams, id)
 
152
                scheduleOperations(ctx, &attachVolumeOp{args: params})
 
153
                return
 
154
        }
 
155
        ctx.incompleteVolumeAttachmentParams[id] = params
 
156
}
 
157
 
 
158
// removePendingVolumeAttachment removes the specified pending volume
 
159
// attachment from the incomplete set and/or the schedule if it exists
 
160
// there.
 
161
func removePendingVolumeAttachment(ctx *context, id params.MachineStorageId) {
 
162
        delete(ctx.incompleteVolumeAttachmentParams, id)
 
163
        ctx.schedule.Remove(id)
 
164
}
 
165
 
 
166
// processDeadVolumes processes the VolumeResults for Dead volumes,
 
167
// deprovisioning volumes and removing from state as necessary.
 
168
func processDeadVolumes(ctx *context, tags []names.VolumeTag, volumeResults []params.VolumeResult) error {
 
169
        for _, tag := range tags {
 
170
                removePendingVolume(ctx, tag)
 
171
        }
 
172
        var destroy []names.VolumeTag
 
173
        var remove []names.Tag
 
174
        for i, result := range volumeResults {
 
175
                tag := tags[i]
 
176
                if result.Error == nil {
 
177
                        logger.Debugf("volume %s is provisioned, queuing for deprovisioning", tag.Id())
 
178
                        volume, err := volumeFromParams(result.Result)
 
179
                        if err != nil {
 
180
                                return errors.Annotate(err, "getting volume info")
 
181
                        }
 
182
                        updateVolume(ctx, volume)
 
183
                        destroy = append(destroy, tag)
 
184
                        continue
 
185
                }
 
186
                if params.IsCodeNotProvisioned(result.Error) {
 
187
                        logger.Debugf("volume %s is not provisioned, queuing for removal", tag.Id())
 
188
                        remove = append(remove, tag)
 
189
                        continue
 
190
                }
 
191
                return errors.Annotatef(result.Error, "getting volume information for volume %s", tag.Id())
 
192
        }
 
193
        if len(destroy) > 0 {
 
194
                ops := make([]scheduleOp, len(destroy))
 
195
                for i, tag := range destroy {
 
196
                        ops[i] = &destroyVolumeOp{tag: tag}
 
197
                }
 
198
                scheduleOperations(ctx, ops...)
 
199
        }
 
200
        if err := removeEntities(ctx, remove); err != nil {
 
201
                return errors.Annotate(err, "removing volumes from state")
 
202
        }
 
203
        return nil
 
204
}
 
205
 
 
206
// processDyingVolumeAttachments processes the VolumeAttachmentResults for
 
207
// Dying volume attachments, detaching volumes and updating state as necessary.
 
208
func processDyingVolumeAttachments(
 
209
        ctx *context,
 
210
        ids []params.MachineStorageId,
 
211
        volumeAttachmentResults []params.VolumeAttachmentResult,
 
212
) error {
 
213
        for _, id := range ids {
 
214
                removePendingVolumeAttachment(ctx, id)
 
215
        }
 
216
        detach := make([]params.MachineStorageId, 0, len(ids))
 
217
        remove := make([]params.MachineStorageId, 0, len(ids))
 
218
        for i, result := range volumeAttachmentResults {
 
219
                id := ids[i]
 
220
                if result.Error == nil {
 
221
                        detach = append(detach, id)
 
222
                        continue
 
223
                }
 
224
                if params.IsCodeNotProvisioned(result.Error) {
 
225
                        remove = append(remove, id)
 
226
                        continue
 
227
                }
 
228
                return errors.Annotatef(result.Error, "getting information for volume attachment %v", id)
 
229
        }
 
230
        if len(detach) > 0 {
 
231
                attachmentParams, err := volumeAttachmentParams(ctx, detach)
 
232
                if err != nil {
 
233
                        return errors.Trace(err)
 
234
                }
 
235
                ops := make([]scheduleOp, len(attachmentParams))
 
236
                for i, p := range attachmentParams {
 
237
                        ops[i] = &detachVolumeOp{args: p}
 
238
                }
 
239
                scheduleOperations(ctx, ops...)
 
240
        }
 
241
        if err := removeAttachments(ctx, remove); err != nil {
 
242
                return errors.Annotate(err, "removing attachments from state")
 
243
        }
 
244
        for _, id := range remove {
 
245
                delete(ctx.volumeAttachments, id)
 
246
        }
 
247
        return nil
 
248
}
 
249
 
 
250
// processAliveVolumes processes the VolumeResults for Alive volumes,
 
251
// provisioning volumes and setting the info in state as necessary.
 
252
func processAliveVolumes(ctx *context, tags []names.Tag, volumeResults []params.VolumeResult) error {
 
253
        // Filter out the already-provisioned volumes.
 
254
        pending := make([]names.VolumeTag, 0, len(tags))
 
255
        for i, result := range volumeResults {
 
256
                volumeTag := tags[i].(names.VolumeTag)
 
257
                if result.Error == nil {
 
258
                        // Volume is already provisioned: skip.
 
259
                        logger.Debugf("volume %q is already provisioned, nothing to do", tags[i].Id())
 
260
                        volume, err := volumeFromParams(result.Result)
 
261
                        if err != nil {
 
262
                                return errors.Annotate(err, "getting volume info")
 
263
                        }
 
264
                        updateVolume(ctx, volume)
 
265
                        removePendingVolume(ctx, volumeTag)
 
266
                        continue
 
267
                }
 
268
                if !params.IsCodeNotProvisioned(result.Error) {
 
269
                        return errors.Annotatef(
 
270
                                result.Error, "getting volume information for volume %q", tags[i].Id(),
 
271
                        )
 
272
                }
 
273
                // The volume has not yet been provisioned, so record its tag
 
274
                // to enquire about parameters below.
 
275
                pending = append(pending, volumeTag)
 
276
        }
 
277
        if len(pending) == 0 {
 
278
                return nil
 
279
        }
 
280
        volumeParams, err := volumeParams(ctx, pending)
 
281
        if err != nil {
 
282
                return errors.Annotate(err, "getting volume params")
 
283
        }
 
284
        for _, params := range volumeParams {
 
285
                updatePendingVolume(ctx, params)
 
286
        }
 
287
        return nil
 
288
}
 
289
 
 
290
// processAliveVolumeAttachments processes the VolumeAttachmentResults
 
291
// for Alive volume attachments, attaching volumes and setting the info
 
292
// in state as necessary.
 
293
func processAliveVolumeAttachments(
 
294
        ctx *context,
 
295
        ids []params.MachineStorageId,
 
296
        volumeAttachmentResults []params.VolumeAttachmentResult,
 
297
) error {
 
298
        // Filter out the already-attached.
 
299
        pending := make([]params.MachineStorageId, 0, len(ids))
 
300
        for i, result := range volumeAttachmentResults {
 
301
                if result.Error == nil {
 
302
                        // Volume attachment is already provisioned: if we
 
303
                        // didn't (re)attach in this session, then we must
 
304
                        // do so now.
 
305
                        action := "nothing to do"
 
306
                        if _, ok := ctx.volumeAttachments[ids[i]]; !ok {
 
307
                                // Not yet (re)attached in this session.
 
308
                                pending = append(pending, ids[i])
 
309
                                action = "will reattach"
 
310
                        }
 
311
                        logger.Debugf(
 
312
                                "%s is already attached to %s, %s",
 
313
                                ids[i].AttachmentTag, ids[i].MachineTag, action,
 
314
                        )
 
315
                        removePendingVolumeAttachment(ctx, ids[i])
 
316
                        continue
 
317
                }
 
318
                if !params.IsCodeNotProvisioned(result.Error) {
 
319
                        return errors.Annotatef(
 
320
                                result.Error, "getting information for attachment %v", ids[i],
 
321
                        )
 
322
                }
 
323
                // The volume has not yet been provisioned, so record its tag
 
324
                // to enquire about parameters below.
 
325
                pending = append(pending, ids[i])
 
326
        }
 
327
        if len(pending) == 0 {
 
328
                return nil
 
329
        }
 
330
        params, err := volumeAttachmentParams(ctx, pending)
 
331
        if err != nil {
 
332
                return errors.Trace(err)
 
333
        }
 
334
        for i, params := range params {
 
335
                if volume, ok := ctx.volumes[params.Volume]; ok {
 
336
                        params.VolumeId = volume.VolumeId
 
337
                }
 
338
                updatePendingVolumeAttachment(ctx, pending[i], params)
 
339
        }
 
340
        return nil
 
341
}
 
342
 
 
343
// volumeAttachmentParams obtains the specified attachments' parameters.
 
344
func volumeAttachmentParams(
 
345
        ctx *context, ids []params.MachineStorageId,
 
346
) ([]storage.VolumeAttachmentParams, error) {
 
347
        paramsResults, err := ctx.config.Volumes.VolumeAttachmentParams(ids)
 
348
        if err != nil {
 
349
                return nil, errors.Annotate(err, "getting volume attachment params")
 
350
        }
 
351
        attachmentParams := make([]storage.VolumeAttachmentParams, len(ids))
 
352
        for i, result := range paramsResults {
 
353
                if result.Error != nil {
 
354
                        return nil, errors.Annotate(result.Error, "getting volume attachment parameters")
 
355
                }
 
356
                params, err := volumeAttachmentParamsFromParams(result.Result)
 
357
                if err != nil {
 
358
                        return nil, errors.Annotate(err, "getting volume attachment parameters")
 
359
                }
 
360
                attachmentParams[i] = params
 
361
        }
 
362
        return attachmentParams, nil
 
363
}
 
364
 
 
365
// volumeParams obtains the specified volumes' parameters.
 
366
func volumeParams(ctx *context, tags []names.VolumeTag) ([]storage.VolumeParams, error) {
 
367
        paramsResults, err := ctx.config.Volumes.VolumeParams(tags)
 
368
        if err != nil {
 
369
                return nil, errors.Annotate(err, "getting volume params")
 
370
        }
 
371
        allParams := make([]storage.VolumeParams, len(tags))
 
372
        for i, result := range paramsResults {
 
373
                if result.Error != nil {
 
374
                        return nil, errors.Annotate(result.Error, "getting volume parameters")
 
375
                }
 
376
                params, err := volumeParamsFromParams(result.Result)
 
377
                if err != nil {
 
378
                        return nil, errors.Annotate(err, "getting volume parameters")
 
379
                }
 
380
                allParams[i] = params
 
381
        }
 
382
        return allParams, nil
 
383
}
 
384
 
 
385
func volumesFromStorage(in []storage.Volume) []params.Volume {
 
386
        out := make([]params.Volume, len(in))
 
387
        for i, v := range in {
 
388
                out[i] = params.Volume{
 
389
                        v.Tag.String(),
 
390
                        params.VolumeInfo{
 
391
                                v.VolumeId,
 
392
                                v.HardwareId,
 
393
                                v.Size,
 
394
                                v.Persistent,
 
395
                        },
 
396
                }
 
397
        }
 
398
        return out
 
399
}
 
400
 
 
401
func volumeAttachmentsFromStorage(in []storage.VolumeAttachment) []params.VolumeAttachment {
 
402
        out := make([]params.VolumeAttachment, len(in))
 
403
        for i, v := range in {
 
404
                out[i] = params.VolumeAttachment{
 
405
                        v.Volume.String(),
 
406
                        v.Machine.String(),
 
407
                        params.VolumeAttachmentInfo{
 
408
                                v.DeviceName,
 
409
                                v.DeviceLink,
 
410
                                v.BusAddress,
 
411
                                v.ReadOnly,
 
412
                        },
 
413
                }
 
414
        }
 
415
        return out
 
416
}
 
417
 
 
418
func volumeFromParams(in params.Volume) (storage.Volume, error) {
 
419
        volumeTag, err := names.ParseVolumeTag(in.VolumeTag)
 
420
        if err != nil {
 
421
                return storage.Volume{}, errors.Trace(err)
 
422
        }
 
423
        return storage.Volume{
 
424
                volumeTag,
 
425
                storage.VolumeInfo{
 
426
                        in.Info.VolumeId,
 
427
                        in.Info.HardwareId,
 
428
                        in.Info.Size,
 
429
                        in.Info.Persistent,
 
430
                },
 
431
        }, nil
 
432
}
 
433
 
 
434
func volumeParamsFromParams(in params.VolumeParams) (storage.VolumeParams, error) {
 
435
        volumeTag, err := names.ParseVolumeTag(in.VolumeTag)
 
436
        if err != nil {
 
437
                return storage.VolumeParams{}, errors.Trace(err)
 
438
        }
 
439
        providerType := storage.ProviderType(in.Provider)
 
440
 
 
441
        var attachment *storage.VolumeAttachmentParams
 
442
        if in.Attachment != nil {
 
443
                if in.Attachment.Provider != in.Provider {
 
444
                        return storage.VolumeParams{}, errors.Errorf(
 
445
                                "storage provider mismatch: volume (%q), attachment (%q)",
 
446
                                in.Provider, in.Attachment.Provider,
 
447
                        )
 
448
                }
 
449
                if in.Attachment.VolumeTag != in.VolumeTag {
 
450
                        return storage.VolumeParams{}, errors.Errorf(
 
451
                                "volume tag mismatch: volume (%q), attachment (%q)",
 
452
                                in.VolumeTag, in.Attachment.VolumeTag,
 
453
                        )
 
454
                }
 
455
                machineTag, err := names.ParseMachineTag(in.Attachment.MachineTag)
 
456
                if err != nil {
 
457
                        return storage.VolumeParams{}, errors.Annotate(
 
458
                                err, "parsing attachment machine tag",
 
459
                        )
 
460
                }
 
461
                attachment = &storage.VolumeAttachmentParams{
 
462
                        AttachmentParams: storage.AttachmentParams{
 
463
                                Provider:   providerType,
 
464
                                Machine:    machineTag,
 
465
                                InstanceId: instance.Id(in.Attachment.InstanceId),
 
466
                                ReadOnly:   in.Attachment.ReadOnly,
 
467
                        },
 
468
                        Volume: volumeTag,
 
469
                }
 
470
        }
 
471
        return storage.VolumeParams{
 
472
                volumeTag,
 
473
                in.Size,
 
474
                providerType,
 
475
                in.Attributes,
 
476
                in.Tags,
 
477
                attachment,
 
478
        }, nil
 
479
}
 
480
 
 
481
func volumeAttachmentParamsFromParams(in params.VolumeAttachmentParams) (storage.VolumeAttachmentParams, error) {
 
482
        machineTag, err := names.ParseMachineTag(in.MachineTag)
 
483
        if err != nil {
 
484
                return storage.VolumeAttachmentParams{}, errors.Trace(err)
 
485
        }
 
486
        volumeTag, err := names.ParseVolumeTag(in.VolumeTag)
 
487
        if err != nil {
 
488
                return storage.VolumeAttachmentParams{}, errors.Trace(err)
 
489
        }
 
490
        return storage.VolumeAttachmentParams{
 
491
                AttachmentParams: storage.AttachmentParams{
 
492
                        Provider:   storage.ProviderType(in.Provider),
 
493
                        Machine:    machineTag,
 
494
                        InstanceId: instance.Id(in.InstanceId),
 
495
                        ReadOnly:   in.ReadOnly,
 
496
                },
 
497
                Volume:   volumeTag,
 
498
                VolumeId: in.VolumeId,
 
499
        }, nil
 
500
}