~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/instancepoller/aggregate_test.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2014 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package instancepoller
 
5
 
 
6
import (
 
7
        "sync"
 
8
        "sync/atomic"
 
9
        "time"
 
10
 
 
11
        "github.com/juju/errors"
 
12
        jc "github.com/juju/testing/checkers"
 
13
        gc "gopkg.in/check.v1"
 
14
 
 
15
        "github.com/juju/juju/environs"
 
16
        "github.com/juju/juju/instance"
 
17
        "github.com/juju/juju/network"
 
18
        "github.com/juju/juju/status"
 
19
        "github.com/juju/juju/testing"
 
20
        "github.com/juju/juju/worker/workertest"
 
21
)
 
22
 
 
23
type aggregateSuite struct {
 
24
        testing.BaseSuite
 
25
}
 
26
 
 
27
var _ = gc.Suite(&aggregateSuite{})
 
28
 
 
29
type testInstance struct {
 
30
        instance.Instance
 
31
        id        instance.Id
 
32
        addresses []network.Address
 
33
        status    string
 
34
        err       error
 
35
}
 
36
 
 
37
var _ instance.Instance = (*testInstance)(nil)
 
38
 
 
39
func (t *testInstance) Id() instance.Id {
 
40
        return t.id
 
41
}
 
42
 
 
43
func (t *testInstance) Addresses() ([]network.Address, error) {
 
44
        if t.err != nil {
 
45
                return nil, t.err
 
46
        }
 
47
        return t.addresses, nil
 
48
}
 
49
 
 
50
func (t *testInstance) Status() instance.InstanceStatus {
 
51
        return instance.InstanceStatus{Status: status.StatusUnknown, Message: t.status}
 
52
}
 
53
 
 
54
type testInstanceGetter struct {
 
55
        sync.RWMutex
 
56
        // ids is set when the Instances method is called.
 
57
        ids     []instance.Id
 
58
        results map[instance.Id]instance.Instance
 
59
        err     error
 
60
        counter int32
 
61
}
 
62
 
 
63
func (tig *testInstanceGetter) Instances(ids []instance.Id) (result []instance.Instance, err error) {
 
64
        tig.ids = ids
 
65
        atomic.AddInt32(&tig.counter, 1)
 
66
        results := make([]instance.Instance, len(ids))
 
67
        for i, id := range ids {
 
68
                // We don't check 'ok' here, because we want the Instance{nil}
 
69
                // response for those
 
70
                results[i] = tig.results[id]
 
71
        }
 
72
        return results, tig.err
 
73
}
 
74
 
 
75
func (tig *testInstanceGetter) newTestInstance(id instance.Id, status string, addresses []string) *testInstance {
 
76
        if tig.results == nil {
 
77
                tig.results = make(map[instance.Id]instance.Instance)
 
78
        }
 
79
        thisInstance := &testInstance{
 
80
                id:        id,
 
81
                status:    status,
 
82
                addresses: network.NewAddresses(addresses...),
 
83
        }
 
84
        tig.results[thisInstance.Id()] = thisInstance
 
85
        return thisInstance
 
86
}
 
87
 
 
88
// Test that one request gets sent after suitable delay.
 
