~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/lxc/lxd/lxd/storage.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package main
 
2
 
 
3
import (
 
4
        "encoding/json"
 
5
        "fmt"
 
6
        "os"
 
7
        "os/exec"
 
8
        "path/filepath"
 
9
        "reflect"
 
10
        "syscall"
 
11
 
 
12
        "github.com/gorilla/websocket"
 
13
 
 
14
        "github.com/lxc/lxd/shared"
 
15
        "github.com/lxc/lxd/shared/logging"
 
16
 
 
17
        log "gopkg.in/inconshreveable/log15.v2"
 
18
)
 
19
 
 
20
/* Some interesting filesystems */
 
21
const (
 
22
        filesystemSuperMagicTmpfs = 0x01021994
 
23
        filesystemSuperMagicExt4  = 0xEF53
 
24
        filesystemSuperMagicXfs   = 0x58465342
 
25
        filesystemSuperMagicNfs   = 0x6969
 
26
        filesystemSuperMagicZfs   = 0x2fc12fc1
 
27
)
 
28
 
 
29
/*
 
30
 * filesystemDetect returns the filesystem on which
 
31
 * the passed-in path sits
 
32
 */
 
33
func filesystemDetect(path string) (string, error) {
 
34
        fs := syscall.Statfs_t{}
 
35
 
 
36
        err := syscall.Statfs(path, &fs)
 
37
        if err != nil {
 
38
                return "", err
 
39
        }
 
40
 
 
41
        switch fs.Type {
 
42
        case filesystemSuperMagicBtrfs:
 
43
                return "btrfs", nil
 
44
        case filesystemSuperMagicZfs:
 
45
                return "zfs", nil
 
46
        case filesystemSuperMagicTmpfs:
 
47
                return "tmpfs", nil
 
48
        case filesystemSuperMagicExt4:
 
49
                return "ext4", nil
 
50
        case filesystemSuperMagicXfs:
 
51
                return "xfs", nil
 
52
        case filesystemSuperMagicNfs:
 
53
                return "nfs", nil
 
54
        default:
 
55
                shared.Debugf("Unknown backing filesystem type: 0x%x", fs.Type)
 
56
                return string(fs.Type), nil
 
57
        }
 
58
}
 
59
 
 
60
// storageRsyncCopy copies a directory using rsync (with the --devices option).
 
61
func storageRsyncCopy(source string, dest string) (string, error) {
 
62
        if err := os.MkdirAll(dest, 0755); err != nil {
 
63
                return "", err
 
64
        }
 
65
 
 
66
        rsyncVerbosity := "-q"
 
67
        if debug {
 
68
                rsyncVerbosity = "-vi"
 
69
        }
 
70
 
 
71
        output, err := exec.Command(
 
72
                "rsync",
 
73
                "-a",
 
74
                "-HAX",
 
75
                "--devices",
 
76
                "--delete",
 
77
                "--checksum",
 
78
                "--numeric-ids",
 
79
                rsyncVerbosity,
 
80
                shared.AddSlash(source),
 
81
                dest).CombinedOutput()
 
82
 
 
83
        return string(output), err
 
84
}
 
85
 
 
86
// storageType defines the type of a storage
 
87
type storageType int
 
88
 
 
89
const (
 
90
        storageTypeBtrfs storageType = iota
 
91
        storageTypeZfs
 
92
        storageTypeLvm
 
93
        storageTypeDir
 
94
        storageTypeMock
 
95
)
 
96
 
 
97
func storageTypeToString(sType storageType) string {
 
98
        switch sType {
 
99
        case storageTypeBtrfs:
 
100
                return "btrfs"
 
101
        case storageTypeZfs:
 
102
                return "zfs"
 
103
        case storageTypeLvm:
 
104
                return "lvm"
 
105
        case storageTypeMock:
 
106
                return "mock"
 
107
        }
 
108
 
 
109
        return "dir"
 
110
}
 
111
 
 
112
type MigrationStorageSourceDriver interface {
 
113
        /* snapshots for this container, if any */
 
114
        Snapshots() []container
 
115
 
 
116
        /* send any bits of the container/snapshots that are possible while the
 
117
         * container is still running.
 
118
         */
 
119
        SendWhileRunning(conn *websocket.Conn) error
 
120
 
 
121
        /* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or
 
122
         * do a final rsync) of the fs after the container has been
 
123
         * checkpointed. This will only be called when a container is actually
 
124
         * being live migrated.
 
125
         */
 
126
        SendAfterCheckpoint(conn *websocket.Conn) error
 
127
 
 
128
        /* Called after either success or failure of a migration, can be used
 
129
         * to clean up any temporary snapshots, etc.
 
130
         */
 
131
        Cleanup()
 
132
}
 
