1
# Copyright (C) 2009 Google Inc. All rights reserved.
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions are
7
# * Redistributions of source code must retain the above copyright
8
# notice, this list of conditions and the following disclaimer.
9
# * Redistributions in binary form must reproduce the above
10
# copyright notice, this list of conditions and the following disclaimer
11
# in the documentation and/or other materials provided with the
13
# * Neither the name of Google Inc. nor the names of its
14
# contributors may be used to endorse or promote products derived from
15
# this software without specific prior written permission.
17
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35
from webkitpy.common.system import path
38
class MockFileSystem(object):
42
def __init__(self, files=None, dirs=None, cwd='/'):
43
"""Initializes a "mock" filesystem that can be used to completely
44
stub out a filesystem.
47
files: a dict of filenames -> file contents. A file contents
48
value of None is used to indicate that the file should
51
self.files = files or {}
52
self.written_files = {}
53
self.last_tmpdir = None
54
self.current_tmpno = 0
56
self.dirs = set(dirs or [])
60
while not d in self.dirs:
64
def clear_written_files(self):
65
# This function can be used to track what is written between steps in a test.
66
self.written_files = {}
68
def _raise_not_found(self, path):
69
raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT))
71
def _split(self, path):
72
# This is not quite a full implementation of os.path.split
73
# http://docs.python.org/library/os.path.html#os.path.split
75
return path.rsplit(self.sep, 1)
78
def abspath(self, path):
79
if os.path.isabs(path):
80
return self.normpath(path)
81
return self.abspath(self.join(self.cwd, path))
83
def realpath(self, path):
84
return self.abspath(path)
86
def basename(self, path):
87
return self._split(path)[1]
89
def expanduser(self, path):
92
parts = path.split(self.sep, 1)
93
home_directory = self.sep + "Users" + self.sep + "mock"
96
return home_directory + self.sep + parts[1]
98
def path_to_module(self, module_name):
99
return "/mock-checkout/Tools/Scripts/" + module_name.replace('.', '/') + ".py"
101
def chdir(self, path):
102
path = self.normpath(path)
103
if not self.isdir(path):
104
raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT))
107
def copyfile(self, source, destination):
108
if not self.exists(source):
109
self._raise_not_found(source)
110
if self.isdir(source):
111
raise IOError(errno.EISDIR, source, os.strerror(errno.EISDIR))
112
if self.isdir(destination):
113
raise IOError(errno.EISDIR, destination, os.strerror(errno.EISDIR))
114
if not self.exists(self.dirname(destination)):
115
raise IOError(errno.ENOENT, destination, os.strerror(errno.ENOENT))
117
self.files[destination] = self.files[source]
118
self.written_files[destination] = self.files[source]
120
def dirname(self, path):
121
return self._split(path)[0]
123
def exists(self, path):
124
return self.isfile(path) or self.isdir(path)
126
def files_under(self, path, dirs_to_skip=[], file_filter=None):
127
def filter_all(fs, dirpath, basename):
130
file_filter = file_filter or filter_all
132
if self.isfile(path):
133
if file_filter(self, self.dirname(path), self.basename(path)) and self.files[path] is not None:
137
if self.basename(path) in dirs_to_skip:
140
if not path.endswith(self.sep):
143
dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip]
144
for filename in self.files:
145
if not filename.startswith(path):
148
suffix = filename[len(path) - 1:]
149
if any(dir_substring in suffix for dir_substring in dir_substrings):
152
dirpath, basename = self._split(filename)
153
if file_filter(self, dirpath, basename) and self.files[filename] is not None:
154
files.append(filename)
161
def glob(self, glob_string):
162
# FIXME: This handles '*', but not '?', '[', or ']'.
163
glob_string = re.escape(glob_string)
164
glob_string = glob_string.replace('\\*', '[^\\/]*') + '$'
165
glob_string = glob_string.replace('\\/', '/')
166
path_filter = lambda path: re.match(glob_string, path)
168
# We could use fnmatch.fnmatch, but that might not do the right thing on windows.
169
existing_files = [path for path, contents in self.files.items() if contents is not None]
170
return filter(path_filter, existing_files) + filter(path_filter, self.dirs)
172
def isabs(self, path):
173
return path.startswith(self.sep)
175
def isfile(self, path):
176
return path in self.files and self.files[path] is not None
178
def isdir(self, path):
179
return self.normpath(path) in self.dirs
181
def _slow_but_correct_join(self, *comps):
182
return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps))
184
def join(self, *comps):
185
# This function is called a lot, so we optimize it; there are
186
# unittests to check that we match _slow_but_correct_join(), above.
198
if comps[-1] == '' and path:
200
path = path.replace(sep + sep, sep)
203
def listdir(self, path):
205
if not self.isdir(path):
206
raise OSError("%s is not a directory" % path)
208
if not path.endswith(sep):
214
if self.exists(f) and f.startswith(path):
215
remaining = f[len(path):]
217
dir = remaining[:remaining.index(sep)]
221
files.append(remaining)
224
def mtime(self, path):
225
if self.exists(path):
227
self._raise_not_found(path)
229
def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs):
231
dir = self.sep + '__im_tmp'
232
curno = self.current_tmpno
233
self.current_tmpno += 1
234
self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
235
return self.last_tmpdir
237
def mkdtemp(self, **kwargs):
238
class TemporaryDirectory(object):
239
def __init__(self, fs, **kwargs):
240
self._kwargs = kwargs
241
self._filesystem = fs
242
self._directory_path = fs._mktemp(**kwargs)
243
fs.maybe_make_directory(self._directory_path)
246
return self._directory_path
249
return self._directory_path
251
def __exit__(self, type, value, traceback):
252
# Only self-delete if necessary.
254
# FIXME: Should we delete non-empty directories?
255
if self._filesystem.exists(self._directory_path):
256
self._filesystem.rmtree(self._directory_path)
258
return TemporaryDirectory(fs=self, **kwargs)
260
def maybe_make_directory(self, *path):
261
norm_path = self.normpath(self.join(*path))
262
while norm_path and not self.isdir(norm_path):
263
self.dirs.add(norm_path)
264
norm_path = self.dirname(norm_path)
266
def move(self, source, destination):
267
if self.files[source] is None:
268
self._raise_not_found(source)
269
self.files[destination] = self.files[source]
270
self.written_files[destination] = self.files[destination]
271
self.files[source] = None
272
self.written_files[source] = None
274
def _slow_but_correct_normpath(self, path):
275
return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path))
277
def normpath(self, path):
278
# This function is called a lot, so we try to optimize the common cases
279
# instead of always calling _slow_but_correct_normpath(), above.
280
if '..' in path or '/./' in path:
281
# This doesn't happen very often; don't bother trying to optimize it.
282
return self._slow_but_correct_normpath(path)
289
if path.endswith('/.'):
291
if path.endswith('/'):
295
def open_binary_tempfile(self, suffix=''):
296
path = self._mktemp(suffix)
297
return (WritableBinaryFileObject(self, path), path)
299
def open_binary_file_for_reading(self, path):
300
if self.files[path] is None:
301
self._raise_not_found(path)
302
return ReadableBinaryFileObject(self, path, self.files[path])
304
def read_binary_file(self, path):
305
# Intentionally raises KeyError if we don't recognize the path.
306
if self.files[path] is None:
307
self._raise_not_found(path)
308
return self.files[path]
310
def write_binary_file(self, path, contents):
311
# FIXME: should this assert if dirname(path) doesn't exist?
312
self.maybe_make_directory(self.dirname(path))
313
self.files[path] = contents
314
self.written_files[path] = contents
316
def open_text_file_for_reading(self, path):
317
if self.files[path] is None:
318
self._raise_not_found(path)
319
return ReadableTextFileObject(self, path, self.files[path])
321
def open_text_file_for_writing(self, path):
322
return WritableTextFileObject(self, path)
324
def read_text_file(self, path):
325
return self.read_binary_file(path).decode('utf-8')
327
def write_text_file(self, path, contents):
328
return self.write_binary_file(path, contents.encode('utf-8'))
330
def sha1(self, path):
331
contents = self.read_binary_file(path)
332
return hashlib.sha1(contents).hexdigest()
334
def relpath(self, path, start='.'):
335
# Since os.path.relpath() calls os.path.normpath()
336
# (see http://docs.python.org/library/os.path.html#os.path.abspath )
337
# it also removes trailing slashes and converts forward and backward
338
# slashes to the preferred slash os.sep.
339
start = self.abspath(start)
340
path = self.abspath(path)
342
if not path.lower().startswith(start.lower()):
343
# path is outside the directory given by start; compute path from root
344
return '../' * start.count('/') + path
346
rel_path = path[len(start):]
349
# Then the paths are the same.
351
elif rel_path[0] == self.sep:
352
# It is probably sufficient to remove just the first character
353
# since os.path.normpath() collapses separators, but we use
354
# lstrip() just to be sure.
355
rel_path = rel_path.lstrip(self.sep)
357
# We are in the case typified by the following example:
358
# path = "/tmp/foobar", start = "/tmp/foo" -> rel_path = "bar"
359
# FIXME: We return a less-than-optimal result here.
360
return '../' * start.count('/') + path
364
def remove(self, path):
365
if self.files[path] is None:
366
self._raise_not_found(path)
367
self.files[path] = None
368
self.written_files[path] = None
370
def rmtree(self, path):
371
path = self.normpath(path)
374
if f.startswith(path):
377
self.dirs = set(filter(lambda d: not d.startswith(path), self.dirs))
379
def copytree(self, source, destination):
380
source = self.normpath(source)
381
destination = self.normpath(destination)
383
for source_file in self.files:
384
if source_file.startswith(source):
385
destination_path = self.join(destination, self.relpath(source_file, source))
386
self.maybe_make_directory(self.dirname(destination_path))
387
self.files[destination_path] = self.files[source_file]
389
def split(self, path):
390
idx = path.rfind(self.sep)
393
return (path[:idx], path[(idx + 1):])
395
def splitext(self, path):
396
idx = path.rfind('.')
399
return (path[0:idx], path[idx:])
402
class WritableBinaryFileObject(object):
403
def __init__(self, fs, path):
407
self.fs.files[path] = ""
412
def __exit__(self, type, value, traceback):
418
def write(self, str):
419
self.fs.files[self.path] += str
420
self.fs.written_files[self.path] = self.fs.files[self.path]
423
class WritableTextFileObject(WritableBinaryFileObject):
424
def write(self, str):
425
WritableBinaryFileObject.write(self, str.encode('utf-8'))
428
class ReadableBinaryFileObject(object):
429
def __init__(self, fs, path, data):
439
def __exit__(self, type, value, traceback):
445
def read(self, bytes=None):
447
return self.data[self.offset:]
450
return self.data[start:self.offset]
453
class ReadableTextFileObject(ReadableBinaryFileObject):
454
def __init__(self, fs, path, data):
455
super(ReadableTextFileObject, self).__init__(fs, path, StringIO.StringIO(data))
459
super(ReadableTextFileObject, self).close()
461
def read(self, bytes=-1):
462
return self.data.read(bytes)
464
def readline(self, length=None):
465
return self.data.readline(length)
468
return self.data.__iter__()
471
return self.data.next()
473
def seek(self, offset, whence=os.SEEK_SET):
474
self.data.seek(offset, whence)