89
func (s *aggregateSuite) TestSingleRequest(c *gc.C) {
 
90
        // We setup a couple variables here so that we can use them locally without
 
91
        // type assertions. Then we use them in the aggregatorConfig.
 
92
        testGetter := new(testInstanceGetter)
 
93
        clock := testing.NewClock(time.Now())
 
94
        delay := time.Minute
 
95
        cfg := aggregatorConfig{
 
96
                Clock:   clock,
 
97
                Delay:   delay,
 
98
                Environ: testGetter,
 
99
        }
 
100
 
 
101
        // Add a new test instance.
 
102
        testGetter.newTestInstance("foo", "foobar", []string{"127.0.0.1", "192.168.1.1"})
 
103
 
 
104
        aggregator, err := newAggregator(cfg)
 
105
        c.Check(err, jc.ErrorIsNil)
 
106
 
 
107
        // Ensure the worker is killed and cleaned up if the test exits early.
 
108
        defer workertest.CleanKill(c, aggregator)
 
109
 
 
110
        // Create a test in a goroutine and make sure we wait for it to finish.
 
111
        var wg sync.WaitGroup
 
112
        wg.Add(1)
 
113
        go func() {
 
114
                defer wg.Done()
 
115
                info, err := aggregator.instanceInfo("foo")
 
116
                c.Check(err, jc.ErrorIsNil)
 
117
                c.Check(info.status.Message, gc.DeepEquals, "foobar")
 
118
        }()
 
119
 
 
120
        // Unwind the test clock
 
121
        waitAlarms(c, clock, 1)
 
122
        clock.Advance(delay)
 
123
 
 
124
        wg.Wait()
 
125
 
 
126
        // Ensure we kill the worker before looking at our testInstanceGetter to
 
127
        // ensure there's no possibility of a race.
 
128
        workertest.CleanKill(c, aggregator)
 
129
 
 
130
        ids := testGetter.ids
 
131
        c.Assert(ids, gc.DeepEquals, []instance.Id{"foo"})
 
132
}
 
133
 
 
134
// Test several requests in a short space of time get batched.
 
135
func (s *aggregateSuite) TestMultipleResponseHandling(c *gc.C) {
 
136
        // We setup a couple variables here so that we can use them locally without
 
137
        // type assertions. Then we use them in the aggregatorConfig.
 
138
        testGetter := new(testInstanceGetter)
 
139
        clock := testing.NewClock(time.Now())
 
140
        delay := time.Minute
 
141
        cfg := aggregatorConfig{
 
142
                Clock:   clock,
 
143
                Delay:   delay,
 
144
                Environ: testGetter,
 
145
        }
 
146
 
 
147
        // Setup multiple instances to batch
 
148
        testGetter.newTestInstance("foo", "foobar", []string{"127.0.0.1", "192.168.1.1"})
 
149
        testGetter.newTestInstance("foo2", "not foobar", []string{"192.168.1.2"})
 
150
        testGetter.newTestInstance("foo3", "ok-ish", []string{"192.168.1.3"})
 
151
 
 
152
        aggregator, err := newAggregator(cfg)
 
153
        c.Check(err, jc.ErrorIsNil)
 
154
 
 
155
        // Ensure the worker is killed and cleaned up if the test exits early.
 
156
        defer workertest.CleanKill(c, aggregator)
 
157
 
 
158
        // Create a closure for tests we can launch in goroutines.
 
159
        var wg sync.WaitGroup
 
160
        checkInfo := func(id instance.Id, expectStatus string) {
 
161
                defer wg.Done()
 
162
                info, err := aggregator.instanceInfo(id)
 
163
                c.Check(err, jc.ErrorIsNil)
 
164
                c.Check(info.status.Message, gc.Equals, expectStatus)
 
165
        }
 
166
 
 
167
        // Launch and wait for these
 
168
        wg.Add(2)
 
169
        go checkInfo("foo2", "not foobar")
 
170
        go checkInfo("foo3", "ok-ish")
 
171
 
 
172
        // Unwind the testing clock to let our requests through.
 
173
        waitAlarms(c, clock, 2)
 
174
        clock.Advance(delay)
 
175
 
 
176
        // Check we're still alive.
 
177
        workertest.CheckAlive(c, aggregator)
 
178
 
 
179
        // Wait until the tests pass.
 
180
        wg.Wait()
 
181
 
 
182
        // Ensure we kill the worker before looking at our testInstanceGetter to
 
183
        // ensure there's no possibility of a race.
 
184
        workertest.CleanKill(c, aggregator)
 
185
 
 
186
        // Ensure we got our list back with the expected contents.
 
187
        c.Assert(testGetter.ids, jc.SameContents, []instance.Id{"foo2", "foo3"})
 
188
 
 
189
        // Ensure we called instances once and have no errors there.
 
190
        c.Assert(testGetter.err, jc.ErrorIsNil)
 
191
        c.Assert(testGetter.counter, gc.DeepEquals, int32(1))
 
192
}
 
