~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/provider/ec2/ebs.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package ec2
 
5
 
 
6
import (
 
7
        "regexp"
 
8
        "sync"
 
9
        "time"
 
10
 
 
11
        "github.com/juju/errors"
 
12
        "github.com/juju/schema"
 
13
        "github.com/juju/utils"
 
14
        "github.com/juju/utils/set"
 
15
        "gopkg.in/amz.v3/ec2"
 
16
 
 
17
        "github.com/juju/juju/constraints"
 
18
        "github.com/juju/juju/environs/tags"
 
19
        "github.com/juju/juju/instance"
 
20
        "github.com/juju/juju/provider/common"
 
21
        "github.com/juju/juju/storage"
 
22
)
 
23
 
 
24
const (
 
25
        EBS_ProviderType = storage.ProviderType("ebs")
 
26
 
 
27
        // Config attributes
 
28
 
 
29
        // The volume type (default standard):
 
30
        //   "gp2" for General Purpose (SSD) volumes
 
31
        //   "io1" for Provisioned IOPS (SSD) volumes,
 
32
        //   "standard" for Magnetic volumes.
 
33
        EBS_VolumeType = "volume-type"
 
34
 
 
35
        // The number of I/O operations per second (IOPS) per GiB
 
36
        // to provision for the volume. Only valid for Provisioned
 
37
        // IOPS (SSD) volumes.
 
38
        EBS_IOPS = "iops"
 
39
 
 
40
        // Specifies whether the volume should be encrypted.
 
41
        EBS_Encrypted = "encrypted"
 
42
 
 
43
        volumeTypeMagnetic        = "magnetic"         // standard
 
44
        volumeTypeSsd             = "ssd"              // gp2
 
45
        volumeTypeProvisionedIops = "provisioned-iops" // io1
 
46
        volumeTypeStandard        = "standard"
 
47
        volumeTypeGp2             = "gp2"
 
48
        volumeTypeIo1             = "io1"
 
49
 
 
50
        rootDiskDeviceName = "/dev/sda1"
 
51
)
 
52
 
 
53
// AWS error codes
 
54
const (
 
55
        deviceInUse        = "InvalidDevice.InUse"
 
56
        attachmentNotFound = "InvalidAttachment.NotFound"
 
57
        volumeNotFound     = "InvalidVolume.NotFound"
 
58
)
 
59
 
 
60
const (
 
61
        volumeStatusAvailable = "available"
 
62
        volumeStatusInUse     = "in-use"
 
63
        volumeStatusCreating  = "creating"
 
64
 
 
65
        attachmentStatusAttaching = "attaching"
 
66
        attachmentStatusAttached  = "attached"
 
67
 
 
68
        instanceStateShuttingDown = "shutting-down"
 
69
        instanceStateTerminated   = "terminated"
 
70
)
 
71
 
 
72
// Limits for volume parameters. See:
 
73
//   http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSVolumeTypes.html
 
74
const (
 
75
        // minMagneticVolumeSizeGiB is the minimum size for magnetic volumes in GiB.
 
76
        minMagneticVolumeSizeGiB = 1
 
77
 
 
78
        // maxMagneticVolumeSizeGiB is the maximum size for magnetic volumes in GiB.
 
79
        maxMagneticVolumeSizeGiB = 1024
 
80
 
 
81
        // minSsdVolumeSizeGiB is the minimum size for SSD volumes in GiB.
 
82
        minSsdVolumeSizeGiB = 1
 
83
 
 
84
        // maxSsdVolumeSizeGiB is the maximum size for SSD volumes in GiB.
 
85
        maxSsdVolumeSizeGiB = 16 * 1024
 
86
 
 
87
        // minProvisionedIopsVolumeSizeGiB is the minimum size of provisioned IOPS
 
88
        // volumes in GiB.
 
89
        minProvisionedIopsVolumeSizeGiB = 4
 
90
 
 
91
        // maxProvisionedIopsVolumeSizeGiB is the maximum size of provisioned IOPS
 
92
        // volumes in GiB.
 
93
        maxProvisionedIopsVolumeSizeGiB = 16 * 1024
 
94
 
 
95
        // maxProvisionedIopsSizeRatio is the maximum allowed ratio of IOPS to
 
96
        // size (in GiB), for provisioend IOPS volumes.
 
97
        maxProvisionedIopsSizeRatio = 30
 
98
 
 
99
        // maxProvisionedIops is the maximum allowed IOPS in total for provisioned IOPS
 
100
        // volumes. We take the minimum of volumeSize*maxProvisionedIopsSizeRatio and
 
101
        // maxProvisionedIops.
 
102
        maxProvisionedIops = 20000
 
103
)
 
104
 
 
105
const (
 
106
        // devicePrefix is the prefix for device names specified when creating volumes.
 
107
        devicePrefix = "/dev/sd"
 
108
 
 
109
        // renamedDevicePrefix is the prefix for device names after they have
 
110
        // been renamed. This should replace "devicePrefix" in the device name
 
111
        // when recording the block device info in state.
 
112
        renamedDevicePrefix = "xvd"
 
113
)
 
114
 
 
115
var deviceInUseRegexp = regexp.MustCompile(".*Attachment point .* is already in use")
 