133
 
 
134
type storage interface {
 
135
        Init(config map[string]interface{}) (storage, error)
 
136
 
 
137
        GetStorageType() storageType
 
138
        GetStorageTypeName() string
 
139
        GetStorageTypeVersion() string
 
140
 
 
141
        // ContainerCreate creates an empty container (no rootfs/metadata.yaml)
 
142
        ContainerCreate(container container) error
 
143
 
 
144
        // ContainerCreateFromImage creates a container from a image.
 
145
        ContainerCreateFromImage(container container, imageFingerprint string) error
 
146
 
 
147
        ContainerCanRestore(container container, sourceContainer container) error
 
148
        ContainerDelete(container container) error
 
149
        ContainerCopy(container container, sourceContainer container) error
 
150
        ContainerStart(container container) error
 
151
        ContainerStop(container container) error
 
152
        ContainerRename(container container, newName string) error
 
153
        ContainerRestore(container container, sourceContainer container) error
 
154
        ContainerSetQuota(container container, size int64) error
 
155
        ContainerGetUsage(container container) (int64, error)
 
156
 
 
157
        ContainerSnapshotCreate(
 
158
                snapshotContainer container, sourceContainer container) error
 
159
        ContainerSnapshotDelete(snapshotContainer container) error
 
160
        ContainerSnapshotRename(snapshotContainer container, newName string) error
 
161
        ContainerSnapshotStart(container container) error
 
162
        ContainerSnapshotStop(container container) error
 
163
 
 
164
        /* for use in migrating snapshots */
 
165
        ContainerSnapshotCreateEmpty(snapshotContainer container) error
 
166
 
 
167
        ImageCreate(fingerprint string) error
 
168
        ImageDelete(fingerprint string) error
 
169
 
 
170
        MigrationType() MigrationFSType
 
171
 
 
172
        // Get the pieces required to migrate the source. This contains a list
 
173
        // of the "object" (i.e. container or snapshot, depending on whether or
 
174
        // not it is a snapshot name) to be migrated in order, and a channel
 
175
        // for arguments of the specific migration command. We use a channel
 
176
        // here so we don't have to invoke `zfs send` or `rsync` or whatever
 
177
        // and keep its stdin/stdout open for each snapshot during the course
 
178
        // of migration, we can do it lazily.
 
179
        //
 
180
        // N.B. that the order here important: e.g. in btrfs/zfs, snapshots
 
181
        // which are parents of other snapshots should be sent first, to save
 
182
        // as much transfer as possible. However, the base container is always
 
183
        // sent as the first object, since that is the grandparent of every
 
184
        // snapshot.
 
185
        //
 
186
        // We leave sending containers which are snapshots of other containers
 
187
        // already present on the target instance as an exercise for the
 
188
        // enterprising developer.
 
189
        MigrationSource(container container) (MigrationStorageSourceDriver, error)
 
190
        MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error
 
191
}
 
192
 
 
193
func newStorage(d *Daemon, sType storageType) (storage, error) {
 
194
        var nilmap map[string]interface{}
 
195
        return newStorageWithConfig(d, sType, nilmap)
 
196
}
 
197
 
 
198
func newStorageWithConfig(d *Daemon, sType storageType, config map[string]interface{}) (storage, error) {
 
199
        if d.MockMode {
 
200
                return d.Storage, nil
 
201
        }
 
202
 
 
203
        var s storage
 
204
 
 
205
        switch sType {
 
206
        case storageTypeBtrfs:
 
207
                if d.Storage != nil && d.Storage.GetStorageType() == storageTypeBtrfs {
 
208
                        return d.Storage, nil
 
209
                }
 
210
 
 
211
                s = &storageLogWrapper{w: &storageBtrfs{d: d}}
 
212
        case storageTypeZfs:
 
213
                if d.Storage != nil && d.Storage.GetStorageType() == storageTypeZfs {
 
214
                        return d.Storage, nil
 
215
                }
 
216
 
 
217
                s = &storageLogWrapper{w: &storageZfs{d: d}}
 
218
        case storageTypeLvm:
 
219
                if d.Storage != nil && d.Storage.GetStorageType() == storageTypeLvm {
 
220
                        return d.Storage, nil
 
221
                }
 
222
 
 
223
                s = &storageLogWrapper{w: &storageLvm{d: d}}
 
224
        default:
 
225
                if d.Storage != nil && d.Storage.GetStorageType() == storageTypeDir {
 
226
                        return d.Storage, nil
 
227
                }
 
228
 
 
229
                s = &storageLogWrapper{w: &storageDir{d: d}}
 
230
        }
 
231
 
 
232
        return s.Init(config)
 
233
}
 
