~ubuntu-branches/ubuntu/saucy/juju-core/saucy-proposed

« back to all changes in this revision

Viewing changes to src/launchpad.net/juju-core/worker/firewaller/firewaller.go

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2013-07-11 17:18:27 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130711171827-vjqkg40r0dlf7ys2
Tags: 1.11.2-0ubuntu1
* New upstream release.
* Make juju-core the default juju (LP: #1190634):
  - d/control: Add virtual package juju -> juju-core.
  - d/juju-core.postinst.in: Bump priority of alternatives over that of
    python juju packages.
* Enable for all architectures (LP: #1172505):
  - d/control: Version BD on golang-go to >= 2:1.1.1 to ensure CGO
    support for non-x86 archs, make juju-core Arch: any.
  - d/README.source: Dropped - no longer required.
* d/watch: Updated for new upstream tarball naming.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012, 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
1
4
package firewaller
2
5
 
3
6
import (
4
7
        "fmt"
5
8
        "launchpad.net/juju-core/environs"
6
9
        "launchpad.net/juju-core/environs/config"
 
10
        "launchpad.net/juju-core/errors"
 
11
        "launchpad.net/juju-core/instance"
7
12
        "launchpad.net/juju-core/log"
8
13
        "launchpad.net/juju-core/state"
9
 
        "launchpad.net/juju-core/state/api/params"
10
14
        "launchpad.net/juju-core/state/watcher"
11
15
        "launchpad.net/juju-core/worker"
12
16
        "launchpad.net/tomb"
19
23
        st              *state.State
20
24
        environ         environs.Environ
21
25
        environWatcher  *state.EnvironConfigWatcher
22
 
        machinesWatcher *state.LifecycleWatcher
 
26
        machinesWatcher state.StringsWatcher
23
27
        machineds       map[string]*machineData
24
28
        unitsChange     chan *unitsChange
25
29
        unitds          map[string]*unitData
27
31
        serviceds       map[string]*serviceData
28
32
        exposedChange   chan *exposedChange
29
33
        globalMode      bool
30
 
        globalPortRef   map[params.Port]int
 
34
        globalPortRef   map[instance.Port]int
31
35
}
32
36
 
33
37
// NewFirewaller returns a new Firewaller.
35
39
        fw := &Firewaller{
36
40
                st:              st,
37
41
                environWatcher:  st.WatchEnvironConfig(),
38
 
                machinesWatcher: st.WatchMachines(),
 
42
                machinesWatcher: st.WatchEnvironMachines(),
39
43
                machineds:       make(map[string]*machineData),
40
44
                unitsChange:     make(chan *unitsChange),
41
45
                unitds:          make(map[string]*unitData),
62
66
        }
63
67
        if fw.environ.Config().FirewallMode() == config.FwGlobal {
64
68
                fw.globalMode = true
65
 
                fw.globalPortRef = make(map[params.Port]int)
 
69
                fw.globalPortRef = make(map[instance.Port]int)
66
70
        }
67
71
        for {
68
72
                select {
131
135
                fw:     fw,
132
136
                id:     id,
133
137
                unitds: make(map[string]*unitData),
134
 
                ports:  make([]params.Port, 0),
 
138
                ports:  make([]instance.Port, 0),
135
139
        }
136
140
        m, err := machined.machine()
137
 
        if state.IsNotFound(err) {
 
141
        if errors.IsNotFoundError(err) {
138
142
                return nil
139
143
        } else if err != nil {
140
144
                return fmt.Errorf("worker/firewaller: cannot watch machine units: %v", err)
153
157
                err = fw.unitsChanged(&unitsChange{machined, change})
154
158
                if err != nil {
155
159
                        stop("units watcher", unitw)
156
 
                        return fmt.Errorf("worker/firewaller: start watching machine %d faild: %v", id, err)
 
160
                        return fmt.Errorf("worker/firewaller: cannot respond to units changes for machine %q: %v", id, err)
157
161
                }
158
162
        }
159
163
        go machined.watchLoop(unitw)
190
194
        unitd.serviced = fw.serviceds[serviceName]
191
195
        unitd.serviced.unitds[unitName] = unitd
192
196
 
193
 
        ports := make([]params.Port, len(unitd.ports))
 
197
        ports := make([]instance.Port, len(unitd.ports))
194
198
        copy(ports, unitd.ports)
195
199
 
196
200
        go unitd.watchLoop(ports)
219
223
        if err != nil {
220
224
                return err
221
225
        }
222
 
        collector := make(map[params.Port]bool)
 
226
        collector := make(map[instance.Port]bool)
223
227
        for _, unitd := range fw.unitds {
224
228
                if unitd.serviced.exposed {
225
229
                        for _, port := range unitd.ports {
227
231
                        }
228
232
                }
229
233
        }
230
 
        wantedPorts := []params.Port{}
 
234
        wantedPorts := []instance.Port{}
231
235
        for port := range collector {
232
236
                wantedPorts = append(wantedPorts, port)
233
237
        }
257
261
func (fw *Firewaller) reconcileInstances() error {
258
262
        for _, machined := range fw.machineds {
259
263
                m, err := machined.machine()
260
 
                if state.IsNotFound(err) {
 
264
                if errors.IsNotFoundError(err) {
261
265
                        if err := fw.forgetMachine(machined); err != nil {
262
266
                                return err
263
267
                        }
265
269
                } else if err != nil {
266
270
                        return err
267
271
                }
268
 
                instanceId, ok := m.InstanceId()
269
 
                if !ok {
270
 
                        return state.NotFoundf("instance id for %v", m)
 
272
                instanceId, err := m.InstanceId()
 
273
                if err != nil {
 
274
                        return err
271
275
                }
272
 
                instances, err := fw.environ.Instances([]state.InstanceId{instanceId})
 
276
                instances, err := fw.environ.Instances([]instance.Id{instanceId})
273
277
                if err == environs.ErrNoInstances {
274
278
                        return nil
275
279
                } else if err != nil {
309
313
        changed := []*unitData{}
310
314
        for _, name := range change.units {
311
315
                unit, err := fw.st.Unit(name)
312
 
                if err != nil && !state.IsNotFound(err) {
 
316
                if err != nil && !errors.IsNotFoundError(err) {
313
317
                        return err
314
318
                }
315
319
                var machineId string
316
320
                if unit != nil {
317
321
                        machineId, err = unit.AssignedMachineId()
318
 
                        if state.IsNotFound(err) {
 
322
                        if errors.IsNotFoundError(err) {
319
323
                                continue
320
324
                        } else if err != nil && !state.IsNotAssigned(err) {
321
325
                                return err
360
364
// flushMachine opens and closes ports for the passed machine.
361
365
func (fw *Firewaller) flushMachine(machined *machineData) error {
362
366
        // Gather ports to open and close.
363
 
        ports := map[params.Port]bool{}
 
367
        ports := map[instance.Port]bool{}
364
368
        for _, unitd := range machined.unitds {
365
369
                if unitd.serviced.exposed {
366
370
                        for _, port := range unitd.ports {
368
372
                        }
369
373
                }
370
374
        }
371
 
        want := []params.Port{}
 
375
        want := []instance.Port{}
372
376
        for port := range ports {
373
377
                want = append(want, port)
374
378
        }
384
388
// flushGlobalPorts opens and closes global ports in the environment.
385
389
// It keeps a reference count for ports so that only 0-to-1 and 1-to-0 events
386
390
// modify the environment.
387
 
func (fw *Firewaller) flushGlobalPorts(rawOpen, rawClose []params.Port) error {
 
391
func (fw *Firewaller) flushGlobalPorts(rawOpen, rawClose []instance.Port) error {
388
392
        // Filter which ports are really to open or close.
389
 
        var toOpen, toClose []params.Port
 
393
        var toOpen, toClose []instance.Port
390
394
        for _, port := range rawOpen {
391
395
                if fw.globalPortRef[port] == 0 {
392
396
                        toOpen = append(toOpen, port)
421
425
}
422
426
 
423
427
// flushGlobalPorts opens and closes ports global on the machine.
424
 
func (fw *Firewaller) flushInstancePorts(machined *machineData, toOpen, toClose []params.Port) error {
 
428
func (fw *Firewaller) flushInstancePorts(machined *machineData, toOpen, toClose []instance.Port) error {
425
429
        // If there's nothing to do, do nothing.
426
430
        // This is important because when a machine is first created,
427
431
        // it will have no instance id but also no open ports -
430
434
                return nil
431
435
        }
432
436
        m, err := machined.machine()
433
 
        if state.IsNotFound(err) {
 
437
        if errors.IsNotFoundError(err) {
434
438
                return nil
435
439
        }
436
440
        if err != nil {
437
441
                return err
438
442
        }
439
 
        instanceId, ok := m.InstanceId()
440
 
        if !ok {
441
 
                return state.NotFoundf("instance id for %v", m)
 
443
        instanceId, err := m.InstanceId()
 
444
        if err != nil {
 
445
                return err
442
446
        }
443
 
        instances, err := fw.environ.Instances([]state.InstanceId{instanceId})
 
447
        instances, err := fw.environ.Instances([]instance.Id{instanceId})
444
448
        if err != nil {
445
449
                return err
446
450
        }
469
473
// machines that are dying.
470
474
func (fw *Firewaller) machineLifeChanged(id string) error {
471
475
        m, err := fw.st.Machine(id)
472
 
        found := !state.IsNotFound(err)
 
476
        found := !errors.IsNotFoundError(err)
473
477
        if found && err != nil {
474
478
                return err
475
479
        }
550
554
        return fw.tomb.Err()
551
555
}
552
556
 
553
 
// Wait waits for the Firewaller to exit.
 
557
// Kill implements worker.Worker.Kill.
 
558
func (fw *Firewaller) Kill() {
 
559
        fw.tomb.Kill(nil)
 
560
}
 
561
 
 
562
// Wait implements worker.Worker.Wait.
554
563
func (fw *Firewaller) Wait() error {
555
564
        return fw.tomb.Wait()
556
565
}
573
582
        fw     *Firewaller
574
583
        id     string
575
584
        unitds map[string]*unitData
576
 
        ports  []params.Port
 
585
        ports  []instance.Port
577
586
}
578
587
 
579
588
func (md *machineData) machine() (*state.Machine, error) {
581
590
}
582
591
 
583
592
// watchLoop watches the machine for units added or removed.
584
 
func (md *machineData) watchLoop(unitw *state.MachineUnitsWatcher) {
 
593
func (md *machineData) watchLoop(unitw state.StringsWatcher) {
585
594
        defer md.tomb.Done()
586
595
        defer watcher.Stop(unitw, &md.tomb)
587
596
        for {
591
600
                case change, ok := <-unitw.Changes():
592
601
                        if !ok {
593
602
                                _, err := md.machine()
594
 
                                if !state.IsNotFound(err) {
 
603
                                if !errors.IsNotFoundError(err) {
595
604
                                        md.fw.tomb.Kill(watcher.MustErr(unitw))
596
605
                                }
597
606
                                return
614
623
// portsChange contains the changed ports for one specific unit.
615
624
type portsChange struct {
616
625
        unitd *unitData
617
 
        ports []params.Port
 
626
        ports []instance.Port
618
627
}
619
628
 
620
629
// unitData holds unit details and watches port changes.
624
633
        unit     *state.Unit
625
634
        serviced *serviceData
626
635
        machined *machineData
627
 
        ports    []params.Port
 
636
        ports    []instance.Port
628
637
}
629
638
 
630
639
// watchLoop watches the unit for port changes.
631
 
func (ud *unitData) watchLoop(latestPorts []params.Port) {
 
640
func (ud *unitData) watchLoop(latestPorts []instance.Port) {
632
641
        defer ud.tomb.Done()
633
642
        w := ud.unit.Watch()
634
643
        defer watcher.Stop(w, &ud.tomb)
642
651
                                return
643
652
                        }
644
653
                        if err := ud.unit.Refresh(); err != nil {
645
 
                                if !state.IsNotFound(err) {
 
654
                                if !errors.IsNotFoundError(err) {
646
655
                                        ud.fw.tomb.Kill(err)
647
656
                                }
648
657
                                return
663
672
 
664
673
// samePorts returns whether old and new contain the same set of ports.
665
674
// Both old and new must be sorted.
666
 
func samePorts(old, new []params.Port) bool {
 
675
func samePorts(old, new []instance.Port) bool {
667
676
        if len(old) != len(new) {
668
677
                return false
669
678
        }
711
720
                                return
712
721
                        }
713
722
                        if err := sd.service.Refresh(); err != nil {
714
 
                                if !state.IsNotFound(err) {
 
723
                                if !errors.IsNotFoundError(err) {
715
724
                                        sd.fw.tomb.Kill(err)
716
725
                                }
717
726
                                return
737
746
}
738
747
 
739
748
// diff returns all the ports that exist in A but not B.
740
 
func diff(A, B []params.Port) (missing []params.Port) {
 
749
func diff(A, B []instance.Port) (missing []instance.Port) {
741
750
next:
742
751
        for _, a := range A {
743
752
                for _, b := range B {