116
 
 
117
// StorageProviderTypes implements storage.ProviderRegistry.
 
118
func (env *environ) StorageProviderTypes() []storage.ProviderType {
 
119
        return []storage.ProviderType{EBS_ProviderType}
 
120
}
 
121
 
 
122
// StorageProvider implements storage.ProviderRegistry.
 
123
func (env *environ) StorageProvider(t storage.ProviderType) (storage.Provider, error) {
 
124
        if t == EBS_ProviderType {
 
125
                return &ebsProvider{env}, nil
 
126
        }
 
127
        return nil, errors.NotFoundf("storage provider %q", t)
 
128
}
 
129
 
 
130
// ebsProvider creates volume sources which use AWS EBS volumes.
 
131
type ebsProvider struct {
 
132
        env *environ
 
133
}
 
134
 
 
135
var _ storage.Provider = (*ebsProvider)(nil)
 
136
 
 
137
var ebsConfigFields = schema.Fields{
 
138
        EBS_VolumeType: schema.OneOf(
 
139
                schema.Const(volumeTypeMagnetic),
 
140
                schema.Const(volumeTypeSsd),
 
141
                schema.Const(volumeTypeProvisionedIops),
 
142
                schema.Const(volumeTypeStandard),
 
143
                schema.Const(volumeTypeGp2),
 
144
                schema.Const(volumeTypeIo1),
 
145
        ),
 
146
        EBS_IOPS:      schema.ForceInt(),
 
147
        EBS_Encrypted: schema.Bool(),
 
148
}
 
149
 
 
150
var ebsConfigChecker = schema.FieldMap(
 
151
        ebsConfigFields,
 
152
        schema.Defaults{
 
153
                EBS_VolumeType: volumeTypeMagnetic,
 
154
                EBS_IOPS:       schema.Omit,
 
155
                EBS_Encrypted:  false,
 
156
        },
 
157
)
 
158
 
 
159
type ebsConfig struct {
 
160
        volumeType string
 
161
        iops       int
 
162
        encrypted  bool
 
163
}
 
164
 
 
165
func newEbsConfig(attrs map[string]interface{}) (*ebsConfig, error) {
 
166
        out, err := ebsConfigChecker.Coerce(attrs, nil)
 
167
        if err != nil {
 
168
                return nil, errors.Annotate(err, "validating EBS storage config")
 
169
        }
 
170
        coerced := out.(map[string]interface{})
 
171
        iops, _ := coerced[EBS_IOPS].(int)
 
172
        volumeType := coerced[EBS_VolumeType].(string)
 
173
        ebsConfig := &ebsConfig{
 
174
                volumeType: volumeType,
 
175
                iops:       iops,
 
176
                encrypted:  coerced[EBS_Encrypted].(bool),
 
177
        }
 
178
        switch ebsConfig.volumeType {
 
179
        case volumeTypeMagnetic:
 
180
                ebsConfig.volumeType = volumeTypeStandard
 
181
        case volumeTypeSsd:
 
182
                ebsConfig.volumeType = volumeTypeGp2
 
183
        case volumeTypeProvisionedIops:
 
184
                ebsConfig.volumeType = volumeTypeIo1
 
185
        }
 
186
        if ebsConfig.iops > 0 && ebsConfig.volumeType != volumeTypeIo1 {
 
187
                return nil, errors.Errorf("IOPS specified, but volume type is %q", volumeType)
 
188
        } else if ebsConfig.iops == 0 && ebsConfig.volumeType == volumeTypeIo1 {
 
189
                return nil, errors.Errorf("volume type is %q, IOPS unspecified or zero", volumeTypeIo1)
 
190
        }
 
191
        return ebsConfig, nil
 
192
}
 
193
 
 
194
// ValidateConfig is defined on the Provider interface.
 
195
func (e *ebsProvider) ValidateConfig(cfg *storage.Config) error {
 
196
        _, err := newEbsConfig(cfg.Attrs())
 
197
        return errors.Trace(err)
 
198
}
 
199
 
 
200
// Supports is defined on the Provider interface.
 
201
func (e *ebsProvider) Supports(k storage.StorageKind) bool {
 
202
        return k == storage.StorageKindBlock
 
203
}
 
204
 
 
205
// Scope is defined on the Provider interface.
 
206
func (e *ebsProvider) Scope() storage.Scope {
 
207
        return storage.ScopeEnviron
 
208
}
 
209
 
 
210
// Dynamic is defined on the Provider interface.
 
211
func (e *ebsProvider) Dynamic() bool {
 
212
        return true
 
213
}
 
214
 
 
215
// DefaultPools is defined on the Provider interface.
 
216
func (e *ebsProvider) DefaultPools() []*storage.Config {
 
217
        ssdPool, _ := storage.NewConfig("ebs-ssd", EBS_ProviderType, map[string]interface{}{
 
218
                EBS_VolumeType: volumeTypeSsd,
 
219
        })
 
220
        return []*storage.Config{ssdPool}
 
221
}
 
222
 
 
223
// VolumeSource is defined on the Provider interface.
 
