~ubuntu-branches/ubuntu/vivid/python-snappy/vivid

« back to all changes in this revision

Viewing changes to test_snappy.py

  • Committer: Package Import Robot
  • Author(s): Shell Xu
  • Date: 2013-06-02 18:59:04 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20130602185904-d91mdohuzqm461c8
Tags: 0.5-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# Copyright (c) 2011, Andres Moreira <andres@andresmoreira.com>
 
4
# All rights reserved.
 
5
#
 
6
# Redistribution and use in source and binary forms, with or without
 
7
# modification, are permitted provided that the following conditions are met:
 
8
#     * Redistributions of source code must retain the above copyright
 
9
#       notice, this list of conditions and the following disclaimer.
 
10
#     * Redistributions in binary form must reproduce the above copyright
 
11
#       notice, this list of conditions and the following disclaimer in the
 
12
#       documentation and/or other materials provided with the distribution.
 
13
#     * Neither the name of the authors nor the
 
14
#       names of its contributors may be used to endorse or promote products
 
15
#       derived from this software without specific prior written permission.
 
16
#
 
17
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 
18
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
19
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
20
# ARE DISCLAIMED. IN NO EVENT SHALL ANDRES MOREIRA BE LIABLE FOR ANY DIRECT,
 
21
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
22
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
23
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 
24
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
25
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
26
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
27
#
 
28
 
 
29
import os
 
30
import sys
 
31
import random
 
32
import snappy
 
33
import struct
 
34
from unittest import TestCase
 
35
 
 
36
 
 
37
class SnappyCompressionTest(TestCase):
 
38
 
 
39
    def test_simple_compress(self):
 
40
        text = "hello world!".encode('utf-8')
 
41
        compressed = snappy.compress(text)
 
42
        self.assertEqual(text, snappy.uncompress(compressed))
 
43
 
 
44
    def test_moredata_compress(self):
 
45
        text = "snappy +" * 1000 + " " + "by " * 1000 + " google"
 
46
        text = text.encode('utf-8')
 
47
        compressed = snappy.compress(text)
 
48
        self.assertEqual(text, snappy.uncompress(compressed))
 
49
 
 
50
    def test_randombytes_compress(self):
 
51
        _bytes = repr(os.urandom(1000)).encode('utf-8')
 
52
        compressed = snappy.compress(_bytes)
 
53
        self.assertEqual(_bytes, snappy.uncompress(compressed))
 
54
 
 
55
    def test_randombytes2_compress(self):
 
56
        _bytes = bytes(os.urandom(10000))
 
57
        compressed = snappy.compress(_bytes)
 
58
        self.assertEqual(_bytes, snappy.uncompress(compressed))
 
59
 
 
60
    def test_uncompress_error(self):
 
61
        self.assertRaises(snappy.UncompressError, snappy.uncompress,
 
62
                          "hoa".encode('utf-8'))
 
63
 
 
64
    if sys.version_info[0] == 2:
 
65
        def test_unicode_compress(self):
 
66
            text = "hello unicode world!".decode('utf-8')
 
67
            compressed = snappy.compress(text)
 
68
            self.assertEqual(text, snappy.uncompress(compressed))
 
69
 
 
70
    def test_decompress(self):
 
71
        # decompress == uncompress, just to support compatibility with zlib
 
72
        text = "hello world!".encode('utf-8')
 
73
        compressed = snappy.compress(text)
 
74
        self.assertEqual(text, snappy.decompress(compressed))
 
75
 
 
76
    def test_big_string(self):
 
77
        text = ('a'*10000000).encode('utf-8')
 
78
        compressed = snappy.compress(text)
 
79
        self.assertEqual(text, snappy.decompress(compressed))
 
80
 
 
81
 
 
82
class SnappyValidBufferTest(TestCase):
 
83
 
 
84
    def test_valid_compressed_buffer(self):
 
85
        text = "hello world!".encode('utf-8')
 
86
        compressed = snappy.compress(text)
 
87
        uncompressed = snappy.uncompress(compressed)
 
88
        self.assertEqual(text == uncompressed,
 
89
                         snappy.isValidCompressed(compressed))
 
90
 
 
91
    def test_invalid_compressed_buffer(self):
 
92
        self.assertFalse(snappy.isValidCompressed(
 
93
                "not compressed".encode('utf-8')))
 
94
 
 
95
 
 
96
class SnappyStreaming(TestCase):
 
97
 
 
98
    def test_random(self):
 
99
        for _ in range(100):
 
100
            compressor = snappy.StreamCompressor()
 
101
            decompressor = snappy.StreamDecompressor()
 
102
            data = b""
 
103
            compressed = b""
 
104
            for _ in range(random.randint(0, 3)):
 
105
                chunk = os.urandom(random.randint(0, snappy._CHUNK_MAX * 2))
 
106
                data += chunk
 
107
                compressed += compressor.add_chunk(
 
108
                        chunk, compress=random.choice([True, False, None]))
 
109
 
 
110
            upper_bound = random.choice([256, snappy._CHUNK_MAX * 2])
 
111
            while compressed:
 
112
                size = random.randint(0, upper_bound)
 
113
                chunk, compressed = compressed[:size], compressed[size:]
 
114
                chunk = decompressor.decompress(chunk)
 
115
                self.assertEqual(data[:len(chunk)], chunk)
 
116
                data = data[len(chunk):]
 
117
 
 
118
            decompressor.flush()
 
119
            self.assertEqual(len(data), 0)
 
120
 
 
121
    def test_compression(self):
 