193
 
 
194
// Test that advancing delay-time.Nanosecond and then killing causes all
 
195
// pending reqs to fail.
 
196
func (s *aggregateSuite) TestKillingWorkerKillsPendinReqs(c *gc.C) {
 
197
        // Setup local variables.
 
198
        testGetter := new(testInstanceGetter)
 
199
        clock := testing.NewClock(time.Now())
 
200
        delay := time.Minute
 
201
        cfg := aggregatorConfig{
 
202
                Clock:   clock,
 
203
                Delay:   delay,
 
204
                Environ: testGetter,
 
205
        }
 
206
 
 
207
        testGetter.newTestInstance("foo", "foobar", []string{"127.0.0.1", "192.168.1.1"})
 
208
        testGetter.newTestInstance("foo2", "not foobar", []string{"192.168.1.2"})
 
209
        testGetter.newTestInstance("foo3", "ok-ish", []string{"192.168.1.3"})
 
210
 
 
211
        aggregator, err := newAggregator(cfg)
 
212
        c.Check(err, jc.ErrorIsNil)
 
213
 
 
214
        defer workertest.CleanKill(c, aggregator)
 
215
 
 
216
        // Set up a couple tests we can launch.
 
217
        var wg sync.WaitGroup
 
218
        checkInfo := func(id instance.Id) {
 
219
                defer wg.Done()
 
220
                info, err := aggregator.instanceInfo(id)
 
221
                c.Check(err.Error(), gc.Equals, "instanceInfo call aborted")
 
222
                c.Check(info.status.Message, gc.Equals, "")
 
223
        }
 
224
 
 
225
        // Launch a couple tests.
 
226
        wg.Add(2)
 
227
        go checkInfo("foo2")
 
228
 
 
229
        // Advance the clock and kill the worker.
 
230
        waitAlarms(c, clock, 1)
 
231
        clock.Advance(delay - time.Nanosecond)
 
232
        aggregator.Kill()
 
233
 
 
234
        go checkInfo("foo3")
 
235
 
 
236
        // Make sure we're dead.
 
237
        workertest.CheckKilled(c, aggregator)
 
238
        wg.Wait()
 
239
 
 
240
        // Make sure we have no ids, since we're dead.
 
241
        c.Assert(len(testGetter.ids), gc.DeepEquals, 0)
 
242
 
 
243
        // Ensure we called instances once and have no errors there.
 
244
        c.Assert(testGetter.err, jc.ErrorIsNil)
 
245
        c.Assert(testGetter.counter, gc.DeepEquals, int32(0))
 
246
 
 
247
}
 
248
 
 
249
// Test that having sent/advanced/received one batch, you can
 
250
// send/advance/receive again and that works, too.
 
