2
local path implementation.
4
from __future__ import with_statement
6
from contextlib import contextmanager
7
import sys, os, re, atexit, io
9
from py._path import common
10
from py._path.common import iswin32, fspath
11
from stat import S_ISLNK, S_ISDIR, S_ISREG
13
from os.path import abspath, normpath, isabs, exists, isdir, isfile, islink, dirname
15
if sys.version_info > (3,0):
16
def map_as_list(func, iter):
17
return list(map(func, iter))
22
def __getattr__(self, name):
23
return getattr(self._osstatresult, "st_" + name)
25
def __init__(self, path, osstatresult):
27
self._osstatresult = osstatresult
32
raise NotImplementedError("XXX win32")
34
entry = py.error.checked_call(pwd.getpwuid, self.uid)
39
""" return group name of file. """
41
raise NotImplementedError("XXX win32")
43
entry = py.error.checked_call(grp.getgrgid, self.gid)
47
return S_ISDIR(self._osstatresult.st_mode)
50
return S_ISREG(self._osstatresult.st_mode)
53
st = self.path.lstat()
54
return S_ISLNK(self._osstatresult.st_mode)
56
class PosixPath(common.PathBase):
57
def chown(self, user, group, rec=0):
58
""" change ownership to the given user and group.
59
user and group may be specified by a number or
60
by a name. if rec is True change ownership
64
gid = getgroupid(group)
66
for x in self.visit(rec=lambda x: x.check(link=0)):
68
py.error.checked_call(os.chown, str(x), uid, gid)
69
py.error.checked_call(os.chown, str(self), uid, gid)
72
""" return value of a symbolic link. """
73
return py.error.checked_call(os.readlink, self.strpath)
75
def mklinkto(self, oldname):
76
""" posix style hard link to another name. """
77
py.error.checked_call(os.link, str(oldname), str(self))
79
def mksymlinkto(self, value, absolute=1):
80
""" create a symbolic link with the given value (pointing to another name). """
82
py.error.checked_call(os.symlink, str(value), self.strpath)
84
base = self.common(value)
85
# with posix local paths '/' is always a common base
86
relsource = self.__class__(value).relto(base)
87
reldest = self.relto(base)
88
n = reldest.count(self.sep)
89
target = self.sep.join(('..', )*n + (relsource, ))
90
py.error.checked_call(os.symlink, target, self.strpath)
94
if not isinstance(user, int):
95
user = pwd.getpwnam(user)[2]
98
def getgroupid(group):
100
if not isinstance(group, int):
101
group = grp.getgrnam(group)[2]
104
FSBase = not iswin32 and PosixPath or common.PathBase
106
class LocalPath(FSBase):
107
""" object oriented interface to os.path and other local filesystem
110
class ImportMismatchError(ImportError):
111
""" raised on pyimport() if there is a mismatch of __file__'s"""
114
class Checkers(common.Checkers):
117
return self._statcache
118
except AttributeError:
120
self._statcache = self.path.stat()
121
except py.error.ELOOP:
122
self._statcache = self.path.lstat()
123
return self._statcache
126
return S_ISDIR(self._stat().mode)
129
return S_ISREG(self._stat().mode)
135
st = self.path.lstat()
136
return S_ISLNK(st.mode)
138
def __init__(self, path=None, expanduser=False):
139
""" Initialize and return a local Path instance.
141
Path can be relative to the current directory.
142
If path is None it defaults to the current working directory.
143
If expanduser is True, tilde-expansion is performed.
144
Note that Path instances always carry an absolute path.
145
Note also that passing in a local path object will simply return
146
the exact same path object. Use new() to get a new copy.
149
self.strpath = py.error.checked_call(os.getcwd)
154
raise ValueError("can only pass None, Path instances "
155
"or non-empty strings to LocalPath")
157
path = os.path.expanduser(path)
158
self.strpath = abspath(path)
161
return hash(self.strpath)
163
def __eq__(self, other):
173
except AttributeError:
177
def __ne__(self, other):
178
return not (self == other)
180
def __lt__(self, other):
181
return fspath(self) < fspath(other)
183
def __gt__(self, other):
184
return fspath(self) > fspath(other)
186
def samefile(self, other):
187
""" return True if 'other' references the same file as 'self'.
189
other = fspath(other)
191
other = abspath(other)
195
return False # there is no samefile
196
return py.error.checked_call(
197
os.path.samefile, self.strpath, other)
199
def remove(self, rec=1, ignore_errors=False):
200
""" remove a file or directory (or a directory tree if rec=1).
201
if ignore_errors is True, errors while removing directories will
204
if self.check(dir=1, link=0):
206
# force remove of readonly files on windows
208
self.chmod(448, rec=1) # octcal 0700
209
py.error.checked_call(py.std.shutil.rmtree, self.strpath,
210
ignore_errors=ignore_errors)
212
py.error.checked_call(os.rmdir, self.strpath)
215
self.chmod(448) # octcal 0700
216
py.error.checked_call(os.remove, self.strpath)
218
def computehash(self, hashtype="md5", chunksize=524288):
219
""" return hexdigest of hashvalue for this file. """
222
import hashlib as mod
224
if hashtype == "sha1":
226
mod = __import__(hashtype)
227
hash = getattr(mod, hashtype)()
228
except (AttributeError, ImportError):
229
raise ValueError("Don't know how to compute %r hash" %(hashtype,))
233
buf = f.read(chunksize)
235
return hash.hexdigest()
241
""" create a modified version of this path.
242
the following keyword arguments modify various path parts::
244
a:/some/path/to/a/file.ext
246
xxxxxxxxxxxxxxxxx dirname
251
obj = object.__new__(self.__class__)
253
obj.strpath = self.strpath
255
drive, dirname, basename, purebasename,ext = self._getbyspec(
256
"drive,dirname,basename,purebasename,ext")
258
if 'purebasename' in kw or 'ext' in kw:
259
raise ValueError("invalid specification %r" % kw)
261
pb = kw.setdefault('purebasename', purebasename)
267
if ext and not ext.startswith('.'):
269
kw['basename'] = pb + ext
271
if ('dirname' in kw and not kw['dirname']):
272
kw['dirname'] = drive
274
kw.setdefault('dirname', dirname)
275
kw.setdefault('sep', self.sep)
276
obj.strpath = normpath(
277
"%(dirname)s%(sep)s%(basename)s" % kw)
280
def _getbyspec(self, spec):
281
""" see new for what 'spec' can be. """
283
parts = self.strpath.split(self.sep)
285
args = filter(None, spec.split(',') )
290
elif name == 'dirname':
291
append(self.sep.join(parts[:-1]))
294
if name == 'basename':
297
i = basename.rfind('.')
299
purebasename, ext = basename, ''
301
purebasename, ext = basename[:i], basename[i:]
302
if name == 'purebasename':
307
raise ValueError("invalid part specification %r" % name)
310
def dirpath(self, *args, **kwargs):
311
""" return the directory path joined with any given path arguments. """
313
path = object.__new__(self.__class__)
314
path.strpath = dirname(self.strpath)
316
path = path.join(*args)
318
return super(LocalPath, self).dirpath(*args, **kwargs)
320
def join(self, *args, **kwargs):
321
""" return a new path by appending all 'args' as path
322
components. if abs=1 is used restart from root if any
323
of the args is an absolute path.
326
strargs = [fspath(arg) for arg in args]
327
strpath = self.strpath
328
if kwargs.get('abs'):
330
for arg in reversed(strargs):
335
newargs.insert(0, arg)
339
# allow unix style paths even on windows.
341
arg = arg.replace('/', sep)
342
strpath = strpath + sep + arg
343
obj = object.__new__(self.__class__)
344
obj.strpath = normpath(strpath)
347
def open(self, mode='r', ensure=False, encoding=None):
348
""" return an opened file with the given mode.
350
If ensure is True, create parent directories if needed.
353
self.dirpath().ensure(dir=1)
355
return py.error.checked_call(io.open, self.strpath, mode, encoding=encoding)
356
return py.error.checked_call(open, self.strpath, mode)
358
def _fastjoin(self, name):
359
child = object.__new__(self.__class__)
360
child.strpath = self.strpath + self.sep + name
364
return islink(self.strpath)
366
def check(self, **kw):
368
return exists(self.strpath)
371
return not kw["dir"] ^ isdir(self.strpath)
373
return not kw["file"] ^ isfile(self.strpath)
374
return super(LocalPath, self).check(**kw)
376
_patternchars = set("*?[" + os.path.sep)
377
def listdir(self, fil=None, sort=None):
378
""" list directory contents, possibly filter by the given fil func
381
if fil is None and sort is None:
382
names = py.error.checked_call(os.listdir, self.strpath)
383
return map_as_list(self._fastjoin, names)
384
if isinstance(fil, py.builtin._basestring):
385
if not self._patternchars.intersection(fil):
386
child = self._fastjoin(fil)
387
if exists(child.strpath):
390
fil = common.FNMatcher(fil)
391
names = py.error.checked_call(os.listdir, self.strpath)
394
child = self._fastjoin(name)
395
if fil is None or fil(child):
397
self._sortlist(res, sort)
401
""" return size of the underlying file object """
402
return self.stat().size
405
""" return last modification time of the path. """
406
return self.stat().mtime
408
def copy(self, target, mode=False, stat=False):
409
""" copy path to target.
411
If mode is True, will copy copy permission from path to target.
412
If stat is True, copy permission, last modification
413
time, last access time, and flags from path to target.
415
if self.check(file=1):
416
if target.check(dir=1):
417
target = target.join(self.basename)
419
copychunked(self, target)
421
copymode(self.strpath, target.strpath)
423
copystat(self, target)
426
return p.check(link=0)
427
for x in self.visit(rec=rec):
428
relpath = x.relto(self)
429
newx = target.join(relpath)
430
newx.dirpath().ensure(dir=1)
432
newx.mksymlinkto(x.readlink())
434
elif x.check(file=1):
439
copymode(x.strpath, newx.strpath)
443
def rename(self, target):
444
""" rename this path to target. """
445
target = fspath(target)
446
return py.error.checked_call(os.rename, self.strpath, target)
448
def dump(self, obj, bin=1):
449
""" pickle object into path location"""
452
py.error.checked_call(py.std.pickle.dump, obj, f, bin)
456
def mkdir(self, *args):
457
""" create & return the directory joined with args. """
459
py.error.checked_call(os.mkdir, fspath(p))
462
def write_binary(self, data, ensure=False):
463
""" write binary data into path. If ensure is True create
464
missing parent directories.
467
self.dirpath().ensure(dir=1)
468
with self.open('wb') as f:
471
def write_text(self, data, encoding, ensure=False):
472
""" write text data into path using the specified encoding.
473
If ensure is True create missing parent directories.
476
self.dirpath().ensure(dir=1)
477
with self.open('w', encoding=encoding) as f:
480
def write(self, data, mode='w', ensure=False):
481
""" write data into path. If ensure is True create
482
missing parent directories.
485
self.dirpath().ensure(dir=1)
487
if not py.builtin._isbytes(data):
488
raise ValueError("can only process bytes")
490
if not py.builtin._istext(data):
491
if not py.builtin._isbytes(data):
494
data = py.builtin._totext(data, sys.getdefaultencoding())
501
def _ensuredirs(self):
502
parent = self.dirpath()
505
if parent.check(dir=0):
507
if self.check(dir=0):
510
except py.error.EEXIST:
511
# race condition: file/dir created by another thread/process.
512
# complain if it is not a dir
513
if self.check(dir=0):
517
def ensure(self, *args, **kwargs):
518
""" ensure that an args-joined path exists (by default as
519
a file). if you specify a keyword argument 'dir=True'
520
then the path is forced to be a directory path.
523
if kwargs.get('dir', 0):
524
return p._ensuredirs()
526
p.dirpath()._ensuredirs()
527
if not p.check(file=1):
531
def stat(self, raising=True):
532
""" Return an os.stat() tuple. """
534
return Stat(self, py.error.checked_call(os.stat, self.strpath))
536
return Stat(self, os.stat(self.strpath))
537
except KeyboardInterrupt:
543
""" Return an os.lstat() tuple. """
544
return Stat(self, py.error.checked_call(os.lstat, self.strpath))
546
def setmtime(self, mtime=None):
547
""" set modification time for the given path. if 'mtime' is None
548
(the default) then the file's mtime is set to current time.
550
Note that the resolution for 'mtime' is platform dependent.
553
return py.error.checked_call(os.utime, self.strpath, mtime)
555
return py.error.checked_call(os.utime, self.strpath, (-1, mtime))
556
except py.error.EINVAL:
557
return py.error.checked_call(os.utime, self.strpath, (self.atime(), mtime))
560
""" change directory to self and return old current directory """
562
old = self.__class__()
563
except py.error.ENOENT:
565
py.error.checked_call(os.chdir, self.strpath)
571
""" return context manager which changes to current dir during the
572
managed "with" context. On __enter__ it returns the old dir.
581
""" return a new path which contains no symbolic links."""
582
return self.__class__(os.path.realpath(self.strpath))
585
""" return last access time of the path. """
586
return self.stat().atime
589
return 'local(%r)' % self.strpath
592
""" return string representation of the Path. """
595
def chmod(self, mode, rec=0):
596
""" change permissions to the given mode. If mode is an
597
integer it directly encodes the os-specific modes.
598
if rec is True perform recursively.
600
if not isinstance(mode, int):
601
raise TypeError("mode %r must be an integer" % (mode,))
603
for x in self.visit(rec=rec):
604
py.error.checked_call(os.chmod, str(x), mode)
605
py.error.checked_call(os.chmod, self.strpath, mode)
608
""" return the Python package path by looking for the last
609
directory upwards which still contains an __init__.py.
610
Return None if a pkgpath can not be determined.
613
for parent in self.parts(reverse=True):
615
if not parent.join('__init__.py').exists():
617
if not isimportable(parent.basename):
622
def _ensuresyspath(self, ensuremode, path):
625
if ensuremode == "append":
626
if s not in sys.path:
630
sys.path.insert(0, s)
632
def pyimport(self, modname=None, ensuresyspath=True):
633
""" return path as an imported python module.
635
If modname is None, look for the containing package
636
and construct an according module name.
637
The module will be put/looked up in sys.modules.
638
if ensuresyspath is True then the root dir for importing
639
the file (taking __init__.py files into account) will
640
be prepended to sys.path if it isn't there already.
641
If ensuresyspath=="append" the root dir will be appended
642
if it isn't already contained in sys.path.
643
if ensuresyspath is False no modification of syspath happens.
646
raise py.error.ENOENT(self)
650
pkgpath = self.pypkgpath()
651
if pkgpath is not None:
652
pkgroot = pkgpath.dirpath()
653
names = self.new(ext="").relto(pkgroot).split(self.sep)
654
if names[-1] == "__init__":
656
modname = ".".join(names)
658
pkgroot = self.dirpath()
659
modname = self.purebasename
661
self._ensuresyspath(ensuresyspath, pkgroot)
663
mod = sys.modules[modname]
664
if self.basename == "__init__.py":
665
return mod # we don't check anything as we might
666
# we in a namespace package ... too icky to check
667
modfile = mod.__file__
668
if modfile[-4:] in ('.pyc', '.pyo'):
669
modfile = modfile[:-1]
670
elif modfile.endswith('$py.class'):
671
modfile = modfile[:-9] + '.py'
672
if modfile.endswith(os.path.sep + "__init__.py"):
673
if self.basename != "__init__.py":
674
modfile = modfile[:-12]
676
issame = self.samefile(modfile)
677
except py.error.ENOENT:
680
raise self.ImportMismatchError(modname, modfile, self)
684
return sys.modules[modname]
686
# we have a custom modname, do a pseudo-import
687
mod = py.std.types.ModuleType(modname)
688
mod.__file__ = str(self)
689
sys.modules[modname] = mod
691
py.builtin.execfile(str(self), mod.__dict__)
693
del sys.modules[modname]
697
def sysexec(self, *argv, **popen_opts):
698
""" return stdout text from executing a system child process,
699
where the 'self' path points to executable.
700
The process is directly invoked and not through a system shell.
702
from subprocess import Popen, PIPE
703
argv = map_as_list(str, argv)
704
popen_opts['stdout'] = popen_opts['stderr'] = PIPE
705
proc = Popen([str(self)] + argv, **popen_opts)
706
stdout, stderr = proc.communicate()
708
if py.builtin._isbytes(stdout):
709
stdout = py.builtin._totext(stdout, sys.getdefaultencoding())
711
if py.builtin._isbytes(stderr):
712
stderr = py.builtin._totext(stderr, sys.getdefaultencoding())
713
raise py.process.cmdexec.Error(ret, ret, str(self),
717
def sysfind(cls, name, checker=None, paths=None):
718
""" return a path object found by looking at the systems
719
underlying PATH specification. If the checker is not None
720
it will be invoked to filter matching paths. If a binary
721
cannot be found, None is returned
722
Note: This is probably not working on plain win32 systems
723
but may work on cygwin.
726
p = py.path.local(name)
732
paths = py.std.os.environ['Path'].split(';')
733
if '' not in paths and '.' not in paths:
736
systemroot = os.environ['SYSTEMROOT']
740
paths = [re.sub('%SystemRoot%', systemroot, path)
743
paths = py.std.os.environ['PATH'].split(':')
746
tryadd += os.environ['PATHEXT'].split(os.pathsep)
750
for addext in tryadd:
751
p = py.path.local(x).join(name, abs=True) + addext
758
except py.error.EACCES:
761
sysfind = classmethod(sysfind)
763
def _gethomedir(cls):
765
x = os.environ['HOME']
768
x = os.environ["HOMEDRIVE"] + os.environ['HOMEPATH']
772
_gethomedir = classmethod(_gethomedir)
775
#special class constructors for local filesystem paths
777
def get_temproot(cls):
778
""" return the system's temporary directory
779
(where tempfiles are usually created in)
781
return py.path.local(py.std.tempfile.gettempdir())
782
get_temproot = classmethod(get_temproot)
784
def mkdtemp(cls, rootdir=None):
785
""" return a Path object pointing to a fresh new temporary directory
786
(which we created ourself).
790
rootdir = cls.get_temproot()
791
return cls(py.error.checked_call(tempfile.mkdtemp, dir=str(rootdir)))
792
mkdtemp = classmethod(mkdtemp)
794
def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3,
795
lock_timeout = 172800): # two days
796
""" return unique directory with a number greater than the current
797
maximum one. The number is assumed to start directly after prefix.
798
if keep is true directories with a number less than (maxnum-keep)
802
rootdir = cls.get_temproot()
805
""" parse the number out of a path (if it matches the prefix) """
807
if bn.startswith(prefix):
809
return int(bn[len(prefix):])
813
# compute the maximum number currently in use with the
818
for path in rootdir.listdir():
819
num = parse_num(path)
821
maxnum = max(maxnum, num)
823
# make the new directory
825
udir = rootdir.mkdir(prefix + str(maxnum+1))
826
except py.error.EEXIST:
827
# race condition: another thread/process created the dir
828
# in the meantime. Try counting again
829
if lastmax == maxnum:
835
# put a .lock file in the new directory that will be removed at
838
lockfile = udir.join('.lock')
840
if hasattr(lockfile, 'mksymlinkto'):
841
lockfile.mksymlinkto(str(mypid))
843
lockfile.write(str(mypid))
844
def try_remove_lockfile():
845
# in a fork() situation, only the last process should
846
# remove the .lock, otherwise the other processes run the
847
# risk of seeing their temporary dir disappear. For now
848
# we remove the .lock in the parent only (i.e. we assume
849
# that the children finish before the parent).
850
if os.getpid() != mypid:
854
except py.error.Error:
856
atexit.register(try_remove_lockfile)
858
# prune old directories
860
for path in rootdir.listdir():
861
num = parse_num(path)
862
if num is not None and num <= (maxnum - keep):
863
lf = path.join('.lock')
865
t1 = lf.lstat().mtime
866
t2 = lockfile.lstat().mtime
867
if not lock_timeout or abs(t2-t1) < lock_timeout:
868
continue # skip directories still locked
869
except py.error.Error:
870
pass # assume that it means that there is no 'lf'
873
except KeyboardInterrupt:
875
except: # this might be py.error.Error, WindowsError ...
880
username = os.environ['USER'] #linux, et al
883
username = os.environ['USERNAME'] #windows
888
dest = src[:src.rfind('-')] + '-' + username
894
os.symlink(src, dest)
895
except (OSError, AttributeError, NotImplementedError):
899
make_numbered_dir = classmethod(make_numbered_dir)
901
def copymode(src, dest):
902
""" copy permission from src to dst. """
903
py.std.shutil.copymode(src, dest)
905
def copystat(src, dest):
906
""" copy permission, last modification time, last access time, and flags from src to dst."""
907
py.std.shutil.copystat(str(src), str(dest))
909
def copychunked(src, dest):
910
chunksize = 524288 # half a meg of bytes
911
fsrc = src.open('rb')
913
fdest = dest.open('wb')
916
buf = fsrc.read(chunksize)
925
def isimportable(name):
926
if name and (name[0].isalpha() or name[0] == '_'):
927
name = name.replace("_", '')
928
return not name or name.isalnum()