1
"""Interface to the liblzma compression library.
3
This module provides a class for reading and writing compressed files,
4
classes for incremental (de)compression, and convenience functions for
5
one-shot (de)compression.
7
These classes and functions support both the XZ and legacy LZMA
8
container formats, as well as raw compressed data streams.
12
"CHECK_NONE", "CHECK_CRC32", "CHECK_CRC64", "CHECK_SHA256",
13
"CHECK_ID_MAX", "CHECK_UNKNOWN",
14
"FILTER_LZMA1", "FILTER_LZMA2", "FILTER_DELTA", "FILTER_X86", "FILTER_IA64",
15
"FILTER_ARM", "FILTER_ARMTHUMB", "FILTER_POWERPC", "FILTER_SPARC",
16
"FORMAT_AUTO", "FORMAT_XZ", "FORMAT_ALONE", "FORMAT_RAW",
17
"MF_HC3", "MF_HC4", "MF_BT2", "MF_BT3", "MF_BT4",
18
"MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME",
20
"LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError",
21
"open", "compress", "decompress", "is_check_supported",
27
from _lzma import _encode_filter_properties, _decode_filter_properties
38
class LZMAFile(io.BufferedIOBase):
40
"""A file object providing transparent LZMA (de)compression.
42
An LZMAFile can act as a wrapper for an existing file object, or
43
refer directly to a named file on disk.
45
Note that LZMAFile provides a *binary* file interface - data read
46
is returned as bytes, and data to be written must be given as bytes.
49
def __init__(self, filename=None, mode="r", *,
50
format=None, check=-1, preset=None, filters=None):
51
"""Open an LZMA-compressed file in binary mode.
53
filename can be either an actual file name (given as a str or
54
bytes object), in which case the named file is opened, or it can
55
be an existing file object to read from or write to.
57
mode can be "r" for reading (default), "w" for (over)writing,
58
"x" for creating exclusively, or "a" for appending. These can
59
equivalently be given as "rb", "wb", "xb" and "ab" respectively.
61
format specifies the container format to use for the file.
62
If mode is "r", this defaults to FORMAT_AUTO. Otherwise, the
65
check specifies the integrity check to use. This argument can
66
only be used when opening a file for writing. For FORMAT_XZ,
67
the default is CHECK_CRC64. FORMAT_ALONE and FORMAT_RAW do not
68
support integrity checks - for these formats, check must be
69
omitted, or be CHECK_NONE.
71
When opening a file for reading, the *preset* argument is not
72
meaningful, and should be omitted. The *filters* argument should
73
also be omitted, except when format is FORMAT_RAW (in which case
76
When opening a file for writing, the settings used by the
77
compressor can be specified either as a preset compression
78
level (with the *preset* argument), or in detail as a custom
79
filter chain (with the *filters* argument). For FORMAT_XZ and
80
FORMAT_ALONE, the default is to use the PRESET_DEFAULT preset
81
level. For FORMAT_RAW, the caller must always specify a filter
82
chain; the raw compressor does not support preset compression
85
preset (if provided) should be an integer in the range 0-9,
86
optionally OR-ed with the constant PRESET_EXTREME.
88
filters (if provided) should be a sequence of dicts. Each dict
89
should have an entry for "id" indicating ID of the filter, plus
90
additional entries for options to the filter.
94
self._mode = _MODE_CLOSED
98
if mode in ("r", "rb"):
100
raise ValueError("Cannot specify an integrity check "
101
"when opening a file for reading")
102
if preset is not None:
103
raise ValueError("Cannot specify a preset compression "
104
"level when opening a file for reading")
107
mode_code = _MODE_READ
108
# Save the args to pass to the LZMADecompressor initializer.
109
# If the file contains multiple compressed streams, each
110
# stream will need a separate decompressor object.
111
self._init_args = {"format":format, "filters":filters}
112
self._decompressor = LZMADecompressor(**self._init_args)
114
self._buffer_offset = 0
115
elif mode in ("w", "wb", "a", "ab", "x", "xb"):
118
mode_code = _MODE_WRITE
119
self._compressor = LZMACompressor(format=format, check=check,
120
preset=preset, filters=filters)
122
raise ValueError("Invalid mode: {!r}".format(mode))
124
if isinstance(filename, (str, bytes)):
127
self._fp = builtins.open(filename, mode)
129
self._mode = mode_code
130
elif hasattr(filename, "read") or hasattr(filename, "write"):
132
self._mode = mode_code
134
raise TypeError("filename must be a str or bytes object, or a file")
137
"""Flush and close the file.
139
May be called more than once without error. Once the file is
140
closed, any other operation on it will raise a ValueError.
142
if self._mode == _MODE_CLOSED:
145
if self._mode in (_MODE_READ, _MODE_READ_EOF):
146
self._decompressor = None
148
elif self._mode == _MODE_WRITE:
149
self._fp.write(self._compressor.flush())
150
self._compressor = None
157
self._closefp = False
158
self._mode = _MODE_CLOSED
162
"""True if this file is closed."""
163
return self._mode == _MODE_CLOSED
166
"""Return the file descriptor for the underlying file."""
167
self._check_not_closed()
168
return self._fp.fileno()
171
"""Return whether the file supports seeking."""
172
return self.readable() and self._fp.seekable()
175
"""Return whether the file was opened for reading."""
176
self._check_not_closed()
177
return self._mode in (_MODE_READ, _MODE_READ_EOF)
180
"""Return whether the file was opened for writing."""
181
self._check_not_closed()
182
return self._mode == _MODE_WRITE
184
# Mode-checking helper functions.
186
def _check_not_closed(self):
188
raise ValueError("I/O operation on closed file")
190
def _check_can_read(self):
191
if self._mode not in (_MODE_READ, _MODE_READ_EOF):
192
self._check_not_closed()
193
raise io.UnsupportedOperation("File not open for reading")
195
def _check_can_write(self):
196
if self._mode != _MODE_WRITE:
197
self._check_not_closed()
198
raise io.UnsupportedOperation("File not open for writing")
200
def _check_can_seek(self):
201
if self._mode not in (_MODE_READ, _MODE_READ_EOF):
202
self._check_not_closed()
203
raise io.UnsupportedOperation("Seeking is only supported "
204
"on files open for reading")
205
if not self._fp.seekable():
206
raise io.UnsupportedOperation("The underlying file object "
207
"does not support seeking")
209
# Fill the readahead buffer if it is empty. Returns False on EOF.
210
def _fill_buffer(self):
211
if self._mode == _MODE_READ_EOF:
213
# Depending on the input data, our call to the decompressor may not
214
# return any data. In this case, try again after reading another block.
215
while self._buffer_offset == len(self._buffer):
216
rawblock = (self._decompressor.unused_data or
217
self._fp.read(_BUFFER_SIZE))
220
if self._decompressor.eof:
221
self._mode = _MODE_READ_EOF
222
self._size = self._pos
225
raise EOFError("Compressed file ended before the "
226
"end-of-stream marker was reached")
228
# Continue to next stream.
229
if self._decompressor.eof:
230
self._decompressor = LZMADecompressor(**self._init_args)
232
self._buffer = self._decompressor.decompress(rawblock)
233
self._buffer_offset = 0
236
# Read data until EOF.
237
# If return_data is false, consume the data without returning it.
238
def _read_all(self, return_data=True):
239
# The loop assumes that _buffer_offset is 0. Ensure that this is true.
240
self._buffer = self._buffer[self._buffer_offset:]
241
self._buffer_offset = 0
244
while self._fill_buffer():
246
blocks.append(self._buffer)
247
self._pos += len(self._buffer)
250
return b"".join(blocks)
252
# Read a block of up to n bytes.
253
# If return_data is false, consume the data without returning it.
254
def _read_block(self, n, return_data=True):
255
# If we have enough data buffered, return immediately.
256
end = self._buffer_offset + n
257
if end <= len(self._buffer):
258
data = self._buffer[self._buffer_offset : end]
259
self._buffer_offset = end
260
self._pos += len(data)
261
return data if return_data else None
263
# The loop assumes that _buffer_offset is 0. Ensure that this is true.
264
self._buffer = self._buffer[self._buffer_offset:]
265
self._buffer_offset = 0
268
while n > 0 and self._fill_buffer():
269
if n < len(self._buffer):
270
data = self._buffer[:n]
271
self._buffer_offset = n
277
self._pos += len(data)
280
return b"".join(blocks)
282
def peek(self, size=-1):
283
"""Return buffered data without advancing the file position.
285
Always returns at least one byte of data, unless at EOF.
286
The exact number of bytes returned is unspecified.
288
self._check_can_read()
289
if not self._fill_buffer():
291
return self._buffer[self._buffer_offset:]
293
def read(self, size=-1):
294
"""Read up to size uncompressed bytes from the file.
296
If size is negative or omitted, read until EOF is reached.
297
Returns b"" if the file is already at EOF.
299
self._check_can_read()
303
return self._read_all()
305
return self._read_block(size)
307
def read1(self, size=-1):
308
"""Read up to size uncompressed bytes, while trying to avoid
309
making multiple reads from the underlying stream.
311
Returns b"" if the file is at EOF.
313
# Usually, read1() calls _fp.read() at most once. However, sometimes
314
# this does not give enough data for the decompressor to make progress.
315
# In this case we make multiple reads, to avoid returning b"".
316
self._check_can_read()
318
# Only call _fill_buffer() if the buffer is actually empty.
319
# This gives a significant speedup if *size* is small.
320
(self._buffer_offset == len(self._buffer) and not self._fill_buffer())):
323
data = self._buffer[self._buffer_offset :
324
self._buffer_offset + size]
325
self._buffer_offset += len(data)
327
data = self._buffer[self._buffer_offset:]
329
self._buffer_offset = 0
330
self._pos += len(data)
333
def readline(self, size=-1):
334
"""Read a line of uncompressed bytes from the file.
336
The terminating newline (if present) is retained. If size is
337
non-negative, no more than size bytes will be read (in which
338
case the line may be incomplete). Returns b'' if already at EOF.
340
self._check_can_read()
341
# Shortcut for the common case - the whole line is in the buffer.
343
end = self._buffer.find(b"\n", self._buffer_offset) + 1
345
line = self._buffer[self._buffer_offset : end]
346
self._buffer_offset = end
347
self._pos += len(line)
349
return io.BufferedIOBase.readline(self, size)
351
def write(self, data):
352
"""Write a bytes object to the file.
354
Returns the number of uncompressed bytes written, which is
355
always len(data). Note that due to buffering, the file on disk
356
may not reflect the data written until close() is called.
358
self._check_can_write()
359
compressed = self._compressor.compress(data)
360
self._fp.write(compressed)
361
self._pos += len(data)
364
# Rewind the file to the beginning of the data stream.
367
self._mode = _MODE_READ
369
self._decompressor = LZMADecompressor(**self._init_args)
371
self._buffer_offset = 0
373
def seek(self, offset, whence=0):
374
"""Change the file position.
376
The new position is specified by offset, relative to the
377
position indicated by whence. Possible values for whence are:
379
0: start of stream (default): offset must not be negative
380
1: current stream position
381
2: end of stream; offset must not be positive
383
Returns the new file position.
385
Note that seeking is emulated, sp depending on the parameters,
386
this operation may be extremely slow.
388
self._check_can_seek()
390
# Recalculate offset as an absolute file position.
394
offset = self._pos + offset
396
# Seeking relative to EOF - we need to know the file's size.
398
self._read_all(return_data=False)
399
offset = self._size + offset
401
raise ValueError("Invalid value for whence: {}".format(whence))
403
# Make it so that offset is the number of bytes to skip forward.
404
if offset < self._pos:
409
# Read and discard data until we reach the desired position.
410
self._read_block(offset, return_data=False)
415
"""Return the current file position."""
416
self._check_not_closed()
420
def open(filename, mode="rb", *,
421
format=None, check=-1, preset=None, filters=None,
422
encoding=None, errors=None, newline=None):
423
"""Open an LZMA-compressed file in binary or text mode.
425
filename can be either an actual file name (given as a str or bytes
426
object), in which case the named file is opened, or it can be an
427
existing file object to read from or write to.
429
The mode argument can be "r", "rb" (default), "w", "wb", "x", "xb",
430
"a", or "ab" for binary mode, or "rt", "wt", "xt", or "at" for text
433
The format, check, preset and filters arguments specify the
434
compression settings, as for LZMACompressor, LZMADecompressor and
437
For binary mode, this function is equivalent to the LZMAFile
438
constructor: LZMAFile(filename, mode, ...). In this case, the
439
encoding, errors and newline arguments must not be provided.
441
For text mode, a LZMAFile object is created, and wrapped in an
442
io.TextIOWrapper instance with the specified encoding, error
443
handling behavior, and line ending(s).
448
raise ValueError("Invalid mode: %r" % (mode,))
450
if encoding is not None:
451
raise ValueError("Argument 'encoding' not supported in binary mode")
452
if errors is not None:
453
raise ValueError("Argument 'errors' not supported in binary mode")
454
if newline is not None:
455
raise ValueError("Argument 'newline' not supported in binary mode")
457
lz_mode = mode.replace("t", "")
458
binary_file = LZMAFile(filename, lz_mode, format=format, check=check,
459
preset=preset, filters=filters)
462
return io.TextIOWrapper(binary_file, encoding, errors, newline)
467
def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None):
468
"""Compress a block of data.
470
Refer to LZMACompressor's docstring for a description of the
471
optional arguments *format*, *check*, *preset* and *filters*.
473
For incremental compression, use an LZMACompressor instead.
475
comp = LZMACompressor(format, check, preset, filters)
476
return comp.compress(data) + comp.flush()
479
def decompress(data, format=FORMAT_AUTO, memlimit=None, filters=None):
480
"""Decompress a block of data.
482
Refer to LZMADecompressor's docstring for a description of the
483
optional arguments *format*, *check* and *filters*.
485
For incremental decompression, use an LZMADecompressor instead.
489
decomp = LZMADecompressor(format, memlimit, filters)
490
results.append(decomp.decompress(data))
492
raise LZMAError("Compressed data ended before the "
493
"end-of-stream marker was reached")
494
if not decomp.unused_data:
495
return b"".join(results)
496
# There is unused data left over. Proceed to next stream.
497
data = decomp.unused_data