1
# -*- coding: utf-8 -*-
10
def smallcache(pkglist=["apt", "debtags", "gedit"]):
12
def __init__(self, cache):
16
def has_key(self, name):
17
return name in self._pkgs
20
return len(self._pkgs)
26
def __getitem__(self, name):
27
if name not in self._pkgs:
28
raise KeyError, "`%s' not in wrapped cache" % name
29
return self._cache[name]
33
class TestIndexer(tools.AxiTestBase):
35
# Remove the text index if it exists
36
if os.path.exists(axi.XAPIANDBPATH): shutil.rmtree(axi.XAPIANDBPATH)
37
# Prepare a quiet indexer
38
progress = axi.indexer.SilentProgress()
39
self.indexer = axi.indexer.Indexer(progress, True)
42
# Explicitly set indexer to none, otherwise in the next setUp we rmtree
43
# testdb before the indexer had a chance to delete its lock
46
def testAptRebuild(self):
47
self.indexer._test_wrap_apt_cache(smallcache())
49
# No other indexers are running, ensure lock succeeds
50
self.assert_(self.indexer.lock())
52
# No index exists, so the indexer should decide it needs to run
53
self.assert_(self.indexer.setupIndexing())
56
self.indexer.rebuild()
61
# Ensure that we have an index
62
self.assertCleanIndex()
64
def testDeb822Rebuild(self):
65
pkgfile = os.path.join(axi.XAPIANDBPATH, "packages")
66
subprocess.check_call("apt-cache show apt debtags gedit > " + pkgfile, shell=True)
68
# No other indexers are running, ensure lock succeeds
69
self.assert_(self.indexer.lock())
71
# No index exists, so the indexer should decide it needs to run
72
self.assert_(self.indexer.setupIndexing())
75
self.indexer.rebuild(pkgfile)
80
# Ensure that we have an index
81
self.assertCleanIndex()
83
def testIncrementalRebuild(self):
84
# Perform the initial indexing
85
progress = axi.indexer.SilentProgress()
86
pre_indexer = axi.indexer.Indexer(progress, True)
87
pre_indexer._test_wrap_apt_cache(smallcache(["apt", "debtags", "gedit"]))
88
self.assert_(pre_indexer.lock())
89
self.assert_(pre_indexer.setupIndexing())
92
curidx = open(axi.XAPIANINDEX).read()
94
# Ensure that we have an index
95
self.assertCleanIndex()
97
# Prepare an incremental update
98
self.indexer._test_wrap_apt_cache(smallcache(["apt", "coreutils", "gedit"]))
100
# No other indexers are running, ensure lock succeeds
101
self.assert_(self.indexer.lock())
103
# An index exists the plugin modification timestamps are the same, so
104
# we need to force the indexer to run
105
self.assert_(not self.indexer.setupIndexing())
106
self.assert_(self.indexer.setupIndexing(force=True))
109
self.indexer.incrementalUpdate()
114
# Ensure that we have an index
115
self.assertCleanIndex()
117
# Ensure that we did not create a new index
118
self.assertEqual(open(axi.XAPIANINDEX).read(), curidx)
120
def testIncrementalRebuildFromEmpty(self):
121
# Prepare an incremental update
122
self.indexer._test_wrap_apt_cache(smallcache())
124
# No other indexers are running, ensure lock succeeds
125
self.assert_(self.indexer.lock())
127
# No index exists, so the indexer should decide it needs to run
128
self.assert_(self.indexer.setupIndexing())
130
# Perform an incremental rebuild, which should fall back on a normal
132
self.indexer.incrementalUpdate()
137
# Ensure that we have an index
138
self.assertCleanIndex()
140
# def test_url(self):
141
# """ Environ: URL building """
142
# request.bind({'HTTP_HOST':'example.com'}, None)
143
# self.assertEqual('http://example.com/', request.url)
144
# request.bind({'SERVER_NAME':'example.com'}, None)
145
# self.assertEqual('http://example.com/', request.url)
146
# request.bind({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'}, None)
147
# self.assertEqual('http://example.com:81/', request.url)
148
# request.bind({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'}, None)
149
# self.assertEqual('https://example.com:80/', request.url)
150
# request.bind({'HTTP_HOST':'example.com', 'PATH_INFO':'/path', 'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'}, None)
151
# self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
153
# def test_dict_access(self):
154
# """ Environ: request objects are environment dicts """
156
# wsgiref.util.setup_testing_defaults(e)
157
# request.bind(e, None)
158
# for k, v in e.iteritems():
159
# self.assertTrue(k in request)
160
# self.assertTrue(request[k] == v)
162
# def test_header_access(self):
163
# """ Environ: Request objects decode headers """
165
# wsgiref.util.setup_testing_defaults(e)
166
# e['HTTP_SOME_HEADER'] = 'some value'
167
# request.bind(e, None)
168
# request['HTTP_SOME_OTHER_HEADER'] = 'some other value'
169
# self.assertTrue('Some-Header' in request.header)
170
# self.assertTrue(request.header['Some-Header'] == 'some value')
171
# self.assertTrue(request.header['Some-Other-Header'] == 'some other value')
174
# def test_cookie(self):
175
# """ Environ: COOKIES """
177
# t['a=a'] = {'a': 'a'}
178
# t['a=a; b=b'] = {'a': 'a', 'b':'b'}
179
# t['a=a; a=b'] = {'a': 'b'}
180
# for k, v in t.iteritems():
181
# request.bind({'HTTP_COOKIE': k}, None)
182
# self.assertEqual(v, request.COOKIES)
184
# def test_get(self):
185
# """ Environ: GET data """
187
# e['QUERY_STRING'] = 'a=a&a=1&b=b&c=c'
188
# request.bind(e, None)
189
# self.assertTrue('a' in request.GET)
190
# self.assertTrue('b' in request.GET)
191
# self.assertEqual(['a','1'], request.GET.getall('a'))
192
# self.assertEqual(['b'], request.GET.getall('b'))
193
# self.assertEqual('1', request.GET['a'])
194
# self.assertEqual('b', request.GET['b'])
196
# def test_post(self):
197
# """ Environ: POST data """
198
# sq = u'a=a&a=1&b=b&c=c'.encode('utf8')
200
# wsgiref.util.setup_testing_defaults(e)
201
# e['wsgi.input'].write(sq)
202
# e['wsgi.input'].seek(0)
203
# e['CONTENT_LENGTH'] = str(len(sq))
204
# e['REQUEST_METHOD'] = "POST"
205
# request.bind(e, None)
206
# self.assertTrue('a' in request.POST)
207
# self.assertTrue('b' in request.POST)
208
# self.assertEqual(['a','1'], request.POST.getall('a'))
209
# self.assertEqual(['b'], request.POST.getall('b'))
210
# self.assertEqual('1', request.POST['a'])
211
# self.assertEqual('b', request.POST['b'])
213
# def test_params(self):
214
# """ Environ: GET and POST are combined in request.param """
216
# wsgiref.util.setup_testing_defaults(e)
217
# e['wsgi.input'].write(tob('b=b&c=p'))
218
# e['wsgi.input'].seek(0)
219
# e['CONTENT_LENGTH'] = '7'
220
# e['QUERY_STRING'] = 'a=a&c=g'
221
# e['REQUEST_METHOD'] = "POST"
222
# request.bind(e, None)
223
# self.assertEqual(['a','b','c'], sorted(request.params.keys()))
224
# self.assertEqual('p', request.params['c'])
226
# def test_getpostleak(self):
227
# """ Environ: GET and POST sh0uld not leak into each other """
229
# wsgiref.util.setup_testing_defaults(e)
230
# e['wsgi.input'].write(u'b=b'.encode('utf8'))
231
# e['wsgi.input'].seek(0)
232
# e['CONTENT_LENGTH'] = '3'
233
# e['QUERY_STRING'] = 'a=a'
234
# e['REQUEST_METHOD'] = "POST"
235
# request.bind(e, None)
236
# self.assertEqual(['a'], request.GET.keys())
237
# self.assertEqual(['b'], request.POST.keys())
239
# def test_body(self):
240
# """ Environ: Request.body should behave like a file object factory """
242
# wsgiref.util.setup_testing_defaults(e)
243
# e['wsgi.input'].write(u'abc'.encode('utf8'))
244
# e['wsgi.input'].seek(0)
245
# e['CONTENT_LENGTH'] = str(3)
246
# request.bind(e, None)
247
# self.assertEqual(u'abc'.encode('utf8'), request.body.read())
248
# self.assertEqual(u'abc'.encode('utf8'), request.body.read(3))
249
# self.assertEqual(u'abc'.encode('utf8'), request.body.readline())
250
# self.assertEqual(u'abc'.encode('utf8'), request.body.readline(3))
252
# def test_bigbody(self):
253
# """ Environ: Request.body should handle big uploads using files """
255
# wsgiref.util.setup_testing_defaults(e)
256
# e['wsgi.input'].write((u'x'*1024*1000).encode('utf8'))
257
# e['wsgi.input'].seek(0)
258
# e['CONTENT_LENGTH'] = str(1024*1000)
259
# request.bind(e, None)
260
# self.assertTrue(hasattr(request.body, 'fileno'))
261
# self.assertEqual(1024*1000, len(request.body.read()))
262
# self.assertEqual(1024, len(request.body.read(1024)))
263
# self.assertEqual(1024*1000, len(request.body.readline()))
264
# self.assertEqual(1024, len(request.body.readline(1024)))
266
# def test_tobigbody(self):
267
# """ Environ: Request.body should truncate to Content-Length bytes """
269
# wsgiref.util.setup_testing_defaults(e)
270
# e['wsgi.input'].write((u'x'*1024).encode('utf8'))
271
# e['wsgi.input'].seek(0)
272
# e['CONTENT_LENGTH'] = '42'
273
# request.bind(e, None)
274
# self.assertEqual(42, len(request.body.read()))
275
# self.assertEqual(42, len(request.body.read(1024)))
276
# self.assertEqual(42, len(request.body.readline()))
277
# self.assertEqual(42, len(request.body.readline(1024)))
279
#class TestMultipart(unittest.TestCase):
280
# def test_multipart(self):
281
# """ Environ: POST (multipart files and multible values per key) """
282
# fields = [('field1','value1'), ('field2','value2'), ('field2','value3')]
283
# files = [('file1','filename1.txt','content1'), ('file2','filename2.py',u'äöü')]
284
# e = tools.multipart_environ(fields=fields, files=files)
285
# request.bind(e, None)
287
# self.assertTrue('file1' in request.POST)
288
# self.assertEqual('content1', request.POST['file1'].file.read())
289
# # File name and meta data
290
# self.assertTrue('file2' in request.POST)
291
# self.assertEqual('filename2.py', request.POST['file2'].filename)
293
# x = request.POST['file2'].file.read()
294
# if sys.version_info >= (3,0,0):
295
# x = x.encode('ISO-8859-1')
296
# self.assertEqual(u'äöü'.encode('utf8'), x)
298
# self.assertTrue('file3' not in request.POST)
300
# self.assertEqual('value1', request.POST['field1'])
302
# self.assertEqual(2, len(request.POST.getall('field2')))
303
# self.assertEqual(['value2', 'value3'], request.POST.getall('field2'))
305
if __name__ == '__main__':