234
 
 
235
func storageForFilename(d *Daemon, filename string) (storage, error) {
 
236
        config := make(map[string]interface{})
 
237
        storageType := storageTypeDir
 
238
 
 
239
        if d.MockMode {
 
240
                return newStorageWithConfig(d, storageTypeMock, config)
 
241
        }
 
242
 
 
243
        filesystem, err := filesystemDetect(filename)
 
244
        if err != nil {
 
245
                return nil, fmt.Errorf("couldn't detect filesystem for '%s': %v", filename, err)
 
246
        }
 
247
 
 
248
        if shared.PathExists(filename + ".lv") {
 
249
                storageType = storageTypeLvm
 
250
                lvPath, err := os.Readlink(filename + ".lv")
 
251
                if err != nil {
 
252
                        return nil, fmt.Errorf("couldn't read link dest for '%s': %v", filename+".lv", err)
 
253
                }
 
254
                vgname := filepath.Base(filepath.Dir(lvPath))
 
255
                config["vgName"] = vgname
 
256
        } else if shared.PathExists(filename + ".zfs") {
 
257
                storageType = storageTypeZfs
 
258
        } else if shared.PathExists(filename+".btrfs") || filesystem == "btrfs" {
 
259
                storageType = storageTypeBtrfs
 
260
        }
 
261
 
 
262
        return newStorageWithConfig(d, storageType, config)
 
263
}
 
264
 
 
265
func storageForImage(d *Daemon, imgInfo *shared.ImageInfo) (storage, error) {
 
266
        imageFilename := shared.VarPath("images", imgInfo.Fingerprint)
 
267
        return storageForFilename(d, imageFilename)
 
268
}
 
269
 
 
270
type storageShared struct {
 
271
        sType        storageType
 
272
        sTypeName    string
 
273
        sTypeVersion string
 
274
 
 
275
        log shared.Logger
 
276
}
 
277
 
 
278
func (ss *storageShared) initShared() error {
 
279
        ss.log = logging.AddContext(
 
280
                shared.Log,
 
281
                log.Ctx{"driver": fmt.Sprintf("storage/%s", ss.sTypeName)},
 
282
        )
 
283
        return nil
 
284
}
 
285
 
 
286
func (ss *storageShared) GetStorageType() storageType {
 
287
        return ss.sType
 
288
}
 
289
 
 
290
func (ss *storageShared) GetStorageTypeName() string {
 
291
        return ss.sTypeName
 
292
}
 
293
 
 
294
func (ss *storageShared) GetStorageTypeVersion() string {
 
295
        return ss.sTypeVersion
 
296
}
 
297
 
 
298
func (ss *storageShared) shiftRootfs(c container) error {
 
299
        dpath := c.Path()
 
300
        rpath := c.RootfsPath()
 
301
 
 
302
        shared.Log.Debug("Shifting root filesystem",
 
303
                log.Ctx{"container": c.Name(), "rootfs": rpath})
 
304
 
 
305
        idmapset := c.IdmapSet()
 
306
 
 
307
        if idmapset == nil {
 
308
                return fmt.Errorf("IdmapSet of container '%s' is nil", c.Name())
 
309
        }
 
310
 
 
311
        err := idmapset.ShiftRootfs(rpath)
 
312
        if err != nil {
 
313
                shared.Debugf("Shift of rootfs %s failed: %s", rpath, err)
 
314
                return err
 
315
        }
 
316
 
 
317
        /* Set an acl so the container root can descend the container dir */
 
318
        // TODO: i changed this so it calls ss.setUnprivUserAcl, which does
 
319
        // the acl change only if the container is not privileged, think thats right.
 
320
        return ss.setUnprivUserAcl(c, dpath)
 
321
}
 