251
func (s *aggregateSuite) TestMultipleBatches(c *gc.C) {
 
252
        // Setup some local variables.
 
253
        testGetter := new(testInstanceGetter)
 
254
        clock := testing.NewClock(time.Now())
 
255
        delay := time.Second
 
256
        cfg := aggregatorConfig{
 
257
                Clock:   clock,
 
258
                Delay:   delay,
 
259
                Environ: testGetter,
 
260
        }
 
261
 
 
262
        testGetter.newTestInstance("foo2", "not foobar", []string{"192.168.1.2"})
 
263
        testGetter.newTestInstance("foo3", "ok-ish", []string{"192.168.1.3"})
 
264
 
 
265
        aggregator, err := newAggregator(cfg)
 
266
        c.Check(err, jc.ErrorIsNil)
 
267
 
 
268
        // Ensure the worker is killed and cleaned up if the test exits early.
 
269
        defer workertest.CleanKill(c, aggregator)
 
270
 
 
271
        // Create a checker we can launch as goroutines
 
272
        var wg sync.WaitGroup
 
273
        checkInfo := func(id instance.Id, expectStatus string) {
 
274
                defer wg.Done()
 
275
                info, err := aggregator.instanceInfo(id)
 
276
                c.Check(err, jc.ErrorIsNil)
 
277
                c.Check(info.status.Message, gc.Equals, expectStatus)
 
278
        }
 
279
 
 
280
        // Launch and wait for these
 
281
        wg.Add(2)
 
282
        go checkInfo("foo2", "not foobar")
 
283
        go checkInfo("foo3", "ok-ish")
 
284
 
 
285
        // Unwind the testing clock to let our requests through.
 
286
        waitAlarms(c, clock, 2)
 
287
        clock.Advance(delay)
 
288
 
 
289
        // Check we're still alive
 
290
        workertest.CheckAlive(c, aggregator)
 
291
 
 
292
        // Wait until the checkers pass
 
293
        // TODO(redir): These could block forever, we should make the effort to be
 
294
        // robust here per http://reviews.vapour.ws/r/4885/
 
295
        wg.Wait()
 
296
 
 
297
        // Ensure we got our list back with the expected length.
 
298
        c.Assert(len(testGetter.ids), gc.DeepEquals, 2)
 
299
 
 
300
        // And then a second batch
 
301
        testGetter.newTestInstance("foo4", "spam", []string{"192.168.1.4"})
 
302
        testGetter.newTestInstance("foo5", "eggs", []string{"192.168.1.5"})
 
303
 
 
304
        // Launch and wait for this second batch
 
305
        wg.Add(2)
 
306
        go checkInfo("foo4", "spam")
 
307
        go checkInfo("foo5", "eggs")
 
308
 
 
309
        for i := 0; i < 2; i++ {
 
310
                // Unwind again to let our next batch through.
 
311
                <-clock.Alarms()
 
312
        }
 
313
        // // Advance the clock again.
 
314
        clock.Advance(delay)
 
315
 
 
316
        // Check we're still alive
 
317
        workertest.CheckAlive(c, aggregator)
 
318
 
 
319
        // Wait until the checkers pass
 
320
        wg.Wait()
 
321
 
 
322
        // Shutdown the worker.
 
323
        workertest.CleanKill(c, aggregator)
 
324
 
 
325
        // Ensure we got our list back with the correct length
 
326
        c.Assert(len(testGetter.ids), gc.DeepEquals, 2)
 
327
 
 
328
        // Ensure we called instances once and have no errors there.
 
329
        c.Assert(testGetter.err, jc.ErrorIsNil)
 
330
        c.Assert(testGetter.counter, gc.Equals, int32(2))
 
331
}
 
332
 
 
333
// Test that things behave as expected when env.Instances errors.
 
334
func (s *aggregateSuite) TestInstancesErrors(c *gc.C) {
 
335
        // Setup local variables.
 
336
        testGetter := new(testInstanceGetter)
 
337
        clock := testing.NewClock(time.Now())
 
338
        delay := time.Millisecond
 
339
        cfg := aggregatorConfig{
 
340
                Clock:   clock,
 
341
                Delay:   delay,
 
342
                Environ: testGetter,
 
343
        }
 
344
 
 
345
        testGetter.newTestInstance("foo", "foobar", []string{"192.168.1.2"})
 
346
        testGetter.err = environs.ErrNoInstances
 
347
        aggregator, err := newAggregator(cfg)
 
348
        c.Check(err, jc.ErrorIsNil)
 
349
 
 
350
        defer workertest.CleanKill(c, aggregator)
 
351
 
 
352
        // Launch test in a goroutine and wait for it.
 
353
        var wg sync.WaitGroup
 
354
        wg.Add(1)
 
355
        go func() {
 
356
                defer wg.Done()
 
357
                _, err = aggregator.instanceInfo("foo")
 
358
                c.Assert(err, gc.Equals, environs.ErrNoInstances)
 
359
        }()
 
360
 
 
361
        // Unwind to let our request through.
 
362
        waitAlarms(c, clock, 1)
 
363
        clock.Advance(delay)
 
364
 
 
365
        wg.Wait()
 
366
 
 
367
        // Kill the worker so we know there is no race checking the erroringTestGetter.
 
368
        workertest.CleanKill(c, aggregator)
 
369
 
 
370
        c.Assert(testGetter.err, gc.Equals, environs.ErrNoInstances)
 
371
        c.Assert(testGetter.counter, gc.Equals, int32(1))
 
372
}
 
