1
# Copyright 2010 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
10
from django.conf import settings
11
from django.core.handlers.wsgi import WSGIHandler
12
from django.core.management import call_command
13
from wsgi_intercept import (urllib2_intercept, add_wsgi_intercept,
14
remove_wsgi_intercept)
16
from identityprovider.readonly import readonly_manager
17
from identityprovider.tests.utils import SQLCachedTestCase
20
class ReadonlyCommandTestCase(SQLCachedTestCase):
22
self._stdout = sys.stdout
23
self._stderr = sys.stderr
24
sys.stdout = StringIO.StringIO()
25
sys.stderr = StringIO.StringIO()
27
self.servers = ['localhost:8000', 'localhost:8001']
28
self._APP_SERVERS = settings.APP_SERVERS
29
settings.APP_SERVERS = self.servers
30
self._DBFAILOVER_FLAG_DIR = getattr(settings, 'DBFAILOVER_FLAG_DIR',
32
settings.DBFAILOVER_FLAG_DIR = tempfile.mkdtemp()
33
self._DB_CONNECTIONS = settings.DB_CONNECTIONS
34
backup_db = copy.copy(settings.DB_CONNECTIONS[0])
35
backup_db['DATABASE_ID'] = 'backup'
36
settings.DB_CONNECTIONS.append(backup_db)
38
# setup wsgi intercept mechanism to simulate wsgi server
39
urllib2_intercept.install_opener()
40
for server in self.servers:
41
host, port = server.split(':')
42
add_wsgi_intercept(host, int(port), WSGIHandler)
45
# make sure we're clear everything up before leaving
46
readonly_manager.clear_readonly()
47
readonly_manager.clear_failed('master')
49
# remove wsgi intercept mechanism
50
for server in self.servers:
51
host, port = server.split(':')
52
remove_wsgi_intercept(host, int(port))
53
urllib2_intercept.uninstall_opener()
55
settings.APP_SERVERS = self._APP_SERVERS
56
shutil.rmtree(settings.DBFAILOVER_FLAG_DIR, True)
57
settings.DBFAILOVER_FLAG_DIR = self._DBFAILOVER_FLAG_DIR
58
settings.DB_CONNECTIONS = self._DB_CONNECTIONS
60
sys.stdout = self._stdout
61
sys.stderr = self._stderr
64
call_command('readonly', list_servers=True)
66
output = sys.stdout.read()
69
def test_readonly(self):
70
self.assertRaises(SystemExit, call_command, 'readonly')
72
def test_readonly_list_all(self):
73
output = self.get_status()
74
self.assertTrue(self.servers[0] in output)
76
def test_readonly_set(self):
77
call_command('readonly', self.servers[0], action='set')
78
readonly_manager.check_readonly()
79
self.assertTrue(readonly_manager.is_readonly())
81
def test_readonly_clear(self):
82
call_command('readonly', self.servers[0], action='clear')
83
readonly_manager.check_readonly()
84
self.assertFalse(readonly_manager.is_readonly())
86
def test_readonly_enable(self):
88
call_command('readonly', self.servers[0], action='disable',
91
call_command('readonly', self.servers[0], action='enable',
93
self.assertFalse(readonly_manager.is_failed('master'))
95
def test_readonly_disable(self):
96
call_command('readonly', self.servers[0], action='disable',
98
self.assertTrue(readonly_manager.is_failed('master'))
100
def test_readonly_set_all(self):
101
call_command('readonly', action='set', all_servers=True)
102
output = self.get_status()
103
for server in settings.APP_SERVERS:
104
self.assertTrue("%s -- In readonly mode" % server in output)
106
def test_readonly_clear_all(self):
107
call_command('readonly', self.servers[0], action='clear',
109
output = self.get_status()
110
for server in settings.APP_SERVERS:
111
self.assertTrue("%s -- Operating normally" % server in output)
113
def test_readonly_enable_all(self):
114
call_command('readonly', action='enable', database='master',
116
output = self.get_status()
117
self.assertEqual(len(settings.APP_SERVERS),
118
output.count('master: OK'))
120
def test_readonly_disable_all(self):
121
call_command('readonly', self.servers[0], action='disable',
122
database='master', all_servers=True)
123
output = self.get_status()
124
self.assertEqual(len(settings.APP_SERVERS),
125
output.count('master: Failed'))