~ubuntu-branches/debian/stretch/python-pyeclib/stretch

« back to all changes in this revision

Viewing changes to .pc/big-endian.patch/test/test_pyeclib_api.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-09-09 13:32:08 UTC
  • Revision ID: package-import@ubuntu.com-20150909133208-r00d9e1xoqynjwtp
Tags: 1.0.8-3
* Team upload.
* Drop BD's on gf-complete and jerasure dev packages; not required
  due to the runtime nature of backend loading in liberasurecode.
* d/p/big-endian.patch: Cherry pick fix from upstream VCS to resolve
  issues on big endian architectures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2013, Kevin Greenan (kmgreen2@gmail.com)
 
2
# All rights reserved.
 
3
#
 
4
# Redistribution and use in source and binary forms, with or without
 
5
# modification, are permitted provided that the following conditions are met:
 
6
#
 
7
# Redistributions of source code must retain the above copyright notice, this
 
8
# list of conditions and the following disclaimer.
 
9
#
 
10
# Redistributions in binary form must reproduce the above copyright notice,
 
11
# this list of conditions and the following disclaimer in the documentation
 
12
# and/or other materials provided with the distribution.  THIS SOFTWARE IS
 
13
# PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
 
14
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
15
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN
 
16
# NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
 
17
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
18
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
19
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 
20
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
21
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 
22
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
23
 
 
24
import random
 
25
from string import ascii_letters, ascii_uppercase, digits
 
26
import sys
 
27
import tempfile
 
28
import unittest
 
29
from pyeclib.ec_iface import ECBackendNotSupported
 
30
from pyeclib.ec_iface import ECDriverError
 
31
from pyeclib.ec_iface import ECInsufficientFragments
 
32
 
 
33
from pyeclib.ec_iface import ECDriver, PyECLib_EC_Types
 
34
from .test_pyeclib_c import _available_backends
 
35
 
 
36
if sys.version < '3':
 
37
    def b2i(b):
 
38
        return ord(b)
 
39
else:
 
40
    def b2i(b):
 
41
        return b
 
42
 
 
43
 
 
44
class TestNullDriver(unittest.TestCase):
 
45
 
 
46
    def setUp(self):
 
47
        self.null_driver = ECDriver(
 
48
            library_import_str="pyeclib.core.ECNullDriver", k=8, m=2)
 
49
 
 
50
    def tearDown(self):
 
51
        pass
 
52
 
 
53
    def test_null_driver(self):
 
54
        self.null_driver.encode('')
 
55
        self.null_driver.decode([])
 
56
 
 
57
 
 
58
class TestStripeDriver(unittest.TestCase):
 
59
 
 
60
    def setUp(self):
 
61
        self.stripe_driver = ECDriver(
 
62
            library_import_str="pyeclib.core.ECStripingDriver", k=8, m=0)
 
63
 
 
64
    def tearDown(self):
 
65
        pass
 
66
 
 
67
 
 
68
class TestPyECLibDriver(unittest.TestCase):
 
69
 
 
70
    def __init__(self, *args):
 
71
        # Create the temp files needed for testing
 
72
        self.file_sizes = ["100-K"]
 
73
        self.files = {}
 
74
        self.num_iterations = 100
 
75
        self._create_tmp_files()
 
76
 
 
77
        unittest.TestCase.__init__(self, *args)
 
78
 
 
79
    def _create_tmp_files(self):
 
80
        """
 
81
        Create the temporary files needed for testing.  Use the tempfile
 
82
        package so that the files will be automatically removed during
 
83
        garbage collection.
 
84
        """
 
85
        for size_str in self.file_sizes:
 
86
            # Determine the size of the file to create
 
87
            size_desc = size_str.split("-")
 
88
            size = int(size_desc[0])
 
89
            if size_desc[1] == 'M':
 
90
                size *= 1000000
 
91
            elif size_desc[1] == 'K':
 
92
                size *= 1000
 
93
 
 
94
            # Create the dictionary of files to test with
 
95
            buf = ''.join(random.choice(ascii_letters) for i in range(size))
 
96
            if sys.version_info >= (3,):
 
97
                buf = buf.encode('ascii')
 
98
            tmp_file = tempfile.NamedTemporaryFile()
 
