~ubuntu-branches/ubuntu/saucy/apt-xapian-index/saucy

« back to all changes in this revision

Viewing changes to test/test_indexer.py

  • Committer: Bazaar Package Importer
  • Author(s): Enrico Zini
  • Date: 2010-05-16 09:33:58 UTC
  • mfrom: (4.1.5 squeeze)
  • mto: This revision was merged to the branch mainline in revision 12.
  • Revision ID: james.westby@ubuntu.com-20100516093358-xvbshas89apqnvgj
Tags: 0.35
* Tolerate (and if --verbose, report) .desktop file with invalid popcon
  fields
* Added missing import. Closes: #581736
* Run update-python-modules -p before updating the index in postinst.
  Closes: #581811

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
import unittest
 
3
import sys, os.path
 
4
import axi
 
5
import axi.indexer
 
6
import shutil
 
7
import subprocess
 
8
import tools
 
9
 
 
10
def smallcache(pkglist=["apt", "debtags", "gedit"]):
 
11
    class sc(object):
 
12
        def __init__(self, cache):
 
13
            self._pkgs = pkglist
 
14
            self._cache = cache
 
15
 
 
16
        def has_key(self, name):
 
17
            return name in self._pkgs
 
18
 
 
19
        def __len__(self):
 
20
            return len(self._pkgs)
 
21
 
 
22
        def __iter__(self):
 
23
            for p in self._pkgs:
 
24
                yield self._cache[p]
 
25
 
 
26
        def __getitem__(self, name):
 
27
            if name not in self._pkgs:
 
28
                raise KeyError, "`%s' not in wrapped cache" % name
 
29
            return self._cache[name]
 
30
 
 
31
    return sc
 
32
 
 
33
class TestIndexer(tools.AxiTestBase):
 
34
    def setUp(self):
 
35
        # Remove the text index if it exists
 
36
        if os.path.exists(axi.XAPIANDBPATH): shutil.rmtree(axi.XAPIANDBPATH)
 
37
        # Prepare a quiet indexer
 
38
        progress = axi.indexer.SilentProgress()
 
39
        self.indexer = axi.indexer.Indexer(progress, True)
 
40
 
 
41
    def tearDown(self):
 
42
        # Explicitly set indexer to none, otherwise in the next setUp we rmtree
 
43
        # testdb before the indexer had a chance to delete its lock
 
44
        self.indexer = None
 
45
 
 
46
    def testAptRebuild(self):
 
47
        self.indexer._test_wrap_apt_cache(smallcache())
 
48
 
 
49
        # No other indexers are running, ensure lock succeeds
 
50
        self.assert_(self.indexer.lock())
 
51
 
 
52
        # No index exists, so the indexer should decide it needs to run
 
53
        self.assert_(self.indexer.setupIndexing())
 
54
 
 
55
        # Perform a rebuild
 
56
        self.indexer.rebuild()
 
57
 
 
58
        # Close the indexer
 
59
        self.indexer = None
 
60
 
 
61
        # Ensure that we have an index
 
62
        self.assertCleanIndex()
 
63
 
 
64
    def testDeb822Rebuild(self):
 
65
        pkgfile = os.path.join(axi.XAPIANDBPATH, "packages")
 
66
        subprocess.check_call("apt-cache show apt debtags gedit > " + pkgfile, shell=True)
 
67
 
 
68
        # No other indexers are running, ensure lock succeeds
 
69
        self.assert_(self.indexer.lock())
 
70
 
 
71
        # No index exists, so the indexer should decide it needs to run
 
72
        self.assert_(self.indexer.setupIndexing())
 
73
 
 
74
        # Perform a rebuild
 
75
        self.indexer.rebuild(pkgfile)
 
76
 
 
77
        # Close the indexer
 
78
        self.indexer = None
 
79
 
 
80
        # Ensure that we have an index
 
81
        self.assertCleanIndex()
 
82
 
 
83
    def testIncrementalRebuild(self):
 
84
        # Perform the initial indexing
 