224
func (e *ebsProvider) VolumeSource(cfg *storage.Config) (storage.VolumeSource, error) {
 
225
        environConfig := e.env.Config()
 
226
        source := &ebsVolumeSource{
 
227
                env:       e.env,
 
228
                envName:   environConfig.Name(),
 
229
                modelUUID: environConfig.UUID(),
 
230
        }
 
231
        return source, nil
 
232
}
 
233
 
 
234
// FilesystemSource is defined on the Provider interface.
 
235
func (e *ebsProvider) FilesystemSource(providerConfig *storage.Config) (storage.FilesystemSource, error) {
 
236
        return nil, errors.NotSupportedf("filesystems")
 
237
}
 
238
 
 
239
type ebsVolumeSource struct {
 
240
        env       *environ
 
241
        envName   string // non-unique, informational only
 
242
        modelUUID string
 
243
}
 
244
 
 
245
var _ storage.VolumeSource = (*ebsVolumeSource)(nil)
 
246
 
 
247
// parseVolumeOptions uses storage volume parameters to make a struct used to create volumes.
 
248
func parseVolumeOptions(size uint64, attrs map[string]interface{}) (_ ec2.CreateVolume, _ error) {
 
249
        ebsConfig, err := newEbsConfig(attrs)
 
250
        if err != nil {
 
251
                return ec2.CreateVolume{}, errors.Trace(err)
 
252
        }
 
253
        if ebsConfig.iops > maxProvisionedIopsSizeRatio {
 
254
                return ec2.CreateVolume{}, errors.Errorf(
 
255
                        "specified IOPS ratio is %d/GiB, maximum is %d/GiB",
 
256
                        ebsConfig.iops, maxProvisionedIopsSizeRatio,
 
257
                )
 
258
        }
 
259
 
 
260
        sizeInGib := mibToGib(size)
 
261
        iops := uint64(ebsConfig.iops) * sizeInGib
 
262
        if iops > maxProvisionedIops {
 
263
                iops = maxProvisionedIops
 
264
        }
 
265
        vol := ec2.CreateVolume{
 
266
                // Juju size is MiB, AWS size is GiB.
 
267
                VolumeSize: int(sizeInGib),
 
268
                VolumeType: ebsConfig.volumeType,
 
269
                Encrypted:  ebsConfig.encrypted,
 
270
                IOPS:       int64(iops),
 
271
        }
 
272
        return vol, nil
 
273
}
 
274
 
 
275
// CreateVolumes is specified on the storage.VolumeSource interface.
 
276
func (v *ebsVolumeSource) CreateVolumes(params []storage.VolumeParams) (_ []storage.CreateVolumesResult, err error) {
 
277
 
 
278
        // First, validate the params before we use them.
 
279
        results := make([]storage.CreateVolumesResult, len(params))
 
280
        instanceIds := set.NewStrings()
 
281
        for i, p := range params {
 
282
                if err := v.ValidateVolumeParams(p); err != nil {
 
283
                        results[i].Error = err
 
284
                        continue
 
285
                }
 
286
                instanceIds.Add(string(p.Attachment.InstanceId))
 
287
        }
 
288
 
 
289
        instances := make(instanceCache)
 
290
        if instanceIds.Size() > 1 {
 
291
                if err := instances.update(v.env.ec2, instanceIds.Values()...); err != nil {
 
292
                        logger.Debugf("querying running instances: %v", err)
 
293
                        // We ignore the error, because we don't want an invalid
 
294
                        // InstanceId reference from one VolumeParams to prevent
 
295
                        // the creation of another volume.
 
296
                }
 
297
        }
 
298
 
 
299
        for i, p := range params {
 
300
                if results[i].Error != nil {
 
301
                        continue
 
302
                }
 
303
                volume, attachment, err := v.createVolume(p, instances)
 
304
                if err != nil {
 
305
                        results[i].Error = err
 
306
                        continue
 
307
                }
 
308
                results[i].Volume = volume
 
309
                results[i].VolumeAttachment = attachment
 
310
        }
 
311
        return results, nil
 
312
}
 
313
 
 
314
func (v *ebsVolumeSource) createVolume(p storage.VolumeParams, instances instanceCache) (_ *storage.Volume, _ *storage.VolumeAttachment, err error) {
 
315
        var volumeId string
 
316
        defer func() {
 
317
                if err == nil || volumeId == "" {
 
318
                        return
 
319
                }
 
320
                if _, err := v.env.ec2.DeleteVolume(volumeId); err != nil {
 
321
                        logger.Errorf("error cleaning up volume %v: %v", volumeId, err)
 
322
                }
 
323
        }()
 
324
 
 
325
        // TODO(axw) if preference is to use ephemeral, use ephemeral
 
326
        // until the instance stores run out. We'll need to know how
 
327
        // many there are and how big each one is. We also need to
 
328
        // unmap ephemeral0 in cloud-init.
 
329
 
 
330
        // Create.
 
331
        instId := string(p.Attachment.InstanceId)
 
332
        if err := instances.update(v.env.ec2, instId); err != nil {
 
333
                return nil, nil, errors.Trace(err)
 
334
        }
 
335
        inst, err := instances.get(instId)
 
336
        if err != nil {
 
337
                // Can't create the volume without the instance,
 
338
                // because we need to know what its AZ is.
 
339
                return nil, nil, errors.Trace(err)
 
340
        }
 
341
        vol, _ := parseVolumeOptions(p.Size, p.Attributes)
 
342
        vol.AvailZone = inst.AvailZone
 
343
        resp, err := v.env.ec2.CreateVolume(vol)
 
344
        if err != nil {
 
345
                return nil, nil, errors.Trace(err)
 
346
        }
 
347
        volumeId = resp.Id
 
348
 
 
349
        // Tag.
 
350
        resourceTags := make(map[string]string)
 
351
        for k, v := range p.ResourceTags {
 
352
                resourceTags[k] = v
 
353
        }
 
354
        resourceTags[tagName] = resourceName(p.Tag, v.envName)
 
355
        if err := tagResources(v.env.ec2, resourceTags, volumeId); err != nil {
 
356
                return nil, nil, errors.Annotate(err, "tagging volume")
 
357
        }
 
358
 
 
359
        volume := storage.Volume{
 
360
                p.Tag,
 
361
                storage.VolumeInfo{
 
362
                        VolumeId:   volumeId,
 
363
                        Size:       gibToMib(uint64(resp.Size)),
 
364
                        Persistent: true,
 
365
                },
 
366
        }
 
367
        return &volume, nil, nil
 
368
}
 