99
            tmp_file.write(buf)
 
100
            self.files[size_str] = tmp_file
 
101
 
 
102
    def setUp(self):
 
103
        # Ensure that the file offset is set to the head of the file
 
104
        for _, tmp_file in self.files.items():
 
105
            tmp_file.seek(0, 0)
 
106
 
 
107
    def tearDown(self):
 
108
        pass
 
109
 
 
110
    def test_valid_algo(self):
 
111
        print("")
 
112
        for _type in PyECLib_EC_Types.names():
 
113
            # Check if this algo works
 
114
            if _type not in _available_backends:
 
115
                print("Skipping test for %s backend" % _type)
 
116
                continue
 
117
            try:
 
118
                if _type is 'shss':
 
119
                    _instance = ECDriver(k=10, m=4, ec_type=_type)
 
120
                else:
 
121
                    _instance = ECDriver(k=10, m=5, ec_type=_type)
 
122
            except ECDriverError:
 
123
                self.fail("%p: %s algorithm not supported" % _instance, _type)
 
124
 
 
125
        self.assertRaises(ECBackendNotSupported, ECDriver, k=10, m=5,
 
126
                          ec_type="invalid_algo")
 
127
 
 
128
    def get_pyeclib_testspec(self, csum="none"):
 
129
        pyeclib_drivers = []
 
130
        _type1 = 'jerasure_rs_vand'
 
131
        if _type1 in _available_backends:
 
132
            pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type1,
 
133
                                   chksum_type=csum))
 
134
            pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type1,
 
135
                                   chksum_type=csum))
 
136
            pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type1,
 
137
                                   chksum_type=csum))
 
138
            pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type1,
 
139
                                   chksum_type=csum))
 
140
        _type2 = 'liberasurecode_rs_vand'
 
141
        if _type2 in _available_backends:
 
142
            pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type2,
 
143
                                   chksum_type=csum))
 
144
            pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type2,
 
145
                                   chksum_type=csum))
 
146
            pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type2,
 
147
                                   chksum_type=csum))
 
148
            pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type2,
 
149
                                   chksum_type=csum))
 
150
        _type3 = 'flat_xor_hd'
 
151
        if _type3 in _available_backends:
 
152
            pyeclib_drivers.append(ECDriver(k=12, m=6, ec_type=_type3,
 
153
                                   chksum_type=csum))
 
154
            pyeclib_drivers.append(ECDriver(k=10, m=5, ec_type=_type3,
 
155
                                   chksum_type=csum))
 
156
        return pyeclib_drivers
 
157
 
 
158
    def test_small_encode(self):
 
159
        pyeclib_drivers = self.get_pyeclib_testspec()
 
160
        encode_strs = [b"a", b"hello", b"hellohyhi", b"yo"]
 
161
 
 
162
        for pyeclib_driver in pyeclib_drivers:
 
163
            for encode_str in encode_strs:
 
164
                encoded_fragments = pyeclib_driver.encode(encode_str)
 
165
                decoded_str = pyeclib_driver.decode(encoded_fragments)
 
166
 
 
167
                self.assertTrue(decoded_str == encode_str)
 
168
 
 
169
#    def disabled_test_verify_fragment_algsig_chksum_fail(self):
 
170
#        pyeclib_drivers = []
 
171
#        pyeclib_drivers.append(
 
172
#            ECDriver(k=12, m=2, ec_type="jerasure_rs_vand", chksum_type="algsig"))
 
173
#        pyeclib_drivers.append(
 
174
#            ECDriver(k=12, m=3, ec_type="jerasure_rs_vand", chksum_type="algsig"))
 
175
#        pyeclib_drivers.append(
 
176
#            ECDriver(k=12, m=6, ec_type="flat_xor_hd", chksum_type="algsig"))
 
177
#        pyeclib_drivers.append(
 
178
#            ECDriver(k=10, m=5, ec_type="flat_xor_hd", chksum_type="algsig"))
 
179
#
 
180
#        filesize = 1024 * 1024 * 3
 
181
#        file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
 
182
#        file_bytes = file_str.encode('utf-8')
 
183
#
 
184
#        fragment_to_corrupt = random.randint(0, 12)
 
185
#
 
186
#        for pyeclib_driver in pyeclib_drivers:
 