85
        progress = axi.indexer.SilentProgress()
 
86
        pre_indexer = axi.indexer.Indexer(progress, True)
 
87
        pre_indexer._test_wrap_apt_cache(smallcache(["apt", "debtags", "gedit"]))
 
88
        self.assert_(pre_indexer.lock())
 
89
        self.assert_(pre_indexer.setupIndexing())
 
90
        pre_indexer.rebuild()
 
91
        pre_indexer = None
 
92
        curidx = open(axi.XAPIANINDEX).read()
 
93
 
 
94
        # Ensure that we have an index
 
95
        self.assertCleanIndex()
 
96
 
 
97
        # Prepare an incremental update
 
98
        self.indexer._test_wrap_apt_cache(smallcache(["apt", "coreutils", "gedit"]))
 
99
 
 
100
        # No other indexers are running, ensure lock succeeds
 
101
        self.assert_(self.indexer.lock())
 
102
 
 
103
        # An index exists the plugin modification timestamps are the same, so
 
104
        # we need to force the indexer to run
 
105
        self.assert_(not self.indexer.setupIndexing())
 
106
        self.assert_(self.indexer.setupIndexing(force=True))
 
107
 
 
108
        # Perform a rebuild
 
109
        self.indexer.incrementalUpdate()
 
110
 
 
111
        # Close the indexer
 
112
        self.indexer = None
 
113
 
 
114
        # Ensure that we have an index
 
115
        self.assertCleanIndex()
 
116
 
 
117
        # Ensure that we did not create a new index
 
118
        self.assertEqual(open(axi.XAPIANINDEX).read(), curidx)
 
119
 
 
120
    def testIncrementalRebuildFromEmpty(self):
 
121
        # Prepare an incremental update
 
122
        self.indexer._test_wrap_apt_cache(smallcache())
 
123
 
 
124
        # No other indexers are running, ensure lock succeeds
 
125
        self.assert_(self.indexer.lock())
 
126
 
 
127
        # No index exists, so the indexer should decide it needs to run
 
128
        self.assert_(self.indexer.setupIndexing())
 
129
 
 
130
        # Perform an incremental rebuild, which should fall back on a normal
 
131
        # one
 
132
        self.indexer.incrementalUpdate()
 
133
 
 
134
        # Close the indexer
 
135
        self.indexer = None
 
136
 
 
137
        # Ensure that we have an index
 
138
        self.assertCleanIndex()
 
139
 
 
140
#    def test_url(self):
 
141
#        """ Environ: URL building """
 
142
#        request.bind({'HTTP_HOST':'example.com'}, None)
 
143
#        self.assertEqual('http://example.com/', request.url)
 
144
#        request.bind({'SERVER_NAME':'example.com'}, None)
 
145
#        self.assertEqual('http://example.com/', request.url)
 
146
#        request.bind({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'}, None)
 
147
#        self.assertEqual('http://example.com:81/', request.url)
 
148
#        request.bind({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'}, None)
 
149
#        self.assertEqual('https://example.com:80/', request.url)
 
150
#        request.bind({'HTTP_HOST':'example.com', 'PATH_INFO':'/path', 'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'}, None)
 
151
#        self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
 
152
#
 
153
#    def test_dict_access(self):
 
154
#        """ Environ: request objects are environment dicts """
 
155
#        e = {}
 
156
#        wsgiref.util.setup_testing_defaults(e)
 
157
#        request.bind(e, None)
 
158
#        for k, v in e.iteritems():
 
159
#            self.assertTrue(k in request)
 
160
#            self.assertTrue(request[k] == v)
 
161
#
 
162
#    def test_header_access(self):
 
163
#        """ Environ: Request objects decode headers """
 
164
#        e = {}
 
165
#        wsgiref.util.setup_testing_defaults(e)
 
166
#        e['HTTP_SOME_HEADER'] = 'some value'
 
167
#        request.bind(e, None)
 
168
#        request['HTTP_SOME_OTHER_HEADER'] = 'some other value'
 