369
 
 
370
// ListVolumes is specified on the storage.VolumeSource interface.
 
371
func (v *ebsVolumeSource) ListVolumes() ([]string, error) {
 
372
        filter := ec2.NewFilter()
 
373
        filter.Add("tag:"+tags.JujuModel, v.modelUUID)
 
374
        return listVolumes(v.env.ec2, filter)
 
375
}
 
376
 
 
377
func listVolumes(client *ec2.EC2, filter *ec2.Filter) ([]string, error) {
 
378
        resp, err := client.Volumes(nil, filter)
 
379
        if err != nil {
 
380
                return nil, err
 
381
        }
 
382
        volumeIds := make([]string, 0, len(resp.Volumes))
 
383
        for _, vol := range resp.Volumes {
 
384
                var isRootDisk bool
 
385
                for _, att := range vol.Attachments {
 
386
                        if att.Device == rootDiskDeviceName {
 
387
                                isRootDisk = true
 
388
                                break
 
389
                        }
 
390
                }
 
391
                if isRootDisk {
 
392
                        // We don't want to list root disks in the output.
 
393
                        // These are managed by the instance provisioning
 
394
                        // code; they will be created and destroyed with
 
395
                        // instances.
 
396
                        continue
 
397
                }
 
398
                volumeIds = append(volumeIds, vol.Id)
 
399
        }
 
400
        return volumeIds, nil
 
401
}
 
402
 
 
403
// DescribeVolumes is specified on the storage.VolumeSource interface.
 
404
func (v *ebsVolumeSource) DescribeVolumes(volIds []string) ([]storage.DescribeVolumesResult, error) {
 
405
        // TODO(axw) invalid volIds here should not cause the whole
 
406
        // operation to fail. If we get an invalid volume ID response,
 
407
        // fall back to querying each volume individually. That should
 
408
        // be rare.
 
409
        resp, err := v.env.ec2.Volumes(volIds, nil)
 
410
        if err != nil {
 
411
                return nil, err
 
412
        }
 
413
        byId := make(map[string]ec2.Volume)
 
414
        for _, vol := range resp.Volumes {
 
415
                byId[vol.Id] = vol
 
416
        }
 
417
        results := make([]storage.DescribeVolumesResult, len(volIds))
 
418
        for i, volId := range volIds {
 
419
                vol, ok := byId[volId]
 
420
                if !ok {
 
421
                        results[i].Error = errors.NotFoundf("%s", volId)
 
422
                        continue
 
423
                }
 
424
                results[i].VolumeInfo = &storage.VolumeInfo{
 
425
                        Size:       gibToMib(uint64(vol.Size)),
 
426
                        VolumeId:   vol.Id,
 
427
                        Persistent: true,
 
428
                }
 
429
                for _, attachment := range vol.Attachments {
 
430
                        if attachment.DeleteOnTermination {
 
431
                                results[i].VolumeInfo.Persistent = false
 
432
                                break
 
433
                        }
 
434
                }
 
435
        }
 
436
        return results, nil
 
437
}
 
438
 
 
439
// DestroyVolumes is specified on the storage.VolumeSource interface.
 
440
func (v *ebsVolumeSource) DestroyVolumes(volIds []string) ([]error, error) {
 
441
        return destroyVolumes(v.env.ec2, volIds), nil
 
442
}
 
443
 
 
444
func destroyVolumes(client *ec2.EC2, volIds []string) []error {
 
445
        var wg sync.WaitGroup
 
446
        wg.Add(len(volIds))
 
447
        results := make([]error, len(volIds))
 
448
        for i, volumeId := range volIds {
 
449
                go func(i int, volumeId string) {
 
450
                        defer wg.Done()
 
451
                        results[i] = destroyVolume(client, volumeId)
 
452
                }(i, volumeId)
 
453
        }
 
454
        wg.Wait()
 
455
        return results
 
456
}
 
457
 
 
458
var destroyVolumeAttempt = utils.AttemptStrategy{
 
459
        Total: 5 * time.Minute,
 
460
        Delay: 5 * time.Second,
 
461
}
 