187
#            fragments = pyeclib_driver.encode(file_bytes)
 
188
#            fragment_metadata_list = []
 
189
#
 
190
#            i = 0
 
191
#            for fragment in fragments:
 
192
#                if i == fragment_to_corrupt:
 
193
#                    corrupted_fragment = fragment[:100] +\
 
194
#                        (str(chr((b2i(fragment[100]) + 0x1)
 
195
#                                 % 0xff))).encode('utf-8') + fragment[101:]
 
196
#                    fragment_metadata_list.append(pyeclib_driver.get_metadata(corrupted_fragment))
 
197
#                else:
 
198
#                    fragment_metadata_list.append(pyeclib_driver.get_metadata(fragment))
 
199
#                i += 1
 
200
#
 
201
#            self.assertTrue(pyeclib_driver.verify_stripe_metadata(fragment_metadata_list) != -1)
 
202
#
 
203
#    def disabled_test_verify_fragment_inline_succeed(self):
 
204
#        pyeclib_drivers = []
 
205
#        pyeclib_drivers.append(
 
206
#            ECDriver(k=12, m=2, ec_type="jerasure_rs_vand", chksum_type="algsig"))
 
207
#        pyeclib_drivers.append(
 
208
#            ECDriver(k=12, m=3, ec_type="jerasure_rs_vand", chksum_type="algsig"))
 
209
#        pyeclib_drivers.append(
 
210
#            ECDriver(k=12, m=6, ec_type="flat_xor_hd", chksum_type="algsig"))
 
211
#        pyeclib_drivers.append(
 
212
#            ECDriver(k=10, m=5, ec_type="flat_xor_hd", chksum_type="algsig"))
 
213
#
 
214
#        filesize = 1024 * 1024 * 3
 
215
#        file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
 
216
#        file_bytes = file_str.encode('utf-8')
 
217
#
 
218
#        for pyeclib_driver in pyeclib_drivers:
 
219
#            fragments = pyeclib_driver.encode(file_bytes)
 
220
#
 
221
#            fragment_metadata_list = []
 
222
#
 
223
#            for fragment in fragments:
 
224
#                fragment_metadata_list.append(
 
225
#                    pyeclib_driver.get_metadata(fragment))
 
226
#
 
227
#            self.assertTrue(
 
228
#                pyeclib_driver.verify_stripe_metadata(fragment_metadata_list) == -1)
 
229
#
 
230
 
 
231
    def check_metadata_formatted(self, k, m, ec_type, chksum_type):
 
232
 
 
233
        if ec_type not in _available_backends:
 
234
            return
 
235
 
 
236
        filesize = 1024 * 1024 * 3
 
237
        file_str = ''.join(random.choice(ascii_letters)
 
238
                           for i in range(filesize))
 
239
        file_bytes = file_str.encode('utf-8')
 
240
 
 
241
        pyeclib_driver = ECDriver(k=k, m=m, ec_type=ec_type,
 
242
                                  chksum_type=chksum_type)
 
243
 
 
244
        fragments = pyeclib_driver.encode(file_bytes)
 
245
 
 
246
        f = 0
 
247
        for fragment in fragments:
 
248
            metadata = pyeclib_driver.get_metadata(fragment, 1)
 
249
            if 'index' in metadata:
 
250
                self.assertEqual(metadata['index'], f)
 
251
            else:
 
252
                self.assertTrue(False)
 
253
 
 
254
            if 'chksum_mismatch' in metadata:
 
255
                self.assertEqual(metadata['chksum_mismatch'], 0)
 
256
            else:
 
257
                self.assertTrue(False)
 
258
 
 
259
            if 'backend_id' in metadata:
 
260
                self.assertEqual(metadata['backend_id'], ec_type)
 
261
            else:
 
262
                self.assertTrue(False)
 
263
 
 
264
            if 'orig_data_size' in metadata:
 
265
                self.assertEqual(metadata['orig_data_size'], 3145728)
 
266
            else:
 
267
                self.assertTrue(False)
 
268
 
 
269
            if 'chksum_type' in metadata:
 
270
                self.assertEqual(metadata['chksum_type'], 'crc32')
 
271
            else:
 
272
                self.assertTrue(False)
 
273
 
 
274
            if 'backend_version' not in metadata:
 
