11
11
__metaclass__ = type
14
from base64 import b64encode
14
15
from functools import partial
15
16
from getpass import getuser
19
from StringIO import StringIO
18
22
from fixtures import TempDir
24
from maastesting.factory import factory
20
25
from provisioningserver.plugin import (
23
29
ProvisioningServiceMaker,
30
SingleUsernamePasswordChecker,
32
from provisioningserver.testing.fakecobbler import make_fake_cobbler_session
25
33
from testtools import TestCase
34
from testtools.deferredruntest import (
36
AsynchronousDeferredRunTest,
26
38
from testtools.matchers import (
42
from twisted.application.internet import TCPServer
30
43
from twisted.application.service import MultiService
44
from twisted.cred.credentials import UsernamePassword
45
from twisted.cred.error import UnauthorizedLogin
46
from twisted.internet.defer import inlineCallbacks
31
47
from twisted.python.usage import UsageError
48
from twisted.web.guard import HTTPAuthSessionWrapper
49
from twisted.web.resource import IResource
50
from twisted.web.server import NOT_DONE_YET
51
from twisted.web.test.test_web import DummyRequest
165
198
self.assertEqual(
166
199
len(service.namedServices), len(service.services),
167
200
"Not all services are named.")
202
def test_makeService_api_requires_credentials(self):
204
The site service's /api resource requires credentials from clients.
207
options["config-file"] = self.write_config({})
208
service_maker = ProvisioningServiceMaker("Harry", "Hill")
209
service = service_maker.makeService(options)
210
self.assertIsInstance(service, MultiService)
211
site_service = service.getServiceNamed("site")
212
self.assertIsInstance(site_service, TCPServer)
213
port, site = site_service.args
214
self.assertIn("api", site.resource.listStaticNames())
215
api = site.resource.getStaticEntity("api")
216
# HTTPAuthSessionWrapper demands credentials from an HTTP request.
217
self.assertIsInstance(api, HTTPAuthSessionWrapper)
219
def exercise_api_credentials(self, config_file, username, password):
221
Create a new service with :class:`ProvisioningServiceMaker` and
222
attempt to access the API with the given credentials.
225
options["config-file"] = config_file
226
service_maker = ProvisioningServiceMaker("Morecombe", "Wise")
227
# Terminate the service in a fake Cobbler session.
228
service_maker._makeCobblerSession = (
229
lambda config: make_fake_cobbler_session())
230
service = service_maker.makeService(options)
231
port, site = service.getServiceNamed("site").args
232
api = site.resource.getStaticEntity("api")
233
# Create an XML-RPC request with valid credentials.
234
request = DummyRequest([])
235
request.method = "POST"
236
request.content = StringIO(xmlrpclib.dumps((), "get_nodes"))
237
request.prepath = ["api"]
238
request.headers["authorization"] = (
239
"Basic %s" % b64encode(b"%s:%s" % (username, password)))
240
# The credential check and resource rendering is deferred, but
241
# NOT_DONE_YET is returned from render(). The request signals
242
# completion with the aid of notifyFinish().
243
finished = request.notifyFinish()
244
self.assertEqual(NOT_DONE_YET, api.render(request))
245
return finished.addCallback(lambda ignored: request)
248
def test_makeService_api_accepts_valid_credentials(self):
250
The site service's /api resource accepts valid credentials.
252
config = {"username": "orange", "password": "goblin"}
253
request = yield self.exercise_api_credentials(
254
self.write_config(config), "orange", "goblin")
255
# A valid XML-RPC response has been written.
256
self.assertEqual(None, request.responseCode) # None implies 200.
257
xmlrpclib.loads(b"".join(request.written))
260
def test_makeService_api_rejects_invalid_credentials(self):
262
The site service's /api resource rejects invalid credentials.
264
config = {"username": "orange", "password": "goblin"}
265
request = yield self.exercise_api_credentials(
266
self.write_config(config), "abigail", "williams")
267
# The request has not been authorized.
268
self.assertEqual(httplib.UNAUTHORIZED, request.responseCode)
271
class TestSingleUsernamePasswordChecker(TestCase):
272
"""Tests for `SingleUsernamePasswordChecker`."""
274
run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=5)
277
def test_requestAvatarId_okay(self):
278
credentials = UsernamePassword("frank", "zappa")
279
checker = SingleUsernamePasswordChecker("frank", "zappa")
280
avatar = yield checker.requestAvatarId(credentials)
281
self.assertEqual("frank", avatar)
283
def test_requestAvatarId_bad(self):
284
credentials = UsernamePassword("frank", "zappa")
285
checker = SingleUsernamePasswordChecker("zap", "franka")
286
d = checker.requestAvatarId(credentials)
287
return assert_fails_with(d, UnauthorizedLogin)
290
class TestProvisioningRealm(TestCase):
291
"""Tests for `ProvisioningRealm`."""
293
def test_requestAvatar_okay(self):
295
realm = ProvisioningRealm(resource)
296
avatar = realm.requestAvatar(
297
"irrelevant", "also irrelevant", IResource)
298
self.assertEqual((IResource, resource, realm.noop), avatar)
300
def test_requestAvatar_bad(self):
301
# If IResource is not amongst the interfaces passed to requestAvatar,
302
# NotImplementedError is raised.
304
realm = ProvisioningRealm(resource)
306
NotImplementedError, realm.requestAvatar,
307
"irrelevant", "also irrelevant")