1
# -*- coding: utf-8 -*-
3
# Copyright 2011-2012 Canonical Ltd.
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU General Public License version 3, as published
7
# by the Free Software Foundation.
9
# This program is distributed in the hope that it will be useful, but
10
# WITHOUT ANY WARRANTY; without even the implied warranties of
11
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12
# PURPOSE. See the GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License along
15
# with this program. If not, see <http://www.gnu.org/licenses/>.
16
"""Tests for the timestamp sync classes."""
18
from twisted.application import internet, service
19
from twisted.internet import defer
20
from twisted.trial.unittest import TestCase
21
from twisted.web import server, resource
23
from ubuntu_sso.utils.webclient import timestamp, webclient_factory
26
class FakedError(Exception):
27
"""Stub to replace Request.error."""
30
class RootResource(resource.Resource):
31
"""A root resource that logs the number of calls."""
35
def __init__(self, *args, **kwargs):
36
"""Initialize this fake instance."""
37
resource.Resource.__init__(self, *args, **kwargs)
39
self.request_headers = []
41
# pylint: disable=C0103
42
def render_HEAD(self, request):
43
"""Increase the counter on each render."""
44
self.request_headers.append(request.requestHeaders)
49
class MockWebServer(object):
50
"""A mock webserver for testing."""
52
# pylint: disable=E1101
54
"""Start up this instance."""
55
self.root = RootResource()
56
site = server.Site(self.root)
57
application = service.Application('web')
58
self.service_collection = service.IServiceCollection(application)
59
self.tcpserver = internet.TCPServer(0, site)
60
self.tcpserver.setServiceParent(self.service_collection)
61
self.service_collection.startService()
64
"""Build the url for this mock server."""
65
# pylint: disable=W0212
66
port_num = self.tcpserver._port.getHost().port
67
return u"http://localhost:%d/" % port_num
71
self.service_collection.stopService()
74
class TimestampCheckerTestCase(TestCase):
75
"""Tests for the timestamp checker."""
77
@defer.inlineCallbacks
79
yield super(TimestampCheckerTestCase, self).setUp()
80
self.ws = MockWebServer()
81
self.addCleanup(self.ws.stop)
82
self.wc = webclient_factory()
83
self.addCleanup(self.wc.shutdown)
84
self.patch(timestamp.TimestampChecker, "SERVER_IRI", self.ws.get_iri())
86
@defer.inlineCallbacks
87
def test_returned_value_is_int(self):
88
"""The returned value is an integer."""
89
checker = timestamp.TimestampChecker(self.wc)
90
result = yield checker.get_faithful_time()
91
self.assertEqual(type(result), int)
93
@defer.inlineCallbacks
94
def test_first_call_does_head(self):
95
"""The first call gets the clock from our web."""
96
checker = timestamp.TimestampChecker(self.wc)
97
yield checker.get_faithful_time()
98
self.assertEqual(self.ws.root.count, 1)
100
@defer.inlineCallbacks
101
def test_second_call_is_cached(self):
102
"""For the second call, the time is cached."""
103
checker = timestamp.TimestampChecker(self.wc)
104
yield checker.get_faithful_time()
105
yield checker.get_faithful_time()
106
self.assertEqual(self.ws.root.count, 1)
108
@defer.inlineCallbacks
109
def test_after_timeout_cache_expires(self):
110
"""After some time, the cache expires."""
112
self.patch(timestamp.time, "time", lambda: fake_timestamp)
113
checker = timestamp.TimestampChecker(self.wc)
114
yield checker.get_faithful_time()
115
fake_timestamp += timestamp.TimestampChecker.CHECKING_INTERVAL
116
yield checker.get_faithful_time()
117
self.assertEqual(self.ws.root.count, 2)
119
@defer.inlineCallbacks
120
def test_server_error_means_skew_not_updated(self):
121
"""When server can't be reached, the skew is not updated."""
123
self.patch(timestamp.time, "time", lambda: fake_timestamp)
124
checker = timestamp.TimestampChecker(self.wc)
125
failing_get_server_time = lambda _: defer.fail(FakedError())
126
self.patch(checker, "get_server_time", failing_get_server_time)
127
yield checker.get_faithful_time()
128
self.assertEqual(checker.skew, 0)
129
self.assertEqual(checker.next_check,
130
fake_timestamp + timestamp.TimestampChecker.ERROR_INTERVAL)
132
@defer.inlineCallbacks
133
def test_server_date_sends_nocache_headers(self):
134
"""Getting the server date sends the no-cache headers."""
135
checker = timestamp.TimestampChecker(self.wc)
136
yield checker.get_server_date_header(self.ws.get_iri())
137
self.assertEqual(len(self.ws.root.request_headers), 1)
138
headers = self.ws.root.request_headers[0]
139
result = headers.getRawHeaders("Cache-Control")
140
self.assertEqual(result, ["no-cache"])