275
                self.assertTrue(False)
 
276
 
 
277
            if 'chksum' not in metadata:
 
278
                self.assertTrue(False)
 
279
 
 
280
            if 'size' not in metadata:
 
281
                self.assertTrue(False)
 
282
 
 
283
            f += 1
 
284
 
 
285
    def test_get_metadata_formatted(self):
 
286
        self.check_metadata_formatted(12, 2, "jerasure_rs_vand",
 
287
                                      "inline_crc32")
 
288
        self.check_metadata_formatted(12, 2, "liberasurecode_rs_vand",
 
289
                                      "inline_crc32")
 
290
        self.check_metadata_formatted(8, 4, "liberasurecode_rs_vand",
 
291
                                      "inline_crc32")
 
292
 
 
293
    def test_verify_fragment_inline_chksum_fail(self):
 
294
        pyeclib_drivers = self.get_pyeclib_testspec("inline_crc32")
 
295
        filesize = 1024 * 1024 * 3
 
296
        file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
 
297
        file_bytes = file_str.encode('utf-8')
 
298
 
 
299
        for pyeclib_driver in pyeclib_drivers:
 
300
            fragments = pyeclib_driver.encode(file_bytes)
 
301
 
 
302
            fragment_metadata_list = []
 
303
 
 
304
            first_fragment_to_corrupt = random.randint(0, len(fragments))
 
305
            num_to_corrupt = 2
 
306
            fragments_to_corrupt = [
 
307
              (first_fragment_to_corrupt + i) % len(fragments) for i in range(num_to_corrupt + 1)
 
308
            ]
 
309
            fragments_to_corrupt.sort()
 
310
 
 
311
            i = 0
 
312
            for fragment in fragments:
 
313
                if i in fragments_to_corrupt:
 
314
                    corrupted_fragment = fragment[:100] +\
 
315
                        (str(chr((b2i(fragment[100]) + 0x1)
 
316
                                 % 128))).encode('utf-8') + fragment[101:]
 
317
                    fragment_metadata_list.append(
 
318
                        pyeclib_driver.get_metadata(corrupted_fragment))
 
319
                else:
 
320
                    fragment_metadata_list.append(
 
321
                        pyeclib_driver.get_metadata(fragment))
 
322
                i += 1
 
323
 
 
324
            expected_ret_value = {"status": -206, 
 
325
                                  "reason": "Bad checksum", 
 
326
                                  "bad_fragments": fragments_to_corrupt}
 
327
            self.assertEqual(
 
328
                pyeclib_driver.verify_stripe_metadata(fragment_metadata_list),
 
329
                expected_ret_value)
 
330
 
 
331
    def test_verify_fragment_inline_chksum_succeed(self):
 
332
        pyeclib_drivers = self.get_pyeclib_testspec("inline_crc32")
 
333
 
 
334
        filesize = 1024 * 1024 * 3
 
335
        file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
 
336
        file_bytes = file_str.encode('utf-8')
 
337
 
 
338
        for pyeclib_driver in pyeclib_drivers:
 
339
            fragments = pyeclib_driver.encode(file_bytes)
 
340
 
 
341
            fragment_metadata_list = []
 
342
 
 
343
            for fragment in fragments:
 
344
                fragment_metadata_list.append(
 
345
                    pyeclib_driver.get_metadata(fragment))
 
346
 
 
347
            expected_ret_value = {"status": 0 }
 
348
 
 
349
            self.assertTrue(
 
350
                pyeclib_driver.verify_stripe_metadata(fragment_metadata_list) == expected_ret_value)
 
351
 
 
352
    def test_get_segment_byterange_info(self):
 
353
        pyeclib_drivers = self.get_pyeclib_testspec()
 
354
 
 
355
        file_size = 1024 * 1024
 
356
        segment_size = 3 * 1024
 
357
 
 
358
        ranges = [(0, 1), (1, 12), (10, 1000), (0, segment_size-1), (1, segment_size+1), (segment_size-1, 2*segment_size)]
 
359
 
 
360
        expected_results = {}
 
361
 
 
362
        expected_results[(0, 1)] = {0: (0, 1)}
 
363
        expected_results[(1, 12)] = {0: (1, 12)}
 