322
 
 
323
func (ss *storageShared) setUnprivUserAcl(c container, destPath string) error {
 
324
        idmapset := c.IdmapSet()
 
325
 
 
326
        // Skip for privileged containers
 
327
        if idmapset == nil {
 
328
                return nil
 
329
        }
 
330
 
 
331
        // Make sure the map is valid. Skip if container uid 0 == host uid 0
 
332
        uid, _ := idmapset.ShiftIntoNs(0, 0)
 
333
        switch uid {
 
334
        case -1:
 
335
                return fmt.Errorf("Container doesn't have a uid 0 in its map")
 
336
        case 0:
 
337
                return nil
 
338
        }
 
339
 
 
340
        // Attempt to set a POSIX ACL first. Fallback to chmod if the fs doesn't support it.
 
341
        acl := fmt.Sprintf("%d:rx", uid)
 
342
        _, err := exec.Command("setfacl", "-m", acl, destPath).CombinedOutput()
 
343
        if err != nil {
 
344
                _, err := exec.Command("chmod", "+x", destPath).CombinedOutput()
 
345
                if err != nil {
 
346
                        return fmt.Errorf("Failed to chmod the container path.")
 
347
                }
 
348
        }
 
349
 
 
350
        return nil
 
351
}
 
352
 
 
353
type storageLogWrapper struct {
 
354
        w   storage
 
355
        log shared.Logger
 
356
}
 
357
 
 
358
func (lw *storageLogWrapper) Init(config map[string]interface{}) (storage, error) {
 
359
        _, err := lw.w.Init(config)
 
360
        lw.log = logging.AddContext(
 
361
                shared.Log,
 
362
                log.Ctx{"driver": fmt.Sprintf("storage/%s", lw.w.GetStorageTypeName())},
 
363
        )
 
364
 
 
365
        lw.log.Info("Init")
 
366
        return lw, err
 
367
}
 
368
 
 
369
func (lw *storageLogWrapper) GetStorageType() storageType {
 
370
        return lw.w.GetStorageType()
 
371
}
 
372
 
 
373
func (lw *storageLogWrapper) GetStorageTypeName() string {
 
374
        return lw.w.GetStorageTypeName()
 
375
}
 
376
 
 
377
func (lw *storageLogWrapper) GetStorageTypeVersion() string {
 
378
        return lw.w.GetStorageTypeVersion()
 
379
}
 
380
 
 
381
func (lw *storageLogWrapper) ContainerCreate(container container) error {
 
382
        lw.log.Debug(
 
383
                "ContainerCreate",
 
384
                log.Ctx{
 
385
                        "name":         container.Name(),
 
386
                        "isPrivileged": container.IsPrivileged()})
 
387
        return lw.w.ContainerCreate(container)
 
388
}
 
389
 
 
390
func (lw *storageLogWrapper) ContainerCreateFromImage(
 
391
        container container, imageFingerprint string) error {
 
392
 
 
393
        lw.log.Debug(
 
394
                "ContainerCreateFromImage",
 
395
                log.Ctx{
 
396
                        "imageFingerprint": imageFingerprint,
 
397
                        "name":             container.Name(),
 
398
                        "isPrivileged":     container.IsPrivileged()})
 
399
        return lw.w.ContainerCreateFromImage(container, imageFingerprint)
 
400
}
 
401
 
 
402
func (lw *storageLogWrapper) ContainerCanRestore(container container, sourceContainer container) error {
 
403
        lw.log.Debug("ContainerCanRestore", log.Ctx{"container": container.Name()})
 
404
        return lw.w.ContainerCanRestore(container, sourceContainer)
 
405
}
 
406
 
 
407
func (lw *storageLogWrapper) ContainerDelete(container container) error {
 
408
        lw.log.Debug("ContainerDelete", log.Ctx{"container": container.Name()})
 
409
        return lw.w.ContainerDelete(container)
 
410
}
 
411
 
 
412
func (lw *storageLogWrapper) ContainerCopy(
 
413
        container container, sourceContainer container) error {
 
414
 
 
415
        lw.log.Debug(
 
416
                "ContainerCopy",
 
417
                log.Ctx{
 
418
                        "container": container.Name(),
 
419
                        "source":    sourceContainer.Name()})
 
420
        return lw.w.ContainerCopy(container, sourceContainer)
 
421
}
 
422
 
 
423
func (lw *storageLogWrapper) ContainerStart(container container) error {
 
424
        lw.log.Debug("ContainerStart", log.Ctx{"container": container.Name()})
 
425
        return lw.w.ContainerStart(container)
 
426
}
 
