~ubuntuone-control-tower/ubuntu-sso-client/stable-1-0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# -*- coding: utf-8 -*-

# Author: Alejandro J. Cura <alecu@canonical.com>
#
# Copyright 2011 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Tests for the oauth_headers helper function."""

import time

from twisted.application import internet, service
from twisted.internet import defer
from twisted.internet.threads import deferToThread
from twisted.trial.unittest import TestCase
from twisted.web import server, resource

from ubuntu_sso.utils import SyncTimestampChecker


class RootResource(resource.Resource):
    """A root resource that logs the number of calls."""

    isLeaf = True

    def __init__(self, *args, **kwargs):
        """Initialize this fake instance."""
        resource.Resource.__init__(self, *args, **kwargs)
        self.count = 0
        self.request_headers = []

    # pylint: disable=C0103
    def render_HEAD(self, request):
        """Increase the counter on each render."""
        self.count += 1
        self.request_headers.append(request.requestHeaders)
        return ""


class MockWebServer(object):
    """A mock webserver for testing."""

    def __init__(self):
        """Start up this instance."""
        # pylint: disable=E1101
        self.root = RootResource()
        site = server.Site(self.root)
        application = service.Application('web')
        self.service_collection = service.IServiceCollection(application)
        self.tcpserver = internet.TCPServer(0, site)
        self.tcpserver.setServiceParent(self.service_collection)
        self.service_collection.startService()

    def get_url(self):
        """Build the url for this mock server."""
        # pylint: disable=W0212
        port_num = self.tcpserver._port.getHost().port
        return "http://localhost:%d/" % port_num

    def stop(self):
        """Shut it down."""
        # pylint: disable=E1101
        self.service_collection.stopService()


class FakedError(Exception):
    """A mock, test, sample, and fake exception."""


class TimestampCheckerTestCase(TestCase):
    """Tests for the timestamp checker."""

    def setUp(self):
        """Initialize a fake webserver."""
        self.ws = MockWebServer()
        self.addCleanup(self.ws.stop)
        self.patch(SyncTimestampChecker, "SERVER_URL", self.ws.get_url())

    @defer.inlineCallbacks
    def test_returned_value_is_int(self):
        """The returned value is an integer."""
        checker = SyncTimestampChecker()
        timestamp = yield deferToThread(checker.get_faithful_time)
        self.assertEqual(type(timestamp), int)

    @defer.inlineCallbacks
    def test_first_call_does_head(self):
        """The first call gets the clock from our web."""
        checker = SyncTimestampChecker()
        yield deferToThread(checker.get_faithful_time)
        self.assertEqual(self.ws.root.count, 1)

    @defer.inlineCallbacks
    def test_second_call_is_cached(self):
        """For the second call, the time is cached."""
        checker = SyncTimestampChecker()
        yield deferToThread(checker.get_faithful_time)
        yield deferToThread(checker.get_faithful_time)
        self.assertEqual(self.ws.root.count, 1)

    @defer.inlineCallbacks
    def test_after_timeout_cache_expires(self):
        """After some time, the cache expires."""
        fake_timestamp = 1
        self.patch(time, "time", lambda: fake_timestamp)
        checker = SyncTimestampChecker()
        yield deferToThread(checker.get_faithful_time)
        fake_timestamp += SyncTimestampChecker.CHECKING_INTERVAL
        yield deferToThread(checker.get_faithful_time)
        self.assertEqual(self.ws.root.count, 2)

    @defer.inlineCallbacks
    def test_server_date_sends_nocache_headers(self):
        """Getting the server date sends the no-cache headers."""
        checker = SyncTimestampChecker()
        yield deferToThread(checker.get_server_time)
        assert len(self.ws.root.request_headers) == 1
        headers = self.ws.root.request_headers[0]
        result = headers.getRawHeaders("Cache-Control")
        self.assertEqual(result, ["no-cache"])

    @defer.inlineCallbacks
    def test_server_error_means_skew_not_updated(self):
        """When server can't be reached, the skew is not updated."""
        fake_timestamp = 1
        self.patch(time, "time", lambda: fake_timestamp)
        checker = SyncTimestampChecker()

        def failing_get_server_time():
            """Let's fail while retrieving the server time."""
            raise FakedError()

        self.patch(checker, "get_server_time", failing_get_server_time)
        yield deferToThread(checker.get_faithful_time)
        self.assertEqual(checker.skew, 0)
        self.assertEqual(checker.next_check,
                         fake_timestamp + SyncTimestampChecker.ERROR_INTERVAL)