364
        expected_results[(10, 1000)] = {0: (10, 1000)}
 
365
        expected_results[(0, segment_size-1)] = {0: (0, segment_size-1)}
 
366
        expected_results[(1, segment_size+1)] = {0: (1, segment_size-1), 1: (0, 1)}
 
367
        expected_results[(segment_size-1, 2*segment_size)] = {0: (segment_size-1, segment_size-1), 1: (0, segment_size-1), 2: (0, 0)}
 
368
 
 
369
        results = pyeclib_drivers[0].get_segment_info_byterange(ranges, file_size, segment_size)
 
370
 
 
371
        for exp_result_key in expected_results:
 
372
            self.assertIn(exp_result_key, results)
 
373
            self.assertTrue(len(results[exp_result_key]) == len(expected_results[exp_result_key]))
 
374
            exp_result_map = expected_results[exp_result_key]
 
375
            for segment_key in exp_result_map:
 
376
                self.assertIn(segment_key, results[exp_result_key])
 
377
                self.assertTrue(results[exp_result_key][segment_key] == expected_results[exp_result_key][segment_key])
 
378
 
 
379
    def test_get_segment_info(self):
 
380
        pyeclib_drivers = self.get_pyeclib_testspec()
 
381
 
 
382
        file_sizes = [
 
383
            1024 * 1024,
 
384
            2 * 1024 * 1024,
 
385
            10 * 1024 * 1024,
 
386
            10 * 1024 * 1024 + 7]
 
387
        segment_sizes = [3 * 1024, 1024 * 1024]
 
388
        segment_strings = {}
 
389
 
 
390
        #
 
391
        # Generate some test segments for each segment size.
 
392
        # Use 2 * segment size, because last segment may be
 
393
        # greater than segment_size
 
394
        #
 
395
        char_set = ascii_uppercase + digits
 
396
        for segment_size in segment_sizes:
 
397
            segment_strings[segment_size] = \
 
398
                ''.join(random.choice(char_set) for i in range(segment_size * 2))
 
399
 
 
400
        for pyeclib_driver in pyeclib_drivers:
 
401
            for file_size in file_sizes:
 
402
                for segment_size in segment_sizes:
 
403
                    #
 
404
                    # Compute the segment info
 
405
                    #
 
406
                    segment_info = pyeclib_driver.get_segment_info(
 
407
                        file_size,
 
408
                        segment_size)
 
409
 
 
410
                    num_segments = segment_info['num_segments']
 
411
                    segment_size = segment_info['segment_size']
 
412
                    fragment_size = segment_info['fragment_size']
 
413
                    last_segment_size = segment_info['last_segment_size']
 
414
                    last_fragment_size = segment_info['last_fragment_size']
 
415
 
 
416
                    computed_file_size = (
 
417
                        (num_segments - 1) * segment_size) + last_segment_size
 
418
 
 
419
                    #
 
420
                    # Verify that the segment sizes add up
 
421
                    #
 
422
                    self.assertTrue(computed_file_size == file_size)
 
423
 
 
424
                    encoded_fragments = pyeclib_driver.encode(
 
425
                        (segment_strings[segment_size][
 
426
                            :segment_size]).encode('utf-8'))
 
427
 
 
428
                    #
 
429
                    # Verify the fragment size
 
430
                    #
 
431
                    self.assertTrue(fragment_size == len(encoded_fragments[0]))
 
432
 
 
433
                    if last_segment_size > 0:
 
434
                        encoded_fragments = pyeclib_driver.encode(
 
435
                            (segment_strings[segment_size][
 
436
                                :last_segment_size]).encode('utf-8'))
 
437
 
 
438
                        #
 
439
                        # Verify the last fragment size, if there is one
 
440
                        #
 
441
                        self.assertTrue(
 
442
                            last_fragment_size == len(
 
443
                                encoded_fragments[0]))
 
444
 
 
445
    def test_rs(self):
 
446
        pyeclib_drivers = self.get_pyeclib_testspec()
 
447
 
 
448
        for pyeclib_driver in pyeclib_drivers:
 
449
            for file_size in self.file_sizes:
 
450
                tmp_file = self.files[file_size]
 
451
                tmp_file.seek(0)
 
452
                whole_file_bytes = tmp_file.read()
 