462
 
 
463
func destroyVolume(client *ec2.EC2, volumeId string) (err error) {
 
464
        defer func() {
 
465
                if err != nil {
 
466
                        if ec2ErrCode(err) == volumeNotFound || errors.IsNotFound(err) {
 
467
                                // Either the volume isn't found, or we queried the
 
468
                                // instance corresponding to a DeleteOnTermination
 
469
                                // attachment; in either case, the volume is or will
 
470
                                // be destroyed.
 
471
                                logger.Tracef("Ignoring error destroying volume %q: %v", volumeId, err)
 
472
                                err = nil
 
473
                        }
 
474
                }
 
475
        }()
 
476
 
 
477
        logger.Debugf("destroying %q", volumeId)
 
478
        // Volumes must not be in-use when destroying. A volume may
 
479
        // still be in-use when the instance it is attached to is
 
480
        // in the process of being terminated.
 
481
        volume, err := waitVolume(client, volumeId, destroyVolumeAttempt, func(volume *ec2.Volume) (bool, error) {
 
482
                if volume.Status != volumeStatusInUse {
 
483
                        // Volume is not in use, it should be OK to destroy now.
 
484
                        return true, nil
 
485
                }
 
486
                if len(volume.Attachments) == 0 {
 
487
                        // There are no attachments remaining now; keep querying
 
488
                        // until volume transitions out of in-use.
 
489
                        return false, nil
 
490
                }
 
491
                var deleteOnTermination []string
 
492
                var args []storage.VolumeAttachmentParams
 
493
                for _, a := range volume.Attachments {
 
494
                        switch a.Status {
 
495
                        case attachmentStatusAttaching, attachmentStatusAttached:
 
496
                                // The volume is attaching or attached to an
 
497
                                // instance, we need for it to be detached
 
498
                                // before we can destroy it.
 
499
                                args = append(args, storage.VolumeAttachmentParams{
 
500
                                        AttachmentParams: storage.AttachmentParams{
 
501
                                                InstanceId: instance.Id(a.InstanceId),
 
502
                                        },
 
503
                                        VolumeId: volumeId,
 
504
                                })
 
505
                                if a.DeleteOnTermination {
 
506
                                        // The volume is still attached, and the
 
507
                                        // attachment is "delete on termination";
 
508
                                        // check if the related instance is being
 
509
                                        // terminated, in which case we can stop
 
510
                                        // waiting and skip destroying the volume.
 
511
                                        //
 
512
                                        // Note: we still accrue in "args" above
 
513
                                        // in case the instance is not terminating;
 
514
                                        // in that case we detach and destroy as
 
515
                                        // usual.
 
516
                                        deleteOnTermination = append(
 
517
                                                deleteOnTermination, a.InstanceId,
 
518
                                        )
 
519
                                }
 
520
                        }
 
521
                }
 
522
                if len(deleteOnTermination) > 0 {
 
523
                        result, err := client.Instances(deleteOnTermination, nil)
 
524
                        if err != nil {
 
525
                                return false, errors.Trace(err)
 
526
                        }
 
527
                        for _, reservation := range result.Reservations {
 
528
                                for _, instance := range reservation.Instances {
 
529
                                        switch instance.State.Name {
 
530
                                        case instanceStateShuttingDown, instanceStateTerminated:
 
531
                                                // The instance is or will be terminated,
 
532
                                                // and so the volume will be deleted by
 
533
                                                // virtue of delete-on-termination.
 
534
                                                return true, nil
 
535
                                        }
 
536
                                }
 
537
                        }
 
538
                }
 
539
                if len(args) == 0 {
 
540
                        return false, nil
 
541
                }
 
542
                results, err := detachVolumes(client, args)
 
543
                if err != nil {
 
544
                        return false, errors.Trace(err)
 
545
                }
 
546
                for _, err := range results {
 
547
                        if err != nil {
 
548
                                return false, errors.Trace(err)
 
549
                        }
 
550
                }
 
551
                return false, nil
 
552
        })
 
553
        if err != nil {
 
554
                if err == errWaitVolumeTimeout {
 
555
                        return errors.Errorf("timed out waiting for volume %v to not be in-use", volumeId)
 
556
                }
 
557
                return errors.Trace(err)
 
558
        }
 
559
        if volume.Status == volumeStatusInUse {
 
560
                // If the volume is in-use, that means it will be
 
561
                // handled by delete-on-termination and we have
 
562
                // nothing more to do.
 
563
                return nil
 
564
        }
 
565
        if _, err := client.DeleteVolume(volumeId); err != nil {
 
566
                return errors.Annotatef(err, "destroying %q", volumeId)
 
567
        }
 
568
        return nil
 
569
}
 
570
 
 
571
// ValidateVolumeParams is specified on the storage.VolumeSource interface.
 
