1
# Copyright (c) 2014 VMware, Inc.
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
17
Unit tests for functions and classes for image transfer.
24
from oslo.vmware import exceptions
25
from oslo.vmware import image_transfer
26
from tests import base
29
class BlockingQueueTest(base.TestCase):
30
"""Tests for BlockingQueue."""
35
max_transfer_size = 30
36
queue = image_transfer.BlockingQueue(max_size, max_transfer_size)
38
def get_side_effect():
39
return [1] * chunk_size
41
queue.get = mock.Mock(side_effect=get_side_effect)
43
data_item = queue.read(chunk_size)
47
self.assertEqual(max_transfer_size, queue._transferred)
48
exp_calls = [mock.call()] * int(math.ceil(float(max_transfer_size) /
50
self.assertEqual(exp_calls, queue.get.call_args_list)
53
queue = image_transfer.BlockingQueue(10, 30)
54
queue.put = mock.Mock()
56
for _ in range(0, write_count):
58
exp_calls = [mock.call([1])] * write_count
59
self.assertEqual(exp_calls, queue.put.call_args_list)
62
max_transfer_size = 30
63
queue = image_transfer.BlockingQueue(10, 30)
64
self.assertEqual(max_transfer_size, queue.tell())
67
class ImageWriterTest(base.TestCase):
68
"""Tests for ImageWriter class."""
70
def _create_image_writer(self):
71
self.image_service = mock.Mock()
72
self.context = mock.Mock()
73
self.input_file = mock.Mock()
74
self.image_id = mock.Mock()
75
return image_transfer.ImageWriter(self.context, self.input_file,
76
self.image_service, self.image_id)
79
writer = self._create_image_writer()
80
status_list = ['queued', 'saving', 'active']
82
def image_service_show_side_effect(context, image_id):
83
status = status_list.pop(0)
84
return {'status': status}
86
self.image_service.show.side_effect = image_service_show_side_effect
87
exp_calls = [mock.call(self.context, self.image_id)] * len(status_list)
88
with mock.patch.object(image_transfer,
89
'IMAGE_SERVICE_POLL_INTERVAL', 0):
91
self.assertTrue(writer.wait())
92
self.image_service.update.assert_called_once_with(self.context,
95
self.assertEqual(exp_calls, self.image_service.show.call_args_list)
97
def test_start_with_killed_status(self):
98
writer = self._create_image_writer()
100
def image_service_show_side_effect(_context, _image_id):
101
return {'status': 'killed'}
103
self.image_service.show.side_effect = image_service_show_side_effect
105
self.assertRaises(exceptions.ImageTransferException,
107
self.image_service.update.assert_called_once_with(self.context,
109
data=self.input_file)
110
self.image_service.show.assert_called_once_with(self.context,
113
def test_start_with_unknown_status(self):
114
writer = self._create_image_writer()
116
def image_service_show_side_effect(_context, _image_id):
117
return {'status': 'unknown'}
119
self.image_service.show.side_effect = image_service_show_side_effect
121
self.assertRaises(exceptions.ImageTransferException,
123
self.image_service.update.assert_called_once_with(self.context,
125
data=self.input_file)
126
self.image_service.show.assert_called_once_with(self.context,
129
def test_start_with_image_service_show_exception(self):
130
writer = self._create_image_writer()
131
self.image_service.show.side_effect = RuntimeError()
133
self.assertRaises(exceptions.ImageTransferException, writer.wait)
134
self.image_service.update.assert_called_once_with(self.context,
136
data=self.input_file)
137
self.image_service.show.assert_called_once_with(self.context,
141
class FileReadWriteTaskTest(base.TestCase):
142
"""Tests for FileReadWriteTask class."""
144
def test_start(self):
145
data_items = [[1] * 10, [1] * 20, [1] * 5, []]
147
def input_file_read_side_effect(arg):
148
self.assertFalse(arg)
149
data = data_items[input_file_read_side_effect.i]
150
input_file_read_side_effect.i += 1
153
input_file_read_side_effect.i = 0
154
input_file = mock.Mock()
155
input_file.read.side_effect = input_file_read_side_effect
156
output_file = mock.Mock()
157
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
159
self.assertTrue(rw_task.wait())
160
self.assertEqual(len(data_items), input_file.read.call_count)
163
for i in range(0, len(data_items)):
164
exp_calls.append(mock.call(data_items[i]))
165
self.assertEqual(exp_calls, output_file.write.call_args_list)
167
self.assertEqual(len(data_items),
168
input_file.update_progress.call_count)
169
self.assertEqual(len(data_items),
170
output_file.update_progress.call_count)
172
def test_start_with_read_exception(self):
173
input_file = mock.Mock()
174
input_file.read.side_effect = RuntimeError()
175
output_file = mock.Mock()
176
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
178
self.assertRaises(exceptions.ImageTransferException, rw_task.wait)
179
input_file.read.assert_called_once_with(None)