3
# Copyright (c) 2011, Andres Moreira <andres@andresmoreira.com>
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions are met:
8
# * Redistributions of source code must retain the above copyright
9
# notice, this list of conditions and the following disclaimer.
10
# * Redistributions in binary form must reproduce the above copyright
11
# notice, this list of conditions and the following disclaimer in the
12
# documentation and/or other materials provided with the distribution.
13
# * Neither the name of the authors nor the
14
# names of its contributors may be used to endorse or promote products
15
# derived from this software without specific prior written permission.
17
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
# ARE DISCLAIMED. IN NO EVENT SHALL ANDRES MOREIRA BE LIABLE FOR ANY DIRECT,
21
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
from unittest import TestCase
37
class SnappyCompressionTest(TestCase):
39
def test_simple_compress(self):
40
text = "hello world!".encode('utf-8')
41
compressed = snappy.compress(text)
42
self.assertEqual(text, snappy.uncompress(compressed))
44
def test_moredata_compress(self):
45
text = "snappy +" * 1000 + " " + "by " * 1000 + " google"
46
text = text.encode('utf-8')
47
compressed = snappy.compress(text)
48
self.assertEqual(text, snappy.uncompress(compressed))
50
def test_randombytes_compress(self):
51
_bytes = repr(os.urandom(1000)).encode('utf-8')
52
compressed = snappy.compress(_bytes)
53
self.assertEqual(_bytes, snappy.uncompress(compressed))
55
def test_randombytes2_compress(self):
56
_bytes = bytes(os.urandom(10000))
57
compressed = snappy.compress(_bytes)
58
self.assertEqual(_bytes, snappy.uncompress(compressed))
60
def test_uncompress_error(self):
61
self.assertRaises(snappy.UncompressError, snappy.uncompress,
62
"hoa".encode('utf-8'))
64
if sys.version_info[0] == 2:
65
def test_unicode_compress(self):
66
text = "hello unicode world!".decode('utf-8')
67
compressed = snappy.compress(text)
68
self.assertEqual(text, snappy.uncompress(compressed))
70
def test_decompress(self):
71
# decompress == uncompress, just to support compatibility with zlib
72
text = "hello world!".encode('utf-8')
73
compressed = snappy.compress(text)
74
self.assertEqual(text, snappy.decompress(compressed))
76
def test_big_string(self):
77
text = ('a'*10000000).encode('utf-8')
78
compressed = snappy.compress(text)
79
self.assertEqual(text, snappy.decompress(compressed))
82
class SnappyValidBufferTest(TestCase):
84
def test_valid_compressed_buffer(self):
85
text = "hello world!".encode('utf-8')
86
compressed = snappy.compress(text)
87
uncompressed = snappy.uncompress(compressed)
88
self.assertEqual(text == uncompressed,
89
snappy.isValidCompressed(compressed))
91
def test_invalid_compressed_buffer(self):
92
self.assertFalse(snappy.isValidCompressed(
93
"not compressed".encode('utf-8')))
96
class SnappyStreaming(TestCase):
98
def test_random(self):
100
compressor = snappy.StreamCompressor()
101
decompressor = snappy.StreamDecompressor()
104
for _ in range(random.randint(0, 3)):
105
chunk = os.urandom(random.randint(0, snappy._CHUNK_MAX * 2))
107
compressed += compressor.add_chunk(
108
chunk, compress=random.choice([True, False, None]))
110
upper_bound = random.choice([256, snappy._CHUNK_MAX * 2])
112
size = random.randint(0, upper_bound)
113
chunk, compressed = compressed[:size], compressed[size:]
114
chunk = decompressor.decompress(chunk)
115
self.assertEqual(data[:len(chunk)], chunk)
116
data = data[len(chunk):]
119
self.assertEqual(len(data), 0)
121
def test_compression(self):
122
# test that we can add compressed chunks
123
compressor = snappy.StreamCompressor()
125
compressed_data = snappy.compress(data)
126
crc = struct.pack("<L", snappy._masked_crc32c(data))
127
self.assertEqual(crc, b"\x8f)H\xbd")
128
self.assertEqual(len(compressed_data), 6)
129
self.assertEqual(compressor.add_chunk(data, compress=True),
130
b"\xff\x06\x00\x00sNaPpY"
131
b"\x00\x0a\x00\x00" + crc + compressed_data)
133
# test that we can add uncompressed chunks
135
crc = struct.pack("<L", snappy._masked_crc32c(data))
136
self.assertEqual(crc, b"\xb2\x14)\x8a")
137
self.assertEqual(compressor.add_chunk(data, compress=False),
138
b"\x01\x36\x00\x00" + crc + data)
140
# test that we can add more data than will fit in one chunk
141
data = b"\x01" * (snappy._CHUNK_MAX * 2 - 5)
142
crc1 = struct.pack("<L",
143
snappy._masked_crc32c(data[:snappy._CHUNK_MAX]))
144
self.assertEqual(crc1, b"h#6\x8e")
145
crc2 = struct.pack("<L",
146
snappy._masked_crc32c(data[snappy._CHUNK_MAX:]))
147
self.assertEqual(crc2, b"q\x8foE")
148
self.assertEqual(compressor.add_chunk(data, compress=False),
149
b"\x01\x04\x00\x01" + crc1 + data[:snappy._CHUNK_MAX] +
150
b"\x01\xff\xff\x00" + crc2 + data[snappy._CHUNK_MAX:])
152
def test_decompression(self):
153
# test that we check for the initial stream identifier
155
self.assertRaises(snappy.UncompressError,
156
snappy.StreamDecompressor().decompress,
158
struct.pack("<L", snappy._masked_crc32c(data)) + data)
160
snappy.StreamDecompressor().decompress(
161
b"\xff\x06\x00\x00sNaPpY"
162
b"\x01\x36\x00\x00" +
163
struct.pack("<L", snappy._masked_crc32c(data)) + data),
165
decompressor = snappy.StreamDecompressor()
166
decompressor.decompress(b"\xff\x06\x00\x00sNaPpY")
168
decompressor.copy().decompress(
169
b"\x01\x36\x00\x00" +
170
struct.pack("<L", snappy._masked_crc32c(data)) + data),
173
# test that we throw errors for unknown unskippable chunks
174
self.assertRaises(snappy.UncompressError,
175
decompressor.copy().decompress, b"\x03\x01\x00\x00")
177
# test that we skip unknown skippable chunks
178
self.assertEqual(b"",
179
decompressor.copy().decompress(b"\xfe\x01\x00\x00"))
181
# test that we check CRCs
182
compressed_data = snappy.compress(data)
183
real_crc = struct.pack("<L", snappy._masked_crc32c(data))
184
fake_crc = os.urandom(4)
185
self.assertRaises(snappy.UncompressError,
186
decompressor.copy().decompress,
187
b"\x00\x0a\x00\x00" + fake_crc + compressed_data)
189
decompressor.copy().decompress(
190
b"\x00\x0a\x00\x00" + real_crc + compressed_data),
193
# test that we buffer when we don't have enough
194
uncompressed_data = os.urandom(100)
195
compressor = snappy.StreamCompressor()
196
compressed_data = (compressor.compress(uncompressed_data[:50]) +
197
compressor.compress(uncompressed_data[50:]))
198
for split1 in range(len(compressed_data) - 1):
199
for split2 in range(split1, len(compressed_data)):
200
decompressor = snappy.StreamDecompressor()
202
(decompressor.decompress(compressed_data[:split1]) +
203
decompressor.decompress(compressed_data[split1:split2]) +
204
decompressor.decompress(compressed_data[split2:])),
207
def test_concatenation(self):
208
data1 = os.urandom(snappy._CHUNK_MAX * 2)
209
data2 = os.urandom(4096)
210
decompressor = snappy.StreamDecompressor()
212
decompressor.decompress(
213
snappy.StreamCompressor().compress(data1) +
214
snappy.StreamCompressor().compress(data2)),
218
if __name__ == "__main__":