427
 
 
428
func (lw *storageLogWrapper) ContainerStop(container container) error {
 
429
        lw.log.Debug("ContainerStop", log.Ctx{"container": container.Name()})
 
430
        return lw.w.ContainerStop(container)
 
431
}
 
432
 
 
433
func (lw *storageLogWrapper) ContainerRename(
 
434
        container container, newName string) error {
 
435
 
 
436
        lw.log.Debug(
 
437
                "ContainerRename",
 
438
                log.Ctx{
 
439
                        "container": container.Name(),
 
440
                        "newName":   newName})
 
441
        return lw.w.ContainerRename(container, newName)
 
442
}
 
443
 
 
444
func (lw *storageLogWrapper) ContainerRestore(
 
445
        container container, sourceContainer container) error {
 
446
 
 
447
        lw.log.Debug(
 
448
                "ContainerRestore",
 
449
                log.Ctx{
 
450
                        "container": container.Name(),
 
451
                        "source":    sourceContainer.Name()})
 
452
        return lw.w.ContainerRestore(container, sourceContainer)
 
453
}
 
454
 
 
455
func (lw *storageLogWrapper) ContainerSetQuota(
 
456
        container container, size int64) error {
 
457
 
 
458
        lw.log.Debug(
 
459
                "ContainerSetQuota",
 
460
                log.Ctx{
 
461
                        "container": container.Name(),
 
462
                        "size":      size})
 
463
        return lw.w.ContainerSetQuota(container, size)
 
464
}
 
465
 
 
466
func (lw *storageLogWrapper) ContainerGetUsage(
 
467
        container container) (int64, error) {
 
468
 
 
469
        lw.log.Debug(
 
470
                "ContainerGetUsage",
 
471
                log.Ctx{
 
472
                        "container": container.Name()})
 
473
        return lw.w.ContainerGetUsage(container)
 
474
}
 
475
 
 
476
func (lw *storageLogWrapper) ContainerSnapshotCreate(
 
477
        snapshotContainer container, sourceContainer container) error {
 
478
 
 
479
        lw.log.Debug("ContainerSnapshotCreate",
 
480
                log.Ctx{
 
481
                        "snapshotContainer": snapshotContainer.Name(),
 
482
                        "sourceContainer":   sourceContainer.Name()})
 
483
 
 
484
        return lw.w.ContainerSnapshotCreate(snapshotContainer, sourceContainer)
 
485
}
 
486
 
 
487
func (lw *storageLogWrapper) ContainerSnapshotCreateEmpty(snapshotContainer container) error {
 
488
        lw.log.Debug("ContainerSnapshotCreateEmpty",
 
489
                log.Ctx{
 
490
                        "snapshotContainer": snapshotContainer.Name()})
 
491
 
 
492
        return lw.w.ContainerSnapshotCreateEmpty(snapshotContainer)
 
493
}
 
494
 
 
495
func (lw *storageLogWrapper) ContainerSnapshotDelete(
 
496
        snapshotContainer container) error {
 
497
 
 
498
        lw.log.Debug("ContainerSnapshotDelete",
 
499
                log.Ctx{"snapshotContainer": snapshotContainer.Name()})
 
500
        return lw.w.ContainerSnapshotDelete(snapshotContainer)
 
501
}
 
502
 
 
503
func (lw *storageLogWrapper) ContainerSnapshotRename(
 
504
        snapshotContainer container, newName string) error {
 
505
 
 
506
        lw.log.Debug("ContainerSnapshotRename",
 
507
                log.Ctx{
 
508
                        "snapshotContainer": snapshotContainer.Name(),
 
509
                        "newName":           newName})
 
510
        return lw.w.ContainerSnapshotRename(snapshotContainer, newName)
 
511
}
 
512
 
 
513
func (lw *storageLogWrapper) ContainerSnapshotStart(container container) error {
 
514
        lw.log.Debug("ContainerStart", log.Ctx{"container": container.Name()})
 
515
        return lw.w.ContainerSnapshotStart(container)
 
516
}
 
517
 
 
518
func (lw *storageLogWrapper) ContainerSnapshotStop(container container) error {
 
519
        lw.log.Debug("ContainerStop", log.Ctx{"container": container.Name()})
 
520
        return lw.w.ContainerSnapshotStop(container)
 
521
}
 
522
 
 
523
func (lw *storageLogWrapper) ImageCreate(fingerprint string) error {
 
524
        lw.log.Debug(
 
525
                "ImageCreate",
 
526
                log.Ctx{"fingerprint": fingerprint})
 
527
        return lw.w.ImageCreate(fingerprint)
 
528
}
 
