~ubuntu-branches/ubuntu/oneiric/hgsubversion/oneiric

« back to all changes in this revision

Viewing changes to .pc/fix-mercurial-1.9-compat-2.diff/tests/test_util.py

  • Committer: Package Import Robot
  • Author(s): Jakub Wilk
  • Date: 2011-09-17 23:30:14 UTC
  • Revision ID: package-import@ubuntu.com-20110917233014-h5jq6ylufdi2kku8
Tags: 1.2.1-2.1
* Non-maintainer upload.
* Convert to dh_python2 (closes: #637399). Thanks to Javi Merino for the bug
  report.
  + Build depend on python (>= 2.6.6-3) instead of python-support.
  + Add ‘--with python2’ to dh call in debian/rules.
  + Bump minimum required version of mercurial to 1.9.1-1, which is the
    first version using dh_python2.
  + Remove python-support from Depends, add ${python:Depends} there.
* Backport two patches from upstream VCS to fix compatibility with Mercurial
  1.9.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import StringIO
 
2
import difflib
 
3
import errno
 
4
import gettext
 
5
import imp
 
6
import os
 
7
import shutil
 
8
import stat
 
9
import subprocess
 
10
import sys
 
11
import tempfile
 
12
import unittest
 
13
import urllib
 
14
 
 
15
_rootdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
16
sys.path.insert(0, _rootdir)
 
17
 
 
18
from mercurial import cmdutil
 
19
from mercurial import commands
 
20
from mercurial import context
 
21
from mercurial import dispatch
 
22
from mercurial import hg
 
23
from mercurial import i18n
 
24
from mercurial import node
 
25
from mercurial import ui
 
26
from mercurial import util
 
27
from mercurial import extensions
 
28
 
 
29
try:
 
30
    SkipTest = unittest.SkipTest
 
31
except AttributeError:
 
32
    try:
 
33
        from unittest2 import SkipTest
 
34
    except ImportError:
 
35
        try:
 
36
            from nose import SkipTest
 
37
        except ImportError:
 
38
            SkipTest = None
 
39
 
 
40
from hgsubversion import util
 
41
 
 
42
# Documentation for Subprocess.Popen() says:
 
43
#   "Note that on Windows, you cannot set close_fds to true and
 
44
#   also redirect the standard handles by setting stdin, stdout or
 
45
#   stderr."
 
46
canCloseFds='win32' not in sys.platform
 
47
 
 
48
if not 'win32' in sys.platform:
 
49
    def kill_process(popen_obj):
 
50
        os.kill(popen_obj.pid, 9)
 
51
else:
 
52
    import ctypes
 
53
    from ctypes.wintypes import BOOL, DWORD, HANDLE, UINT
 
54
 
 
55
    def win_status_check(result, func, args):
 
56
        if result == 0:
 
57
            raise ctypes.WinError()
 
58
        return args
 
59
 
 
60
    def WINAPI(returns, func, *params):
 
61
        assert len(params) % 2 == 0
 
62
 
 
63
        func.argtypes = tuple(params[0::2])
 
64
        func.resvalue = returns
 
65
        func.errcheck = win_status_check
 
66
 
 
67
        return func
 
68
 
 
69
    # dwDesiredAccess
 
70
    PROCESS_TERMINATE = 0x0001
 
71
 
 
72
    OpenProcess = WINAPI(HANDLE, ctypes.windll.kernel32.OpenProcess,
 
73
        DWORD, 'dwDesiredAccess',
 
74
        BOOL, 'bInheritHandle',
 
75
        DWORD, 'dwProcessId',
 
76
    )
 
77
 
 
78
    CloseHandle =  WINAPI(BOOL, ctypes.windll.kernel32.CloseHandle,
 
79
        HANDLE, 'hObject'
 
80
    )
 
81
 
 
82
    TerminateProcess = WINAPI(BOOL, ctypes.windll.kernel32.TerminateProcess,
 
83
        HANDLE, 'hProcess',
 
84
        UINT, 'uExitCode'
 
85
    )
 
86
 
 
87
    def kill_process(popen_obj):
 
88
        phnd = OpenProcess(PROCESS_TERMINATE, False, popen_obj.pid)
 
89
        TerminateProcess(phnd, 1)
 
90
        CloseHandle(phnd)
 
91
 
 
92
# Fixtures that need to be pulled at a subdirectory of the repo path
 
93
subdir = {'truncatedhistory.svndump': '/project2',
 
94
          'fetch_missing_files_subdir.svndump': '/foo',
 
95
          'empty_dir_in_trunk_not_repo_root.svndump': '/project',
 
96
          'project_root_not_repo_root.svndump': '/dummyproj',
 
97
          'project_name_with_space.svndump': '/project name',
 
98
          'non_ascii_path_1.svndump': '/b\xC3\xB8b',
 
99
          'non_ascii_path_2.svndump': '/b%C3%B8b',
 
100
          }
 
101
 
 
102
FIXTURES = os.path.join(os.path.abspath(os.path.dirname(__file__)),
 
103
                        'fixtures')
 
104
 
 
105
 
 
106
def _makeskip(name, message):
 
107
    if SkipTest:
 
108
        def skip(*args, **kwargs):
 
109
            raise SkipTest(message)
 
110
        skip.__name__ = name
 
111
        return skip
 
112
 
 
113
def requiresmodule(mod):
 
114
    """Skip a test if the specified module is not None."""
 
115
    def decorator(fn):
 
116
        if fn is None:
 
117
            return
 
118
        if mod is not None:
 
119
            return fn
 
120
        return _makeskip(fn.__name__, 'missing required feature')
 
121
    return decorator
 
122
 
 
123
 
 
124
def requiresoption(option):
 
125
    '''Skip a test if commands.clone does not take the specified option.'''
 
126
    def decorator(fn):
 
127
        for entry in cmdutil.findcmd('clone', commands.table)[1][1]:
 
128
            if entry[1] == option:
 
129
                return fn
 
130
        # no match found, so skip
 
131
        if SkipTest:
 
132
            return _makeskip(fn.__name__,
 
133
                             'test requires clone to accept %s' % option)
 
134
        # no skipping support, so erase decorated method
 
135
        return
 
136
    if not isinstance(option, str):
 
137
        raise TypeError('requiresoption takes a string argument')
 
138
    return decorator
 
139
 
 
140
def filtermanifest(manifest):
 
141
    return [f for f in manifest if f not in util.ignoredfiles]
 
142
 
 
143
def fileurl(path):
 
144
    path = os.path.abspath(path).replace(os.sep, '/')
 
145
    drive, path = os.path.splitdrive(path)
 
146
    if drive:
 
147
        drive = '/' + drive
 
148
    url = 'file://%s%s' % (drive, path)
 
149
    return url
 
150
 
 
151
def testui(stupid=False, layout='auto', startrev=0):
 
152
    u = ui.ui()
 
153
    bools = {True: 'true', False: 'false'}
 
154
    u.setconfig('ui', 'quiet', bools[True])
 
155
    u.setconfig('extensions', 'hgsubversion', '')
 
156
    u.setconfig('hgsubversion', 'stupid', bools[stupid])
 
157
    u.setconfig('hgsubversion', 'layout', layout)
 
158
    u.setconfig('hgsubversion', 'startrev', startrev)
 
159
    return u
 
160
 
 
161
def load_svndump_fixture(path, fixture_name):
 
162
    '''Loads an svnadmin dump into a fresh repo at path, which should not
 
163
    already exist.
 
164
    '''
 
165
    if os.path.exists(path): rmtree(path)
 
166
    subprocess.call(['svnadmin', 'create', path,],
 
167
                    stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 
168
    inp = open(os.path.join(FIXTURES, fixture_name))
 
169
    proc = subprocess.Popen(['svnadmin', 'load', path,], stdin=inp,
 
170
                            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 
171
    proc.communicate()
 
172
 
 
173
def load_fixture_and_fetch(fixture_name, repo_path, wc_path, stupid=False,
 
174
                           subdir='', noupdate=True, layout='auto',
 
175
                           startrev=0, externals=None):
 
176
    load_svndump_fixture(repo_path, fixture_name)
 
177
    if subdir:
 
178
        repo_path += '/' + subdir
 
179
 
 
180
    cmd = [
 
181
        'clone',
 
182
        '--layout=%s' % layout,
 
183
        '--startrev=%s' % startrev,
 
184
        fileurl(repo_path),
 
185
        wc_path,
 
186
    ]
 
187
    if stupid:
 
188
        cmd.append('--stupid')
 
189
    if noupdate:
 
190
        cmd.append('--noupdate')
 
191
    if externals:
 
192
        cmd[:0] = ['--config', 'hgsubversion.externals=%s' % externals]
 
193
 
 
194
    dispatch.dispatch(cmd)
 
195
 
 
196
    return hg.repository(testui(), wc_path)
 
197
 
 
198
def rmtree(path):
 
199
    # Read-only files cannot be removed under Windows
 
200
    for root, dirs, files in os.walk(path):
 
201
        for f in files:
 
202
            f = os.path.join(root, f)
 
203
            try:
 
204
                s = os.stat(f)
 
205
            except OSError, e:
 
206
                if e.errno == errno.ENOENT:
 
207
                    continue
 
208
                raise
 
209
            if (s.st_mode & stat.S_IWRITE) == 0:
 
210
                os.chmod(f, s.st_mode | stat.S_IWRITE)
 
211
    shutil.rmtree(path)
 
212
 
 
213
def _verify_our_modules():
 
214
    '''
 
215
    Verify that hgsubversion was imported from the correct location.
 
216
 
 
217
    The correct location is any location within the parent directory of the
 
218
    directory containing this file.
 
219
    '''
 
220
 
 
221
    for modname, module in sys.modules.iteritems():
 
222
        if not module or not modname.startswith('hgsubversion.'):
 
223
            continue
 
224
 
 
225
        modloc = module.__file__
 
226
        cp = os.path.commonprefix((os.path.abspath(__file__), modloc))
 
227
        assert cp.rstrip(os.sep) == _rootdir, (
 
228
            'Module location verification failed: hgsubversion was imported '
 
229
            'from the wrong path!'
 
230
        )
 
231
 
 
232
class TestBase(unittest.TestCase):
 
233
    def setUp(self):
 
234
        _verify_our_modules()
 
235
 
 
236
        self.oldenv = dict([(k, os.environ.get(k, None), ) for k in
 
237
                           ('LANG', 'LC_ALL', 'HGRCPATH', )])
 
238
        self.oldt = i18n.t
 
239
        os.environ['LANG'] = os.environ['LC_ALL'] = 'C'
 
240
        i18n.t = gettext.translation('hg', i18n.localedir, fallback=True)
 
241
 
 
242
        self.oldwd = os.getcwd()
 
243
        self.tmpdir = tempfile.mkdtemp(
 
244
            'svnwrap_test', dir=os.environ.get('HGSUBVERSION_TEST_TEMP', None))
 
245
        self.hgrc = os.path.join(self.tmpdir, '.hgrc')
 
246
        os.environ['HGRCPATH'] = self.hgrc
 
247
        rc = open(self.hgrc, 'w')
 
248
        for l in '[extensions]', 'hgsubversion=':
 
249
            print >> rc, l
 
250
 
 
251
        self.repo_path = '%s/testrepo' % self.tmpdir
 
252
        self.wc_path = '%s/testrepo_wc' % self.tmpdir
 
253
        self.svn_wc = None
 
254
 
 
255
        # Previously, we had a MockUI class that wrapped ui, and giving access
 
256
        # to the stream. The ui.pushbuffer() and ui.popbuffer() can be used
 
257
        # instead. Using the regular UI class, with all stderr redirected to
 
258
        # stdout ensures that the test setup is much more similar to usage
 
259
        # setups.
 
260
        self.patch = (ui.ui.write_err, ui.ui.write)
 
261
        setattr(ui.ui, self.patch[0].func_name, self.patch[1])
 
262
 
 
263
    def tearDown(self):
 
264
        for var, val in self.oldenv.iteritems():
 
265
            if val is None:
 
266
                del os.environ[var]
 
267
            else:
 
268
                os.environ[var] = val
 
269
        i18n.t = self.oldt
 
270
        rmtree(self.tmpdir)
 
271
        os.chdir(self.oldwd)
 
272
        setattr(ui.ui, self.patch[0].func_name, self.patch[0])
 
273
 
 
274
        _verify_our_modules()
 
275
 
 
276
    def ui(self, stupid=False, layout='auto'):
 
277
        return testui(stupid, layout)
 
278
 
 
279
    def _load_fixture_and_fetch(self, fixture_name, subdir=None, stupid=False,
 
280
                                layout='auto', startrev=0, externals=None):
 
281
        if layout == 'single':
 
282
            if subdir is None:
 
283
                subdir = 'trunk'
 
284
        elif subdir is None:
 
285
            subdir = ''
 
286
        return load_fixture_and_fetch(fixture_name, self.repo_path,
 
287
                                      self.wc_path, subdir=subdir,
 
288
                                      stupid=stupid, layout=layout,
 
289
                                      startrev=startrev, externals=externals)
 
290
 
 
291
    def _add_svn_rev(self, changes):
 
292
        '''changes is a dict of filename -> contents'''
 
293
        if self.svn_wc is None:
 
294
            self.svn_wc = os.path.join(self.tmpdir, 'testsvn_wc')
 
295
            subprocess.call([
 
296
                'svn', 'co', '-q', fileurl(self.repo_path),
 
297
                self.svn_wc
 
298
            ],
 
299
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 
300
 
 
301
        for filename, contents in changes.iteritems():
 
302
            # filenames are / separated
 
303
            filename = filename.replace('/', os.path.sep)
 
304
            filename = os.path.join(self.svn_wc, filename)
 
305
            open(filename, 'w').write(contents)
 
306
            # may be redundant
 
307
            subprocess.call(['svn', 'add', '-q', filename],
 
308
                            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 
309
        subprocess.call([
 
310
            'svn', 'commit', '-q', self.svn_wc, '-m', 'test changes'],
 
311
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 
312
 
 
313
    # define this as a property so that it reloads anytime we need it
 
314
    @property
 
315
    def repo(self):
 
316
        return hg.repository(testui(), self.wc_path)
 
317
 
 
318
    def pushrevisions(self, stupid=False, expected_extra_back=0):
 
319
        before = len(self.repo)
 
320
        self.repo.ui.setconfig('hgsubversion', 'stupid', str(stupid))
 
321
        res = commands.push(self.repo.ui, self.repo)
 
322
        after = len(self.repo)
 
323
        self.assertEqual(expected_extra_back, after - before)
 
324
        return res
 
325
 
 
326
    def svnls(self, path, rev='HEAD'):
 
327
        path = self.repo_path + '/' + path
 
328
        path = util.normalize_url(fileurl(path))
 
329
        args = ['svn', 'ls', '-r', rev, '-R', path]
 
330
        p = subprocess.Popen(args,
 
331
                             stdout=subprocess.PIPE,
 
332
                             stderr=subprocess.STDOUT)
 
333
        stdout, stderr = p.communicate()
 
334
        if p.returncode:
 
335
            raise Exception('svn ls failed on %s: %r' % (path, stderr))
 
336
        entries = [e.strip('/') for e in stdout.splitlines()]
 
337
        entries.sort()
 
338
        return entries
 
339
 
 
340
    def svnco(self, svnpath, rev, path):
 
341
        path = os.path.join(self.wc_path, path)
 
342
        subpath = os.path.dirname(path)
 
343
        if not os.path.isdir(subpath):
 
344
            os.makedirs(subpath)
 
345
        svnpath = fileurl(self.repo_path + '/' + svnpath)
 
346
        args = ['svn', 'co', '-r', rev, svnpath, path]
 
347
        p = subprocess.Popen(args,
 
348
                             stdout=subprocess.PIPE,
 
349
                             stderr=subprocess.STDOUT)
 
350
        stdout, stderr = p.communicate()
 
351
        if p.returncode:
 
352
            raise Exception('svn co failed on %s: %r' % (svnpath, stderr))
 
353
 
 
354
    def commitchanges(self, changes, parent='tip', message='automated test'):
 
355
        """Commit changes to mercurial directory
 
356
 
 
357
        'changes' is a sequence of tuples (source, dest, data). It can look
 
358
        like:
 
359
        - (source, source, data) to set source content to data
 
360
        - (source, dest, None) to set dest content to source one, and mark it as
 
361
        copied from source.
 
362
        - (source, dest, data) to set dest content to data, and mark it as copied
 
363
        from source.
 
364
        - (source, None, None) to remove source.
 
365
        """
 
366
        repo = self.repo
 
367
        parentctx = repo[parent]
 
368
 
 
369
        changed, removed = [], []
 
370
        for source, dest, newdata in changes:
 
371
            if dest is None:
 
372
                removed.append(source)
 
373
            else:
 
374
                changed.append(dest)
 
375
 
 
376
        def filectxfn(repo, memctx, path):
 
377
            if path in removed:
 
378
                raise IOError(errno.ENOENT,
 
379
                              "File \"%s\" no longer exists" % path)
 
380
            entry = [e for e in changes if path == e[1]][0]
 
381
            source, dest, newdata = entry
 
382
            if newdata is None:
 
383
                newdata = parentctx[source].data()
 
384
            copied = None
 
385
            if source != dest:
 
386
                copied = source
 
387
            return context.memfilectx(path=dest,
 
388
                                      data=newdata,
 
389
                                      islink=False,
 
390
                                      isexec=False,
 
391
                                      copied=copied)
 
392
 
 
393
        ctx = context.memctx(repo,
 
394
                             (parentctx.node(), node.nullid),
 
395
                             message,
 
396
                             changed + removed,
 
397
                             filectxfn,
 
398
                             'an_author',
 
399
                             '2008-10-07 20:59:48 -0500')
 
400
        nodeid = repo.commitctx(ctx)
 
401
        repo = self.repo
 
402
        hg.clean(repo, nodeid)
 
403
        return nodeid
 
404
 
 
405
    def assertchanges(self, changes, ctx):
 
406
        """Assert that all 'changes' (as in defined in commitchanged())
 
407
        went into ctx.
 
408
        """
 
409
        for source, dest, data in changes:
 
410
            if dest is None:
 
411
                self.assertTrue(source not in ctx)
 
412
                continue
 
413
            self.assertTrue(dest in ctx)
 
414
            if data is None:
 
415
                data = ctx.parents()[0][source].data()
 
416
            self.assertEqual(ctx[dest].data(), data)
 
417
            if dest != source:
 
418
                copy = ctx[dest].renamed()
 
419
                self.assertEqual(copy[0], source)
 
420
 
 
421
    def assertMultiLineEqual(self, first, second, msg=None):
 
422
        """Assert that two multi-line strings are equal. (Based on Py3k code.)
 
423
        """
 
424
        try:
 
425
            return super(TestBase, self).assertMultiLineEqual(first, second,
 
426
                                                              msg)
 
427
        except AttributeError:
 
428
            pass
 
429
 
 
430
        self.assert_(isinstance(first, str),
 
431
                     ('First argument is not a string'))
 
432
        self.assert_(isinstance(second, str),
 
433
                     ('Second argument is not a string'))
 
434
 
 
435
        if first != second:
 
436
            diff = ''.join(difflib.unified_diff(first.splitlines(True),
 
437
                                                second.splitlines(True),
 
438
                                                fromfile='a',
 
439
                                                tofile='b'))
 
440
            msg = '%s\n%s' % (msg or '', diff)
 
441
            raise self.failureException, msg
 
442
 
 
443
    def draw(self, repo):
 
444
        """Helper function displaying a repository graph, especially
 
445
        useful when debugging comprehensive tests.
 
446
        """
 
447
        # Could be more elegant, but it works with stock hg
 
448
        _ui = ui.ui()
 
449
        _ui.setconfig('extensions', 'graphlog', '')
 
450
        extensions.loadall(_ui)
 
451
        graphlog = extensions.find('graphlog')
 
452
        templ = """\
 
453
changeset: {rev}:{node|short}
 
454
branch:    {branches}
 
455
tags:      {tags}
 
456
summary:   {desc|firstline}
 
457
files:     {files}
 
458
 
 
459
"""
 
460
        graphlog.graphlog(_ui, repo, rev=None, template=templ)