122
        # test that we can add compressed chunks
 
123
        compressor = snappy.StreamCompressor()
 
124
        data = b"\0" * 50
 
125
        compressed_data = snappy.compress(data)
 
126
        crc = struct.pack("<L", snappy._masked_crc32c(data))
 
127
        self.assertEqual(crc, b"\x8f)H\xbd")
 
128
        self.assertEqual(len(compressed_data), 6)
 
129
        self.assertEqual(compressor.add_chunk(data, compress=True),
 
130
                         b"\xff\x06\x00\x00sNaPpY"
 
131
                         b"\x00\x0a\x00\x00" + crc + compressed_data)
 
132
 
 
133
        # test that we can add uncompressed chunks
 
134
        data = b"\x01" * 50
 
135
        crc = struct.pack("<L", snappy._masked_crc32c(data))
 
136
        self.assertEqual(crc, b"\xb2\x14)\x8a")
 
137
        self.assertEqual(compressor.add_chunk(data, compress=False),
 
138
                         b"\x01\x36\x00\x00" + crc + data)
 
139
 
 
140
        # test that we can add more data than will fit in one chunk
 
141
        data = b"\x01" * (snappy._CHUNK_MAX * 2 - 5)
 
142
        crc1 = struct.pack("<L",
 
143
                snappy._masked_crc32c(data[:snappy._CHUNK_MAX]))
 
144
        self.assertEqual(crc1, b"h#6\x8e")
 
145
        crc2 = struct.pack("<L",
 
146
                snappy._masked_crc32c(data[snappy._CHUNK_MAX:]))
 
147
        self.assertEqual(crc2, b"q\x8foE")
 
148
        self.assertEqual(compressor.add_chunk(data, compress=False),
 
149
                b"\x01\x04\x00\x01" + crc1 + data[:snappy._CHUNK_MAX] +
 
150
                b"\x01\xff\xff\x00" + crc2 + data[snappy._CHUNK_MAX:])
 
151
 
 
152
    def test_decompression(self):
 
153
        # test that we check for the initial stream identifier
 
154
        data = b"\x01" * 50
 
155
        self.assertRaises(snappy.UncompressError,
 
156
                snappy.StreamDecompressor().decompress,
 
157
                    b"\x01\x36\x00\00" +
 
158
                    struct.pack("<L", snappy._masked_crc32c(data)) + data)
 
159
        self.assertEqual(
 
160
                snappy.StreamDecompressor().decompress(
 
161
                    b"\xff\x06\x00\x00sNaPpY"
 
162
                    b"\x01\x36\x00\x00" +
 
163
                    struct.pack("<L", snappy._masked_crc32c(data)) + data),
 
164
                data)
 
165
        decompressor = snappy.StreamDecompressor()
 
166
        decompressor.decompress(b"\xff\x06\x00\x00sNaPpY")
 
167
        self.assertEqual(
 
168
                decompressor.copy().decompress(
 
169
                    b"\x01\x36\x00\x00" +
 
170
                    struct.pack("<L", snappy._masked_crc32c(data)) + data),
 
171
                data)
 
172
 
 
173
        # test that we throw errors for unknown unskippable chunks
 
174
        self.assertRaises(snappy.UncompressError,
 
175
                decompressor.copy().decompress, b"\x03\x01\x00\x00")
 
176
 
 
177
        # test that we skip unknown skippable chunks
 
178
        self.assertEqual(b"",
 
179
                         decompressor.copy().decompress(b"\xfe\x01\x00\x00"))
 
180
 
 
181
        # test that we check CRCs
 
182
        compressed_data = snappy.compress(data)
 
183
        real_crc = struct.pack("<L", snappy._masked_crc32c(data))
 
184
        fake_crc = os.urandom(4)
 
185
        self.assertRaises(snappy.UncompressError,
 
186
                decompressor.copy().decompress,
 
187
                    b"\x00\x0a\x00\x00" + fake_crc + compressed_data)
 
188
        self.assertEqual(
 
189
                decompressor.copy().decompress(
 
190
                    b"\x00\x0a\x00\x00" + real_crc + compressed_data),
 
191
                data)
 
192
 
 
193
        # test that we buffer when we don't have enough
 
194
        uncompressed_data = os.urandom(100)
 
195
        compressor = snappy.StreamCompressor()
 
196
        compressed_data = (compressor.compress(uncompressed_data[:50]) +
 
197
                           compressor.compress(uncompressed_data[50:]))
 
198
        for split1 in range(len(compressed_data) - 1):
 
199
            for split2 in range(split1, len(compressed_data)):
 
200
                decompressor = snappy.StreamDecompressor()
 
201
                self.assertEqual(
 
202
                    (decompressor.decompress(compressed_data[:split1]) +
 
203
                     decompressor.decompress(compressed_data[split1:split2]) +
 
204
                     decompressor.decompress(compressed_data[split2:])),
 
205
                    uncompressed_data)
 
206
 
 
207
    def test_concatenation(self):
 
208
        data1 = os.urandom(snappy._CHUNK_MAX * 2)
 
209
        data2 = os.urandom(4096)
 
210
        decompressor = snappy.StreamDecompressor()
 
211
        self.assertEqual(
 
212
                decompressor.decompress(
 
213
                    snappy.StreamCompressor().compress(data1) +
 
214
                    snappy.StreamCompressor().compress(data2)),
 
215
                data1 + data2)
 
216
 
 
217
 
 
218
if __name__ == "__main__":
 
219
    import unittest
 
220
    unittest.main()