572
func (v *ebsVolumeSource) ValidateVolumeParams(params storage.VolumeParams) error {
 
573
        vol, err := parseVolumeOptions(params.Size, params.Attributes)
 
574
        if err != nil {
 
575
                return err
 
576
        }
 
577
        var minVolumeSize, maxVolumeSize int
 
578
        switch vol.VolumeType {
 
579
        case volumeTypeStandard:
 
580
                minVolumeSize = minMagneticVolumeSizeGiB
 
581
                maxVolumeSize = maxMagneticVolumeSizeGiB
 
582
        case volumeTypeGp2:
 
583
                minVolumeSize = minSsdVolumeSizeGiB
 
584
                maxVolumeSize = maxSsdVolumeSizeGiB
 
585
        case volumeTypeIo1:
 
586
                minVolumeSize = minProvisionedIopsVolumeSizeGiB
 
587
                maxVolumeSize = maxProvisionedIopsVolumeSizeGiB
 
588
        }
 
589
        if vol.VolumeSize < minVolumeSize {
 
590
                return errors.Errorf(
 
591
                        "volume size is %d GiB, must be at least %d GiB",
 
592
                        vol.VolumeSize, minVolumeSize,
 
593
                )
 
594
        }
 
595
        if vol.VolumeSize > maxVolumeSize {
 
596
                return errors.Errorf(
 
597
                        "volume size %d GiB exceeds the maximum of %d GiB",
 
598
                        vol.VolumeSize, maxVolumeSize,
 
599
                )
 
600
        }
 
601
        return nil
 
602
}
 
603
 
 
604
// AttachVolumes is specified on the storage.VolumeSource interface.
 
605
func (v *ebsVolumeSource) AttachVolumes(attachParams []storage.VolumeAttachmentParams) ([]storage.AttachVolumesResult, error) {
 
606
        // We need the virtualisation types for each instance we are
 
607
        // attaching to so we can determine the device name.
 
608
        instIds := set.NewStrings()
 
609
        for _, p := range attachParams {
 
610
                instIds.Add(string(p.InstanceId))
 
611
        }
 
612
        instances := make(instanceCache)
 
613
        if instIds.Size() > 1 {
 
614
                if err := instances.update(v.env.ec2, instIds.Values()...); err != nil {
 
615
                        logger.Debugf("querying running instances: %v", err)
 
616
                        // We ignore the error, because we don't want an invalid
 
617
                        // InstanceId reference from one VolumeParams to prevent
 
618
                        // the creation of another volume.
 
619
                }
 
620
        }
 
621
 
 
622
        results := make([]storage.AttachVolumesResult, len(attachParams))
 
623
        for i, params := range attachParams {
 
624
                instId := string(params.InstanceId)
 
625
                // By default we should allocate device names without the
 
626
                // trailing number. Block devices with a trailing number are
 
627
                // not liked by some applications, e.g. Ceph, which want full
 
628
                // disks.
 
629
                //
 
630
                // TODO(axw) introduce a configuration option if and when
 
631
                // someone asks for it to enable use of numbers. This option
 
632
                // must error if used with an "hvm" instance type.
 
633
                const numbers = false
 
634
                nextDeviceName := blockDeviceNamer(numbers)
 
635
                _, deviceName, err := v.attachOneVolume(nextDeviceName, params.VolumeId, instId)
 
636
                if err != nil {
 
637
                        results[i].Error = err
 
638
                        continue
 
639
                }
 
640
                results[i].VolumeAttachment = &storage.VolumeAttachment{
 
641
                        params.Volume,
 
642
                        params.Machine,
 
643
                        storage.VolumeAttachmentInfo{
 
644
                                DeviceName: deviceName,
 
645
                        },
 
646
                }
 
647
        }
 
648
        return results, nil
 
649
}
 
650
 
 
651
func (v *ebsVolumeSource) attachOneVolume(
 
652
        nextDeviceName func() (string, string, error),
 
653
        volumeId, instId string,
 
654
) (string, string, error) {
 
655
        // Wait for the volume to move out of "creating".
 
656
        volume, err := v.waitVolumeCreated(volumeId)
 
657
        if err != nil {
 
658
                return "", "", errors.Trace(err)
 
659
        }
 
660
 
 
661
        // Possible statuses:
 
662
        //    creating | available | in-use | deleting | deleted | error
 
663
        switch volume.Status {
 
664
        default:
 
665
                return "", "", errors.Errorf("cannot attach to volume with status %q", volume.Status)
 
666
 
 
667
        case volumeStatusInUse:
 
668
                // Volume is already attached; see if it's attached to the
 
669
                // instance requested.
 
670
                attachments := volume.Attachments
 
671
                if len(attachments) != 1 {
 
672
                        return "", "", errors.Annotatef(err, "volume %v has unexpected attachment count: %v", volumeId, len(attachments))
 
673
                }
 
674
                if attachments[0].InstanceId != instId {
 
675
                        return "", "", errors.Annotatef(err, "volume %v is attached to %v", volumeId, attachments[0].InstanceId)
 
676
                }
 
677
                requestDeviceName := attachments[0].Device
 
678
                actualDeviceName := renamedDevicePrefix + requestDeviceName[len(devicePrefix):]
 
679
                return requestDeviceName, actualDeviceName, nil
 
680
 
 
681
        case volumeStatusAvailable:
 
682
                // Attempt to attach below.
 
683
                break
 
684
        }
 
685
 
 
686
        for {
 
687
                requestDeviceName, actualDeviceName, err := nextDeviceName()
 
688
                if err != nil {
 
689
                        // Can't attach any more volumes.
 
690
                        return "", "", err
 
691
                }
 
692
                _, err = v.env.ec2.AttachVolume(volumeId, instId, requestDeviceName)
 
693
                if ec2Err, ok := err.(*ec2.Error); ok {
 
694
                        switch ec2Err.Code {
 
695
                        case invalidParameterValue:
 
696
                                // InvalidParameterValue is returned by AttachVolume
 
697
                                // rather than InvalidDevice.InUse as the docs would
 
698
                                // suggest.
 
699
                                if !deviceInUseRegexp.MatchString(ec2Err.Message) {
 
700
                                        break
 
701
                                }
 
702
                                fallthrough
 
703
 
 
704
                        case deviceInUse:
 
705
                                // deviceInUse means that the requested device name
 
706
                                // is in use already. Try again with the next name.
 
707
                                continue
 
708
                        }
 
709
                }
 
710
                if err != nil {
 
711
                        return "", "", errors.Annotate(err, "attaching volume")
 
712
                }
 
713
                return requestDeviceName, actualDeviceName, nil
 
714
        }
 
715
}
 
