2
# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
4
# Permission is hereby granted, free of charge, to any person obtaining a
5
# copy of this software and associated documentation files (the
6
# "Software"), to deal in the Software without restriction, including
7
# without limitation the rights to use, copy, modify, merge, publish, dis-
8
# tribute, sublicense, and/or sell copies of the Software, and to permit
9
# persons to whom the Software is furnished to do so, subject to the fol-
12
# The above copyright notice and this permission notice shall be included
13
# in all copies or substantial portions of the Software.
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
24
from Queue import Queue
27
from tests.unit import unittest
28
from tests.unit import AWSMockServiceTestCase
30
from boto.glacier.concurrent import ConcurrentUploader, ConcurrentDownloader
31
from boto.glacier.concurrent import UploadWorkerThread
32
from boto.glacier.concurrent import _END_SENTINEL
35
class FakeThreadedConcurrentUploader(ConcurrentUploader):
36
def _start_upload_threads(self, results_queue, upload_id,
37
worker_queue, filename):
38
self.results_queue = results_queue
39
self.worker_queue = worker_queue
40
self.upload_id = upload_id
42
def _wait_for_upload_threads(self, hash_chunks, result_queue, total_parts):
43
for i in xrange(total_parts):
44
hash_chunks[i] = 'foo'
47
class FakeThreadedConcurrentDownloader(ConcurrentDownloader):
48
def _start_download_threads(self, results_queue, worker_queue):
49
self.results_queue = results_queue
50
self.worker_queue = worker_queue
52
def _wait_for_download_threads(self, filename, result_queue, total_parts):
56
class TestConcurrentUploader(unittest.TestCase):
59
super(TestConcurrentUploader, self).setUp()
60
self.stat_patch = mock.patch('os.stat')
61
self.stat_mock = self.stat_patch.start()
62
# Give a default value for tests that don't care
63
# what the file size is.
64
self.stat_mock.return_value.st_size = 1024 * 1024 * 8
67
self.stat_mock = self.stat_patch.start()
69
def test_calculate_required_part_size(self):
70
self.stat_mock.return_value.st_size = 1024 * 1024 * 8
71
uploader = ConcurrentUploader(mock.Mock(), 'vault_name')
72
total_parts, part_size = uploader._calculate_required_part_size(
74
self.assertEqual(total_parts, 2)
75
self.assertEqual(part_size, 4 * 1024 * 1024)
77
def test_calculate_required_part_size_too_small(self):
78
too_small = 1 * 1024 * 1024
79
self.stat_mock.return_value.st_size = 1024 * 1024 * 1024
80
uploader = ConcurrentUploader(mock.Mock(), 'vault_name',
82
total_parts, part_size = uploader._calculate_required_part_size(
84
self.assertEqual(total_parts, 256)
85
# Part size if 4MB not the passed in 1MB.
86
self.assertEqual(part_size, 4 * 1024 * 1024)
88
def test_work_queue_is_correctly_populated(self):
89
uploader = FakeThreadedConcurrentUploader(mock.MagicMock(),
91
uploader.upload('foofile')
92
q = uploader.worker_queue
93
items = [q.get() for i in xrange(q.qsize())]
94
self.assertEqual(items[0], (0, 4 * 1024 * 1024))
95
self.assertEqual(items[1], (1, 4 * 1024 * 1024))
96
# 2 for the parts, 10 for the end sentinels (10 threads).
97
self.assertEqual(len(items), 12)
99
def test_correct_low_level_api_calls(self):
100
api_mock = mock.MagicMock()
101
uploader = FakeThreadedConcurrentUploader(api_mock, 'vault_name')
102
uploader.upload('foofile')
103
# The threads call the upload_part, so we're just verifying the
104
# initiate/complete multipart API calls.
105
api_mock.initiate_multipart_upload.assert_called_with(
106
'vault_name', 4 * 1024 * 1024, None)
107
api_mock.complete_multipart_upload.assert_called_with(
108
'vault_name', mock.ANY, mock.ANY, 8 * 1024 * 1024)
110
def test_downloader_work_queue_is_correctly_populated(self):
111
job = mock.MagicMock()
112
job.archive_size = 8 * 1024 * 1024
113
downloader = FakeThreadedConcurrentDownloader(job)
114
downloader.download('foofile')
115
q = downloader.worker_queue
116
items = [q.get() for i in xrange(q.qsize())]
117
self.assertEqual(items[0], (0, 4 * 1024 * 1024))
118
self.assertEqual(items[1], (1, 4 * 1024 * 1024))
119
# 2 for the parts, 10 for the end sentinels (10 threads).
120
self.assertEqual(len(items), 12)
123
class TestUploaderThread(unittest.TestCase):
125
self.fileobj = tempfile.NamedTemporaryFile()
126
self.filename = self.fileobj.name
128
def test_fileobj_closed_when_thread_shuts_down(self):
129
thread = UploadWorkerThread(mock.Mock(), 'vault_name',
130
self.filename, 'upload_id',
132
fileobj = thread._fileobj
133
self.assertFalse(fileobj.closed)
134
# By settings should_continue to False, it should immediately
135
# exit, and we can still verify cleanup behavior.
136
thread.should_continue = False
138
self.assertTrue(fileobj.closed)
140
def test_upload_errors_have_exception_messages(self):
143
result_queue = Queue()
144
upload_thread = UploadWorkerThread(
145
api, 'vault_name', self.filename,
146
'upload_id', job_queue, result_queue, num_retries=1,
147
time_between_retries=0)
148
api.upload_part.side_effect = Exception("exception message")
149
job_queue.put((0, 1024))
150
job_queue.put(_END_SENTINEL)
153
result = result_queue.get(timeout=1)
154
self.assertIn("exception message", str(result))
156
def test_num_retries_is_obeyed(self):
157
# total attempts is 1 + num_retries so if I have num_retries of 2,
158
# I'll attempt the upload once, and if that fails I'll retry up to
159
# 2 more times for a total of 3 attempts.
162
result_queue = Queue()
163
upload_thread = UploadWorkerThread(
164
api, 'vault_name', self.filename,
165
'upload_id', job_queue, result_queue, num_retries=2,
166
time_between_retries=0)
167
api.upload_part.side_effect = Exception()
168
job_queue.put((0, 1024))
169
job_queue.put(_END_SENTINEL)
172
self.assertEqual(api.upload_part.call_count, 3)
175
if __name__ == '__main__':