15
_rootdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
16
sys.path.insert(0, _rootdir)
18
from mercurial import cmdutil
19
from mercurial import commands
20
from mercurial import context
21
from mercurial import dispatch
22
from mercurial import hg
23
from mercurial import i18n
24
from mercurial import node
25
from mercurial import ui
26
from mercurial import util
27
from mercurial import extensions
30
SkipTest = unittest.SkipTest
31
except AttributeError:
33
from unittest2 import SkipTest
36
from nose import SkipTest
40
from hgsubversion import util
42
# Documentation for Subprocess.Popen() says:
43
# "Note that on Windows, you cannot set close_fds to true and
44
# also redirect the standard handles by setting stdin, stdout or
46
canCloseFds='win32' not in sys.platform
48
if not 'win32' in sys.platform:
49
def kill_process(popen_obj):
50
os.kill(popen_obj.pid, 9)
53
from ctypes.wintypes import BOOL, DWORD, HANDLE, UINT
55
def win_status_check(result, func, args):
57
raise ctypes.WinError()
60
def WINAPI(returns, func, *params):
61
assert len(params) % 2 == 0
63
func.argtypes = tuple(params[0::2])
64
func.resvalue = returns
65
func.errcheck = win_status_check
70
PROCESS_TERMINATE = 0x0001
72
OpenProcess = WINAPI(HANDLE, ctypes.windll.kernel32.OpenProcess,
73
DWORD, 'dwDesiredAccess',
74
BOOL, 'bInheritHandle',
78
CloseHandle = WINAPI(BOOL, ctypes.windll.kernel32.CloseHandle,
82
TerminateProcess = WINAPI(BOOL, ctypes.windll.kernel32.TerminateProcess,
87
def kill_process(popen_obj):
88
phnd = OpenProcess(PROCESS_TERMINATE, False, popen_obj.pid)
89
TerminateProcess(phnd, 1)
92
# Fixtures that need to be pulled at a subdirectory of the repo path
93
subdir = {'truncatedhistory.svndump': '/project2',
94
'fetch_missing_files_subdir.svndump': '/foo',
95
'empty_dir_in_trunk_not_repo_root.svndump': '/project',
96
'project_root_not_repo_root.svndump': '/dummyproj',
97
'project_name_with_space.svndump': '/project name',
98
'non_ascii_path_1.svndump': '/b\xC3\xB8b',
99
'non_ascii_path_2.svndump': '/b%C3%B8b',
102
FIXTURES = os.path.join(os.path.abspath(os.path.dirname(__file__)),
106
def _makeskip(name, message):
108
def skip(*args, **kwargs):
109
raise SkipTest(message)
113
def requiresmodule(mod):
114
"""Skip a test if the specified module is not None."""
120
return _makeskip(fn.__name__, 'missing required feature')
124
def requiresoption(option):
125
'''Skip a test if commands.clone does not take the specified option.'''
127
for entry in cmdutil.findcmd('clone', commands.table)[1][1]:
128
if entry[1] == option:
130
# no match found, so skip
132
return _makeskip(fn.__name__,
133
'test requires clone to accept %s' % option)
134
# no skipping support, so erase decorated method
136
if not isinstance(option, str):
137
raise TypeError('requiresoption takes a string argument')
140
def filtermanifest(manifest):
141
return [f for f in manifest if f not in util.ignoredfiles]
144
path = os.path.abspath(path).replace(os.sep, '/')
145
drive, path = os.path.splitdrive(path)
148
url = 'file://%s%s' % (drive, path)
151
def testui(stupid=False, layout='auto', startrev=0):
153
bools = {True: 'true', False: 'false'}
154
u.setconfig('ui', 'quiet', bools[True])
155
u.setconfig('extensions', 'hgsubversion', '')
156
u.setconfig('hgsubversion', 'stupid', bools[stupid])
157
u.setconfig('hgsubversion', 'layout', layout)
158
u.setconfig('hgsubversion', 'startrev', startrev)
161
def load_svndump_fixture(path, fixture_name):
162
'''Loads an svnadmin dump into a fresh repo at path, which should not
165
if os.path.exists(path): rmtree(path)
166
subprocess.call(['svnadmin', 'create', path,],
167
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
168
inp = open(os.path.join(FIXTURES, fixture_name))
169
proc = subprocess.Popen(['svnadmin', 'load', path,], stdin=inp,
170
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
173
def load_fixture_and_fetch(fixture_name, repo_path, wc_path, stupid=False,
174
subdir='', noupdate=True, layout='auto',
175
startrev=0, externals=None):
176
load_svndump_fixture(repo_path, fixture_name)
178
repo_path += '/' + subdir
182
'--layout=%s' % layout,
183
'--startrev=%s' % startrev,
188
cmd.append('--stupid')
190
cmd.append('--noupdate')
192
cmd[:0] = ['--config', 'hgsubversion.externals=%s' % externals]
194
dispatch.dispatch(cmd)
196
return hg.repository(testui(), wc_path)
199
# Read-only files cannot be removed under Windows
200
for root, dirs, files in os.walk(path):
202
f = os.path.join(root, f)
206
if e.errno == errno.ENOENT:
209
if (s.st_mode & stat.S_IWRITE) == 0:
210
os.chmod(f, s.st_mode | stat.S_IWRITE)
213
def _verify_our_modules():
215
Verify that hgsubversion was imported from the correct location.
217
The correct location is any location within the parent directory of the
218
directory containing this file.
221
for modname, module in sys.modules.iteritems():
222
if not module or not modname.startswith('hgsubversion.'):
225
modloc = module.__file__
226
cp = os.path.commonprefix((os.path.abspath(__file__), modloc))
227
assert cp.rstrip(os.sep) == _rootdir, (
228
'Module location verification failed: hgsubversion was imported '
229
'from the wrong path!'
232
class TestBase(unittest.TestCase):
234
_verify_our_modules()
236
self.oldenv = dict([(k, os.environ.get(k, None), ) for k in
237
('LANG', 'LC_ALL', 'HGRCPATH', )])
239
os.environ['LANG'] = os.environ['LC_ALL'] = 'C'
240
i18n.t = gettext.translation('hg', i18n.localedir, fallback=True)
242
self.oldwd = os.getcwd()
243
self.tmpdir = tempfile.mkdtemp(
244
'svnwrap_test', dir=os.environ.get('HGSUBVERSION_TEST_TEMP', None))
245
self.hgrc = os.path.join(self.tmpdir, '.hgrc')
246
os.environ['HGRCPATH'] = self.hgrc
247
rc = open(self.hgrc, 'w')
248
for l in '[extensions]', 'hgsubversion=':
251
self.repo_path = '%s/testrepo' % self.tmpdir
252
self.wc_path = '%s/testrepo_wc' % self.tmpdir
255
# Previously, we had a MockUI class that wrapped ui, and giving access
256
# to the stream. The ui.pushbuffer() and ui.popbuffer() can be used
257
# instead. Using the regular UI class, with all stderr redirected to
258
# stdout ensures that the test setup is much more similar to usage
260
self.patch = (ui.ui.write_err, ui.ui.write)
261
setattr(ui.ui, self.patch[0].func_name, self.patch[1])
264
for var, val in self.oldenv.iteritems():
268
os.environ[var] = val
272
setattr(ui.ui, self.patch[0].func_name, self.patch[0])
274
_verify_our_modules()
276
def ui(self, stupid=False, layout='auto'):
277
return testui(stupid, layout)
279
def _load_fixture_and_fetch(self, fixture_name, subdir=None, stupid=False,
280
layout='auto', startrev=0, externals=None):
281
if layout == 'single':
286
return load_fixture_and_fetch(fixture_name, self.repo_path,
287
self.wc_path, subdir=subdir,
288
stupid=stupid, layout=layout,
289
startrev=startrev, externals=externals)
291
def _add_svn_rev(self, changes):
292
'''changes is a dict of filename -> contents'''
293
if self.svn_wc is None:
294
self.svn_wc = os.path.join(self.tmpdir, 'testsvn_wc')
296
'svn', 'co', '-q', fileurl(self.repo_path),
299
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
301
for filename, contents in changes.iteritems():
302
# filenames are / separated
303
filename = filename.replace('/', os.path.sep)
304
filename = os.path.join(self.svn_wc, filename)
305
open(filename, 'w').write(contents)
307
subprocess.call(['svn', 'add', '-q', filename],
308
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
310
'svn', 'commit', '-q', self.svn_wc, '-m', 'test changes'],
311
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
313
# define this as a property so that it reloads anytime we need it
316
return hg.repository(testui(), self.wc_path)
318
def pushrevisions(self, stupid=False, expected_extra_back=0):
319
before = len(self.repo)
320
self.repo.ui.setconfig('hgsubversion', 'stupid', str(stupid))
321
res = commands.push(self.repo.ui, self.repo)
322
after = len(self.repo)
323
self.assertEqual(expected_extra_back, after - before)
326
def svnls(self, path, rev='HEAD'):
327
path = self.repo_path + '/' + path
328
path = util.normalize_url(fileurl(path))
329
args = ['svn', 'ls', '-r', rev, '-R', path]
330
p = subprocess.Popen(args,
331
stdout=subprocess.PIPE,
332
stderr=subprocess.STDOUT)
333
stdout, stderr = p.communicate()
335
raise Exception('svn ls failed on %s: %r' % (path, stderr))
336
entries = [e.strip('/') for e in stdout.splitlines()]
340
def svnco(self, svnpath, rev, path):
341
path = os.path.join(self.wc_path, path)
342
subpath = os.path.dirname(path)
343
if not os.path.isdir(subpath):
345
svnpath = fileurl(self.repo_path + '/' + svnpath)
346
args = ['svn', 'co', '-r', rev, svnpath, path]
347
p = subprocess.Popen(args,
348
stdout=subprocess.PIPE,
349
stderr=subprocess.STDOUT)
350
stdout, stderr = p.communicate()
352
raise Exception('svn co failed on %s: %r' % (svnpath, stderr))
354
def commitchanges(self, changes, parent='tip', message='automated test'):
355
"""Commit changes to mercurial directory
357
'changes' is a sequence of tuples (source, dest, data). It can look
359
- (source, source, data) to set source content to data
360
- (source, dest, None) to set dest content to source one, and mark it as
362
- (source, dest, data) to set dest content to data, and mark it as copied
364
- (source, None, None) to remove source.
367
parentctx = repo[parent]
369
changed, removed = [], []
370
for source, dest, newdata in changes:
372
removed.append(source)
376
def filectxfn(repo, memctx, path):
378
raise IOError(errno.ENOENT,
379
"File \"%s\" no longer exists" % path)
380
entry = [e for e in changes if path == e[1]][0]
381
source, dest, newdata = entry
383
newdata = parentctx[source].data()
387
return context.memfilectx(path=dest,
393
ctx = context.memctx(repo,
394
(parentctx.node(), node.nullid),
399
'2008-10-07 20:59:48 -0500')
400
nodeid = repo.commitctx(ctx)
402
hg.clean(repo, nodeid)
405
def assertchanges(self, changes, ctx):
406
"""Assert that all 'changes' (as in defined in commitchanged())
409
for source, dest, data in changes:
411
self.assertTrue(source not in ctx)
413
self.assertTrue(dest in ctx)
415
data = ctx.parents()[0][source].data()
416
self.assertEqual(ctx[dest].data(), data)
418
copy = ctx[dest].renamed()
419
self.assertEqual(copy[0], source)
421
def assertMultiLineEqual(self, first, second, msg=None):
422
"""Assert that two multi-line strings are equal. (Based on Py3k code.)
425
return super(TestBase, self).assertMultiLineEqual(first, second,
427
except AttributeError:
430
self.assert_(isinstance(first, str),
431
('First argument is not a string'))
432
self.assert_(isinstance(second, str),
433
('Second argument is not a string'))
436
diff = ''.join(difflib.unified_diff(first.splitlines(True),
437
second.splitlines(True),
440
msg = '%s\n%s' % (msg or '', diff)
441
raise self.failureException, msg
443
def draw(self, repo):
444
"""Helper function displaying a repository graph, especially
445
useful when debugging comprehensive tests.
447
# Could be more elegant, but it works with stock hg
449
_ui.setconfig('extensions', 'graphlog', '')
450
extensions.loadall(_ui)
451
graphlog = extensions.find('graphlog')
453
changeset: {rev}:{node|short}
456
summary: {desc|firstline}
460
graphlog.graphlog(_ui, repo, rev=None, template=templ)