716
 
 
717
func (v *ebsVolumeSource) waitVolumeCreated(volumeId string) (*ec2.Volume, error) {
 
718
        var attempt = utils.AttemptStrategy{
 
719
                Total: 5 * time.Second,
 
720
                Delay: 200 * time.Millisecond,
 
721
        }
 
722
        var lastStatus string
 
723
        volume, err := waitVolume(v.env.ec2, volumeId, attempt, func(volume *ec2.Volume) (bool, error) {
 
724
                lastStatus = volume.Status
 
725
                return volume.Status != volumeStatusCreating, nil
 
726
        })
 
727
        if err == errWaitVolumeTimeout {
 
728
                return nil, errors.Errorf(
 
729
                        "timed out waiting for volume %v to become available (%v)",
 
730
                        volumeId, lastStatus,
 
731
                )
 
732
        } else if err != nil {
 
733
                return nil, errors.Trace(err)
 
734
        }
 
735
        return volume, nil
 
736
}
 
737
 
 
738
var errWaitVolumeTimeout = errors.New("timed out")
 
739
 
 
740
func waitVolume(
 
741
        client *ec2.EC2,
 
742
        volumeId string,
 
743
        attempt utils.AttemptStrategy,
 
744
        pred func(v *ec2.Volume) (bool, error),
 
745
) (*ec2.Volume, error) {
 
746
        for a := attempt.Start(); a.Next(); {
 
747
                volume, err := describeVolume(client, volumeId)
 
748
                if err != nil {
 
749
                        return nil, errors.Trace(err)
 
750
                }
 
751
                ok, err := pred(volume)
 
752
                if err != nil {
 
753
                        return nil, errors.Trace(err)
 
754
                }
 
755
                if ok {
 
756
                        return volume, nil
 
757
                }
 
758
        }
 
759
        return nil, errWaitVolumeTimeout
 
760
}
 
761
 
 
762
func describeVolume(client *ec2.EC2, volumeId string) (*ec2.Volume, error) {
 
763
        resp, err := client.Volumes([]string{volumeId}, nil)
 
764
        if err != nil {
 
765
                return nil, errors.Annotate(err, "querying volume")
 
766
        }
 
767
        if len(resp.Volumes) == 0 {
 
768
                return nil, errors.NotFoundf("%v", volumeId)
 
769
        } else if len(resp.Volumes) != 1 {
 
770
                return nil, errors.Errorf("expected one volume, got %d", len(resp.Volumes))
 
771
        }
 
772
        return &resp.Volumes[0], nil
 
773
}
 
774
 
 
775
type instanceCache map[string]ec2.Instance
 
776
 
 
777
func (c instanceCache) update(ec2client *ec2.EC2, ids ...string) error {
 
778
        if len(ids) == 1 {
 
779
                if _, ok := c[ids[0]]; ok {
 
780
                        return nil
 
781
                }
 
782
        }
 
783
        filter := ec2.NewFilter()
 
784
        filter.Add("instance-state-name", "running")
 
785
        resp, err := ec2client.Instances(ids, filter)
 
786
        if err != nil {
 
787
                return errors.Annotate(err, "querying instance details")
 
788
        }
 
789
        for j := range resp.Reservations {
 
790
                r := &resp.Reservations[j]
 
791
                for _, inst := range r.Instances {
 
792
                        c[inst.InstanceId] = inst
 
793
                }
 
794
        }
 
795
        return nil
 
796
}
 
797
 
 
798
func (c instanceCache) get(id string) (ec2.Instance, error) {
 
799
        inst, ok := c[id]
 
800
        if !ok {
 
801
                return ec2.Instance{}, errors.Errorf("cannot attach to non-running instance %v", id)
 
802
        }
 
803
        return inst, nil
 
804
}
 
805
 
 
806
// DetachVolumes is specified on the storage.VolumeSource interface.
 
807
func (v *ebsVolumeSource) DetachVolumes(attachParams []storage.VolumeAttachmentParams) ([]error, error) {
 
808
        return detachVolumes(v.env.ec2, attachParams)
 
809
}
 
