1
# -*- coding: utf-8 -*-
3
# Copyright (C) 2012 The Python Software Foundation.
4
# See LICENSE.txt and CONTRIBUTORS.txt.
6
"""Utility functions for copying and archiving files and directory trees.
8
XXX The functions here don't copy the resource fork or other metadata on Mac.
15
from os.path import abspath
25
_BZ2_SUPPORTED = False
28
from pwd import getpwnam
33
from grp import getgrnam
37
__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
38
"copytree", "move", "rmtree", "Error", "SpecialFileError",
39
"ExecError", "make_archive", "get_archive_formats",
40
"register_archive_format", "unregister_archive_format",
41
"get_unpack_formats", "register_unpack_format",
42
"unregister_unpack_format", "unpack_archive", "ignore_patterns"]
44
class Error(EnvironmentError):
47
class SpecialFileError(EnvironmentError):
48
"""Raised when trying to do a kind of operation (e.g. copying) which is
49
not supported on a special file (e.g. a named pipe)"""
51
class ExecError(EnvironmentError):
52
"""Raised when a command could not be executed"""
54
class ReadError(EnvironmentError):
55
"""Raised when an archive cannot be read"""
57
class RegistryError(Exception):
58
"""Raised when a registery operation with the archiving
59
and unpacking registeries fails"""
67
def copyfileobj(fsrc, fdst, length=16*1024):
68
"""copy data from file-like object fsrc to file-like object fdst"""
70
buf = fsrc.read(length)
75
def _samefile(src, dst):
77
if hasattr(os.path, 'samefile'):
79
return os.path.samefile(src, dst)
83
# All other platforms: check for same pathname.
84
return (os.path.normcase(os.path.abspath(src)) ==
85
os.path.normcase(os.path.abspath(dst)))
87
def copyfile(src, dst):
88
"""Copy data from src to dst"""
89
if _samefile(src, dst):
90
raise Error("`%s` and `%s` are the same file" % (src, dst))
96
# File most likely does not exist
99
# XXX What about other special files? (sockets, devices...)
100
if stat.S_ISFIFO(st.st_mode):
101
raise SpecialFileError("`%s` is a named pipe" % fn)
103
with open(src, 'rb') as fsrc:
104
with open(dst, 'wb') as fdst:
105
copyfileobj(fsrc, fdst)
107
def copymode(src, dst):
108
"""Copy mode bits from src to dst"""
109
if hasattr(os, 'chmod'):
111
mode = stat.S_IMODE(st.st_mode)
114
def copystat(src, dst):
115
"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
117
mode = stat.S_IMODE(st.st_mode)
118
if hasattr(os, 'utime'):
119
os.utime(dst, (st.st_atime, st.st_mtime))
120
if hasattr(os, 'chmod'):
122
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
124
os.chflags(dst, st.st_flags)
125
except OSError as why:
126
if (not hasattr(errno, 'EOPNOTSUPP') or
127
why.errno != errno.EOPNOTSUPP):
131
"""Copy data and mode bits ("cp src dst").
133
The destination may be a directory.
136
if os.path.isdir(dst):
137
dst = os.path.join(dst, os.path.basename(src))
142
"""Copy data and all stat info ("cp -p src dst").
144
The destination may be a directory.
147
if os.path.isdir(dst):
148
dst = os.path.join(dst, os.path.basename(src))
152
def ignore_patterns(*patterns):
153
"""Function that can be used as copytree() ignore parameter.
155
Patterns is a sequence of glob-style patterns
156
that are used to exclude files"""
157
def _ignore_patterns(path, names):
159
for pattern in patterns:
160
ignored_names.extend(fnmatch.filter(names, pattern))
161
return set(ignored_names)
162
return _ignore_patterns
164
def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
165
ignore_dangling_symlinks=False):
166
"""Recursively copy a directory tree.
168
The destination directory must not already exist.
169
If exception(s) occur, an Error is raised with a list of reasons.
171
If the optional symlinks flag is true, symbolic links in the
172
source tree result in symbolic links in the destination tree; if
173
it is false, the contents of the files pointed to by symbolic
174
links are copied. If the file pointed by the symlink doesn't
175
exist, an exception will be added in the list of errors raised in
176
an Error exception at the end of the copy process.
178
You can set the optional ignore_dangling_symlinks flag to true if you
179
want to silence this exception. Notice that this has no effect on
180
platforms that don't support os.symlink.
182
The optional ignore argument is a callable. If given, it
183
is called with the `src` parameter, which is the directory
184
being visited by copytree(), and `names` which is the list of
185
`src` contents, as returned by os.listdir():
187
callable(src, names) -> ignored_names
189
Since copytree() is called recursively, the callable will be
190
called once for each directory that is copied. It returns a
191
list of names relative to the `src` directory that should
194
The optional copy_function argument is a callable that will be used
195
to copy each file. It will be called with the source path and the
196
destination path as arguments. By default, copy2() is used, but any
197
function that supports the same signature (like copy()) can be used.
200
names = os.listdir(src)
201
if ignore is not None:
202
ignored_names = ignore(src, names)
204
ignored_names = set()
209
if name in ignored_names:
211
srcname = os.path.join(src, name)
212
dstname = os.path.join(dst, name)
214
if os.path.islink(srcname):
215
linkto = os.readlink(srcname)
217
os.symlink(linkto, dstname)
219
# ignore dangling symlink if the flag is on
220
if not os.path.exists(linkto) and ignore_dangling_symlinks:
222
# otherwise let the copy occurs. copy2 will raise an error
223
copy_function(srcname, dstname)
224
elif os.path.isdir(srcname):
225
copytree(srcname, dstname, symlinks, ignore, copy_function)
227
# Will raise a SpecialFileError for unsupported file types
228
copy_function(srcname, dstname)
229
# catch the Error from the recursive copytree so that we can
230
# continue with other files
232
errors.extend(err.args[0])
233
except EnvironmentError as why:
234
errors.append((srcname, dstname, str(why)))
237
except OSError as why:
238
if WindowsError is not None and isinstance(why, WindowsError):
239
# Copying file access times may fail on Windows
242
errors.extend((src, dst, str(why)))
246
def rmtree(path, ignore_errors=False, onerror=None):
247
"""Recursively delete a directory tree.
249
If ignore_errors is set, errors are ignored; otherwise, if onerror
250
is set, it is called to handle the error with arguments (func,
251
path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
252
path is the argument to that function that caused it to fail; and
253
exc_info is a tuple returned by sys.exc_info(). If ignore_errors
254
is false and onerror is None, an exception is raised.
260
elif onerror is None:
264
if os.path.islink(path):
265
# symlinks to directories are forbidden, see bug #1669
266
raise OSError("Cannot call rmtree on a symbolic link")
268
onerror(os.path.islink, path, sys.exc_info())
269
# can't continue even if onerror hook returns
273
names = os.listdir(path)
275
onerror(os.listdir, path, sys.exc_info())
277
fullname = os.path.join(path, name)
279
mode = os.lstat(fullname).st_mode
282
if stat.S_ISDIR(mode):
283
rmtree(fullname, ignore_errors, onerror)
288
onerror(os.remove, fullname, sys.exc_info())
292
onerror(os.rmdir, path, sys.exc_info())
296
# A basename() variant which first strips the trailing slash, if present.
297
# Thus we always get the last component of the path, even for directories.
298
return os.path.basename(path.rstrip(os.path.sep))
301
"""Recursively move a file or directory to another location. This is
302
similar to the Unix "mv" command.
304
If the destination is a directory or a symlink to a directory, the source
305
is moved inside the directory. The destination path must not already
308
If the destination already exists but is not a directory, it may be
309
overwritten depending on os.rename() semantics.
311
If the destination is on our current filesystem, then rename() is used.
312
Otherwise, src is copied to the destination and then removed.
313
A lot more could be done here... A look at a mv.c shows a lot of
314
the issues this implementation glosses over.
318
if os.path.isdir(dst):
319
if _samefile(src, dst):
320
# We might be on a case insensitive filesystem,
321
# perform the rename anyway.
325
real_dst = os.path.join(dst, _basename(src))
326
if os.path.exists(real_dst):
327
raise Error("Destination path '%s' already exists" % real_dst)
329
os.rename(src, real_dst)
331
if os.path.isdir(src):
332
if _destinsrc(src, dst):
333
raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst))
334
copytree(src, real_dst, symlinks=True)
340
def _destinsrc(src, dst):
343
if not src.endswith(os.path.sep):
345
if not dst.endswith(os.path.sep):
347
return dst.startswith(src)
350
"""Returns a gid, given a group name."""
351
if getgrnam is None or name is None:
354
result = getgrnam(name)
357
if result is not None:
362
"""Returns an uid, given a user name."""
363
if getpwnam is None or name is None:
366
result = getpwnam(name)
369
if result is not None:
373
def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
374
owner=None, group=None, logger=None):
375
"""Create a (possibly compressed) tar file from all the files under
378
'compress' must be "gzip" (the default), "bzip2", or None.
380
'owner' and 'group' can be used to define an owner and a group for the
381
archive that is being built. If not provided, the current owner and group
384
The output tar file will be named 'base_name' + ".tar", possibly plus
385
the appropriate compression extension (".gz", or ".bz2").
387
Returns the output filename.
389
tar_compression = {'gzip': 'gz', None: ''}
390
compress_ext = {'gzip': '.gz'}
393
tar_compression['bzip2'] = 'bz2'
394
compress_ext['bzip2'] = '.bz2'
396
# flags for compression program, each element of list will be an argument
397
if compress is not None and compress not in compress_ext:
398
raise ValueError("bad value for 'compress', or compression format not "
399
"supported : {0}".format(compress))
401
archive_name = base_name + '.tar' + compress_ext.get(compress, '')
402
archive_dir = os.path.dirname(archive_name)
404
if not os.path.exists(archive_dir):
405
if logger is not None:
406
logger.info("creating %s", archive_dir)
408
os.makedirs(archive_dir)
410
# creating the tarball
411
if logger is not None:
412
logger.info('Creating tar archive')
414
uid = _get_uid(owner)
415
gid = _get_gid(group)
417
def _set_uid_gid(tarinfo):
420
tarinfo.gname = group
423
tarinfo.uname = owner
427
tar = tarfile.open(archive_name, 'w|%s' % tar_compression[compress])
429
tar.add(base_dir, filter=_set_uid_gid)
435
def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False):
436
# XXX see if we want to keep an external call here
441
from distutils.errors import DistutilsExecError
442
from distutils.spawn import spawn
444
spawn(["zip", zipoptions, zip_filename, base_dir], dry_run=dry_run)
445
except DistutilsExecError:
446
# XXX really should distinguish between "couldn't find
447
# external 'zip' command" and "zip failed".
448
raise ExecError("unable to create zip file '%s': "
449
"could neither import the 'zipfile' module nor "
450
"find a standalone zip utility") % zip_filename
452
def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
453
"""Create a zip file from all the files under 'base_dir'.
455
The output zip file will be named 'base_name' + ".zip". Uses either the
456
"zipfile" Python module (if available) or the InfoZIP "zip" utility
457
(if installed and found on the default search path). If neither tool is
458
available, raises ExecError. Returns the name of the output zip
461
zip_filename = base_name + ".zip"
462
archive_dir = os.path.dirname(base_name)
464
if not os.path.exists(archive_dir):
465
if logger is not None:
466
logger.info("creating %s", archive_dir)
468
os.makedirs(archive_dir)
470
# If zipfile module is not available, try spawning an external 'zip'
478
_call_external_zip(base_dir, zip_filename, verbose, dry_run)
480
if logger is not None:
481
logger.info("creating '%s' and adding '%s' to it",
482
zip_filename, base_dir)
485
zip = zipfile.ZipFile(zip_filename, "w",
486
compression=zipfile.ZIP_DEFLATED)
488
for dirpath, dirnames, filenames in os.walk(base_dir):
489
for name in filenames:
490
path = os.path.normpath(os.path.join(dirpath, name))
491
if os.path.isfile(path):
492
zip.write(path, path)
493
if logger is not None:
494
logger.info("adding '%s'", path)
500
'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"),
501
'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"),
502
'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
503
'zip': (_make_zipfile, [], "ZIP file"),
507
_ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
510
def get_archive_formats():
511
"""Returns a list of supported formats for archiving and unarchiving.
513
Each element of the returned sequence is a tuple (name, description)
515
formats = [(name, registry[2]) for name, registry in
516
_ARCHIVE_FORMATS.items()]
520
def register_archive_format(name, function, extra_args=None, description=''):
521
"""Registers an archive format.
523
name is the name of the format. function is the callable that will be
524
used to create archives. If provided, extra_args is a sequence of
525
(name, value) tuples that will be passed as arguments to the callable.
526
description can be provided to describe the format, and will be returned
527
by the get_archive_formats() function.
529
if extra_args is None:
531
if not isinstance(function, collections.Callable):
532
raise TypeError('The %s object is not callable' % function)
533
if not isinstance(extra_args, (tuple, list)):
534
raise TypeError('extra_args needs to be a sequence')
535
for element in extra_args:
536
if not isinstance(element, (tuple, list)) or len(element) !=2:
537
raise TypeError('extra_args elements are : (arg_name, value)')
539
_ARCHIVE_FORMATS[name] = (function, extra_args, description)
541
def unregister_archive_format(name):
542
del _ARCHIVE_FORMATS[name]
544
def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
545
dry_run=0, owner=None, group=None, logger=None):
546
"""Create an archive file (eg. zip or tar).
548
'base_name' is the name of the file to create, minus any format-specific
549
extension; 'format' is the archive format: one of "zip", "tar", "bztar"
552
'root_dir' is a directory that will be the root directory of the
553
archive; ie. we typically chdir into 'root_dir' before creating the
554
archive. 'base_dir' is the directory where we start archiving from;
555
ie. 'base_dir' will be the common prefix of all files and
556
directories in the archive. 'root_dir' and 'base_dir' both default
557
to the current directory. Returns the name of the archive file.
559
'owner' and 'group' are used when creating a tar archive. By default,
560
uses the current owner and group.
562
save_cwd = os.getcwd()
563
if root_dir is not None:
564
if logger is not None:
565
logger.debug("changing into '%s'", root_dir)
566
base_name = os.path.abspath(base_name)
573
kwargs = {'dry_run': dry_run, 'logger': logger}
576
format_info = _ARCHIVE_FORMATS[format]
578
raise ValueError("unknown archive format '%s'" % format)
580
func = format_info[0]
581
for arg, val in format_info[1]:
585
kwargs['owner'] = owner
586
kwargs['group'] = group
589
filename = func(base_name, base_dir, **kwargs)
591
if root_dir is not None:
592
if logger is not None:
593
logger.debug("changing back to '%s'", save_cwd)
599
def get_unpack_formats():
600
"""Returns a list of supported formats for unpacking.
602
Each element of the returned sequence is a tuple
603
(name, extensions, description)
605
formats = [(name, info[0], info[3]) for name, info in
606
_UNPACK_FORMATS.items()]
610
def _check_unpack_options(extensions, function, extra_args):
611
"""Checks what gets registered as an unpacker."""
612
# first make sure no other unpacker is registered for this extension
613
existing_extensions = {}
614
for name, info in _UNPACK_FORMATS.items():
616
existing_extensions[ext] = name
618
for extension in extensions:
619
if extension in existing_extensions:
620
msg = '%s is already registered for "%s"'
621
raise RegistryError(msg % (extension,
622
existing_extensions[extension]))
624
if not isinstance(function, collections.Callable):
625
raise TypeError('The registered function must be a callable')
628
def register_unpack_format(name, extensions, function, extra_args=None,
630
"""Registers an unpack format.
632
`name` is the name of the format. `extensions` is a list of extensions
633
corresponding to the format.
635
`function` is the callable that will be
636
used to unpack archives. The callable will receive archives to unpack.
637
If it's unable to handle an archive, it needs to raise a ReadError
640
If provided, `extra_args` is a sequence of
641
(name, value) tuples that will be passed as arguments to the callable.
642
description can be provided to describe the format, and will be returned
643
by the get_unpack_formats() function.
645
if extra_args is None:
647
_check_unpack_options(extensions, function, extra_args)
648
_UNPACK_FORMATS[name] = extensions, function, extra_args, description
650
def unregister_unpack_format(name):
651
"""Removes the pack format from the registery."""
652
del _UNPACK_FORMATS[name]
654
def _ensure_directory(path):
655
"""Ensure that the parent directory of `path` exists"""
656
dirname = os.path.dirname(path)
657
if not os.path.isdir(dirname):
660
def _unpack_zipfile(filename, extract_dir):
661
"""Unpack zip `filename` to `extract_dir`
666
raise ReadError('zlib not supported, cannot unpack this archive.')
668
if not zipfile.is_zipfile(filename):
669
raise ReadError("%s is not a zip file" % filename)
671
zip = zipfile.ZipFile(filename)
673
for info in zip.infolist():
676
# don't extract absolute paths or ones with .. in them
677
if name.startswith('/') or '..' in name:
680
target = os.path.join(extract_dir, *name.split('/'))
684
_ensure_directory(target)
685
if not name.endswith('/'):
687
data = zip.read(info.filename)
688
f = open(target, 'wb')
697
def _unpack_tarfile(filename, extract_dir):
698
"""Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
701
tarobj = tarfile.open(filename)
702
except tarfile.TarError:
704
"%s is not a compressed or uncompressed tar file" % filename)
706
tarobj.extractall(extract_dir)
711
'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"),
712
'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
713
'zip': (['.zip'], _unpack_zipfile, [], "ZIP file")
717
_UNPACK_FORMATS['bztar'] = (['.bz2'], _unpack_tarfile, [],
720
def _find_unpack_format(filename):
721
for name, info in _UNPACK_FORMATS.items():
722
for extension in info[0]:
723
if filename.endswith(extension):
727
def unpack_archive(filename, extract_dir=None, format=None):
728
"""Unpack an archive.
730
`filename` is the name of the archive.
732
`extract_dir` is the name of the target directory, where the archive
733
is unpacked. If not provided, the current working directory is used.
735
`format` is the archive format: one of "zip", "tar", or "gztar". Or any
736
other registered format. If not provided, unpack_archive will use the
737
filename extension and see if an unpacker was registered for that
740
In case none is found, a ValueError is raised.
742
if extract_dir is None:
743
extract_dir = os.getcwd()
745
if format is not None:
747
format_info = _UNPACK_FORMATS[format]
749
raise ValueError("Unknown unpack format '{0}'".format(format))
751
func = format_info[1]
752
func(filename, extract_dir, **dict(format_info[2]))
754
# we need to look at the registered unpackers supported extensions
755
format = _find_unpack_format(filename)
757
raise ReadError("Unknown archive format '{0}'".format(filename))
759
func = _UNPACK_FORMATS[format][1]
760
kwargs = dict(_UNPACK_FORMATS[format][2])
761
func(filename, extract_dir, **kwargs)