1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
package storageprovisioner
7
"github.com/juju/errors"
8
"gopkg.in/juju/names.v2"
10
"github.com/juju/juju/apiserver/params"
11
"github.com/juju/juju/instance"
12
"github.com/juju/juju/storage"
13
"github.com/juju/juju/watcher"
16
// volumesChanged is called when the lifecycle states of the volumes
17
// with the provided IDs have been seen to have changed.
18
func volumesChanged(ctx *context, changes []string) error {
19
tags := make([]names.Tag, len(changes))
20
for i, change := range changes {
21
tags[i] = names.NewVolumeTag(change)
23
alive, dying, dead, err := storageEntityLife(ctx, tags)
25
return errors.Trace(err)
27
logger.Debugf("volumes alive: %v, dying: %v, dead: %v", alive, dying, dead)
28
if err := processDyingVolumes(ctx, dying); err != nil {
29
return errors.Annotate(err, "processing dying volumes")
31
if len(alive)+len(dead) == 0 {
35
// Get volume information for alive and dead volumes, so
36
// we can provision/deprovision.
37
volumeTags := make([]names.VolumeTag, 0, len(alive)+len(dead))
38
for _, tag := range alive {
39
volumeTags = append(volumeTags, tag.(names.VolumeTag))
41
for _, tag := range dead {
42
volumeTags = append(volumeTags, tag.(names.VolumeTag))
44
volumeResults, err := ctx.config.Volumes.Volumes(volumeTags)
46
return errors.Annotatef(err, "getting volume information")
48
if err := processDeadVolumes(ctx, volumeTags[len(alive):], volumeResults[len(alive):]); err != nil {
49
return errors.Annotate(err, "deprovisioning volumes")
51
if err := processAliveVolumes(ctx, alive, volumeResults[:len(alive)]); err != nil {
52
return errors.Annotate(err, "provisioning volumes")
57
// volumeAttachmentsChanged is called when the lifecycle states of the volume
58
// attachments with the provided IDs have been seen to have changed.
59
func volumeAttachmentsChanged(ctx *context, watcherIds []watcher.MachineStorageId) error {
60
ids := copyMachineStorageIds(watcherIds)
61
alive, dying, dead, err := attachmentLife(ctx, ids)
63
return errors.Trace(err)
65
logger.Debugf("volume attachments alive: %v, dying: %v, dead: %v", alive, dying, dead)
67
// We should not see dead volume attachments;
68
// attachments go directly from Dying to removed.
69
logger.Warningf("unexpected dead volume attachments: %v", dead)
71
if len(alive)+len(dying) == 0 {
75
// Get volume information for alive and dying volume attachments, so
76
// we can attach/detach.
77
ids = append(alive, dying...)
78
volumeAttachmentResults, err := ctx.config.Volumes.VolumeAttachments(ids)
80
return errors.Annotatef(err, "getting volume attachment information")
83
// Deprovision Dying volume attachments.
84
dyingVolumeAttachmentResults := volumeAttachmentResults[len(alive):]
85
if err := processDyingVolumeAttachments(ctx, dying, dyingVolumeAttachmentResults); err != nil {
86
return errors.Annotate(err, "deprovisioning volume attachments")
89
// Provision Alive volume attachments.
90
aliveVolumeAttachmentResults := volumeAttachmentResults[:len(alive)]
91
if err := processAliveVolumeAttachments(ctx, alive, aliveVolumeAttachmentResults); err != nil {
92
return errors.Annotate(err, "provisioning volumes")
98
// processDyingVolumes processes the VolumeResults for Dying volumes,
99
// removing them from provisioning-pending as necessary.
100
func processDyingVolumes(ctx *context, tags []names.Tag) error {
101
for _, tag := range tags {
102
removePendingVolume(ctx, tag.(names.VolumeTag))
107
// updateVolume updates the context with the given volume info.
108
func updateVolume(ctx *context, info storage.Volume) {
109
ctx.volumes[info.Tag] = info
110
for id, params := range ctx.incompleteVolumeAttachmentParams {
111
if params.VolumeId == "" && id.AttachmentTag == info.Tag.String() {
112
params.VolumeId = info.VolumeId
113
updatePendingVolumeAttachment(ctx, id, params)
118
// updatePendingVolume adds the given volume params to either the incomplete
119
// set or the schedule. If the params are incomplete due to a missing instance
120
// ID, updatePendingVolume will request that the machine be watched so its
121
// instance ID can be learned.
122
func updatePendingVolume(ctx *context, params storage.VolumeParams) {
123
if params.Attachment.InstanceId == "" {
124
watchMachine(ctx, params.Attachment.Machine)
125
ctx.incompleteVolumeParams[params.Tag] = params
127
delete(ctx.incompleteVolumeParams, params.Tag)
128
scheduleOperations(ctx, &createVolumeOp{args: params})
132
// removePendingVolume removes the specified pending volume from the
133
// incomplete set and/or the schedule if it exists there.
134
func removePendingVolume(ctx *context, tag names.VolumeTag) {
135
delete(ctx.incompleteVolumeParams, tag)
136
ctx.schedule.Remove(tag)
139
// updatePendingVolumeAttachment adds the given volume attachment params to
140
// either the incomplete set or the schedule. If the params are incomplete
141
// due to a missing instance ID, updatePendingVolumeAttachment will request
142
// that the machine be watched so its instance ID can be learned.
143
func updatePendingVolumeAttachment(
145
id params.MachineStorageId,
146
params storage.VolumeAttachmentParams,
148
if params.InstanceId == "" {
149
watchMachine(ctx, params.Machine)
150
} else if params.VolumeId != "" {
151
delete(ctx.incompleteVolumeAttachmentParams, id)
152
scheduleOperations(ctx, &attachVolumeOp{args: params})
155
ctx.incompleteVolumeAttachmentParams[id] = params
158
// removePendingVolumeAttachment removes the specified pending volume
159
// attachment from the incomplete set and/or the schedule if it exists
161
func removePendingVolumeAttachment(ctx *context, id params.MachineStorageId) {
162
delete(ctx.incompleteVolumeAttachmentParams, id)
163
ctx.schedule.Remove(id)
166
// processDeadVolumes processes the VolumeResults for Dead volumes,
167
// deprovisioning volumes and removing from state as necessary.
168
func processDeadVolumes(ctx *context, tags []names.VolumeTag, volumeResults []params.VolumeResult) error {
169
for _, tag := range tags {
170
removePendingVolume(ctx, tag)
172
var destroy []names.VolumeTag
173
var remove []names.Tag
174
for i, result := range volumeResults {
176
if result.Error == nil {
177
logger.Debugf("volume %s is provisioned, queuing for deprovisioning", tag.Id())
178
volume, err := volumeFromParams(result.Result)
180
return errors.Annotate(err, "getting volume info")
182
updateVolume(ctx, volume)
183
destroy = append(destroy, tag)
186
if params.IsCodeNotProvisioned(result.Error) {
187
logger.Debugf("volume %s is not provisioned, queuing for removal", tag.Id())
188
remove = append(remove, tag)
191
return errors.Annotatef(result.Error, "getting volume information for volume %s", tag.Id())
193
if len(destroy) > 0 {
194
ops := make([]scheduleOp, len(destroy))
195
for i, tag := range destroy {
196
ops[i] = &destroyVolumeOp{tag: tag}
198
scheduleOperations(ctx, ops...)
200
if err := removeEntities(ctx, remove); err != nil {
201
return errors.Annotate(err, "removing volumes from state")
206
// processDyingVolumeAttachments processes the VolumeAttachmentResults for
207
// Dying volume attachments, detaching volumes and updating state as necessary.
208
func processDyingVolumeAttachments(
210
ids []params.MachineStorageId,
211
volumeAttachmentResults []params.VolumeAttachmentResult,
213
for _, id := range ids {
214
removePendingVolumeAttachment(ctx, id)
216
detach := make([]params.MachineStorageId, 0, len(ids))
217
remove := make([]params.MachineStorageId, 0, len(ids))
218
for i, result := range volumeAttachmentResults {
220
if result.Error == nil {
221
detach = append(detach, id)
224
if params.IsCodeNotProvisioned(result.Error) {
225
remove = append(remove, id)
228
return errors.Annotatef(result.Error, "getting information for volume attachment %v", id)
231
attachmentParams, err := volumeAttachmentParams(ctx, detach)
233
return errors.Trace(err)
235
ops := make([]scheduleOp, len(attachmentParams))
236
for i, p := range attachmentParams {
237
ops[i] = &detachVolumeOp{args: p}
239
scheduleOperations(ctx, ops...)
241
if err := removeAttachments(ctx, remove); err != nil {
242
return errors.Annotate(err, "removing attachments from state")
244
for _, id := range remove {
245
delete(ctx.volumeAttachments, id)
250
// processAliveVolumes processes the VolumeResults for Alive volumes,
251
// provisioning volumes and setting the info in state as necessary.
252
func processAliveVolumes(ctx *context, tags []names.Tag, volumeResults []params.VolumeResult) error {
253
// Filter out the already-provisioned volumes.
254
pending := make([]names.VolumeTag, 0, len(tags))
255
for i, result := range volumeResults {
256
volumeTag := tags[i].(names.VolumeTag)
257
if result.Error == nil {
258
// Volume is already provisioned: skip.
259
logger.Debugf("volume %q is already provisioned, nothing to do", tags[i].Id())
260
volume, err := volumeFromParams(result.Result)
262
return errors.Annotate(err, "getting volume info")
264
updateVolume(ctx, volume)
265
removePendingVolume(ctx, volumeTag)
268
if !params.IsCodeNotProvisioned(result.Error) {
269
return errors.Annotatef(
270
result.Error, "getting volume information for volume %q", tags[i].Id(),
273
// The volume has not yet been provisioned, so record its tag
274
// to enquire about parameters below.
275
pending = append(pending, volumeTag)
277
if len(pending) == 0 {
280
volumeParams, err := volumeParams(ctx, pending)
282
return errors.Annotate(err, "getting volume params")
284
for _, params := range volumeParams {
285
updatePendingVolume(ctx, params)
290
// processAliveVolumeAttachments processes the VolumeAttachmentResults
291
// for Alive volume attachments, attaching volumes and setting the info
292
// in state as necessary.
293
func processAliveVolumeAttachments(
295
ids []params.MachineStorageId,
296
volumeAttachmentResults []params.VolumeAttachmentResult,
298
// Filter out the already-attached.
299
pending := make([]params.MachineStorageId, 0, len(ids))
300
for i, result := range volumeAttachmentResults {
301
if result.Error == nil {
302
// Volume attachment is already provisioned: if we
303
// didn't (re)attach in this session, then we must
305
action := "nothing to do"
306
if _, ok := ctx.volumeAttachments[ids[i]]; !ok {
307
// Not yet (re)attached in this session.
308
pending = append(pending, ids[i])
309
action = "will reattach"
312
"%s is already attached to %s, %s",
313
ids[i].AttachmentTag, ids[i].MachineTag, action,
315
removePendingVolumeAttachment(ctx, ids[i])
318
if !params.IsCodeNotProvisioned(result.Error) {
319
return errors.Annotatef(
320
result.Error, "getting information for attachment %v", ids[i],
323
// The volume has not yet been provisioned, so record its tag
324
// to enquire about parameters below.
325
pending = append(pending, ids[i])
327
if len(pending) == 0 {
330
params, err := volumeAttachmentParams(ctx, pending)
332
return errors.Trace(err)
334
for i, params := range params {
335
if volume, ok := ctx.volumes[params.Volume]; ok {
336
params.VolumeId = volume.VolumeId
338
updatePendingVolumeAttachment(ctx, pending[i], params)
343
// volumeAttachmentParams obtains the specified attachments' parameters.
344
func volumeAttachmentParams(
345
ctx *context, ids []params.MachineStorageId,
346
) ([]storage.VolumeAttachmentParams, error) {
347
paramsResults, err := ctx.config.Volumes.VolumeAttachmentParams(ids)
349
return nil, errors.Annotate(err, "getting volume attachment params")
351
attachmentParams := make([]storage.VolumeAttachmentParams, len(ids))
352
for i, result := range paramsResults {
353
if result.Error != nil {
354
return nil, errors.Annotate(result.Error, "getting volume attachment parameters")
356
params, err := volumeAttachmentParamsFromParams(result.Result)
358
return nil, errors.Annotate(err, "getting volume attachment parameters")
360
attachmentParams[i] = params
362
return attachmentParams, nil
365
// volumeParams obtains the specified volumes' parameters.
366
func volumeParams(ctx *context, tags []names.VolumeTag) ([]storage.VolumeParams, error) {
367
paramsResults, err := ctx.config.Volumes.VolumeParams(tags)
369
return nil, errors.Annotate(err, "getting volume params")
371
allParams := make([]storage.VolumeParams, len(tags))
372
for i, result := range paramsResults {
373
if result.Error != nil {
374
return nil, errors.Annotate(result.Error, "getting volume parameters")
376
params, err := volumeParamsFromParams(result.Result)
378
return nil, errors.Annotate(err, "getting volume parameters")
380
allParams[i] = params
382
return allParams, nil
385
func volumesFromStorage(in []storage.Volume) []params.Volume {
386
out := make([]params.Volume, len(in))
387
for i, v := range in {
388
out[i] = params.Volume{
401
func volumeAttachmentsFromStorage(in []storage.VolumeAttachment) []params.VolumeAttachment {
402
out := make([]params.VolumeAttachment, len(in))
403
for i, v := range in {
404
out[i] = params.VolumeAttachment{
407
params.VolumeAttachmentInfo{
418
func volumeFromParams(in params.Volume) (storage.Volume, error) {
419
volumeTag, err := names.ParseVolumeTag(in.VolumeTag)
421
return storage.Volume{}, errors.Trace(err)
423
return storage.Volume{
434
func volumeParamsFromParams(in params.VolumeParams) (storage.VolumeParams, error) {
435
volumeTag, err := names.ParseVolumeTag(in.VolumeTag)
437
return storage.VolumeParams{}, errors.Trace(err)
439
providerType := storage.ProviderType(in.Provider)
441
var attachment *storage.VolumeAttachmentParams
442
if in.Attachment != nil {
443
if in.Attachment.Provider != in.Provider {
444
return storage.VolumeParams{}, errors.Errorf(
445
"storage provider mismatch: volume (%q), attachment (%q)",
446
in.Provider, in.Attachment.Provider,
449
if in.Attachment.VolumeTag != in.VolumeTag {
450
return storage.VolumeParams{}, errors.Errorf(
451
"volume tag mismatch: volume (%q), attachment (%q)",
452
in.VolumeTag, in.Attachment.VolumeTag,
455
machineTag, err := names.ParseMachineTag(in.Attachment.MachineTag)
457
return storage.VolumeParams{}, errors.Annotate(
458
err, "parsing attachment machine tag",
461
attachment = &storage.VolumeAttachmentParams{
462
AttachmentParams: storage.AttachmentParams{
463
Provider: providerType,
465
InstanceId: instance.Id(in.Attachment.InstanceId),
466
ReadOnly: in.Attachment.ReadOnly,
471
return storage.VolumeParams{
481
func volumeAttachmentParamsFromParams(in params.VolumeAttachmentParams) (storage.VolumeAttachmentParams, error) {
482
machineTag, err := names.ParseMachineTag(in.MachineTag)
484
return storage.VolumeAttachmentParams{}, errors.Trace(err)
486
volumeTag, err := names.ParseVolumeTag(in.VolumeTag)
488
return storage.VolumeAttachmentParams{}, errors.Trace(err)
490
return storage.VolumeAttachmentParams{
491
AttachmentParams: storage.AttachmentParams{
492
Provider: storage.ProviderType(in.Provider),
494
InstanceId: instance.Id(in.InstanceId),
495
ReadOnly: in.ReadOnly,
498
VolumeId: in.VolumeId,