810
 
 
811
func detachVolumes(client *ec2.EC2, attachParams []storage.VolumeAttachmentParams) ([]error, error) {
 
812
        results := make([]error, len(attachParams))
 
813
        for i, params := range attachParams {
 
814
                _, err := client.DetachVolume(params.VolumeId, string(params.InstanceId), "", false)
 
815
                // Process aws specific error information.
 
816
                if err != nil {
 
817
                        if ec2Err, ok := err.(*ec2.Error); ok {
 
818
                                switch ec2Err.Code {
 
819
                                // attachment not found means this volume is already detached.
 
820
                                case attachmentNotFound:
 
821
                                        err = nil
 
822
                                }
 
823
                        }
 
824
                }
 
825
                if err != nil {
 
826
                        results[i] = errors.Annotatef(
 
827
                                err, "detaching %v from %v", params.Volume, params.Machine,
 
828
                        )
 
829
                }
 
830
        }
 
831
        return results, nil
 
832
}
 
833
 
 
834
var errTooManyVolumes = errors.New("too many EBS volumes to attach")
 
835
 
 
836
// blockDeviceNamer returns a function that cycles through block device names.
 
837
//
 
838
// The returned function returns the device name that should be used in
 
839
// requests to the EC2 API, and and also the (kernel) device name as it
 
840
// will appear on the machine.
 
841
//
 
842
// See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html
 
843
func blockDeviceNamer(numbers bool) func() (requestName, actualName string, err error) {
 
844
        const (
 
845
                // deviceLetterMin is the first letter to use for EBS block device names.
 
846
                deviceLetterMin = 'f'
 
847
                // deviceLetterMax is the last letter to use for EBS block device names.
 
848
                deviceLetterMax = 'p'
 
849
                // deviceNumMax is the maximum value for trailing numbers on block device name.
 
850
                deviceNumMax = 6
 
851
        )
 
852
        var n int
 
853
        letterRepeats := 1
 
854
        if numbers {
 
855
                letterRepeats = deviceNumMax
 
856
        }
 
857
        return func() (string, string, error) {
 
858
                letter := deviceLetterMin + (n / letterRepeats)
 
859
                if letter > deviceLetterMax {
 
860
                        return "", "", errTooManyVolumes
 
861
                }
 
862
                deviceName := devicePrefix + string(letter)
 
863
                if numbers {
 
864
                        deviceName += string('1' + (n % deviceNumMax))
 
865
                }
 
866
                n++
 
867
                realDeviceName := renamedDevicePrefix + deviceName[len(devicePrefix):]
 
868
                return deviceName, realDeviceName, nil
 
869
        }
 
870
}
 
871
 
 
872
func minRootDiskSizeMiB(ser string) uint64 {
 
873
        return gibToMib(common.MinRootDiskSizeGiB(ser))
 
874
}
 
875
 
 
876
// getBlockDeviceMappings translates constraints into BlockDeviceMappings.
 
877
//
 
878
// The first entry is always the root disk mapping, followed by instance
 
879
// stores (ephemeral disks).
 
880
func getBlockDeviceMappings(cons constraints.Value, ser string) []ec2.BlockDeviceMapping {
 
881
        rootDiskSizeMiB := minRootDiskSizeMiB(ser)
 
882
        if cons.RootDisk != nil {
 
883
                if *cons.RootDisk >= minRootDiskSizeMiB(ser) {
 
884
                        rootDiskSizeMiB = *cons.RootDisk
 
885
                } else {
 
886
                        logger.Infof(
 
887
                                "Ignoring root-disk constraint of %dM because it is smaller than the EC2 image size of %dM",
 
888
                                *cons.RootDisk,
 
889
                                minRootDiskSizeMiB(ser),
 
890
                        )
 
891
                }
 
892
        }
 
893
        // The first block device is for the root disk.
 
894
        blockDeviceMappings := []ec2.BlockDeviceMapping{{
 
895
                DeviceName: rootDiskDeviceName,
 
896
                VolumeSize: int64(mibToGib(rootDiskSizeMiB)),
 
897
        }}
 
898
 
 
899
        // Not all machines have this many instance stores.
 
900
        // Instances will be started with as many of the
 
901
        // instance stores as they can support.
 
902
        blockDeviceMappings = append(blockDeviceMappings, []ec2.BlockDeviceMapping{{
 
903
                VirtualName: "ephemeral0",
 
904
                DeviceName:  "/dev/sdb",
 
905
        }, {
 
906
                VirtualName: "ephemeral1",
 
907
                DeviceName:  "/dev/sdc",
 
908
        }, {
 
909
                VirtualName: "ephemeral2",
 
910
                DeviceName:  "/dev/sdd",
 
911
        }, {
 
912
                VirtualName: "ephemeral3",
 
913
                DeviceName:  "/dev/sde",
 
914
        }}...)
 
915
 
 
916
        return blockDeviceMappings
 
917
}
 
918
 
 
919
// mibToGib converts mebibytes to gibibytes.
 
920
// AWS expects GiB, we work in MiB; round up
 
921
// to nearest GiB.
 
922
func mibToGib(m uint64) uint64 {
 
923
        return (m + 1023) / 1024
 
924
}
 
925
 
 
926
// gibToMib converts gibibytes to mebibytes.
 
927
func gibToMib(g uint64) uint64 {
 
928
        return g * 1024
 
929
}