1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright (c) 2011 Citrix Systems, Inc.
4
# Copyright 2011 OpenStack LLC.
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
19
Utility classes for defining the time saving transfer of data from the reader
20
to the write using a LightQueue as a Pipe between the reader and the writer.
23
from eventlet import event
24
from eventlet import greenthread
25
from eventlet.queue import LightQueue
27
from glance import client
29
from nova import exception
30
from nova import log as logging
32
LOG = logging.getLogger("nova.virt.vmwareapi.io_util")
34
IO_THREAD_SLEEP_TIME = .01
35
GLANCE_POLL_INTERVAL = 5
38
class ThreadSafePipe(LightQueue):
39
"""The pipe to hold the data which the reader writes to and the writer
42
def __init__(self, maxsize, transfer_size):
43
LightQueue.__init__(self, maxsize)
44
self.transfer_size = transfer_size
47
def read(self, chunk_size):
48
"""Read data from the pipe. Chunksize if ignored for we have ensured
49
that the data chunks written to the pipe by readers is the same as the
50
chunks asked for by the Writer."""
51
if self.transferred < self.transfer_size:
52
data_item = self.get()
53
self.transferred += len(data_item)
58
def write(self, data):
59
"""Put a data item in the pipe."""
63
"""A place-holder to maintain consistency."""
67
class GlanceWriteThread(object):
68
"""Ensures that image data is written to in the glance client and that
69
it is in correct ('active')state."""
71
def __init__(self, input, glance_client, image_id, image_meta={}):
73
self.glance_client = glance_client
74
self.image_id = image_id
75
self.image_meta = image_meta
79
self.done = event.Event()
82
"""Function to do the image data transfer through an update
83
and thereon checks if the state is 'active'."""
84
self.glance_client.update_image(self.image_id,
85
image_meta=self.image_meta,
86
image_data=self.input)
91
self.glance_client.get_image_meta(self.image_id).get(
93
if image_status == "active":
96
# If the state is killed, then raise an exception.
97
elif image_status == "killed":
99
exc_msg = _("Glance image %s is in killed state") %\
101
LOG.exception(exc_msg)
102
self.done.send_exception(exception.Error(exc_msg))
103
elif image_status in ["saving", "queued"]:
104
greenthread.sleep(GLANCE_POLL_INTERVAL)
107
exc_msg = _("Glance image "
108
"%(image_id)s is in unknown state "
110
"image_id": self.image_id,
111
"state": image_status}
112
LOG.exception(exc_msg)
113
self.done.send_exception(exception.Error(exc_msg))
114
except Exception, exc:
116
self.done.send_exception(exc)
118
greenthread.spawn(_inner)
122
self._running = False
125
return self.done.wait()
131
class IOThread(object):
132
"""Class that reads chunks from the input file and writes them to the
133
output file till the transfer is completely done."""
135
def __init__(self, input, output):
138
self._running = False
139
self.got_exception = False
142
self.done = event.Event()
145
"""Read data from the input and write the same to the output
146
until the transfer completes."""
150
data = self.input.read(None)
154
self.output.write(data)
155
greenthread.sleep(IO_THREAD_SLEEP_TIME)
156
except Exception, exc:
159
self.done.send_exception(exc)
161
greenthread.spawn(_inner)
165
self._running = False
168
return self.done.wait()