373
 
 
374
func (s *aggregateSuite) TestPartialInstanceErrors(c *gc.C) {
 
375
        testGetter := new(testInstanceGetter)
 
376
        clock := testing.NewClock(time.Now())
 
377
        delay := time.Second
 
378
 
 
379
        cfg := aggregatorConfig{
 
380
                Clock:   clock,
 
381
                Delay:   delay,
 
382
                Environ: testGetter,
 
383
        }
 
384
 
 
385
        testGetter.err = environs.ErrPartialInstances
 
386
        testGetter.newTestInstance("foo", "not foobar", []string{"192.168.1.2"})
 
387
 
 
388
        aggregator, err := newAggregator(cfg)
 
389
        c.Check(err, jc.ErrorIsNil)
 
390
 
 
391
        // Ensure the worker is killed and cleaned up if the test exits early.
 
392
        defer workertest.CleanKill(c, aggregator)
 
393
 
 
394
        // // Create a checker we can launch as goroutines
 
395
        var wg sync.WaitGroup
 
396
        checkInfo := func(id instance.Id, expectStatus string, expectedError error) {
 
397
                defer wg.Done()
 
398
                info, err := aggregator.instanceInfo(id)
 
399
                if expectedError == nil {
 
400
                        c.Check(err, jc.ErrorIsNil)
 
401
                } else {
 
402
                        c.Check(err.Error(), gc.Equals, expectedError.Error())
 
403
                }
 
404
                c.Check(info.status.Message, gc.Equals, expectStatus)
 
405
        }
 
406
 
 
407
        // Launch and wait for these
 
408
        wg.Add(2)
 
409
        go checkInfo("foo", "not foobar", nil)
 
410
        go checkInfo("foo2", "", errors.New("instance foo2 not found"))
 
411
 
 
412
        // Unwind the testing clock to let our requests through.
 
413
        waitAlarms(c, clock, 2)
 
414
        clock.Advance(delay)
 
415
 
 
416
        // Check we're still alive.
 
417
        workertest.CheckAlive(c, aggregator)
 
418
 
 
419
        // Wait until the checkers pass.
 
420
        wg.Wait()
 
421
 
 
422
        // Now kill the worker so we don't risk a race in the following assertions.
 
423
        workertest.CleanKill(c, aggregator)
 
424
 
 
425
        // Ensure we got our list back with the correct length.
 
426
        c.Assert(len(testGetter.ids), gc.Equals, 2)
 
427
 
 
428
        // Ensure we called instances once.
 
429
        // TODO(redir): all this stuff is really crying out to be, e.g.
 
430
        // testGetter.CheckOneCall(c, "foo", "foo2") per
 
431
        // http://reviews.vapour.ws/r/4885/
 
432
        c.Assert(testGetter.counter, gc.Equals, int32(1))
 
433
}
 
434
 
 
435
func waitAlarms(c *gc.C, clock *testing.Clock, count int) {
 
436
        timeout := time.After(testing.LongWait)
 
437
        for i := 0; i < count; i++ {
 
438
                select {
 
439
                case <-clock.Alarms():
 
440
                case <-timeout:
 
441
                        c.Fatalf("timed out waiting for %dth alarm set", i)
 
442
                }
 
443
        }
 
444
}