169
#        self.assertTrue('Some-Header' in request.header)
 
170
#        self.assertTrue(request.header['Some-Header'] == 'some value')
 
171
#        self.assertTrue(request.header['Some-Other-Header'] == 'some other value')
 
172
#
 
173
#
 
174
#    def test_cookie(self):
 
175
#        """ Environ: COOKIES """ 
 
176
#        t = dict()
 
177
#        t['a=a'] = {'a': 'a'}
 
178
#        t['a=a; b=b'] = {'a': 'a', 'b':'b'}
 
179
#        t['a=a; a=b'] = {'a': 'b'}
 
180
#        for k, v in t.iteritems():
 
181
#            request.bind({'HTTP_COOKIE': k}, None)
 
182
#            self.assertEqual(v, request.COOKIES)
 
183
#
 
184
#    def test_get(self):
 
185
#        """ Environ: GET data """ 
 
186
#        e = {}
 
187
#        e['QUERY_STRING'] = 'a=a&a=1&b=b&c=c'
 
188
#        request.bind(e, None)
 
189
#        self.assertTrue('a' in request.GET)
 
190
#        self.assertTrue('b' in request.GET)
 
191
#        self.assertEqual(['a','1'], request.GET.getall('a'))
 
192
#        self.assertEqual(['b'], request.GET.getall('b'))
 
193
#        self.assertEqual('1', request.GET['a'])
 
194
#        self.assertEqual('b', request.GET['b'])
 
195
#        
 
196
#    def test_post(self):
 
197
#        """ Environ: POST data """ 
 
198
#        sq = u'a=a&a=1&b=b&c=c'.encode('utf8')
 
199
#        e = {}
 
200
#        wsgiref.util.setup_testing_defaults(e)
 
201
#        e['wsgi.input'].write(sq)
 
202
#        e['wsgi.input'].seek(0)
 
203
#        e['CONTENT_LENGTH'] = str(len(sq))
 
204
#        e['REQUEST_METHOD'] = "POST"
 
205
#        request.bind(e, None)
 
206
#        self.assertTrue('a' in request.POST)
 
207
#        self.assertTrue('b' in request.POST)
 
208
#        self.assertEqual(['a','1'], request.POST.getall('a'))
 
209
#        self.assertEqual(['b'], request.POST.getall('b'))
 
210
#        self.assertEqual('1', request.POST['a'])
 
211
#        self.assertEqual('b', request.POST['b'])
 
212
#
 
213
#    def test_params(self):
 
214
#        """ Environ: GET and POST are combined in request.param """ 
 
215
#        e = {}
 
216
#        wsgiref.util.setup_testing_defaults(e)
 
217
#        e['wsgi.input'].write(tob('b=b&c=p'))
 
218
#        e['wsgi.input'].seek(0)
 
219
#        e['CONTENT_LENGTH'] = '7'
 
220
#        e['QUERY_STRING'] = 'a=a&c=g'
 
221
#        e['REQUEST_METHOD'] = "POST"
 
222
#        request.bind(e, None)
 
223
#        self.assertEqual(['a','b','c'], sorted(request.params.keys()))
 
224
#        self.assertEqual('p', request.params['c'])
 
225
#
 
226
#    def test_getpostleak(self):
 
227
#        """ Environ: GET and POST sh0uld not leak into each other """ 
 
228
#        e = {}
 
229
#        wsgiref.util.setup_testing_defaults(e)
 
230
#        e['wsgi.input'].write(u'b=b'.encode('utf8'))
 
231
#        e['wsgi.input'].seek(0)
 
232
#        e['CONTENT_LENGTH'] = '3'
 
233
#        e['QUERY_STRING'] = 'a=a'
 
234
#        e['REQUEST_METHOD'] = "POST"
 
235
#        request.bind(e, None)
 
236
#        self.assertEqual(['a'], request.GET.keys())
 
237
#        self.assertEqual(['b'], request.POST.keys())
 
238
#
 
239
#    def test_body(self):
 