529
 
 
530
func (lw *storageLogWrapper) ImageDelete(fingerprint string) error {
 
531
        lw.log.Debug("ImageDelete", log.Ctx{"fingerprint": fingerprint})
 
532
        return lw.w.ImageDelete(fingerprint)
 
533
 
 
534
}
 
535
 
 
536
func (lw *storageLogWrapper) MigrationType() MigrationFSType {
 
537
        return lw.w.MigrationType()
 
538
}
 
539
 
 
540
func (lw *storageLogWrapper) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
 
541
        lw.log.Debug("MigrationSource", log.Ctx{"container": container.Name()})
 
542
        return lw.w.MigrationSource(container)
 
543
}
 
544
 
 
545
func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error {
 
546
        objNames := []string{}
 
547
        for _, obj := range objects {
 
548
                objNames = append(objNames, obj.Name())
 
549
        }
 
550
 
 
551
        lw.log.Debug("MigrationSink", log.Ctx{
 
552
                "live":      live,
 
553
                "container": container.Name(),
 
554
                "objects":   objNames,
 
555
        })
 
556
 
 
557
        return lw.w.MigrationSink(live, container, objects, conn)
 
558
}
 
559
 
 
560
func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
 
561
        dstIdmap := container.IdmapSet()
 
562
        if dstIdmap == nil {
 
563
                dstIdmap = new(shared.IdmapSet)
 
564
        }
 
565
 
 
566
        if !reflect.DeepEqual(srcIdmap, dstIdmap) {
 
567
                var jsonIdmap string
 
568
                if srcIdmap != nil {
 
569
                        idmapBytes, err := json.Marshal(srcIdmap.Idmap)
 
570
                        if err != nil {
 
571
                                return err
 
572
                        }
 
573
                        jsonIdmap = string(idmapBytes)
 
574
                } else {
 
575
                        jsonIdmap = "[]"
 
576
                }
 
577
 
 
578
                err := container.ConfigKeySet("volatile.last_state.idmap", jsonIdmap)
 
579
                if err != nil {
 
580
                        return err
 
581
                }
 
582
        }
 
583
 
 
584
        return nil
 
585
}
 
586
 
 
587
type rsyncStorageSourceDriver struct {
 
588
        container container
 
589
        snapshots []container
 
590
}
 
591
 
 
592
func (s rsyncStorageSourceDriver) Snapshots() []container {
 
593
        return s.snapshots
 
594
}
 
595
 
 
596
func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
 
597
        toSend := append([]container{s.container}, s.snapshots...)
 
598
 
 
599
        for _, send := range toSend {
 
600
                path := send.Path()
 
601
                if err := RsyncSend(shared.AddSlash(path), conn); err != nil {
 
602
                        return err
 
603
                }
 
604
        }
 
605
 
 
606
        return nil
 
607
}
 
608
 
 
609
func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
 
610
        /* resync anything that changed between our first send and the checkpoint */
 
611
        return RsyncSend(shared.AddSlash(s.container.Path()), conn)
 
612
}
 
613
 
 
614
func (s rsyncStorageSourceDriver) Cleanup() {
 
615
        /* no-op */
 
616
}
 
617
 
 
618
func rsyncMigrationSource(container container) (MigrationStorageSourceDriver, error) {
 
619
        snapshots, err := container.Snapshots()
 
620
        if err != nil {
 
621
                return nil, err
 
622
        }
 
623
 
 
624
        return rsyncStorageSourceDriver{container, snapshots}, nil
 
625
}
 
626
 
 
627
func rsyncMigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
 
628
        /* the first object is the actual container */
 
629
        if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
 
630
                return err
 
631
        }
 
632
 
 
633
        if len(snapshots) > 0 {
 
634
                err := os.MkdirAll(shared.VarPath(fmt.Sprintf("snapshots/%s", container.Name())), 0700)
 
635
                if err != nil {
 
636
                        return err
 
637
                }
 
638
        }
 
639
 
 
640
        for _, snap := range snapshots {
 
641
                if err := RsyncRecv(shared.AddSlash(snap.Path()), conn); err != nil {
 
642
                        return err
 
643
                }
 
644
        }
 
645
 
 
646
        if live {
 
647
                /* now receive the final sync */
 
648
                if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
 
649
                        return err
 
650
                }
 
651
        }
 
652
 
 
653
        return nil
 
654
}