453
 
 
454
                encode_input = whole_file_bytes
 
455
                orig_fragments = pyeclib_driver.encode(encode_input)
 
456
 
 
457
                for iter in range(self.num_iterations):
 
458
                    num_missing = 2
 
459
                    idxs_to_remove = []
 
460
                    fragments = orig_fragments[:]
 
461
                    for j in range(num_missing):
 
462
                        idx = random.randint(0, (pyeclib_driver.k +
 
463
                                                 pyeclib_driver.m - 1))
 
464
                        if idx not in idxs_to_remove:
 
465
                            idxs_to_remove.append(idx)
 
466
 
 
467
                    # Reverse sort the list, so we can always
 
468
                    # remove from the original index
 
469
                    idxs_to_remove.sort(key=int, reverse=True)
 
470
                    for idx in idxs_to_remove:
 
471
                        fragments.pop(idx)
 
472
 
 
473
                    #
 
474
                    # Test decoder (send copy, because we want to re-use
 
475
                    # fragments for reconstruction)
 
476
                    #
 
477
                    decoded_string = pyeclib_driver.decode(fragments[:])
 
478
 
 
479
                    self.assertTrue(encode_input == decoded_string)
 
480
 
 
481
                    #
 
482
                    # Test reconstructor
 
483
                    #
 
484
                    reconstructed_fragments = pyeclib_driver.reconstruct(
 
485
                        fragments,
 
486
                        idxs_to_remove)
 
487
                    self.assertTrue(
 
488
                        reconstructed_fragments[0] == orig_fragments[
 
489
                            idxs_to_remove[0]])
 
490
 
 
491
                    #
 
492
                    # Test decode with integrity checks
 
493
                    #
 
494
                    first_fragment_to_corrupt = random.randint(0, len(fragments))
 
495
                    num_to_corrupt = min(len(fragments), pyeclib_driver.m + 1)
 
496
                    fragments_to_corrupt = [
 
497
                      (first_fragment_to_corrupt + i) % len(fragments) for i in range(num_to_corrupt)
 
498
                    ]
 
499
 
 
500
                    i = 0
 
501
                    for fragment in fragments:
 
502
                      if i in fragments_to_corrupt:
 
503
                        corrupted_fragment = ("0" * len(fragment)).encode('utf-8')
 
504
                        fragments[i] = corrupted_fragment
 
505
                      i += 1
 
506
 
 
507
                    self.assertRaises(ECInsufficientFragments,
 
508
                                      pyeclib_driver.decode,
 
509
                                      fragments[:], force_metadata_checks=True)
 
510
 
 
511
    def get_available_backend(self, k, m, ec_type, chksum_type="inline_crc32"):
 
512
        if ec_type[:11] == "flat_xor_hd":
 
513
            return ECDriver(k=k, m=m, ec_type="flat_xor_hd",
 
514
                            chksum_type=chksum_type)
 
515
        elif ec_type in _available_backends:
 
516
            return ECDriver(k=k, m=m, ec_type=ec_type, chksum_type=chksum_type)
 
517
        else:
 
518
            return None
 
519
 
 
520
    def test_liberasurecode_insufficient_frags_error(self):
 
521
        file_size = self.file_sizes[0]
 
522
        tmp_file = self.files[file_size]
 
523
        tmp_file.seek(0)
 
524
        whole_file_bytes = tmp_file.read()
 
525
        for ec_type in ['flat_xor_hd_3', 'liberasurecode_rs_vand']:
 
526
            pyeclib_driver = self.get_available_backend(
 
527
                k=10, m=5, ec_type=ec_type)
 
528
            fragments = pyeclib_driver.encode(whole_file_bytes)
 
529
            self.assertRaises(ECInsufficientFragments,
 
530
                              pyeclib_driver.reconstruct,
 
531
                              [fragments[0]], [1, 2, 3, 4, 5, 6])
 
532
 
 
533
    def test_min_parity_fragments_needed(self):
 
534
        pyeclib_drivers = []
 
535
        pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type="liberasurecode_rs_vand"))
 
536
        self.assertTrue(
 
537
            pyeclib_drivers[0].min_parity_fragments_needed() == 1)
 
538
 
 
539
if __name__ == '__main__':
 
540
    unittest.main()