~ubuntu-branches/ubuntu/raring/maas/raring

« back to all changes in this revision

Viewing changes to src/maasserver/tests/test_zeroconfservice.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez, Dave Walker (Daviey), Andres Rodriguez, Scott Moser
  • Date: 2012-03-22 10:56:58 UTC
  • mfrom: (1.1.6)
  • Revision ID: package-import@ubuntu.com-20120322105658-cc42wllftxl8qpxm
Tags: 0.1+bzr338+dfsg-0ubuntu1
[ Dave Walker (Daviey) ]
* d/patches/03-txlongpoll-config.patch: Resolve typo error from prior patch.
  - LP: #961031

[ Andres Rodriguez ]
* debian/maas.config: Don't allow reconfigure.
* debian/maas.postinst: Don't allow reconfigure. Run sync/migrate db on
  all upgrades.

[ Scott Moser ]
* add rsylog config for logging node boots (LP: 960149)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright 2012 Canonical Ltd.  This software is licensed under the
2
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
3
 
4
 
"""Tests for the ZeroconfService class"""
 
4
"""Tests for `zeroconfservice`."""
5
5
 
6
6
from __future__ import (
7
7
    print_function,
11
11
__metaclass__ = type
12
12
__all__ = []
13
13
 
14
 
import random
15
 
import select
 
14
import itertools
16
15
import subprocess
17
 
import time
18
16
 
 
17
from maasserver.zeroconfservice import ZeroconfService
 
18
from maastesting.factory import factory
19
19
from maastesting.testcase import TestCase
20
 
from maasserver.zeroconfservice import ZeroconfService
21
 
 
22
 
 
23
 
# These tests will actually inject data in the system Avahi system.
24
 
# Would be nice to isolate it from the system Avahi service, but I didn't
25
 
# feel like writing a private DBus session with a mock Avahi service on it.
 
20
from testtools.content import text_content
 
21
 
 
22
 
26
23
class TestZeroconfService(TestCase):
 
24
    """Test :class:`ZeroconfService`.
 
25
 
 
26
    These tests will actually inject data in the system Avahi service. It
 
27
    would be nice to isolate it from the system Avahi service, but there's a
 
28
    lot of work involved in writing a private DBus session with a mock Avahi
 
29
    service on it, probably more than it's worth.
 
30
    """
27
31
 
28
32
    STYPE = '_maas_zeroconftest._tcp'
29
33
 
30
 
    count = 0
 
34
    count = itertools.count(1)
31
35
 
32
36
    def avahi_browse(self, service_type, timeout=3):
33
37
        """Return the list of published Avahi service through avahi-browse."""
35
39
        # running a glib mainloop. And stopping one is hard. Much easier to
36
40
        # kill an external process. This slows test, and could be fragile,
37
41
        # but it's the best I've come with.
 
42
        command = (
 
43
            'avahi-browse', '--no-db-lookup', '--parsable',
 
44
            '--terminate', service_type)
38
45
        browser = subprocess.Popen(
39
 
            ['avahi-browse', '-k', '-p', service_type],
40
 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
41
 
        until = time.time() + timeout;
42
 
        while time.time() < until:
43
 
            # Busy loop until there is some input on stdout,
44
 
            # or we give up.
45
 
            ready = select.select([browser.stdout], [], [], 0.10)
46
 
            if ready[0] or browser.poll():
47
 
                break
48
 
        browser.terminate()
 
46
            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
47
        stdout, stderr = browser.communicate()
 
48
        self.addDetail("stdout", text_content(stdout))
 
49
        self.addDetail("stderr", text_content(stderr))
49
50
        names = []
50
 
        for record in browser.stdout.readlines():
 
51
        for record in stdout.splitlines():
51
52
            fields = record.split(';')
52
53
            names.append(fields[3])
53
54
        return names
54
55
 
55
 
    @classmethod
56
56
    def getUniqueServiceNameAndPort(self):
57
57
        # getUniqueString() generates an invalid service name
58
 
        name = 'My-Test-Service-%d' % self.count
59
 
        self.count += 1
60
 
        port = random.randint(30000, 40000)
 
58
        name = 'My-Test-Service-%d' % next(self.count)
 
59
        port = factory.getRandomPort()
61
60
        return name, port
62
61
 
63
62
    def test_publish(self):
67
66
        service = ZeroconfService(name, port, self.STYPE)
68
67
        service.publish()
69
68
        # This will unregister the published name from Avahi.
70
 
        self.addCleanup(service.group.Reset)
 
69
        self.addCleanup(service.unpublish)
71
70
        services = self.avahi_browse(self.STYPE)
72
71
        self.assertIn(name, services)
73
72