240
#        """ Environ: Request.body should behave like a file object factory """ 
 
241
#        e = {}
 
242
#        wsgiref.util.setup_testing_defaults(e)
 
243
#        e['wsgi.input'].write(u'abc'.encode('utf8'))
 
244
#        e['wsgi.input'].seek(0)
 
245
#        e['CONTENT_LENGTH'] = str(3)
 
246
#        request.bind(e, None)
 
247
#        self.assertEqual(u'abc'.encode('utf8'), request.body.read())
 
248
#        self.assertEqual(u'abc'.encode('utf8'), request.body.read(3))
 
249
#        self.assertEqual(u'abc'.encode('utf8'), request.body.readline())
 
250
#        self.assertEqual(u'abc'.encode('utf8'), request.body.readline(3))
 
251
#
 
252
#    def test_bigbody(self):
 
253
#        """ Environ: Request.body should handle big uploads using files """
 
254
#        e = {}
 
255
#        wsgiref.util.setup_testing_defaults(e)
 
256
#        e['wsgi.input'].write((u'x'*1024*1000).encode('utf8'))
 
257
#        e['wsgi.input'].seek(0)
 
258
#        e['CONTENT_LENGTH'] = str(1024*1000)
 
259
#        request.bind(e, None)
 
260
#        self.assertTrue(hasattr(request.body, 'fileno'))        
 
261
#        self.assertEqual(1024*1000, len(request.body.read()))
 
262
#        self.assertEqual(1024, len(request.body.read(1024)))
 
263
#        self.assertEqual(1024*1000, len(request.body.readline()))
 
264
#        self.assertEqual(1024, len(request.body.readline(1024)))
 
265
#
 
266
#    def test_tobigbody(self):
 
267
#        """ Environ: Request.body should truncate to Content-Length bytes """
 
268
#        e = {}
 
269
#        wsgiref.util.setup_testing_defaults(e)
 
270
#        e['wsgi.input'].write((u'x'*1024).encode('utf8'))
 
271
#        e['wsgi.input'].seek(0)
 
272
#        e['CONTENT_LENGTH'] = '42'
 
273
#        request.bind(e, None)
 
274
#        self.assertEqual(42, len(request.body.read()))
 
275
#        self.assertEqual(42, len(request.body.read(1024)))
 
276
#        self.assertEqual(42, len(request.body.readline()))
 
277
#        self.assertEqual(42, len(request.body.readline(1024)))
 
278
#
 
279
#class TestMultipart(unittest.TestCase):
 
280
#    def test_multipart(self):
 
281
#        """ Environ: POST (multipart files and multible values per key) """
 
282
#        fields = [('field1','value1'), ('field2','value2'), ('field2','value3')]
 
283
#        files = [('file1','filename1.txt','content1'), ('file2','filename2.py',u'äöü')]
 
284
#        e = tools.multipart_environ(fields=fields, files=files)
 
285
#        request.bind(e, None)
 
286
#        # File content
 
287
#        self.assertTrue('file1' in request.POST)
 
288
#        self.assertEqual('content1', request.POST['file1'].file.read())
 
289
#        # File name and meta data
 
290
#        self.assertTrue('file2' in request.POST)
 
291
#        self.assertEqual('filename2.py', request.POST['file2'].filename)
 
292
#        # UTF-8 files
 
293
#        x = request.POST['file2'].file.read()
 
294
#        if sys.version_info >= (3,0,0):
 
295
#            x = x.encode('ISO-8859-1')
 
296
#        self.assertEqual(u'äöü'.encode('utf8'), x)
 
297
#        # No file
 
298
#        self.assertTrue('file3' not in request.POST)
 
299
#        # Field (single)
 
300
#        self.assertEqual('value1', request.POST['field1'])
 
301
#        # Field (multi)
 
302
#        self.assertEqual(2, len(request.POST.getall('field2')))
 
303
#        self.assertEqual(['value2', 'value3'], request.POST.getall('field2'))
 
304
 
 
305
if __name__ == '__main__':
 
306
    unittest.main()